import csv
import pytest
import subprocess
from swiftclient.service import SwiftService
def products():
with open('./preprocessed_list.csv') as f:
yield csv.reader(f)
# parsing command line output of swift auth
auth_keys = subprocess.check_output(["swift", "auth"]).decode(sys.stdout.encoding).split("\n")
storage_url = auth_keys[0].split("OS_STORAGE_URL=")[1]
auth_token = auth_keys[1].split("OS_AUTH_TOKEN=")[1]
# setting gdal config
gdal.SetConfigOption("SWIFT_STORAGE_URL", storage_url)
gdal.SetConfigOption("SWIFT_AUTH_TOKEN", auth_token)
def list_tifs(products):
preprocessed_list = []
for product in products:
with SwiftService() as swift:
# auth_options["prefix"] = product[0]
list_parts_gen = swift.list(
container='preprocessor_results', options= {"prefix": product[0]},
for page in list_parts_gen:
if page["success"]:
for item in page["listing"]:
if item["name"].endswith(".tif"):
return preprocessed_list
def test_preprocessor(list_tifs):
# check if there are preprocessed results in the buckets
assert len(list_tifs) > 0
for item in list_tifs:
image = gdal.Open('/vsiswift/preprocessor_results/%s' % item, gdal.GA_ReadOnly)
srcband = image.GetRasterBand(1)
assert srcband.Checksum() != None
assert srcband.Checksum() > 0