EOX GitLab Instance

Skip to content
Snippets Groups Projects
Commit 933650f2 authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Further developing preprocessor. Added GSC metadata generation.

parent c0422814
No related branches found
No related tags found
No related merge requests found
from textwrap import dedent
def positions_to_poslist(positions, projection):
# TODO: maybe reproject if not lat, lon
return ' '.join([
' '.join(pair)
for pair in positions
])
def get_footprint_from_browse(browse):
btype = browse['browse_type']
if btype == 'rectified_browse':
low, high = browse['rectified']['coord_list']
minx, miny = low
maxx, maxy = high
positions = [
(minx, miny),
(maxx, miny),
(maxx, maxy),
(minx, maxy),
(minx, minx),
]
elif btype == 'footprint_browse':
positions = browse['footprint']
elif btype == 'model_in_geotiff_browse':
# TODO: read from input dataset
raise NotImplementedError('Model in geotiff browses are not supported')
elif btype == 'regular_grid_browse':
raise NotImplementedError('Regular grid browses are not supported')
return positions_to_poslist(
positions,
browse['reference_system_identifier'],
)
def generate_gsc_metadata(metadata):
return dedent("""\
<?xml version='1.0' encoding='UTF-8'?>
<gsc:report xmlns:sar="http://earth.esa.int/sar"
xmlns:gml="http://www.opengis.net/gml"
xmlns:eop="http://earth.esa.int/eop"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:opt="http://earth.esa.int/opt"
xmlns:gsc="http://earth.esa.int/gsc"
xmlns:atm="http://earth.esa.int/atm"
xmlns:xlink="http://www.w3.org/1999/xlink" version="2.0">
<gsc:responsibleOrgName>EOX</gsc:responsibleOrgName>
<gsc:reportType>CIRCULATION</gsc:reportType>
<gsc:dateTime>{now_time}</gsc:dateTime>
<gsc:orderReference></gsc:orderReference>
<gsc:opt_metadata version="1.2.1">
<gml:metaDataProperty>
<gsc:EarthObservationMetaData>
<eop:identifier>{identifier}</eop:identifier>
<!--<eop:parentIdentifier>D2_MG2b_FRTX_004a/other/FRSC_0059_001/SpotImage</eop:parentIdentifier>-->
<eop:acquisitionType>NOMINAL</eop:acquisitionType>
<eop:productType>{product_type}</eop:productType>
<eop:status>ARCHIVED</eop:status>
<eop:archivedIn>
<eop:ArchivingInformation>
<eop:archivingCenter>CDS</eop:archivingCenter>
<eop:archivingDate>{archiving_time}</eop:archivingDate>
</eop:ArchivingInformation>
</eop:archivedIn>
<gsc:deliveryInfo>
<gsc:deliveryDateTime>{delivery_time}</gsc:deliveryDateTime>
<gsc:deliveryMethod>ELECTRONIC</gsc:deliveryMethod>
</gsc:deliveryInfo>
</gsc:EarthObservationMetaData>
</gml:metaDataProperty>
<gml:validTime>
<gml:TimePeriod>
<gml:beginPosition>{begin_time}</gml:beginPosition>
<gml:endPosition>{end_time}</gml:endPosition>
</gml:TimePeriod>
</gml:validTime>
<gml:using>
<!--<eop:EarthObservationEquipment>
<eop:platform>
<eop:Platform>
<eop:shortName>PH1A</eop:shortName>
<eop:serialIdentifier>1A</eop:serialIdentifier>
</eop:Platform>
</eop:platform>
<eop:instrument>
<eop:Instrument>
<eop:shortName>HR</eop:shortName>
</eop:Instrument>
</eop:instrument>
<eop:sensor>
<eop:Sensor>
<eop:sensorType>OPTICAL</eop:sensorType>
<eop:operationalMode>FUS</eop:operationalMode>
<eop:resolution uom="m">0.5</eop:resolution>
</eop:Sensor>
</eop:sensor>
<eop:acquisitionParameters>
<opt:Acquisition>
<eop:orbitNumber>118</eop:orbitNumber>
<eop:orbitDirection>DESCENDING</eop:orbitDirection>
<eop:acrossTrackIncidenceAngle uom="deg">-4.070247073869651</eop:acrossTrackIncidenceAngle>
<eop:alongTrackIncidenceAngle uom="deg">2.304231907410827</eop:alongTrackIncidenceAngle>
<opt:illuminationAzimuthAngle uom="deg">164.3516878667332</opt:illuminationAzimuthAngle>
</opt:Acquisition>
</eop:acquisitionParameters>
</eop:EarthObservationEquipment>-->
</gml:using>
<gml:target>
<eop:Footprint>
<gml:multiExtentOf>
<gml:MultiSurface srsName="EPSG:4326">
<gml:surfaceMembers>
<gml:Polygon>
<gml:exterior>
<gml:LinearRing>
<gml:posList>{footprint}</gml:posList>
</gml:LinearRing>
</gml:exterior>
</gml:Polygon>
</gml:surfaceMembers>
</gml:MultiSurface>
</gml:multiExtentOf>
</eop:Footprint>
</gml:target>
<gml:resultOf/>
</gsc:opt_metadata>
</gsc:report>""".format(**metadata))
\ No newline at end of file
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
import sys import sys
import os import os
import os.path import os.path
from os.path import join, basename, splitext
import argparse import argparse
import textwrap import textwrap
import logging import logging
...@@ -43,11 +44,13 @@ import re ...@@ -43,11 +44,13 @@ import re
import subprocess import subprocess
from urllib.parse import urlparse from urllib.parse import urlparse
from urllib.request import urlretrieve from urllib.request import urlretrieve
from datetime import datetime
from swiftclient.multithreading import OutputManager from swiftclient.multithreading import OutputManager
from swiftclient.service import SwiftError, SwiftService, SwiftUploadObject from swiftclient.service import SwiftError, SwiftService, SwiftUploadObject
import transform_chain import transform_chain
import gsc_generator
# collection: [name] # collection: [name]
COLLECTION_MAP = { COLLECTION_MAP = {
...@@ -164,7 +167,7 @@ class BasePreprocessor: ...@@ -164,7 +167,7 @@ class BasePreprocessor:
tempfile.TemporaryDirectory() as tmpdir, \ tempfile.TemporaryDirectory() as tmpdir, \
OutputManager(): OutputManager():
for container, path in self.get_input_container_and_filenames(): for container, path, extra in self.get_input_container_and_filenames():
if not self.replace and self.check_output_exists(output_swift, container, path): if not self.replace and self.check_output_exists(output_swift, container, path):
logger.critical( logger.critical(
"Aborting, package '%s' already exists at " "Aborting, package '%s' already exists at "
...@@ -174,7 +177,7 @@ class BasePreprocessor: ...@@ -174,7 +177,7 @@ class BasePreprocessor:
source_file = self.download_source(input_swift, container, path, tmpdir) source_file = self.download_source(input_swift, container, path, tmpdir)
unpacked_files = self.unpack_source(source_file, tmpdir) unpacked_files = self.unpack_source(source_file, tmpdir)
processed_files = self.process_image(unpacked_files, tmpdir) processed_files = self.process_image(unpacked_files, extra, tmpdir)
self.upload_files(output_swift, container, path, processed_files, tmpdir) self.upload_files(output_swift, container, path, processed_files, tmpdir)
def check_output_exists(self, swift, container, path): def check_output_exists(self, swift, container, path):
...@@ -201,7 +204,7 @@ class BasePreprocessor: ...@@ -201,7 +204,7 @@ class BasePreprocessor:
def cleanup_source(self, filename): def cleanup_source(self, filename):
os.unlink(filename) os.unlink(filename)
def process_image(self, files, tmpdir): def process_image(self, files, extra, tmpdir):
raise NotImplementedError raise NotImplementedError
def upload_files(self, swift, container, base_path, filenames, tmpdir): def upload_files(self, swift, container, base_path, filenames, tmpdir):
...@@ -216,7 +219,7 @@ class PackagePreprocessor(BasePreprocessor): ...@@ -216,7 +219,7 @@ class PackagePreprocessor(BasePreprocessor):
def get_input_container_and_filenames(self): def get_input_container_and_filenames(self):
container = self.tar_object_path.split("/")[1] container = self.tar_object_path.split("/")[1]
package = "/".join(self.tar_object_path.split("/")[2:]) package = "/".join(self.tar_object_path.split("/")[2:])
return [(container, package)] return [(container, package, None)]
def unpack_source(self, filename, tmpdir): def unpack_source(self, filename, tmpdir):
tf = tarfile.open(filename, mode="r") tf = tarfile.open(filename, mode="r")
...@@ -266,9 +269,12 @@ class PackagePreprocessor(BasePreprocessor): ...@@ -266,9 +269,12 @@ class PackagePreprocessor(BasePreprocessor):
logger.debug("Opening file using GDAL.") logger.debug("Opening file using GDAL.")
return transform_chain.open_gdal_dataset(source_name) return transform_chain.open_gdal_dataset(source_name)
def process_image(self, files, tmpdir, src_ds): def process_image(self, files, extra, tmpdir):
data_files, metadata_file = files[:-1], files[-1] data_files, metadata_file = files[:-1], files[-1]
# get initial source dataset
src_ds = self.open_source_dataset(data_files, tmpdir)
# perform transformation steps as necessary # perform transformation steps as necessary
logger.debug("Changing geotransform.") logger.debug("Changing geotransform.")
src_ds = transform_chain.correct_geo_transform(src_ds) src_ds = transform_chain.correct_geo_transform(src_ds)
...@@ -289,34 +295,82 @@ class BrowseReportPreprocessor(BasePreprocessor): ...@@ -289,34 +295,82 @@ class BrowseReportPreprocessor(BasePreprocessor):
super().__init__(replace) super().__init__(replace)
self.browse_report = browse_report self.browse_report = browse_report
def get_input_container_filenames(self): def get_input_container_and_filenames(self):
def _get_browse_container_filename(filename): def _get_browse_container_filename(filename, browse):
parsed = urlparse(filename) parsed = urlparse(filename)
if parsed.scheme: if parsed.scheme:
return (None, filename) return (None, filename)
container = filename.split("/")[1] container = filename.split("/")[1]
filename = "/".join(filename.split("/")[2:]) filename = "/".join(filename.split("/")[2:])
return container, filename return container, filename, browse
return [ return [
_get_browse_container_filename(browse['filename']) _get_browse_container_filename(browse['filename'], browse)
for browse in self.browse_report['browses'] for browse in self.browse_report['browses']
] ]
def download_source(self, swift, container, object_name, tmpdir): def download_source(self, swift, container, object_name, tmpdir):
local_path = os.path.join(tmpdir, os.path.basename(object_name)) local_path = os.path.join(tmpdir, os.path.basename(object_name))
if not container: # download source from either swift or HTTP
if container:
# construct local path
swift_download_file(swift, container, object_name, local_path)
else:
urlretrieve(object_name, local_path) urlretrieve(object_name, local_path)
# construct local path
swift_download_file(swift, container, object_name, local_path)
return local_path return local_path
def unpack_source(self, filename, tmpdir): def unpack_source(self, filename, tmpdir):
# should already be a simple file # should already be a simple file
return filename return [filename]
def process_image(self, files, browse, tmpdir):
data_file = files[0]
src_ds = transform_chain.open_gdal_dataset(data_file)
# TODO: preprocessing from ngeo
# # perform transformation steps as necessary
# logger.debug("Changing geotransform.")
# src_ds = transform_chain.correct_geo_transform(src_ds)
# save file with given options - should use ENV
creation_options = ["BLOCKSIZE=512", "COMPRESS=DEFLATE", "LEVEL=6", "NUM_THREADS=8", "BIGTIFF=IF_SAFER", "OVERVIEWS=AUTO", "RESAMPLING=CUBIC"]
logger.debug("Saving resulting file.")
output_filename = os.path.join(tmpdir, "%s.tmp" % data_file)
transform_chain.write_gdal_dataset(src_ds, "COG", output_filename, creation_options)
src_ds = None
os.rename(output_filename, data_file)
# generate GSC metadata
metadata_file = self.generate_metadata_file(data_file, browse, tmpdir)
return (data_file, metadata_file)
def generate_metadata_file(self, data_file, browse, tmpdir):
now_time = datetime.now().isoformat().rpartition('.')[0] + 'Z'
metadata = {
'identifier': browse['browse_identifier'],
'now_time': now_time,
'archiving_time': now_time,
'delivery_time': now_time,
'begin_time': browse['start_time'],
'end_time': browse['end_time'],
'product_type': browse['browse_type'],
'footprint': gsc_generator.get_footprint_from_browse(browse)
}
out_filename = join(tmpdir, splitext(basename(browse['filename']))[0] + '.xml')
with open(out_filename, 'w') as f:
f.write(
gsc_generator.generate_gsc_metadata(metadata)
)
return out_filename
def preprocessor( def preprocessor(
collection, tar_object_path, replace=False, collection, tar_object_path, replace=False,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment