Commit f650b7ff authored by Theodore Chang's avatar Theodore Chang
Browse files

Resolve "Improve ArchiveReader performance"

parent 81d03231
......@@ -236,6 +236,20 @@ deploy prod test:
- /^dev-.*$/
when: manual
deploy prod util:
stage: release
before_script:
- mkdir -p /etc/deploy
- echo ${CI_K8S_PROD_CONFIG} | base64 -d > ${KUBECONFIG}
script:
- helm dependency update ops/helm/nomad
- helm upgrade --install nomad-util-v1 ops/helm/nomad -f ops/helm/nomad/deployments/prod-util-values.yaml --set image.tag=$CI_COMMIT_REF_NAME,roll=true --wait
- docker pull $TEST_IMAGE
- docker run -t -e NOMAD_KEYCLOAK_REALM_NAME=fairdi_nomad_prod $TEST_IMAGE python -m nomad.cli client -n https://nomad-lab.eu/prod/v1/util/api -u test -w $CI_NOMAD_TEST_PASSWORD integrationtests --skip-publish --skip-doi
except:
- /^dev-.*$/
when: manual
release latest image:
stage: release
script:
......
......@@ -17,6 +17,10 @@
#
from datetime import datetime
import functools
from itertools import zip_longest
import multiprocessing
import operator
from typing import Optional, Set, Union, Dict, Iterator, Any, List
from fastapi import (
......@@ -38,7 +42,6 @@ from nomad.datamodel import EditableUserMetadata
from nomad.files import StreamedFile, create_zipstream
from nomad.utils import strip
from nomad.archive import RequiredReader, RequiredValidationError, ArchiveQueryError
from nomad.archive import ArchiveQueryError
from nomad.search import AuthenticationRequiredError, SearchError, update_metadata as es_update_metadata
from nomad.search import search, QueryValidationError
from nomad.metainfo.elasticsearch_extension import entry_type
......@@ -69,7 +72,7 @@ archive_required_documentation = strip('''
The `required` part allows you to specify what parts of the requested archives
should be returned. The NOMAD Archive is a hierarchical data format and
you can *require* certain branches (i.e. *sections*) in the hierarchy.
By specifing certain sections with specific contents or all contents (via
By specifying certain sections with specific contents or all contents (via
the directive `"*"`), you can determine what sections and what quantities should
be returned. The default is the whole archive, i.e., `"*"`.
......@@ -441,13 +444,20 @@ def _do_exaustive_search(owner: Owner, query: Query, include: List[str], user: U
break
class _Uploads():
class _Uploads:
'''
A helper class that caches subsequent access to upload files the same upload.
'''
def __init__(self):
self._upload_files = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def get_upload_files(self, upload_id: str) -> files.UploadFiles:
if self._upload_files is not None and self._upload_files.upload_id != upload_id:
self._upload_files.close()
......@@ -492,13 +502,9 @@ def _answer_entries_rawdir_request(
required=MetadataRequired(include=['entry_id', 'upload_id', 'mainfile']),
user_id=user.user_id if user is not None else None)
uploads = _Uploads()
try:
response_data = [
_create_entry_rawdir(entry_metadata, uploads)
for entry_metadata in search_response.data]
finally:
uploads.close()
with _Uploads() as uploads:
response_data = [_create_entry_rawdir(
entry_metadata, uploads) for entry_metadata in search_response.data]
return EntriesRawDirResponse(
owner=search_response.owner,
......@@ -670,68 +676,107 @@ def _read_archive(entry_metadata, uploads, required_reader: RequiredReader):
return {
'entry_id': entry_id,
'parser_name': entry_metadata['parser_name'],
'archive': required_reader.read(archive, entry_id)
}
'archive': required_reader.read(archive, entry_id)}
except ArchiveQueryError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=str(e))
def _validate_required(required: ArchiveRequired) -> RequiredReader:
try:
return RequiredReader(required)
except RequiredValidationError as e:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=[dict(
msg=e.msg, loc=['required'] + e.loc)])
raise HTTPException(
status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=[dict(msg=e.msg, loc=['required'] + e.loc)])
def _read_entry_from_archive(entry, uploads, required_reader: RequiredReader):
entry_id, upload_id = entry['entry_id'], entry['upload_id']
# all other exceptions are handled by the caller `_read_entries_from_archive`
try:
upload_files = uploads.get_upload_files(upload_id)
with upload_files.read_archive(entry_id, True) as archive:
return {
'entry_id': entry_id,
'upload_id': upload_id,
'parser_name': entry['parser_name'],
'archive': required_reader.read(archive, entry_id)}
except ArchiveQueryError as e:
raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=str(e))
except KeyError as e:
logger.error('missing archive', exc_info=e, entry_id=entry_id)
return None
def _read_entries_from_archive(entries, required):
'''
Takes pickleable arguments so that it can be offloaded to worker processes.
It is important to ensure the return values are also pickleable.
'''
with _Uploads() as uploads:
required_reader = _validate_required(required)
responses = [_read_entry_from_archive(
entry, uploads, required_reader) for entry in entries if entry is not None]
return list(filter(None, responses))
def _answer_entries_archive_request(
owner: Owner, query: Query, pagination: MetadataPagination, required: ArchiveRequired,
user: User):
if owner == Owner.all_:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=strip('''
The owner=all is not allowed for this operation as it will search for entries
that you might now be allowed to access.
'''))
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail=strip(
'''The owner=all is not allowed for this operation as it will search for entries
that you might now be allowed to access.'''))
if required is None:
required = '*'
required_reader = _validate_required(required)
search_response = perform_search(
owner=owner, query=query,
pagination=pagination,
required=MetadataRequired(include=['entry_id', 'upload_id', 'parser_name']),
user_id=user.user_id if user is not None else None)
uploads = _Uploads()
response_data = {}
for entry_metadata in search_response.data:
entry_id, upload_id = entry_metadata['entry_id'], entry_metadata['upload_id']
# fewer than 8 entries per process is not useful
# more than config.max_process_number processes is too much for the server
number: int = min(
len(search_response.data) // config.archive.min_entires_per_process,
config.archive.max_process_number)
archive_data = None
if number <= 1:
request_data: list = _read_entries_from_archive(search_response.data, required)
else:
entries_per_process = len(search_response.data) // number + 1
try:
archive_data = _read_archive(entry_metadata, uploads, required_reader)['archive']
except KeyError as e:
logger.error('missing archive', exc_info=e, entry_id=entry_id)
continue
response_data[entry_id] = {
'entry_id': entry_id,
'upload_id': upload_id,
'parser_name': entry_metadata['parser_name'],
'archive': archive_data}
# use process pool
pool: multiprocessing.pool.Pool = multiprocessing.pool.Pool(processes=number)
uploads.close()
try:
responses = pool.map(
functools.partial(_read_entries_from_archive, required=required),
zip_longest(*[iter(search_response.data)] * entries_per_process))
finally:
# gracefully shutdown the pool
pool.close()
pool.join()
# collect results from each process
# https://stackoverflow.com/a/45323085
request_data = functools.reduce(operator.iconcat, responses, [])
return EntriesArchiveResponse(
owner=search_response.owner,
query=search_response.query,
pagination=search_response.pagination,
required=required,
data=list(response_data.values()))
data=request_data)
_entries_archive_docstring = strip('''
......@@ -805,7 +850,6 @@ def _answer_entries_archive_download_request(
'The limit of maximum number of entries in a single download (%d) has been '
'exeeded (%d).' % (config.max_entry_download, response.pagination.total)))
uploads = _Uploads()
manifest = []
search_includes = ['entry_id', 'upload_id', 'parser_name']
......@@ -833,12 +877,10 @@ def _answer_entries_archive_download_request(
manifest_content = json.dumps(manifest, indent=2).encode()
yield StreamedFile(path='manifest.json', f=io.BytesIO(manifest_content), size=len(manifest_content))
try:
with _Uploads() as uploads:
# create the streaming response with zip file contents
content = create_zipstream(streamed_files(), compress=files_params.compress)
return StreamingResponse(content, media_type='application/zip')
finally:
uploads.close()
_entries_archive_download_docstring = strip('''
......@@ -936,11 +978,9 @@ async def get_entry_rawdir(
status_code=status.HTTP_404_NOT_FOUND,
detail='The entry with the given id does not exist or is not visible to you.')
uploads = _Uploads()
try:
return EntryRawDirResponse(entry_id=entry_id, data=_create_entry_rawdir(response.data[0], uploads))
finally:
uploads.close()
with _Uploads() as uploads:
return EntryRawDirResponse(
entry_id=entry_id, data=_create_entry_rawdir(response.data[0], uploads))
@router.get(
......@@ -1044,13 +1084,12 @@ def answer_entry_archive_request(query: Dict[str, Any], required: ArchiveRequire
entry_metadata = response.data[0]
entry_id = entry_metadata['entry_id']
uploads = _Uploads()
try:
with _Uploads() as uploads:
try:
archive_data = _read_archive(entry_metadata, uploads, required_reader)['archive']
except KeyError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
status.HTTP_404_NOT_FOUND,
detail='The entry does exist, but it has no archive.')
return {
......@@ -1060,10 +1099,7 @@ def answer_entry_archive_request(query: Dict[str, Any], required: ArchiveRequire
'entry_id': entry_id,
'upload_id': entry_metadata['upload_id'],
'parser_name': entry_metadata['parser_name'],
'archive': archive_data
}}
finally:
uploads.close()
'archive': archive_data}}
@router.get(
......
......@@ -35,7 +35,7 @@ section annotations/categories.
from .storage import (
write_archive, read_archive, ArchiveError, ArchiveReader, ArchiveWriter,
ArchiveObject, ArchiveList, ArchiveItem)
ArchiveDict, ArchiveList, ArchiveItem)
from .query import query_archive, filter_archive, ArchiveQueryError
from .partial import (
read_partial_archive_from_mongo, read_partial_archives_from_mongo,
......
......@@ -16,13 +16,70 @@
# limitations under the License.
#
from typing import Any, Tuple, Dict, Callable, Union, List
from io import BytesIO
import functools
import re
from typing import Any, Dict, Callable, Union, Tuple
from io import BytesIO
from nomad import utils
from .storage import ArchiveReader, ArchiveList, ArchiveObject
from .storage import ArchiveReader, ArchiveList, ArchiveDict, _to_son
_query_archive_key_pattern = re.compile(r'^([\s\w\-]+)(\[([-?0-9]*)(:([-?0-9]*))?])?$')
@functools.lru_cache(maxsize=1024)
def _fix_index(index, length):
if index is None:
return index
return max(-length, index) if index < 0 else min(length, index)
@functools.lru_cache(maxsize=1024)
def _extract_key_and_index(match) -> Tuple[str, Union[Tuple[int, int], int]]:
key = match.group(1)
# noinspection PyTypeChecker
index: Union[Tuple[int, int], int] = None
# check if we have indices
if match.group(2) is not None:
group = match.group(3)
first_index = None if group == '' else int(group)
if match.group(4) is None:
index = first_index # one item
else:
group = match.group(5)
last_index = None if group == '' else int(group)
index = (0 if first_index is None else first_index, last_index)
return key, index
# @cached(thread_safe=False, max_size=1024)
def _extract_child(archive_item, prop, index) -> Union[dict, list]:
archive_child = archive_item[prop]
is_list = isinstance(archive_child, (ArchiveList, list))
if index is None and is_list:
index = (0, None)
elif index is not None and not is_list:
raise ArchiveQueryError(f'cannot use list key on none list {prop}')
if index is not None:
length = len(archive_child)
if isinstance(index, tuple):
index = (_fix_index(index[0], length), _fix_index(index[1], length))
if index[0] == index[1]:
archive_child = [archive_child[index[0]]]
else:
archive_child = archive_child[index[0]: index[1]]
else:
archive_child = [archive_child[_fix_index(index, length)]]
return archive_child
class ArchiveQueryError(Exception):
......@@ -33,9 +90,6 @@ class ArchiveQueryError(Exception):
pass
__query_archive_key_pattern = re.compile(r'^([\s\w\-]+)(\[([-?0-9]*)(:([-?0-9]*))?\])?$')
def query_archive(
f_or_archive_reader: Union[str, ArchiveReader, BytesIO], query_dict: dict,
**kwargs) -> Dict:
......@@ -72,43 +126,25 @@ def query_archive(
* exclude (in combination with wildcard keys), replaces the value with null
'''
def _to_son(data):
if isinstance(data, (ArchiveList, List)):
data = [_to_son(item) for item in data]
elif isinstance(data, ArchiveObject):
data = data.to_dict()
return data
def _load_data(query_dict: Dict[str, Any], archive_item: ArchiveObject) -> Dict:
query_dict_with_fixed_ids = {
utils.adjust_uuid_size(key): value for key, value in query_dict.items()}
return filter_archive(query_dict_with_fixed_ids, archive_item, transform=_to_son)
if isinstance(f_or_archive_reader, ArchiveReader):
return _load_data(query_dict, f_or_archive_reader)
elif isinstance(f_or_archive_reader, (BytesIO, str)):
with ArchiveReader(f_or_archive_reader, **kwargs) as archive:
return _load_data(query_dict, archive)
else:
raise TypeError('%s is neither a file-like nor ArchiveReader' % f_or_archive_reader)
raise TypeError(f'{f_or_archive_reader} is neither a file-like nor ArchiveReader')
def filter_archive(
required: Union[str, Dict[str, Any]], archive_item: Union[Dict, ArchiveObject, str],
transform: Callable, result_root: Dict = None, resolve_inplace: bool = False) -> Dict:
def _load_data(query_dict: Dict[str, Any], archive_item: ArchiveDict) -> Dict:
query_dict_with_fixed_ids = {utils.adjust_uuid_size(
key): value for key, value in query_dict.items()}
return filter_archive(query_dict_with_fixed_ids, archive_item, transform=_to_son)
def _fix_index(index, length):
if index is None:
return index
if index < 0:
return max(-(length), index)
else:
return min(length, index)
def filter_archive(
required: Union[str, Dict[str, Any]], archive_item: Union[Dict, ArchiveDict, str],
transform: Callable, result_root: Dict = None, resolve_inplace: bool = False) -> Dict:
if archive_item is None:
return None
......@@ -119,51 +155,31 @@ def filter_archive(
if required == 'resolve':
# TODO this requires to reflect on the definition to determine what are references!
pass
elif required in ['*', 'include']:
pass
else:
raise ArchiveQueryError(f'unknown directive {required}')
return transform(archive_item)
elif not isinstance(required, dict):
if not isinstance(required, dict):
raise ArchiveQueryError('a value in required is neither dict not string directive')
if isinstance(archive_item, str):
# The archive item is a reference, the required is still a dict, the references
# needs to be resolved
# TODO
raise ArchiveQueryError(f'resolving references in non partial archives is not yet implemented')
raise ArchiveQueryError(
f'resolving references in non partial archives is not yet implemented')
result: Dict[str, Any] = {}
for key, val in required.items():
key = key.strip()
# process array indices
match = __query_archive_key_pattern.match(key)
index: Union[Tuple[int, int], int] = None
match = _query_archive_key_pattern.match(key)
if match:
key = match.group(1)
# check if we have indices
if match.group(2) is not None:
first_index, last_index = None, None
group = match.group(3)
first_index = None if group == '' else int(group)
if match.group(4) is not None:
group = match.group(5)
last_index = None if group == '' else int(group)
index = (0 if first_index is None else first_index, last_index)
else:
index = first_index # one item
else:
index = None
key, index = _extract_key_and_index(match)
elif key == '*':
# TODO
raise ArchiveQueryError('key wildcards not yet implemented')
......@@ -171,32 +187,11 @@ def filter_archive(
raise ArchiveQueryError('invalid key format: %s' % key)
try:
archive_child = archive_item[key]
is_list = isinstance(archive_child, (ArchiveList, list))
if index is None and is_list:
index = (0, None)
elif index is not None and not is_list:
raise ArchiveQueryError('cannot use list key on none list %s' % key)
if index is None:
pass
else:
length = len(archive_child)
if isinstance(index, tuple):
index = (_fix_index(index[0], length), _fix_index(index[1], length))
if index[0] == index[1]:
archive_child = [archive_child[index[0]]]
else:
archive_child = archive_child[index[0]: index[1]]
else:
archive_child = [archive_child[_fix_index(index, length)]]
archive_child = _extract_child(archive_item, key, index)
if isinstance(archive_child, (ArchiveList, list)):
result[key] = [
filter_archive(val, item, transform=transform)
for item in archive_child]
result[key] = [filter_archive(
val, item, transform=transform) for item in archive_child]
else:
result[key] = filter_archive(val, archive_child, transform=transform)
......
......@@ -16,14 +16,14 @@
# limitations under the License.
#
from typing import cast, Union, Dict, Tuple, List, Any, Callable
import re
import functools
from typing import cast, Union, Dict, Tuple, Any
from nomad import utils
from nomad.metainfo import Definition, Section, Quantity, SubSection, Reference, QuantityReference
from .storage import ArchiveReader, ArchiveList, ArchiveObject, ArchiveError
from .query import ArchiveQueryError # TODO
from .storage import ArchiveReader, ArchiveList, ArchiveError
from .query import ArchiveQueryError, _to_son, _query_archive_key_pattern, _extract_key_and_index, \
_extract_child
class RequiredValidationError(Exception):
......@@ -33,6 +33,29 @@ class RequiredValidationError(Exception):
self.loc = loc
@functools.lru_cache(maxsize=1024)
def _parse_required_key(key: str) -> Tuple[str, Union[Tuple[int, int], int]]:
key = key.strip()
match = _query_archive_key_pattern.match(key)
if not match:
raise Exception(f'invalid key format: {key}')
return _extract_key_and_index(match)
def _setdefault(target: Union[dict, list], key, value_type: type):
if isinstance(target, list):
if target[key] is None:
target[key] = value_type()
return target[key]
if key not in target:
target[key] = value_type()
return target[key]
class RequiredReader:
'''
Clients can read only the required parts of an archive. They specify the required
......@@ -54,7 +77,7 @@ class RequiredReader:
}
}
The structure has to adheres to metainfo definitions of an archive's sub-sections and
The structure has to adhere to metainfo definitions of an archive's subsections and
quantities. At each point in the specification, children can be replaced with certain
directives.
......@@ -70,38 +93,23 @@ class RequiredReader:
- required: The requirement specification as a python dictionary or directive string.
'''
__query_archive_key_pattern = re.compile(r'^([\s\w\-]+)(\[([-?0-9]*)(:([-?0-9]*))?\])?$')
def __init__(self, required: Union[dict, str], root_section_def: Section = None, resolve_inplace: bool = False):
def __init__(
self, required: Union[dict, str], root_section_def: Section = None,
resolve_inplace: bool = False):
if root_section_def is None:
from nomad import datamodel
self.root_section_def = datamodel.EntryArchive.m_def
else:
self.root_section_def = root_section_def
self.__result_root: dict = None
self.__archive_root: dict = None # it is actually an ArchvieReader, but we use it as dict
# noinspection PyTypeChecker
self._result_root: dict = None
# noinspection PyTypeChecker
self._archive_root: dict = None # it is actually an ArchiveReader, but we use it as dict
self.resolve_inplace = resolve_inplace