import os
import json
from datetime import datetime
from dateutil import parser as date_parser
from pywps import Process
from pywps import LiteralInput
from pywps import ComplexInput
from pywps import LiteralOutput
from pywps import ComplexOutput
from pywps import Format, FORMATS
from pywps.app.Common import Metadata
from malleefowl.esgf.search import ESGSearch
import logging
LOGGER = logging.getLogger(__name__)
[docs]class ESGSearchProcess(Process):
"""
The ESGF search process runs a ESGF search request with constraints (project, experiment, ...)
to get a list of matching files on ESGF data nodes.
It is using `esgf-pyclient <https://github.com/ESGF/esgf-pyclient>`_ Python client
for the ESGF search API.
In addition to the esgf-pyclient the process checks if local replicas are available
and would return the replica files instead of the original one.
The result is a JSON document with a list of ``http://`` URLs to files on ESGF data nodes.
TODO: bbox constraint for datasets
"""
def __init__(self):
inputs = [
LiteralInput('url', 'URL',
data_type='string',
abstract="URL of ESGF Search Index which is used for search queries."
" Example: http://esgf-data.dkrz.de/esg-search",
min_occurs=0,
max_occurs=1,
default="http://esgf-data.dkrz.de/esg-search",
),
LiteralInput('distrib', 'Distributed',
data_type='boolean',
abstract="If flag is set then a distributed search will be run.",
min_occurs=0,
max_occurs=1,
default='0',
),
LiteralInput('replica', 'Replica',
data_type='boolean',
abstract="If flag is set then search will include replicated datasets.",
min_occurs=0,
max_occurs=1,
default='False',
),
LiteralInput('latest', 'Latest',
data_type='boolean',
abstract="If flag is set then search will include only latest datasets.",
min_occurs=0,
max_occurs=1,
default='True',
),
LiteralInput('temporal', 'Temporal',
data_type='boolean',
abstract="If flag is set then search will use temporal filter.",
min_occurs=0,
max_occurs=1,
default='1',
),
LiteralInput('search_type', 'Search Type',
data_type='string',
abstract="Search on Datasets, Files or Aggregations.",
min_occurs=0,
max_occurs=1,
default='Dataset',
allowed_values=['Dataset', 'File', 'Aggregation']
),
LiteralInput('constraints', 'Constraints',
data_type='string',
abstract="Constraints as list of key/value pairs."
"Example: project:CORDEX, time_frequency:mon, variable:tas",
min_occurs=0,
max_occurs=1,
default="project:CORDEX, time_frequency:mon, variable:tas",
),
LiteralInput('query', 'Query',
data_type='string',
abstract="Freetext query. For Example: temperatue",
min_occurs=0,
max_occurs=1,
default='*',
),
LiteralInput('start', 'Start',
data_type='dateTime',
abstract="Startime: 2000-01-11T12:00:00Z",
min_occurs=0,
max_occurs=1,
default=datetime(2000, 1, 1)
),
LiteralInput('end', 'End',
data_type='dateTime',
abstract="Endtime: 2005-12-31T12:00:00Z",
min_occurs=0,
max_occurs=1,
default=datetime(2001, 12, 31)
),
LiteralInput('limit', 'Limit',
data_type='integer',
abstract="Maximum number of datasets in search result",
min_occurs=0,
max_occurs=1,
default='10',
allowed_values=[0, 1, 2, 5, 10, 20, 50, 100, 200]
),
LiteralInput('offset', 'Offset',
data_type='integer',
abstract="Start search of datasets at offset.",
min_occurs=0,
max_occurs=1,
default='0',
),
]
outputs = [
ComplexOutput('output', 'Search Result',
abstract="JSON document with search result,"
" a list of URLs to files on ESGF archive nodes.",
as_reference=True,
supported_formats=[FORMATS.JSON]),
ComplexOutput('summary', 'Search Result Summary',
abstract="JSON document with search result summary",
as_reference=True,
supported_formats=[FORMATS.JSON]),
ComplexOutput('facet_counts', 'Facet Counts',
abstract="JSON document with facet counts for constraints.",
as_reference=True,
supported_formats=[FORMATS.JSON]),
]
super(ESGSearchProcess, self).__init__(
self._handler,
identifier="esgsearch",
title="ESGF Search",
version="0.6",
abstract="Search ESGF datasets, files and aggreations.",
metadata=[
Metadata('Birdhouse', 'http://bird-house.github.io/'),
Metadata('User Guide', 'http://malleefowl.readthedocs.io/en/latest/'),
],
inputs=inputs,
outputs=outputs,
status_supported=True,
store_supported=True,
)
def _handler(self, request, response):
distrib = False
if 'distrib' in request.inputs:
distrib = request.inputs['distrib'][0].data
replica = False
if 'replica' in request.inputs:
replica = request.inputs['replica'][0].data
latest = True
if 'latest' in request.inputs:
latest = request.inputs['latest'][0].data
esgsearch = ESGSearch(
url=request.inputs['url'][0].data,
distrib=distrib,
replica=replica,
latest=latest,
)
constrains_str = request.inputs['constraints'][0].data.strip()
constraints = []
for constrain in constrains_str.split(','):
key, value = constrain.split(':')
constraints.append((key.strip(), value.strip()))
if 'start' in request.inputs:
start = request.inputs['start'][0].data
else:
start = None
if 'end' in request.inputs:
end = request.inputs['end'][0].data
else:
end = None
if 'offset' in request.inputs:
offset = request.inputs['offset'][0].data
else:
offset = 0
if 'limit' in request.inputs:
limit = request.inputs['limit'][0].data
else:
limit = 10
if 'query' in request.inputs:
query = request.inputs['query'][0].data
else:
query = '*'
if 'search_type' in request.inputs:
search_type = request.inputs['search_type'][0].data
else:
search_type = 'Dataset'
temporal = True
if 'temporal' in request.inputs:
temporal = request.inputs['temporal'][0].data
(result, summary, facet_counts) = esgsearch.search(
constraints=constraints,
query=query,
start=start, end=end,
search_type=search_type,
limit=limit,
offset=offset,
temporal=temporal)
with open(os.path.join(self.workdir, 'out.json'), 'w') as fp:
json.dump(obj=result, fp=fp, indent=4, sort_keys=True)
response.outputs['output'].file = fp.name
with open(os.path.join(self.workdir, 'summary.json'), 'w') as fp:
json.dump(obj=summary, fp=fp, indent=4, sort_keys=True)
response.outputs['summary'].file = fp.name
with open(os.path.join(self.workdir, 'counts.json'), 'w') as fp:
json.dump(obj=facet_counts, fp=fp, indent=4, sort_keys=True)
response.outputs['facet_counts'].file = fp.name
return response