import os import sys import pytest import subprocess from osgeo import gdal from swiftclient.service import SwiftService def set_gdal_swift_auth(): # parsing command line output of swift auth auth_keys = subprocess.check_output(["swift", "auth"]).decode(sys.stdout.encoding).split("\n") storage_url = auth_keys[0].split("OS_STORAGE_URL=")[1] auth_token = auth_keys[1].split("OS_AUTH_TOKEN=")[1] # setting gdal config gdal.SetConfigOption("SWIFT_STORAGE_URL", storage_url) gdal.SetConfigOption("SWIFT_AUTH_TOKEN", auth_token) @pytest.fixture def list_tifs(product): preprocessed_list = [] with SwiftService() as swift: # auth_options["prefix"] = product[0] list_parts_gen = swift.list( container='preprocessor_results', options= {"prefix": product[0]}, ) for page in list_parts_gen: if page["success"]: for item in page["listing"]: if item["name"].endswith(".tif"): preprocessed_list.append(item["name"]) return preprocessed_list def test_preprocessor(list_tifs): # check if there are preprocessed results in the buckets assert len(list_tifs) > 0 for item in list_tifs: set_gdal_swift_auth() image = gdal.Open('/vsiswift/preprocessor_results/%s' % item, gdal.GA_ReadOnly) srcband = image.GetRasterBand(1) assert srcband.Checksum() != None assert srcband.Checksum() > 0