EOX GitLab Instance
Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
V
VS
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Deploy
Releases
Container Registry
Model registry
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
ESA
PRISM
VS
Commits
01c45828
Commit
01c45828
authored
4 years ago
by
Lubomir Dolezal
Browse files
Options
Downloads
Patches
Plain Diff
cleanup
parent
22a496d0
No related branches found
Branches containing commit
No related tags found
Tags containing commit
2 merge requests
!36
Staging to master to prepare 1.0.0 release
,
!32
Registrar modularization
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
README.md
+1
-1
1 addition, 1 deletion
README.md
core/registrar.py
+0
-498
0 additions, 498 deletions
core/registrar.py
core/setup.py
+1
-1
1 addition, 1 deletion
core/setup.py
with
2 additions
and
500 deletions
README.md
+
1
−
1
View file @
01c45828
...
...
@@ -253,7 +253,7 @@ docker stack deploy -c docker-compose.emg.yml -c docker-compose.emg.dev.yml -c d
Deploy base & logging stack in production environment:
```
docker stack deploy -c docker-compose.base.ops.yml base-pvs
docker stack deploy -c docker-compose.logging.yml docker-compose.logging.ops.yml logging
docker stack deploy -c docker-compose.logging.yml
-c
docker-compose.logging.ops.yml logging
```
Deploy the stack in production environment:
Please note that in order to reuse existing database volumes,
<stack-name>
needs to be the same. Here we use
`vhr18-pvs`
but in operational service
`vhr18-pdas`
is used.
...
...
This diff is collapsed.
Click to expand it.
core/registrar.py
deleted
100644 → 0
+
0
−
498
View file @
22a496d0
#!/usr/bin/env python
# -----------------------------------------------------------------------------
#
# Project: registrar.py
# Authors: Stephan Meissl <stephan.meissl@eox.at>
#
# -----------------------------------------------------------------------------
# Copyright (c) 2019 EOX IT Services GmbH
#
# Python script to register products.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
# -----------------------------------------------------------------------------
import
sys
import
os
import
argparse
import
textwrap
import
logging
import
traceback
from
xml.sax.saxutils
import
escape
import
subprocess
import
redis
import
lxml.etree
from
swiftclient.service
import
SwiftService
import
django
from
django.db
import
transaction
from
django.contrib.gis.geos
import
GEOSGeometry
from
osgeo
import
gdal
path
=
os
.
path
.
join
(
os
.
getenv
(
'
INSTALL_DIR
'
,
"
/var/www/pvs
"
),
"
pvs_instance
"
)
if
path
not
in
sys
.
path
:
sys
.
path
.
append
(
path
)
os
.
environ
.
setdefault
(
"
DJANGO_SETTINGS_MODULE
"
,
"
pvs_instance.settings
"
)
django
.
setup
()
from
eoxserver.backends
import
access
from
eoxserver.contrib
import
vsi
from
eoxserver.backends
import
models
as
backends
from
eoxserver.core.util.timetools
import
isoformat
from
eoxserver.resources.coverages
import
models
from
eoxserver.resources.coverages.registration.product
import
(
ProductRegistrator
)
from
eoxserver.resources.coverages.registration.registrators.gdal
import
(
GDALRegistrator
)
logger
=
logging
.
getLogger
(
__name__
)
def
setup_logging
(
verbosity
):
# start logging setup
# get command line level
verbosity
=
verbosity
if
verbosity
==
0
:
level
=
logging
.
CRITICAL
elif
verbosity
==
1
:
level
=
logging
.
ERROR
elif
verbosity
==
2
:
level
=
logging
.
WARNING
elif
verbosity
==
3
:
level
=
logging
.
INFO
else
:
level
=
logging
.
DEBUG
logger
.
setLevel
(
level
)
sh
=
logging
.
StreamHandler
()
sh
.
setLevel
(
level
)
formatter
=
logging
.
Formatter
(
"
%(asctime)s %(levelname)s: %(message)s
"
)
sh
.
setFormatter
(
formatter
)
logger
.
addHandler
(
sh
)
# finished logging setup
def
set_gdal_swift_auth
():
# parsing command line output of swift auth
auth_keys
=
subprocess
.
check_output
([
"
swift
"
,
"
auth
"
]).
decode
(
sys
.
stdout
.
encoding
).
split
(
"
\n
"
)
storage_url
=
auth_keys
[
0
].
split
(
"
OS_STORAGE_URL=
"
)[
1
]
auth_token
=
auth_keys
[
1
].
split
(
"
OS_AUTH_TOKEN=
"
)[
1
]
# setting gdal config
gdal
.
SetConfigOption
(
"
SWIFT_STORAGE_URL
"
,
storage_url
)
gdal
.
SetConfigOption
(
"
SWIFT_AUTH_TOKEN
"
,
auth_token
)
def
add_mask
(
product
):
metadata_item
=
product
.
metadata_items
.
all
()[
0
]
with
access
.
vsi_open
(
metadata_item
)
as
f
:
tree
=
lxml
.
etree
.
parse
(
f
)
root
=
tree
.
getroot
()
wkt
=
tree
.
xpath
(
'
//gsc:opt_metadata/gml:metaDataProperty/gsc:EarthObservationMetaData/eop:vendorSpecific/eop:SpecificInformation[eop:localAttribute/text() =
"
CF_POLY
"
]/eop:localValue/text()
'
,
namespaces
=
root
.
nsmap
)[
0
]
geometry
=
GEOSGeometry
(
wkt
)
mask_type
=
models
.
MaskType
.
objects
.
get
(
product_type
=
product
.
product_type
)
logger
.
debug
(
"
Adding mask
"
)
models
.
Mask
.
objects
.
create
(
product
=
product
,
mask_type
=
mask_type
,
geometry
=
geometry
,
)
def
get_product_type_and_level
(
metadata_item
):
level
=
None
with
access
.
vsi_open
(
metadata_item
)
as
f
:
tree
=
lxml
.
etree
.
parse
(
f
)
root
=
tree
.
getroot
()
try
:
xp
=
'
//gml:using/eop:EarthObservationEquipment/eop:platform/eop:Platform/eop:shortName/text()
'
product_type_name
=
tree
.
xpath
(
xp
,
namespaces
=
root
.
nsmap
)[
0
]
except
Exception
as
e
:
logger
.
debug
(
'
Failed to determine product type of %s, error was %s
'
%
(
metadata_item
.
location
,
e
)
)
try
:
xp
=
'
//gml:metaDataProperty/gsc:EarthObservationMetaData/eop:parentIdentifier/text()
'
parent_identifier
=
tree
.
xpath
(
xp
,
namespaces
=
root
.
nsmap
)[
0
]
if
parent_identifier
.
endswith
(
'
Level_1
'
):
level
=
'
Level_1
'
if
parent_identifier
.
endswith
(
'
Level_3
'
):
level
=
'
Level_3
'
else
:
raise
Exception
(
'
Invalid parent identifier type name %s
'
%
parent_identifier
)
except
Exception
as
e
:
logger
.
debug
(
'
Failed to determine product level of %s, error was %s
'
%
(
metadata_item
.
location
,
e
)
)
return
product_type_name
,
level
def
get_product_collection
(
metadata_file
):
# in case collection needs to be determined from metadata
try
:
if
metadata_file
.
startswith
(
"
/vsiswift
"
):
set_gdal_swift_auth
()
with
vsi
.
open
(
metadata_file
,
"
r
"
)
as
f
:
tree
=
lxml
.
etree
.
parse
(
f
)
root
=
tree
.
getroot
()
xp
=
'
//gml:metaDataProperty/gsc:EarthObservationMetaData/eop:parentIdentifier/text()
'
product_type_name
=
tree
.
xpath
(
xp
,
namespaces
=
root
.
nsmap
)
extracted
=
product_type_name
[
0
].
split
(
'
/
'
)[
0
]
return
extracted
except
Exception
as
e
:
logger
.
debug
(
'
Failed to determine product collection for metadata file %s, error was %s
'
%
(
metadata_file
,
e
)
)
def
get_product_type_from_band_count
(
product_type_name
,
file_path
):
# get raster band count via gdal
logger
.
debug
(
"
Opening file using GDAL: %s
"
%
file_path
)
if
file_path
.
startswith
(
"
/vsiswift
"
):
set_gdal_swift_auth
()
src_ds
=
gdal
.
Open
(
file_path
)
if
src_ds
is
None
:
raise
RegistrationError
(
"
Band check: failed to open dataset: %s
"
%
file_path
)
# try to fetch product model with _bandcount
product_type_name_upd
=
"
%s_%s
"
%
(
product_type_name
,
src_ds
.
RasterCount
)
try
:
product_type_model
=
models
.
ProductType
.
objects
.
get
(
name
=
product_type_name_upd
)
return
product_type_model
except
models
.
ProductType
.
DoesNotExist
:
raise
RegistrationError
(
"
Product Type:
'
%s
'
was not found
"
%
product_type_name_upd
)
class
RegistrationError
(
Exception
):
pass
@transaction.atomic
def
registrar
(
collection_stack
,
objects_prefix
,
upload_container
=
None
,
replace
=
False
,
client
=
None
,
registered_set_key
=
None
,
reporting_dir
=
None
,
service_url
=
None
):
logger
.
info
(
"
Starting registration of product
'
%s
'
.
"
%
objects_prefix
)
metadata_package
,
data_package
=
None
,
None
if
not
upload_container
:
# assuming objects_prefix = bucket/itemname
upload_container
=
objects_prefix
.
partition
(
"
/
"
)[
0
]
objects_prefix
=
objects_prefix
.
partition
(
"
/
"
)[
2
]
with
SwiftService
()
as
swift
:
list_parts_gen
=
swift
.
list
(
container
=
upload_container
,
options
=
{
"
prefix
"
:
objects_prefix
},
)
for
page
in
list_parts_gen
:
if
page
[
"
success
"
]:
for
item
in
page
[
"
listing
"
]:
if
item
[
"
name
"
].
endswith
(
"
.xml
"
):
metadata_package
=
item
[
"
name
"
]
elif
item
[
"
name
"
].
endswith
(
"
.TIF
"
)
or
\
item
[
"
name
"
].
endswith
(
"
.tif
"
):
data_package
=
item
[
"
name
"
]
elif
not
item
[
"
name
"
].
endswith
(
"
.tar
"
):
raise
RegistrationError
(
"
Product with objects prefix
'
%s
'
has
"
"
wrong content
'
%s
'
.
"
%
(
objects_prefix
,
item
[
"
name
"
])
)
else
:
logger
.
error
(
page
[
"
error
"
])
raise
RegistrationError
(
"
No product found with objects prefix
'
%s
'
.
"
%
objects_prefix
)
if
metadata_package
is
None
or
data_package
is
None
:
raise
RegistrationError
(
"
Product with objects prefix
'
%s
'
has missing content.
"
%
objects_prefix
)
logger
.
debug
(
"
Found objects
'
%s
'
and
'
%s
'
.
"
%
(
data_package
,
metadata_package
))
storage
=
backends
.
Storage
.
objects
.
get
(
name
=
upload_container
)
metadata_item
=
models
.
MetaDataItem
(
storage
=
storage
,
location
=
metadata_package
)
product_type
,
level
=
get_product_type_and_level
(
metadata_item
)
if
collection_stack
==
'
DEM
'
:
# special for DEM files, collection name === product_type
gdal_metadata_file_path
=
"
/vsiswift/%s/%s
"
%
(
upload_container
,
metadata_package
)
product_type
=
get_product_collection
(
gdal_metadata_file_path
)
logger
.
debug
(
"
Registering product
"
)
product_type_name
=
"
%s_Product_%s
"
%
(
collection_stack
,
product_type
)
try
:
# first find product type by name from path
product_type_model
=
models
.
ProductType
.
objects
.
get
(
name
=
product_type_name
)
except
models
.
ProductType
.
DoesNotExist
:
# if not found, maybe there are more product types with _bandcount suffix
gdal_file_path
=
"
/vsiswift/%s/%s
"
%
(
upload_container
,
data_package
)
product_type_model
=
get_product_type_from_band_count
(
product_type_name
,
gdal_file_path
)
product_type_name
=
product_type_model
.
name
coverage_type_names
=
product_type_model
.
allowed_coverage_types
.
all
()
if
len
(
coverage_type_names
)
>
1
:
logger
.
warning
(
"
More available
'
CoverageType
'
found, selecting the first one.
"
)
coverage_type_name
=
coverage_type_names
[
0
].
name
product
,
replaced
=
ProductRegistrator
().
register
(
metadata_locations
=
[[
upload_container
,
metadata_package
,
],
],
type_name
=
product_type_name
,
replace
=
replace
,
extended_metadata
=
True
,
mask_locations
=
None
,
package_path
=
None
,
simplify_footprint_tolerance
=
0.0001
,
# ~10meters
overrides
=
{},
)
if
product
.
footprint
.
empty
:
product
.
delete
()
raise
RegistrationError
(
"
No footprint was extracted. full product: %s
"
%
product
)
collection
=
models
.
Collection
.
objects
.
get
(
identifier
=
collection_stack
)
logger
.
debug
(
"
Inserting product into collection %s
"
%
collection_stack
)
models
.
collection_insert_eo_object
(
collection
,
product
)
if
collection_stack
==
"
DEM
"
:
# also insert it to its own collection
collection_own
=
models
.
Collection
.
objects
.
get
(
identifier
=
"
%s_%s
"
%
(
collection
,
product_type
)
)
logger
.
debug
(
"
Inserting product to collection %s_%s
"
%
(
collection
,
product_type
))
models
.
collection_insert_eo_object
(
collection_own
,
product
)
if
level
==
'
Level_1
'
:
collection_level_1
=
models
.
Collection
.
objects
.
get
(
identifier
=
"
%s_Level_1
"
%
collection
)
logger
.
debug
(
"
Inserting product to collection %s_Level_1
"
%
collection
)
models
.
collection_insert_eo_object
(
collection_level_1
,
product
)
elif
level
==
'
Level_3
'
:
collection_level_3
=
models
.
Collection
.
objects
.
get
(
identifier
=
"
%s_Level_3
"
%
collection
)
logger
.
debug
(
"
Inserting product to collection %s_Level_3
"
%
collection
)
models
.
collection_insert_eo_object
(
collection_level_3
,
product
)
logger
.
debug
(
"
Registering coverage
"
)
report
=
GDALRegistrator
().
register
(
data_locations
=
[[
upload_container
,
data_package
,
],
],
metadata_locations
=
[[
upload_container
,
metadata_package
,
],
],
coverage_type_name
=
coverage_type_name
,
overrides
=
{
"
identifier
"
:
"
%s__coverage
"
%
product
.
identifier
,
"
footprint
"
:
None
,
},
replace
=
replace
,
)
logger
.
debug
(
"
Adding coverage to product
"
)
models
.
product_add_coverage
(
product
,
report
.
coverage
)
try
:
add_mask
(
product
)
except
Exception
as
e
:
logger
.
debug
(
"
Couldn
'
t add mask.
"
)
logger
.
debug
(
traceback
.
format_exc
())
logger
.
debug
(
"
%s: %s
\n
"
%
(
type
(
e
).
__name__
,
str
(
e
)))
if
client
is
not
None
:
logger
.
debug
(
"
Storing times in redis queue
'
%s
"
%
registered_set_key
)
client
.
sadd
(
registered_set_key
,
"
%s/%s
"
%
(
product
.
begin_time
.
strftime
(
"
%Y%m%dT%H%M%S
"
),
product
.
end_time
.
strftime
(
"
%Y%m%dT%H%M%S
"
)
)
)
timestamp
=
product
.
inserted
.
strftime
(
"
%Y%m%dT%H%M%S
"
)
if
reporting_dir
is
not
None
:
with
open
(
os
.
path
.
join
(
reporting_dir
,
'
item_%s_%s.xml
'
%
(
timestamp
,
product
.
identifier
)),
'
w
'
)
as
f
:
f
.
write
(
textwrap
.
dedent
(
"""
<?xml version=
"
1.0
"
encoding=
"
UTF-8
"
?>
<DataAccessItem
xsi:schemaLocation=
"
http://www.telespazio.com/CSCDA/CDD/PDAS PDAS_interfaces%2020190924_1916.xsd
"
xmlns=
"
http://www.telespazio.com/CSCDA/CDD/PDAS
"
xmlns:xsi=
"
http://www.w3.org/2001/XMLSchema-instance
"
>
<identifier>{identifier}</identifier>
<BROWSE_AVAILABILITY_DATETIME>{availability_time}</BROWSE_AVAILABILITY_DATETIME>
<URL>
<Service>WCS</Service>
<URL>{wms_capabilities_url}</URL>
</URL>
<URL>
<Service>WMS</Service>
<URL>{wcs_capabilities_url}</URL>
</URL>
</DataAccessItem>
"""
.
format
(
identifier
=
escape
(
product
.
identifier
),
availability_time
=
escape
(
isoformat
(
product
.
inserted
)),
wcs_capabilities_url
=
escape
(
'
%s/ows?service=wcs&request=GetCapabilities&cql=identifier=
"
%s
"'
%
(
service_url
,
product
.
identifier
)
),
wms_capabilities_url
=
escape
(
'
%s/ows?service=wms&request=GetCapabilities&cql=identifier=
"
%s
"'
%
(
service_url
,
product
.
identifier
)
),
)))
logger
.
info
(
"
Successfully finished registration of product
'
%s
'
.
"
%
objects_prefix
)
def
registrar_redis_wrapper
(
collection
,
upload_container
,
replace
=
False
,
host
=
"
localhost
"
,
port
=
6379
,
register_queue_key
=
"
register_queue
"
,
registered_set_key
=
"
registered_set
"
,
reporting_dir
=
None
,
service_url
=
None
,
):
client
=
redis
.
Redis
(
host
=
host
,
port
=
port
,
charset
=
"
utf-8
"
,
decode_responses
=
True
)
while
True
:
logger
.
debug
(
"
waiting for redis queue
'
%s
'
...
"
%
register_queue_key
)
value
=
client
.
brpop
(
register_queue_key
)
try
:
registrar
(
collection
,
value
[
1
],
upload_container
,
replace
=
replace
,
client
=
client
,
registered_set_key
=
registered_set_key
,
reporting_dir
=
reporting_dir
,
service_url
=
service_url
,
)
except
Exception
as
e
:
logger
.
debug
(
traceback
.
format_exc
())
logger
.
error
(
"
%s: %s
\n
"
%
(
type
(
e
).
__name__
,
str
(
e
)))
if
__name__
==
"
__main__
"
:
parser
=
argparse
.
ArgumentParser
()
parser
.
description
=
textwrap
.
dedent
(
"""
\
Register products.
"""
)
parser
.
add_argument
(
"
--mode
"
,
default
=
"
standard
"
,
choices
=
[
"
standard
"
,
"
redis
"
],
help
=
(
"
The mode to run the registrar. Either one-off (standard) or
"
"
reading from a redis queue.
"
)
)
parser
.
add_argument
(
"
--objects-prefix
"
,
default
=
None
,
help
=
(
"
Prefix to objects holding the metadata and data of product.
"
)
)
parser
.
add_argument
(
"
--replace
"
,
action
=
"
store_true
"
,
help
=
(
"
Replace existing products instead of skipping the registration.
"
)
)
parser
.
add_argument
(
"
--redis-register-queue-key
"
,
default
=
"
register_queue
"
)
parser
.
add_argument
(
"
--redis-registered-set-key
"
,
default
=
"
registered_set
"
)
parser
.
add_argument
(
"
--redis-host
"
,
default
=
"
localhost
"
)
parser
.
add_argument
(
"
--redis-port
"
,
type
=
int
,
default
=
6379
)
parser
.
add_argument
(
"
--reporting-dir
"
,
)
parser
.
add_argument
(
"
--service-url
"
,
)
parser
.
add_argument
(
"
-v
"
,
"
--verbosity
"
,
type
=
int
,
default
=
3
,
choices
=
[
0
,
1
,
2
,
3
,
4
],
help
=
(
"
Set verbosity of log output
"
"
(4=DEBUG, 3=INFO, 2=WARNING, 1=ERROR, 0=CRITICAL). (default: 3)
"
)
)
arg_values
=
parser
.
parse_args
()
setup_logging
(
arg_values
.
verbosity
)
collection
=
os
.
environ
.
get
(
'
COLLECTION
'
)
if
collection
is
None
:
logger
.
critical
(
"
Collection environment variable not set.
"
)
sys
.
exit
(
1
)
upload_container
=
os
.
environ
.
get
(
'
UPLOAD_CONTAINER
'
)
if
upload_container
is
None
:
logger
.
warn
(
"
UPLOAD_CONTAINER environment variable not set. Assuming part of path bucket/item
"
)
if
arg_values
.
mode
==
"
standard
"
:
registrar
(
collection
,
arg_values
.
objects_prefix
,
upload_container
,
replace
=
arg_values
.
replace
,
reporting_dir
=
arg_values
.
reporting_dir
,
service_url
=
arg_values
.
service_url
,
)
else
:
registrar_redis_wrapper
(
collection
,
upload_container
,
replace
=
arg_values
.
replace
,
host
=
arg_values
.
redis_host
,
port
=
arg_values
.
redis_port
,
register_queue_key
=
arg_values
.
redis_register_queue_key
,
registered_set_key
=
arg_values
.
redis_registered_set_key
,
reporting_dir
=
arg_values
.
reporting_dir
,
service_url
=
arg_values
.
service_url
,
)
This diff is collapsed.
Click to expand it.
core/setup.py
+
1
−
1
View file @
01c45828
...
...
@@ -9,7 +9,7 @@ setup(
version
=
"
0.0.1
"
,
author
=
""
,
author_email
=
""
,
description
=
"
p
re
processo
r for PVS
"
,
description
=
"
re
gistra
r for PVS
"
,
long_description
=
long_description
,
long_description_content_type
=
"
text/markdown
"
,
url
=
"
https://gitlab.eox.at/esa/prism/vs/-/tree/master/core
"
,
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment