EOX GitLab Instance
Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
View Server 2
harvester
Commits
2db5ae45
Commit
2db5ae45
authored
Nov 24, 2021
by
Nikola Jankovic
💻
Browse files
added tests
parent
51cdc51d
Changes
53
Hide whitespace changes
Inline
Side-by-side
.gitlab-ci.yml
View file @
2db5ae45
stages
:
-
test
-
deploy
-
publish
-
chart
variables
:
...
...
@@ -11,14 +11,18 @@ test:
stage
:
test
script
:
-
python3 setup.py install
-
pip3 install -r requirements.txt
-
pip3 install -r requirements-test.txt
-
pytest
-
pip3 install -r requirements-dev.txt
-
flake8
-
mypy .
-
pytest --cov harvester --cov-report term-missing
deploy
_latest
:
publish
_latest
:
image
:
docker:20.10.8
services
:
-
docker:20.10.8-dind
stage
:
deploy
stage
:
publish
script
:
-
docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" "$CI_REGISTRY"
-
docker build -t $CI_REGISTRY_IMAGE .
...
...
@@ -26,11 +30,11 @@ deploy_latest:
only
:
-
main
deploy
:
publish
:
image
:
docker:20.10.8
services
:
-
docker:20.10.8-dind
stage
:
deploy
stage
:
publish
script
:
-
docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" "$CI_REGISTRY"
-
docker build --cache-from $CI_REGISTRY_IMAGE:latest -t $CI_REGISTRY_IMAGE:$CI_COMMIT_TAG .
...
...
@@ -47,4 +51,3 @@ helm:
-
curl -u $HELM_CHART_REPOSITORY_CREDENTIALS -v -X POST https://charts-public.hub.eox.at/api/charts --data-binary "@${upload_filename}"
only
:
-
tags
-
main
LICENSE
0 → 100644
View file @
2db5ae45
MIT License
Copyright (C) 2021 EOX IT Services GmbH
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
MANIFEST.in
0 → 100644
View file @
2db5ae45
include ./harvester/config-schema.yaml
README.md
View file @
2db5ae45
# harvester
Service used to harvest data from various endpoints
## Tagging
**Before tagging remember to increment the chart version (`.chart/Chart.yaml`) manually!**
This repository uses
`bump2version`
for managing tags. To bump a version use
```
bump2version --new-version <new_version> # or bump2version <major|minor|patch>
git push && git push --tags
```
harvester/cli.py
View file @
2db5ae45
from
os.path
import
join
,
dirname
"""
cli.py
======
Contains command line interface
"""
import
logging.config
import
click
import
yaml
import
jsonschema
from
.daemon
import
run_daemon
from
.config
import
load_config
from
.daemon
import
run_daemon
,
init_client
from
.config
import
load_config
,
validate_config
from
.harvester
import
main
from
.utils
import
get_params
def
setup_logging
(
debug
=
False
):
logging
.
config
.
dictConfig
({
'version'
:
1
,
'disable_existing_loggers'
:
False
,
'formatters'
:
{
'brief'
:
{
'format'
:
'%(levelname)s %(name)s: %(message)s'
}
},
'handlers'
:
{
'console'
:
{
'class'
:
'logging.StreamHandler'
,
'level'
:
'DEBUG'
if
debug
else
'INFO'
,
'formatter'
:
'brief'
,
}
},
'root'
:
{
'handlers'
:
[
'console'
],
'level'
:
'DEBUG'
if
debug
else
'INFO'
,
logging
.
config
.
dictConfig
(
{
"version"
:
1
,
"disable_existing_loggers"
:
False
,
"formatters"
:
{
"brief"
:
{
"format"
:
"%(levelname)s %(name)s: %(message)s"
}},
"handlers"
:
{
"console"
:
{
"class"
:
"logging.StreamHandler"
,
"level"
:
"DEBUG"
if
debug
else
"INFO"
,
"formatter"
:
"brief"
,
}
},
"root"
:
{
"handlers"
:
[
"console"
],
"level"
:
"DEBUG"
if
debug
else
"INFO"
,
},
}
})
def
validate_config
(
config
):
with
open
(
join
(
dirname
(
__file__
),
'config-schema.yaml'
))
as
f
:
schema
=
yaml
.
load
(
f
)
jsonschema
.
validate
(
config
,
schema
)
)
@
click
.
group
()
...
...
@@ -45,36 +40,52 @@ def cli():
pass
@
cli
.
command
(
help
=
'Run the harvester daemon, attaching to a Redis queue'
)
@
click
.
option
(
'--config-file'
,
type
=
click
.
File
(
'r'
))
@
click
.
option
(
'--validate/--no-validate'
,
default
=
False
)
@
click
.
option
(
'--host'
,
type
=
str
)
@
click
.
option
(
'--port'
,
type
=
int
)
@
click
.
option
(
'--listen-queue'
,
type
=
str
)
@
click
.
option
(
'--debug/--no-debug'
,
default
=
False
)
def
daemon
(
config_file
=
None
,
validate
=
False
,
host
=
None
,
port
=
None
,
listen_queue
=
None
,
debug
=
False
):
@
cli
.
command
(
help
=
"Run the harvester daemon, attaching to a Redis queue"
)
@
click
.
option
(
"--host"
,
type
=
str
,
required
=
True
)
@
click
.
option
(
"--port"
,
type
=
int
,
required
=
True
)
@
click
.
option
(
"--listen-queue"
,
type
=
str
,
required
=
True
)
@
click
.
option
(
"--config-file"
,
type
=
click
.
File
(
"r"
),
required
=
True
)
@
click
.
option
(
"--validate/--no-validate"
,
default
=
False
)
@
click
.
option
(
"--debug/--no-debug"
,
default
=
False
)
def
daemon
(
host
:
str
,
port
:
int
,
listen_queue
:
str
,
config_file
:
str
,
validate
:
bool
=
False
,
debug
:
bool
=
False
,
):
setup_logging
(
debug
)
config
=
load_config
(
config_file
)
if
validate
:
validate_config
(
config
)
run_daemon
(
config
,
host
,
port
,
listen_queue
)
client
=
init_client
(
host
,
port
)
run_daemon
(
config
,
client
,
listen_queue
)
@
cli
.
command
(
help
=
'Run a single, one-off harvest'
)
@
click
.
argument
(
'harvester_name'
,
type
=
str
)
@
click
.
option
(
'--config-file'
,
type
=
click
.
File
(
'r'
))
@
click
.
option
(
'--validate/--no-validate'
,
default
=
False
)
@
click
.
option
(
'--debug/--no-debug'
,
default
=
False
)
@
click
.
option
(
'--param'
,
'-p'
,
multiple
=
True
)
def
harvest
(
harvester_name
,
config_file
:
str
=
None
,
validate
:
bool
=
False
,
debug
:
bool
=
False
,
param
:
tuple
=
()):
@
cli
.
command
(
help
=
"Run a single, one-off harvest"
)
@
click
.
argument
(
"harvester_name"
,
type
=
str
)
@
click
.
option
(
"--host"
,
type
=
str
,
required
=
True
)
@
click
.
option
(
"--port"
,
type
=
int
,
required
=
True
)
@
click
.
option
(
"--config-file"
,
type
=
click
.
File
(
"r"
),
required
=
True
)
@
click
.
option
(
"--validate/--no-validate"
,
default
=
False
)
@
click
.
option
(
"--debug/--no-debug"
,
default
=
False
)
def
harvest
(
harvester_name
:
str
,
host
:
str
,
port
:
int
,
config_file
:
str
,
validate
:
bool
=
False
,
debug
:
bool
=
False
,
):
setup_logging
(
debug
)
config
=
load_config
(
config_file
)
if
validate
:
validate_config
(
config
)
kwargs
=
get_params
(
param
)
kwargs
[
'name'
]
=
harvester_name
main
(
config
,
harvester_name
,
**
kwargs
)
client
=
init_client
(
host
,
port
)
main
(
config
,
harvester_name
,
client
)
if
__name__
==
'
__main__
'
:
if
__name__
==
"
__main__
"
:
cli
()
harvester/config-schema.yaml
View file @
2db5ae45
...
...
@@ -2,16 +2,6 @@ $id: https://example.com/address.schema.json
$schema
:
http://json-schema.org/draft-07/schema#
type
:
object
properties
:
redis
:
description
:
Redis configuration
type
:
object
properties
:
host
:
description
:
Host address for Redis
type
:
string
port
:
description
:
Port for Redis
type
:
integer
harvesters
:
description
:
List of harvesters
type
:
array
...
...
@@ -20,7 +10,7 @@ properties:
type
:
object
properties
:
name
:
description
:
Name of the harvester. Should be unique
description
:
Name of the harvester. Should be unique
type
:
string
queue
:
description
:
Name of queue to send queried data to
...
...
@@ -38,7 +28,7 @@ properties:
type
:
description
:
type of the endpoint
type
:
string
enum
:
enum
:
-
STACAPI
-
STACCatalog
-
OpenSearch
...
...
@@ -50,11 +40,11 @@ properties:
time_property
:
description
:
what time to extract from queried results.
type
:
string
enum
:
enum
:
-
sensed
-
updated
-
created
-
modified
-
modified
bbox
:
description
:
Bounding box to be queried
type
:
string
...
...
@@ -69,6 +59,8 @@ properties:
required
:
-
begin
required
:
required
:
-
queue
-
name
-
endpoint
-
mode
harvester/config.py
View file @
2db5ae45
import
os
import
re
import
datetime
from
typing
import
TextIO
import
jsonschema
import
yaml
ENV_PATTERN
=
re
.
compile
(
r
'.*?\${(\w+)}.*?'
)
ENV_PATTERN
=
re
.
compile
(
r
".*?\${(\w+)}.*?"
)
LOADER
=
yaml
.
SafeLoader
def
constructor_env_variables
(
loader
,
node
):
"""
Extracts the environment variable from the node's value
:param yaml.Loader loader: the yaml loader
:param node: the current node in the yaml
:return: the parsed string that contains the value of the environment
variable
"""
value
=
loader
.
construct_scalar
(
node
)
match
=
ENV_PATTERN
.
findall
(
value
)
# to find all env variables in line
if
match
:
full_value
=
value
for
g
in
match
:
env_variable
=
os
.
environ
.
get
(
g
,
)
if
env_variable
is
not
None
:
full_value
=
full_value
.
replace
(
f
'${{
{
g
}
}}'
,
env_variable
)
else
:
return
None
return
full_value
return
value
def
now
(
loader
,
node
):
"""
Extracts the environment variable from the node's value
:param yaml.Loader loader: the yaml loader
:param node: the current node in the yaml
:return: the parsed string that contains the value of the environment
variable
"""
value
=
loader
.
construct_scalar
(
node
)
match
=
ENV_PATTERN
.
findall
(
value
)
# to find all env variables in line
if
match
:
full_value
=
value
for
g
in
match
:
env_variable
=
os
.
environ
.
get
(
g
,
)
if
env_variable
is
not
None
:
full_value
=
full_value
.
replace
(
f
"${{
{
g
}
}}"
,
env_variable
)
else
:
return
None
return
full_value
return
value
def
constructor_now
(
loader
,
node
):
return
datetime
.
datetime
.
now
(
tz
=
datetime
.
timezone
.
utc
).
isoformat
()
tags
=
{
'!env'
:
constructor_env_variables
,
'!now'
:
now
,
}
# the tag will be used to mark where to start searching for the pattern
# e.g. somekey: !env somestring${MYENVVAR}blah blah blah
LOADER
.
add_implicit_resolver
(
"!env"
,
ENV_PATTERN
,
None
)
LOADER
.
add_constructor
(
"!env"
,
constructor_env_variables
)
# this tag resolves !now to datetime.now
LOADER
.
add_constructor
(
"!now"
,
constructor_now
)
def
load_config
(
input_file
:
TextIO
)
->
dict
:
loader
=
yaml
.
SafeLoader
# the tag will be used to mark where to start searching for the pattern
# e.g. somekey: !env somestring${MYENVVAR}blah blah blah
for
tag
,
func
in
tags
.
items
():
if
tag
==
'!env'
:
loader
.
add_implicit_resolver
(
tag
,
ENV_PATTERN
,
None
)
loader
.
add_constructor
(
tag
,
func
)
return
yaml
.
load
(
input_file
,
Loader
=
loader
)
def
load_config
(
input_file
:
str
)
->
dict
:
return
yaml
.
load
(
input_file
,
Loader
=
LOADER
)
def
validate_config
(
config
):
with
open
(
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
"config-schema.yaml"
),
encoding
=
"utf-8"
)
as
file
:
schema
=
yaml
.
load
(
file
,
LOADER
)
jsonschema
.
validate
(
config
,
schema
)
harvester/daemon.py
View file @
2db5ae45
"""
daemon.py
==========
Contains functionality related to running the daemon
"""
import
logging
import
r
edis
from
redis
import
R
edis
from
.harvester
import
main
logger
=
logging
.
getLogger
(
__name__
)
def
run_daemon
(
config
:
dict
,
host
:
str
,
port
:
str
,
listen_queue
:
str
):
""" Run the harvester daemon, listening on a redis queue
for harvest jobs.
def
init_client
(
host
:
str
,
port
:
int
)
->
Redis
:
redis
=
Redis
(
host
=
host
,
port
=
port
,
charset
=
"utf-8"
,
decode_responses
=
True
)
return
redis
def
run_daemon
(
config
:
dict
,
client
:
Redis
,
listen_queue
:
str
):
"""Run the harvester daemon, listening on a redis queue
for harvest jobs.
"""
# initialize the queue client
client
=
redis
.
Redis
(
host
=
host
,
port
=
port
,
charset
=
"utf-8"
,
decode_responses
=
True
)
logger
.
debug
(
"waiting for redis queue '%s'..."
%
listen_queue
)
logger
.
debug
(
"waiting for redis queue '%s'"
,
listen_queue
)
while
True
:
# fetch an item from the queue to be harvested
_
,
value
=
client
.
brpop
(
listen_queue
)
# first param which queue ther result came from
_
,
value
=
client
.
brpop
(
listen_queue
)
# start the harvesting
try
:
main
(
config
,
value
,
client
)
except
Exception
as
e
:
logger
.
exception
(
e
)
main
(
config
,
value
,
client
)
harvester/endpoint/Endpoint.py
deleted
100644 → 0
View file @
51cdc51d
from
datetime
import
datetime
import
logging
from
typing
import
Type
,
List
from
pygeofilter.parsers.cql_json
import
parse
as
json_parse
from
pygeofilter.backends.native.evaluate
import
NativeEvaluator
from
..query
import
Query
logger
=
logging
.
getLogger
(
__name__
)
class
Endpoint
:
def
__init__
(
self
,
url
:
str
,
query
:
dict
,
filter
:
dict
,
*
args
,
**
kwargs
):
self
.
url
=
url
self
.
query
=
Query
(
**
query
)
self
.
filter
=
json_parse
(
filter
)
def
harvest
(
self
)
->
list
:
# All endpoints should extend function to do following
# 1. prepare query
# 2. query endpoint
# 3. convert to stac items
# 4. filter data if necessary
# 5. return list of stac items
raise
NotImplementedError
()
@
classmethod
def
from_config
(
cls
,
endpoint_config
:
dict
)
->
Type
[
'Endpoint'
]:
subclass_map
=
{
subclass
.
type
:
subclass
for
subclass
in
cls
.
__subclasses__
()}
endpoint_type
=
endpoint_config
.
pop
(
'type'
,
None
)
SubClass
=
subclass_map
[
endpoint_type
]
return
SubClass
(
**
endpoint_config
)
def
filter_data
(
self
,
data
:
List
[
dict
])
->
List
[
dict
]:
attr_map
=
{
'point_attr'
:
'geometry'
,
'*'
:
'properties.*'
}
e
=
NativeEvaluator
(
attribute_map
=
attr_map
,
use_getattr
=
False
)
evaluator
=
e
.
evaluate
(
self
.
filter
)
result
=
list
(
filter
(
evaluator
,
data
))
return
result
harvester/endpoint/FTPEndpoint.py
deleted
100644 → 0
View file @
51cdc51d
from
.Endpoint
import
Endpoint
class
FTPEndpoint
(
Endpoint
):
type
=
'FTP'
harvester/endpoint/OADSEndpoint.py
deleted
100644 → 0
View file @
51cdc51d
from
.Endpoint
import
Endpoint
class
OADSEndpoint
(
Endpoint
):
type
=
'OADS'
harvester/endpoint/OGCAPIEndpoint.py
deleted
100644 → 0
View file @
51cdc51d
from
.Endpoint
import
Endpoint
class
OGCAPIEndpoint
(
Endpoint
):
type
=
'OGCAPI'
harvester/endpoint/OpenSearchEndpoint.py
deleted
100644 → 0
View file @
51cdc51d
import
logging
from
typing
import
List
,
Optional
,
Tuple
,
Type
from
dataclasses
import
dataclass
import
requests
import
lxml.etree
as
ET
import
pystac
from
.Endpoint
import
Endpoint
from
..stac
import
STACItemComposer
logger
=
logging
.
getLogger
(
__name__
)
SearchPage
=
Tuple
[
List
[
dict
],
Optional
[
str
]]
@
dataclass
class
SearchPage
:
records
:
List
[
dict
]
index
:
int
total
:
int
class
OpenSearchFormat
:
mimetype
=
None
@
classmethod
def
from_config
(
cls
,
config
:
dict
)
->
Type
[
'OpenSearchFormat'
]:
subclass_map
=
{
subclass
.
mimetype
:
subclass
for
subclass
in
cls
.
__subclasses__
()
}
type_
=
config
.
pop
(
'type'
,
None
)
SubClass
=
subclass_map
[
type_
]
return
SubClass
(
**
config
)
class
GeoJSONFormat
(
OpenSearchFormat
):
mimetype
=
'application/json'
def
__init__
(
self
,
property_mapping
):
self
.
property_mapping
=
property_mapping
def
parse
(
self
,
response
:
requests
.
Response
)
->
SearchPage
:
data
=
response
.
json
()
features
=
[
self
.
_parse_feature
(
feature
)
for
feature
in
data
[
'features'
]
]
return
SearchPage
(
features
,
data
[
'properties'
][
'startIndex'
],
data
[
'properties'
][
'totalResults'
],
)
def
_parse_feature
(
self
,
feature
:
dict
)
->
dict
:
properties
=
{
property_name
:
feature
[
'properties'
][
property_path
]
for
property_name
,
property_path
in
self
.
property_mapping
.
items
()
}
return
pystac
.
Item
(
feature
[
'id'
],
geometry
=
feature
[
'geometry'
],
bbox
=
None
,
datetime
=
None
,
properties
=
properties
,
).
to_dict
()
class
AtomFormat
(
OpenSearchFormat
):
mimetype
=
'application/atom+xml'
def
__init__
(
self
):
# TODO: maybe add mapping from XML -> properties in output STAC
pass
class
OpenSearchEndpoint
(
Endpoint
):
type
=
'OpenSearch'
NS
=
{
''
:
'http://www.w3.org/2005/Atom'
,
'opensearch'
:
'http://a9.com/-/spec/opensearch/1.1/'
,
'parameters'
:
'http://a9.com/-/spec/opensearch/extensions/parameters/1.0/'
,
'georss'
:
'http://www.georss.org/georss/'
,
'media'
:
'http://search.yahoo.com/mrss/'
,
'owc'
:
'http://www.opengis.net/owc/1.0/'
,
'eo'
:
'http://a9.com/-/opensearch/extensions/eo/1.0/'
,
'geo'
:
'http://a9.com/-/opensearch/extensions/geo/1.0/'
,
'time'
:
'http://a9.com/-/opensearch/extensions/time/1.0/'
,
'cql'
:
'http://a9.com/-/opensearch/extensions/cql/1.0/'
,
'dc'
:
'http://purl.org/dc/elements/1.1/'
,
}
def
__init__
(
self
,
format_config
,
*
args
,
**
kwargs
):
super
(
OpenSearchEndpoint
,
self
).
__init__
(
*
args
,
**
kwargs
)
self
.
format
=
OpenSearchFormat
.
from_config
(
format_config
)
def
harvest
(
self
)
->
list
:
logger
.
info
(
"Starting OpenSearch harvesting"
)
# prepare query
parser
=
ET
.
XMLParser
(
recover
=
True
)
data
=
ET
.
fromstring
(
requests
.
get
(
self
.
url
).
content
,
parser
)
urls
=
self
.
_get_urls_params
(
data
)
result
=
[]
url
,
search_params
,
(
index
,
index_val
),
_
=
self
.
query
.
opensearch_params
(
urls
,
self
.
format
.
mimetype
)
while
True
:
response
=
requests
.
get
(
url
,
params
=
search_params
)
response
.
raise_for_status
()
search_params
[
index
]
+=
index_val