import re from os.path import join import logging from .xml import read_xml, parse_metadata_schema, Parameter from .context import Context from .source import Source from .exceptions import RegistrationError logger = logging.getLogger(__name__) class RegistrationScheme: def get_context(self): raise NotImplementedError def parse_datetime(value): return value def pairwise(iterable): "s -> (s0,s1), (s2,s3), (s4, s5), ..." a = iter(iterable) return zip(a, a) def parse_footprint(value): coord_list = ','.join( f'{x} {y}' for y, x in pairwise(value.split()) ) return f'POLYGON(({coord_list}))' class Sentinel2RegistrationScheme(RegistrationScheme): MTD_MSIL2A_SCHEMA = { 'begin_time': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/PRODUCT_START_TIME/text()', False, parse_datetime), 'end_time': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/PRODUCT_STOP_TIME/text()', False, parse_datetime), 'identifier': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/PRODUCT_URI/text()'), 'footprint': Parameter('/n1:Level-2A_User_Product/n1:Geometric_Info/Product_Footprint/Product_Footprint/Global_Footprint/EXT_POS_LIST/text()', False, parse_footprint), 'level': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/PROCESSING_LEVEL/text()'), 'type': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/PRODUCT_TYPE/text()'), 'generation_time': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/GENERATION_TIME/text()', False, parse_datetime), 'cloud_cover': Parameter('/n1:Level-2A_User_Product/n1:Quality_Indicators_Info/Cloud_Coverage_Assessment/text()'), 'image_file_paths': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/Product_Organisation/Granule_List/Granule/IMAGE_FILE/text()', True), } MTD_TL_SCHEMA = { 'mask_file_paths': Parameter('/n1:Level-2A_Tile_ID/n1:Quality_Indicators_Info/Pixel_Level_QI/MASK_FILENAME/text()', True), } MTD_MSIL2A_NAMESPACES = { 'n1': "https://psd-14.sentinel2.eo.esa.int/PSD/User_Product_Level-2A.xsd" } MTD_TL_NAMESPACES = { 'n1': 'https://psd-14.sentinel2.eo.esa.int/PSD/S2_PDI_Level-2A_Tile_Metadata.xsd' } def get_context(self, source: Source, path: str): metadata_file = join(path, 'MTD_MSIL2A.xml') tree = read_xml(source, metadata_file) # get product metadata metadata = parse_metadata_schema(tree, self.MTD_MSIL2A_SCHEMA, self.MTD_MSIL2A_NAMESPACES) band_re = re.compile(r'.*([A-Z0-9]{3}_[0-9]{2}m)$') raster_files = { band_re.match(image_file_path).groups()[0]: f'{join(path, image_file_path)}.jp2' for image_file_path in metadata['image_file_paths'] } # get granule metadata mtd_files = source.list_files(join(path, 'GRANULE'), '*/MTD_TL.xml') logger.info(f'{mtd_files}') tl_tree = read_xml(source, mtd_files[0]) tile_metadata = parse_metadata_schema(tl_tree, self.MTD_TL_SCHEMA, self.MTD_TL_NAMESPACES) mask_type_re = re.compile(r'.*/MSK_([A-Z]*)_([A-Z0-9]{3}).[a-z0-9]+$') mask_files = { mask_type_re.match(mask_file_path).groups()[0]: join(path, mask_file_path) for mask_file_path in tile_metadata['mask_file_paths'] if mask_type_re.match(mask_file_path) is not None } return Context( identifier=metadata['identifier'], path=path, product_type=metadata['type'], product_level=metadata['level'], raster_files=raster_files, mask_files=mask_files, metadata_files=[metadata_file], metadata={ 'begin_time': metadata['begin_time'], 'end_time': metadata['end_time'], 'generation_time': metadata['generation_time'], 'cloud_cover': metadata['cloud_cover'], 'footprint': metadata['footprint'], } ) class GSCRegistrationScheme(RegistrationScheme): pass REGISTRATION_SCHEMES = { 'gsc': GSCRegistrationScheme, 'sentinel-2': Sentinel2RegistrationScheme, } def get_scheme(config: dict, path: str) -> RegistrationScheme: cfg_schemes = config['schemes'] for cfg_scheme in cfg_schemes: if cfg_scheme['filter']: if re.match(cfg_scheme['filter'], path): break else: break else: # no source found raise RegistrationError(f'Could not find a suitable scheme for the path {path}') return REGISTRATION_SCHEMES[cfg_scheme['type']]( *cfg_scheme.get('args', []), **cfg_scheme.get('kwargs', {}), )