import re from os.path import join import logging from .xml import read_xml, parse_metadata_schema, Parameter from .context import Context from .source import Source from .exceptions import RegistrationError logger = logging.getLogger(__name__) class RegistrationScheme: def get_context(self): raise NotImplementedError def parse_datetime(value): return value def pairwise(iterable): "s -> (s0,s1), (s2,s3), (s4, s5), ..." a = iter(iterable) return zip(a, a) def parse_footprint(value): coord_list = ','.join( f'{x} {y}' for y, x in pairwise(value.split()) ) return f'POLYGON(({coord_list}))' class Sentinel2RegistrationScheme(RegistrationScheme): MTD_MSIL2A_SCHEMA = { 'begin_time': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/PRODUCT_START_TIME/text()', False, parse_datetime), 'end_time': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/PRODUCT_STOP_TIME/text()', False, parse_datetime), 'identifier': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/PRODUCT_URI/text()'), 'footprint': Parameter('/n1:Level-2A_User_Product/n1:Geometric_Info/Product_Footprint/Product_Footprint/Global_Footprint/EXT_POS_LIST/text()', False, parse_footprint), 'level': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/PROCESSING_LEVEL/text()'), 'type': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/PRODUCT_TYPE/text()'), 'generation_time': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/GENERATION_TIME/text()', False, parse_datetime), 'cloud_cover': Parameter('/n1:Level-2A_User_Product/n1:Quality_Indicators_Info/Cloud_Coverage_Assessment/text()'), 'image_file_paths': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/Product_Organisation/Granule_List/Granule/IMAGE_FILE/text()', True), } MTD_TL_SCHEMA = { 'mask_file_paths': Parameter('/n1:Level-2A_Tile_ID/n1:Quality_Indicators_Info/Pixel_Level_QI/MASK_FILENAME/text()', True), } MTD_MSIL2A_NAMESPACES = { 'n1': "https://psd-14.sentinel2.eo.esa.int/PSD/User_Product_Level-2A.xsd" } MTD_TL_NAMESPACES = { 'n1': 'https://psd-14.sentinel2.eo.esa.int/PSD/S2_PDI_Level-2A_Tile_Metadata.xsd' } def get_context(self, source: Source, path: str): metadata_file = join(path, 'MTD_MSIL2A.xml') tree = read_xml(source, metadata_file) # get product metadata metadata = parse_metadata_schema(tree, self.MTD_MSIL2A_SCHEMA, self.MTD_MSIL2A_NAMESPACES) band_re = re.compile(r'.*([A-Z0-9]{3}_[0-9]{2}m)$') raster_files = { band_re.match(image_file_path).groups()[0]: f'{join(path, image_file_path)}.jp2' for image_file_path in metadata['image_file_paths'] } # get granule metadata mtd_files = source.list_files(join(path, 'GRANULE'), '*/MTD_TL.xml') logger.info(f'{mtd_files}') tl_tree = read_xml(source, mtd_files[0]) tile_metadata = parse_metadata_schema(tl_tree, self.MTD_TL_SCHEMA, self.MTD_TL_NAMESPACES) mask_type_re = re.compile(r'.*/MSK_([A-Z]*)_([A-Z0-9]{3}).[a-z0-9]+$') mask_files = { mask_type_re.match(mask_file_path).groups()[0]: join(path, mask_file_path) for mask_file_path in tile_metadata['mask_file_paths'] if mask_type_re.match(mask_file_path) is not None } return Context( identifier=metadata['identifier'], path=path, product_type=metadata['type'], product_level=metadata['level'], raster_files=raster_files, mask_files=mask_files, metadata_files=[metadata_file], metadata={ 'begin_time': metadata['begin_time'], 'end_time': metadata['end_time'], 'generation_time': metadata['generation_time'], 'cloud_cover': metadata['cloud_cover'], 'footprint': metadata['footprint'], } ) def parse_ring(string): raw_coords = string.split() return [(lon, lat) for lat, lon in pairwise(raw_coords)] def parse_polygons_gsc(elem): def serialize_coord_list(coords): return ','.join( f'{x} {y}' for x, y in coords ) interior = serialize_coord_list( parse_ring( elem.xpath( "gml:exterior/gml:LinearRing/gml:posList", namespaces=elem.nsmap )[0].text.strip() ) ) exteriors = [ f'''({ serialize_coord_list( parse_ring(poslist_elem.text.strip()) ) })''' for poslist_elem in elem.xpath( "gml:interior/gml:LinearRing/gml:posList", namespaces=elem.nsmap ) ] return f"POLYGON(({interior}){',' if exteriors else ''}{','.join(exteriors)})" class GSCRegistrationScheme(RegistrationScheme): GSC_SCHEMA = { 'identifier': Parameter('//gml:metaDataProperty/gsc:EarthObservationMetaData/eop:identifier/text()'), 'type': Parameter('//gml:using/eop:EarthObservationEquipment/eop:platform/eop:Platform/eop:shortName/text()'), 'level': Parameter('//gml:metaDataProperty/gsc:EarthObservationMetaData/eop:parentIdentifier/text()'), 'mask': Parameter('//gsc:opt_metadata/gml:metaDataProperty/gsc:EarthObservationMetaData/eop:vendorSpecific/eop:SpecificInformation[eop:localAttribute/text() = "CF_POLY"]/eop:localValue/text()', True), 'footprint': Parameter( '(gsc:sar_metadata|gsc:opt_metadata|gsc:report)/gml:target/eop:Footprint/gml:multiExtentOf/gml:MultiSurface/gml:surfaceMembers/gml:Polygon', True, parse_polygons_gsc ), } def __init__(self, level_re: str=r'.*(Level_[0-9]+)$'): self.level_re = level_re def get_context(self, source: Source, path: str) -> Context: gsc_filenames = source.list_files(path, ['GSC*.xml', 'GSC*.XML']) metadata_file = gsc_filenames[0] tree = read_xml(source, metadata_file) metadata = parse_metadata_schema(tree, self.GSC_SCHEMA, tree.getroot().nsmap) tiff_files = { metadata['type']: source.list_files(path, ['*.tif', '*.TIF']) } match = re.match(self.level_re, metadata['level']) if match: level = match.groups()[0] else: level = None return Context( identifier=metadata['identifier'], path=path, product_type=metadata['type'], product_level=level, raster_files=tiff_files, masks={ 'validity': metadata['mask'][0] if metadata['mask'] else None }, metadata_files=[metadata_file], metadata={ } ) REGISTRATION_SCHEMES = { 'gsc': GSCRegistrationScheme, 'sentinel-2': Sentinel2RegistrationScheme, } def get_scheme(config: dict, path: str) -> RegistrationScheme: cfg_schemes = config['schemes'] for cfg_scheme in cfg_schemes: if cfg_scheme.get('filter'): if re.match(cfg_scheme['filter'], path): break else: break else: # no source found raise RegistrationError(f'Could not find a suitable scheme for the path {path}') return REGISTRATION_SCHEMES[cfg_scheme['type']]( *cfg_scheme.get('args', []), **cfg_scheme.get('kwargs', {}), )