import os import sys import pytest import subprocess from osgeo import gdal from swiftclient.service import SwiftService def set_gdal_swift_auth(): # parsing command line output of swift auth auth_keys = subprocess.check_output(["swift", "auth"]).decode(sys.stdout.encoding).split("\n") storage_url = auth_keys[0].split("OS_STORAGE_URL=")[1] auth_token = auth_keys[1].split("OS_AUTH_TOKEN=")[1] # setting gdal config gdal.SetConfigOption("SWIFT_STORAGE_URL", storage_url) gdal.SetConfigOption("SWIFT_AUTH_TOKEN", auth_token) @pytest.fixture def list_tifs(product='KS03_AIS_PSH_1G_20160101T113020_20160101T113022_KGS_019339_75CE.tar'): preprocessed_list = [] container = os.getenv('PREPROCESSOR_RESULTS_BUCKET') with SwiftService() as swift: # auth_options["prefix"] = product[0] list_parts_gen = swift.list( container=container, options={"prefix": product[0]}, ) for page in list_parts_gen: if page["success"]: for item in page["listing"]: if item["name"].endswith(".tif"): preprocessed_list.append(item["name"]) return preprocessed_list def test_preprocessor(list_tifs): # check if there are preprocessed results in the buckets assert len(list_tifs) > 0 container = os.getenv('PREPROCESSOR_RESULTS_BUCKET') for item in list_tifs: set_gdal_swift_auth() image = gdal.Open('/vsiswift/%s/%s' % (container, item), gdal.GA_ReadOnly) srcband = image.GetRasterBand(1) assert srcband.Checksum() is not None assert srcband.Checksum() > 0