EOX GitLab Instance

Commit 9da9012b authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Adding implementation to harvest from OADS systems and

generate STAC Items.
parent f9ad2fb7
Pipeline #22606 passed with stage
in 47 seconds
from typing import Iterator
from io import BytesIO, TextIOWrapper
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple
import logging
from urllib.parse import urljoin
from zipfile import ZipFile
import csv
import re
from lxml import etree
import requests
import pystac
from pystac.extensions.eo import EOExtension
from pystac.extensions.file import FileExtension
from pystac.extensions.sar import (
SarExtension, FrequencyBand, ObservationDirection, Polarization
)
from pystac.extensions.sat import SatExtension, OrbitState
from pystac.extensions.version import VersionExtension
from pystac.extensions.view import ViewExtension
from ..abc import Endpoint
logger = logging.getLogger()
CAMEL_TO_SNAKE_RE = re.compile(r'(?<!^)(?=[A-Z])')
def camel_to_snake_case(value: str) -> str:
return CAMEL_TO_SNAKE_RE.sub('_', value).lower()
class OADSEndpoint(Endpoint):
type = "OADS"
def __init__(self, scan_href: str, use_oads_ext: bool = False):
self.scan_href = scan_href
self.use_oads_ext = use_oads_ext
def harvest(self) -> Iterator[dict]:
raise NotImplementedError()
""" Harvests an OADS system. First performs a "Scan" operation to get the list of
index files which are subsequently retrieved and parsed.
"""
logger.info(f"Scanning index files from list '{self.scan_href}'")
response = requests.get(self.scan_href)
# parse as HTML
tree = etree.parse(BytesIO(response.content), etree.HTMLParser())
# assume that each listed item has the "href" property
index_file_names = [
elem.attrib["href"]
for elem in tree.xpath("//a[contains(@class, 'index-file')]")
]
logger.info(f"Found {len(index_file_names)} index files")
for index_file_name in index_file_names:
yield from self.harvest_index_file(index_file_name)
def harvest_index_file(self, index_file_name: str) -> Iterator[dict]:
""" Harvests a single index file and yields the contents as STAC Items
"""
# retrieve the full index file for later parsing
index_file_href = urljoin(self.scan_href, index_file_name)
logger.info(f"Harvesting from {index_file_href}")
response = requests.get(index_file_href)
response_file = BytesIO(response.content)
# handle zipped index files, as well as unzipped ones
if index_file_name.endswith(".zip"):
# zipped index files need to be extracted
# name is the same as zip-file without the '.zip' extension
zipfile = ZipFile(response_file)
index_file = zipfile.open(index_file_name[:-4])
else:
index_file = response_file
# create a CSV reader and parser and parse the contents of the file
reader = csv.DictReader(TextIOWrapper(index_file), delimiter="\t")
parser = IndexFileParser(self.use_oads_ext)
for line in reader:
yield parser.parse(line).to_dict()
def pairwise(iterable):
"s -> (s0, s1), (s2, s3), (s4, s5), ..."
a = iter(iterable)
return zip(a, a)
class IndexFileParser:
def __init__(self, use_oads_ext: bool):
self.use_oads_ext = use_oads_ext
def parse(self, record: Dict[str, str]) -> pystac.Item:
""" Parses a record to a STAC Item
"""
item = pystac.Item(
record["productId"],
self.parse_geometry(record),
self.parse_bbox(record),
None,
{
# Timings are all mandatory
"start_datetime": record["beginAcquisition"],
"end_datetime": record["endAcquisition"],
"created": record["availabilityTime"],
"updated": record["recordLastUpdate"],
# platform/instrument
"platform": record["platformShortName"],
"instruments": [record["instrumentShortName"]],
}
)
self.apply_properties(record, item)
self.apply_eo_extension(record, item)
self.apply_sar_extension(record, item)
self.apply_sat_extension(record, item)
self.apply_version_extension(record, item)
self.apply_view_extension(record, item)
if self.use_oads_ext:
self.apply_oads_extension(record, item)
self.apply_assets(record, item)
return item
def parse_geometry(self, record: Dict[str, str]) -> Optional[dict]:
""" Retrieve GeoJSON geometry from records "footprint", "nominalTrack" or
"sceneCentre" fields (in that preference)
"""
raw_footprint = record.get("footprint")
raw_nominal_track = record.get("nominalTrack")
raw_scene_centre = record.get("sceneCentre")
if raw_footprint:
ring = list(
[float(coord[1]), float(coord[0])]
for coord in pairwise(raw_footprint.split())
)
# close loop if not already closed
if ring[0] != ring[-1]:
ring.append(list(ring[0]))
return {
"type": "Polygon",
"coordinates": [ring]
}
elif raw_nominal_track:
return {
"type": "LineString",
"coordinates": list(
[float(coord[1]), float(coord[0])]
for coord in pairwise(raw_nominal_track.split())
)
}
elif raw_scene_centre:
parts = raw_scene_centre.split()
return {
"type": "Point",
"coordinates": [float(parts[1]), float(parts[0])]
}
return None
def parse_bbox(self, record: Dict[str, str]) -> Optional[List[float]]:
""" Parses the bbox from the potential "boundingBox" field.
"""
bounding_box = record.get("boundingBox")
if bounding_box:
bbox = [float(v) for v in bounding_box.split()]
bbox = [bbox[1], bbox[0], bbox[3], bbox[2]]
return bbox
return None
def apply_properties(self, record: Dict[str, str], item: pystac.Item):
""" Apply any optional properties currently in the core spec
"""
resolution = record.get("resolution")
if resolution:
item.properties["gsd"] = float(resolution)
def apply_eo_extension(self, record: Dict[str, str], item: pystac.Item):
""" Apply properties from the EO extension spec
"""
cloud_cover_percentage = record.get("cloudCoverPercentage")
if cloud_cover_percentage:
ext = EOExtension.ext(item, True)
ext.apply(cloud_cover=float(cloud_cover_percentage))
def apply_sat_extension(self, record: Dict[str, str], item: pystac.Item):
""" Apply properties from the SAT extension spec
"""
orbit_direction = record.get("orbitDirecton")
orbit_number = record.get("orbitNumber")
if orbit_direction or orbit_number:
ext = SatExtension.ext(item, True)
ext.apply(
OrbitState(orbit_direction) if orbit_direction else None,
relative_orbit=int(orbit_number) if orbit_number else None,
# TODO: startTimeFromAscendingNode? completionTimeFromAscendingNode?
# anx_datetime=
)
def apply_view_extension(self, record: Dict[str, str], item: pystac.Item):
""" Apply properties from the View extension spec
"""
illumination_azimuth_angle = record.get("illuminationAzimuthAngle")
illumination_elevation_angle = record.get("illuminationElevationAngle")
incidence_angle = record.get("incidenceAngle")
if illumination_azimuth_angle or illumination_elevation_angle or incidence_angle:
ext = ViewExtension.ext(item, True)
ext.apply(
incidence_angle=float(incidence_angle) if incidence_angle else None,
sun_azimuth=(
float(illumination_azimuth_angle)
if illumination_azimuth_angle else None
),
sun_elevation=(
float(illumination_elevation_angle)
if illumination_elevation_angle else None
),
)
def apply_sar_extension(self, record: Dict[str, str], item: pystac.Item):
""" Apply properties from the SAR extension spec
"""
polarisation_mode = record.get("polarisationMode")
polarisation_channels = record.get("polarisationChannels")
antenna_look_direction = record.get("antennaLookDirection")
doppler_frequency = record.get("dopplerFrequency")
if polarisation_mode or polarisation_channels or antenna_look_direction \
or doppler_frequency:
ext = SarExtension.ext(item, True)
ext.apply(
# TODO: currently defaults have to be used, as no info about frequencies
# is in the index files
frequency_band=FrequencyBand.C,
instrument_mode=polarisation_mode or "",
product_type=record.get("productType", ""),
polarizations=(
[
Polarization(v.strip()) for v in polarisation_channels.split(",")
]
if polarisation_channels and polarisation_channels != "UNDEFINED"
else []
),
observation_direction=(
ObservationDirection(antenna_look_direction)
if antenna_look_direction else None
),
center_frequency=float(doppler_frequency) if doppler_frequency else None,
)
def apply_version_extension(self, record: Dict[str, str], item: pystac.Item):
""" Apply properties from the Version extension spec
"""
product_version = record.get("productVersion")
if product_version:
ext = VersionExtension.ext(item, True)
ext.apply(product_version)
def apply_oads_extension(self, record: Dict[str, str], item: pystac.Item):
""" Apply properties from the non-standard OADS extension spec
"""
item.stac_extensions.append("oads") # TODO: make schema and add an href
oads_properties: List[Tuple[str, Callable[[str], Any]]] = [
("platformSerialIdentifier", str),
("sensorType", str),
("operationalMode", str),
("swathIdentifier", str),
("wrsLongitudeGrid", str),
("wrsLatitudeGrid", str),
("startTimeFromAscendingNode", int),
("completionTimeFromAscendingNode", int),
("illuminationZenithAngle", float),
("acrossTrackIncidenceAngle", float),
("alongTrackIncidenceAngle", float),
("timeliness", str),
("parentIdentifier", str),
("acquisitionType", str),
("acquisitionSubType", str),
("productType", str),
("productQualityDegradation", str),
("productQualityStatus", str),
("productQualityDegradationTag", str),
("productGroupId", str),
# TODO: parse and apply to all "browse" assets
# ("browseAvailabilityDateList", str),
("browseColRowList", lambda v: list(pairwise(int(i) for i in v.split()))),
("snowCoverPercentage", float),
# TODO: actually parse
# ("multiViewAngles", str),
# ("centreViewAngles", str),
("minimumIncidenceAngle", float),
("maximumIncidenceAngle", float),
("incidenceAngleVariation", float),
# TODO: actually parse
# ("occultationPoints", str),
]
for name, parser in oads_properties:
value = record.get(name)
if value:
item.properties[f"oads:{camel_to_snake_case(name)}"] = parser(value)
def apply_assets(self, record: Dict[str, str], item: pystac.Item):
""" Create and add STAC Assets from the provided information
"""
product_uri = record.get("productURI")
product_size = record.get("productSize")
thumbnail_list = record.get("thumbnailImageLocationList")
browse_list = record.get("browseImageLocationList")
metadata_list = record.get("browseMetadataLocationList")
quality_report = record.get("productQualityReportURL")
if product_uri:
asset = pystac.Asset(product_uri, roles=["data"])
item.add_asset("product", asset)
if product_size:
ext = FileExtension.ext(asset, True)
ext.apply(size=int(product_size))
if thumbnail_list:
thumbnails = thumbnail_list.split(",")
for i, thumbnail in enumerate(thumbnails):
item.add_asset(
"thumbnail" if len(thumbnails) == 1 and i == 0 else f"thumbnail-{i}",
pystac.Asset(
href=thumbnail,
roles=["thumbnail"]
)
)
if browse_list:
browses = browse_list.split(",")
for i, browse in enumerate(browses):
item.add_asset(
"browse" if len(browses) == 1 and i == 0 else f"browse-{i}",
pystac.Asset(
href=browse,
roles=["overview"]
)
)
if metadata_list:
metadatas = metadata_list.split(",")
for i, metadata in enumerate(metadatas):
item.add_asset(
"metadata" if len(metadatas) == 1 and i == 0 else f"metadata-{i}",
pystac.Asset(
href=metadata,
roles=["metadata"]
)
)
if quality_report:
item.add_asset(
"quality-report",
pystac.Asset(quality_report, roles=["metadata"])
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment