Newer
Older
import os
import sys
import pytest
import subprocess
from osgeo import gdal
from swiftclient.service import SwiftService
def set_gdal_swift_auth():
# parsing command line output of swift auth
auth_keys = subprocess.check_output(["swift", "auth"]).decode(sys.stdout.encoding).split("\n")
storage_url = auth_keys[0].split("OS_STORAGE_URL=")[1]
auth_token = auth_keys[1].split("OS_AUTH_TOKEN=")[1]
# setting gdal config
gdal.SetConfigOption("SWIFT_STORAGE_URL", storage_url)
gdal.SetConfigOption("SWIFT_AUTH_TOKEN", auth_token)
def list_tifs(product='KS03_AIS_PSH_1G_20160101T113020_20160101T113022_KGS_019339_75CE.tar'):
container = os.getenv('PREPROCESSOR_RESULTS_BUCKET')
with SwiftService() as swift:
# auth_options["prefix"] = product[0]
list_parts_gen = swift.list(
container=container, options={"prefix": product[0]},
)
for page in list_parts_gen:
if page["success"]:
for item in page["listing"]:
if item["name"].endswith(".tif"):
preprocessed_list.append(item["name"])
return preprocessed_list
# check if there are preprocessed results in the buckets
container = os.getenv('PREPROCESSOR_RESULTS_BUCKET')
image = gdal.Open('/vsiswift/%s/%s' % (container, item), gdal.GA_ReadOnly)
assert srcband.Checksum() is not None
assert srcband.Checksum() > 0