Commit 3e2a9d0a authored by Theodore Chang's avatar Theodore Chang
Browse files

Merge branch '747-multiprocess' into 'v1.0.8'

Resolve "Improve ArchiveReader performance"

See merge request !606
parents 81d03231 f650b7ff
Pipeline #129056 failed with stages
in 20 minutes and 27 seconds
...@@ -236,6 +236,20 @@ deploy prod test: ...@@ -236,6 +236,20 @@ deploy prod test:
- /^dev-.*$/ - /^dev-.*$/
when: manual when: manual
deploy prod util:
stage: release
before_script:
- mkdir -p /etc/deploy
- echo ${CI_K8S_PROD_CONFIG} | base64 -d > ${KUBECONFIG}
script:
- helm dependency update ops/helm/nomad
- helm upgrade --install nomad-util-v1 ops/helm/nomad -f ops/helm/nomad/deployments/prod-util-values.yaml --set image.tag=$CI_COMMIT_REF_NAME,roll=true --wait
- docker pull $TEST_IMAGE
- docker run -t -e NOMAD_KEYCLOAK_REALM_NAME=fairdi_nomad_prod $TEST_IMAGE python -m nomad.cli client -n https://nomad-lab.eu/prod/v1/util/api -u test -w $CI_NOMAD_TEST_PASSWORD integrationtests --skip-publish --skip-doi
except:
- /^dev-.*$/
when: manual
release latest image: release latest image:
stage: release stage: release
script: script:
......
...@@ -17,6 +17,10 @@ ...@@ -17,6 +17,10 @@
# #
from datetime import datetime from datetime import datetime
import functools
from itertools import zip_longest
import multiprocessing
import operator
from typing import Optional, Set, Union, Dict, Iterator, Any, List from typing import Optional, Set, Union, Dict, Iterator, Any, List
from fastapi import ( from fastapi import (
...@@ -38,7 +42,6 @@ from nomad.datamodel import EditableUserMetadata ...@@ -38,7 +42,6 @@ from nomad.datamodel import EditableUserMetadata
from nomad.files import StreamedFile, create_zipstream from nomad.files import StreamedFile, create_zipstream
from nomad.utils import strip from nomad.utils import strip
from nomad.archive import RequiredReader, RequiredValidationError, ArchiveQueryError from nomad.archive import RequiredReader, RequiredValidationError, ArchiveQueryError
from nomad.archive import ArchiveQueryError
from nomad.search import AuthenticationRequiredError, SearchError, update_metadata as es_update_metadata from nomad.search import AuthenticationRequiredError, SearchError, update_metadata as es_update_metadata
from nomad.search import search, QueryValidationError from nomad.search import search, QueryValidationError
from nomad.metainfo.elasticsearch_extension import entry_type from nomad.metainfo.elasticsearch_extension import entry_type
...@@ -69,7 +72,7 @@ archive_required_documentation = strip(''' ...@@ -69,7 +72,7 @@ archive_required_documentation = strip('''
The `required` part allows you to specify what parts of the requested archives The `required` part allows you to specify what parts of the requested archives
should be returned. The NOMAD Archive is a hierarchical data format and should be returned. The NOMAD Archive is a hierarchical data format and
you can *require* certain branches (i.e. *sections*) in the hierarchy. you can *require* certain branches (i.e. *sections*) in the hierarchy.
By specifing certain sections with specific contents or all contents (via By specifying certain sections with specific contents or all contents (via
the directive `"*"`), you can determine what sections and what quantities should the directive `"*"`), you can determine what sections and what quantities should
be returned. The default is the whole archive, i.e., `"*"`. be returned. The default is the whole archive, i.e., `"*"`.
...@@ -441,13 +444,20 @@ def _do_exaustive_search(owner: Owner, query: Query, include: List[str], user: U ...@@ -441,13 +444,20 @@ def _do_exaustive_search(owner: Owner, query: Query, include: List[str], user: U
break break
class _Uploads(): class _Uploads:
''' '''
A helper class that caches subsequent access to upload files the same upload. A helper class that caches subsequent access to upload files the same upload.
''' '''
def __init__(self): def __init__(self):
self._upload_files = None self._upload_files = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def get_upload_files(self, upload_id: str) -> files.UploadFiles: def get_upload_files(self, upload_id: str) -> files.UploadFiles:
if self._upload_files is not None and self._upload_files.upload_id != upload_id: if self._upload_files is not None and self._upload_files.upload_id != upload_id:
self._upload_files.close() self._upload_files.close()
...@@ -492,13 +502,9 @@ def _answer_entries_rawdir_request( ...@@ -492,13 +502,9 @@ def _answer_entries_rawdir_request(
required=MetadataRequired(include=['entry_id', 'upload_id', 'mainfile']), required=MetadataRequired(include=['entry_id', 'upload_id', 'mainfile']),
user_id=user.user_id if user is not None else None) user_id=user.user_id if user is not None else None)
uploads = _Uploads() with _Uploads() as uploads:
try: response_data = [_create_entry_rawdir(
response_data = [ entry_metadata, uploads) for entry_metadata in search_response.data]
_create_entry_rawdir(entry_metadata, uploads)
for entry_metadata in search_response.data]
finally:
uploads.close()
return EntriesRawDirResponse( return EntriesRawDirResponse(
owner=search_response.owner, owner=search_response.owner,
...@@ -670,68 +676,107 @@ def _read_archive(entry_metadata, uploads, required_reader: RequiredReader): ...@@ -670,68 +676,107 @@ def _read_archive(entry_metadata, uploads, required_reader: RequiredReader):
return { return {
'entry_id': entry_id, 'entry_id': entry_id,
'parser_name': entry_metadata['parser_name'], 'parser_name': entry_metadata['parser_name'],
'archive': required_reader.read(archive, entry_id) 'archive': required_reader.read(archive, entry_id)}
}
except ArchiveQueryError as e: except ArchiveQueryError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=str(e))
def _validate_required(required: ArchiveRequired) -> RequiredReader: def _validate_required(required: ArchiveRequired) -> RequiredReader:
try: try:
return RequiredReader(required) return RequiredReader(required)
except RequiredValidationError as e: except RequiredValidationError as e:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=[dict( raise HTTPException(
msg=e.msg, loc=['required'] + e.loc)]) status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=[dict(msg=e.msg, loc=['required'] + e.loc)])
def _read_entry_from_archive(entry, uploads, required_reader: RequiredReader):
entry_id, upload_id = entry['entry_id'], entry['upload_id']
# all other exceptions are handled by the caller `_read_entries_from_archive`
try:
upload_files = uploads.get_upload_files(upload_id)
with upload_files.read_archive(entry_id, True) as archive:
return {
'entry_id': entry_id,
'upload_id': upload_id,
'parser_name': entry['parser_name'],
'archive': required_reader.read(archive, entry_id)}
except ArchiveQueryError as e:
raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=str(e))
except KeyError as e:
logger.error('missing archive', exc_info=e, entry_id=entry_id)
return None
def _read_entries_from_archive(entries, required):
'''
Takes pickleable arguments so that it can be offloaded to worker processes.
It is important to ensure the return values are also pickleable.
'''
with _Uploads() as uploads:
required_reader = _validate_required(required)
responses = [_read_entry_from_archive(
entry, uploads, required_reader) for entry in entries if entry is not None]
return list(filter(None, responses))
def _answer_entries_archive_request( def _answer_entries_archive_request(
owner: Owner, query: Query, pagination: MetadataPagination, required: ArchiveRequired, owner: Owner, query: Query, pagination: MetadataPagination, required: ArchiveRequired,
user: User): user: User):
if owner == Owner.all_: if owner == Owner.all_:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=strip(''' raise HTTPException(
The owner=all is not allowed for this operation as it will search for entries status_code=status.HTTP_401_UNAUTHORIZED, detail=strip(
that you might now be allowed to access. '''The owner=all is not allowed for this operation as it will search for entries
''')) that you might now be allowed to access.'''))
if required is None: if required is None:
required = '*' required = '*'
required_reader = _validate_required(required)
search_response = perform_search( search_response = perform_search(
owner=owner, query=query, owner=owner, query=query,
pagination=pagination, pagination=pagination,
required=MetadataRequired(include=['entry_id', 'upload_id', 'parser_name']), required=MetadataRequired(include=['entry_id', 'upload_id', 'parser_name']),
user_id=user.user_id if user is not None else None) user_id=user.user_id if user is not None else None)
uploads = _Uploads() # fewer than 8 entries per process is not useful
response_data = {} # more than config.max_process_number processes is too much for the server
for entry_metadata in search_response.data: number: int = min(
entry_id, upload_id = entry_metadata['entry_id'], entry_metadata['upload_id'] len(search_response.data) // config.archive.min_entires_per_process,
config.archive.max_process_number)
archive_data = None if number <= 1:
request_data: list = _read_entries_from_archive(search_response.data, required)
else:
entries_per_process = len(search_response.data) // number + 1
try: # use process pool
archive_data = _read_archive(entry_metadata, uploads, required_reader)['archive'] pool: multiprocessing.pool.Pool = multiprocessing.pool.Pool(processes=number)
except KeyError as e:
logger.error('missing archive', exc_info=e, entry_id=entry_id)
continue
response_data[entry_id] = {
'entry_id': entry_id,
'upload_id': upload_id,
'parser_name': entry_metadata['parser_name'],
'archive': archive_data}
uploads.close() try:
responses = pool.map(
functools.partial(_read_entries_from_archive, required=required),
zip_longest(*[iter(search_response.data)] * entries_per_process))
finally:
# gracefully shutdown the pool
pool.close()
pool.join()
# collect results from each process
# https://stackoverflow.com/a/45323085
request_data = functools.reduce(operator.iconcat, responses, [])
return EntriesArchiveResponse( return EntriesArchiveResponse(
owner=search_response.owner, owner=search_response.owner,
query=search_response.query, query=search_response.query,
pagination=search_response.pagination, pagination=search_response.pagination,
required=required, required=required,
data=list(response_data.values())) data=request_data)
_entries_archive_docstring = strip(''' _entries_archive_docstring = strip('''
...@@ -805,7 +850,6 @@ def _answer_entries_archive_download_request( ...@@ -805,7 +850,6 @@ def _answer_entries_archive_download_request(
'The limit of maximum number of entries in a single download (%d) has been ' 'The limit of maximum number of entries in a single download (%d) has been '
'exeeded (%d).' % (config.max_entry_download, response.pagination.total))) 'exeeded (%d).' % (config.max_entry_download, response.pagination.total)))
uploads = _Uploads()
manifest = [] manifest = []
search_includes = ['entry_id', 'upload_id', 'parser_name'] search_includes = ['entry_id', 'upload_id', 'parser_name']
...@@ -833,12 +877,10 @@ def _answer_entries_archive_download_request( ...@@ -833,12 +877,10 @@ def _answer_entries_archive_download_request(
manifest_content = json.dumps(manifest, indent=2).encode() manifest_content = json.dumps(manifest, indent=2).encode()
yield StreamedFile(path='manifest.json', f=io.BytesIO(manifest_content), size=len(manifest_content)) yield StreamedFile(path='manifest.json', f=io.BytesIO(manifest_content), size=len(manifest_content))
try: with _Uploads() as uploads:
# create the streaming response with zip file contents # create the streaming response with zip file contents
content = create_zipstream(streamed_files(), compress=files_params.compress) content = create_zipstream(streamed_files(), compress=files_params.compress)
return StreamingResponse(content, media_type='application/zip') return StreamingResponse(content, media_type='application/zip')
finally:
uploads.close()
_entries_archive_download_docstring = strip(''' _entries_archive_download_docstring = strip('''
...@@ -936,11 +978,9 @@ async def get_entry_rawdir( ...@@ -936,11 +978,9 @@ async def get_entry_rawdir(
status_code=status.HTTP_404_NOT_FOUND, status_code=status.HTTP_404_NOT_FOUND,
detail='The entry with the given id does not exist or is not visible to you.') detail='The entry with the given id does not exist or is not visible to you.')
uploads = _Uploads() with _Uploads() as uploads:
try: return EntryRawDirResponse(
return EntryRawDirResponse(entry_id=entry_id, data=_create_entry_rawdir(response.data[0], uploads)) entry_id=entry_id, data=_create_entry_rawdir(response.data[0], uploads))
finally:
uploads.close()
@router.get( @router.get(
...@@ -1044,13 +1084,12 @@ def answer_entry_archive_request(query: Dict[str, Any], required: ArchiveRequire ...@@ -1044,13 +1084,12 @@ def answer_entry_archive_request(query: Dict[str, Any], required: ArchiveRequire
entry_metadata = response.data[0] entry_metadata = response.data[0]
entry_id = entry_metadata['entry_id'] entry_id = entry_metadata['entry_id']
uploads = _Uploads() with _Uploads() as uploads:
try:
try: try:
archive_data = _read_archive(entry_metadata, uploads, required_reader)['archive'] archive_data = _read_archive(entry_metadata, uploads, required_reader)['archive']
except KeyError: except KeyError:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, status.HTTP_404_NOT_FOUND,
detail='The entry does exist, but it has no archive.') detail='The entry does exist, but it has no archive.')
return { return {
...@@ -1060,10 +1099,7 @@ def answer_entry_archive_request(query: Dict[str, Any], required: ArchiveRequire ...@@ -1060,10 +1099,7 @@ def answer_entry_archive_request(query: Dict[str, Any], required: ArchiveRequire
'entry_id': entry_id, 'entry_id': entry_id,
'upload_id': entry_metadata['upload_id'], 'upload_id': entry_metadata['upload_id'],
'parser_name': entry_metadata['parser_name'], 'parser_name': entry_metadata['parser_name'],
'archive': archive_data 'archive': archive_data}}
}}
finally:
uploads.close()
@router.get( @router.get(
......
...@@ -35,7 +35,7 @@ section annotations/categories. ...@@ -35,7 +35,7 @@ section annotations/categories.
from .storage import ( from .storage import (
write_archive, read_archive, ArchiveError, ArchiveReader, ArchiveWriter, write_archive, read_archive, ArchiveError, ArchiveReader, ArchiveWriter,
ArchiveObject, ArchiveList, ArchiveItem) ArchiveDict, ArchiveList, ArchiveItem)
from .query import query_archive, filter_archive, ArchiveQueryError from .query import query_archive, filter_archive, ArchiveQueryError
from .partial import ( from .partial import (
read_partial_archive_from_mongo, read_partial_archives_from_mongo, read_partial_archive_from_mongo, read_partial_archives_from_mongo,
......
...@@ -16,13 +16,70 @@ ...@@ -16,13 +16,70 @@
# limitations under the License. # limitations under the License.
# #
from typing import Any, Tuple, Dict, Callable, Union, List import functools
from io import BytesIO
import re import re
from typing import Any, Dict, Callable, Union, Tuple
from io import BytesIO
from nomad import utils from nomad import utils
from .storage import ArchiveReader, ArchiveList, ArchiveObject from .storage import ArchiveReader, ArchiveList, ArchiveDict, _to_son
_query_archive_key_pattern = re.compile(r'^([\s\w\-]+)(\[([-?0-9]*)(:([-?0-9]*))?])?$')
@functools.lru_cache(maxsize=1024)
def _fix_index(index, length):
if index is None:
return index
return max(-length, index) if index < 0 else min(length, index)
@functools.lru_cache(maxsize=1024)
def _extract_key_and_index(match) -> Tuple[str, Union[Tuple[int, int], int]]:
key = match.group(1)
# noinspection PyTypeChecker
index: Union[Tuple[int, int], int] = None
# check if we have indices
if match.group(2) is not None:
group = match.group(3)
first_index = None if group == '' else int(group)
if match.group(4) is None:
index = first_index # one item
else:
group = match.group(5)
last_index = None if group == '' else int(group)
index = (0 if first_index is None else first_index, last_index)
return key, index
# @cached(thread_safe=False, max_size=1024)
def _extract_child(archive_item, prop, index) -> Union[dict, list]:
archive_child = archive_item[prop]
is_list = isinstance(archive_child, (ArchiveList, list))
if index is None and is_list:
index = (0, None)
elif index is not None and not is_list:
raise ArchiveQueryError(f'cannot use list key on none list {prop}')
if index is not None:
length = len(archive_child)
if isinstance(index, tuple):
index = (_fix_index(index[0], length), _fix_index(index[1], length))
if index[0] == index[1]:
archive_child = [archive_child[index[0]]]
else:
archive_child = archive_child[index[0]: index[1]]
else:
archive_child = [archive_child[_fix_index(index, length)]]
return archive_child
class ArchiveQueryError(Exception): class ArchiveQueryError(Exception):
...@@ -33,9 +90,6 @@ class ArchiveQueryError(Exception): ...@@ -33,9 +90,6 @@ class ArchiveQueryError(Exception):
pass pass
__query_archive_key_pattern = re.compile(r'^([\s\w\-]+)(\[([-?0-9]*)(:([-?0-9]*))?\])?$')
def query_archive( def query_archive(
f_or_archive_reader: Union[str, ArchiveReader, BytesIO], query_dict: dict, f_or_archive_reader: Union[str, ArchiveReader, BytesIO], query_dict: dict,
**kwargs) -> Dict: **kwargs) -> Dict:
...@@ -72,43 +126,25 @@ def query_archive( ...@@ -72,43 +126,25 @@ def query_archive(
* exclude (in combination with wildcard keys), replaces the value with null * exclude (in combination with wildcard keys), replaces the value with null
''' '''
def _to_son(data):
if isinstance(data, (ArchiveList, List)):
data = [_to_son(item) for item in data]
elif isinstance(data, ArchiveObject):
data = data.to_dict()
return data
def _load_data(query_dict: Dict[str, Any], archive_item: ArchiveObject) -> Dict:
query_dict_with_fixed_ids = {
utils.adjust_uuid_size(key): value for key, value in query_dict.items()}
return filter_archive(query_dict_with_fixed_ids, archive_item, transform=_to_son)
if isinstance(f_or_archive_reader, ArchiveReader): if isinstance(f_or_archive_reader, ArchiveReader):
return _load_data(query_dict, f_or_archive_reader) return _load_data(query_dict, f_or_archive_reader)
elif isinstance(f_or_archive_reader, (BytesIO, str)): elif isinstance(f_or_archive_reader, (BytesIO, str)):
with ArchiveReader(f_or_archive_reader, **kwargs) as archive: with ArchiveReader(f_or_archive_reader, **kwargs) as archive:
return _load_data(query_dict, archive) return _load_data(query_dict, archive)
else: else:
raise TypeError('%s is neither a file-like nor ArchiveReader' % f_or_archive_reader) raise TypeError(f'{f_or_archive_reader} is neither a file-like nor ArchiveReader')
def filter_archive( def _load_data(query_dict: Dict[str, Any], archive_item: ArchiveDict) -> Dict:
required: Union[str, Dict[str, Any]], archive_item: Union[Dict, ArchiveObject, str], query_dict_with_fixed_ids = {utils.adjust_uuid_size(
transform: Callable, result_root: Dict = None, resolve_inplace: bool = False) -> Dict: key): value for key, value in query_dict.items()}
return filter_archive(query_dict_with_fixed_ids, archive_item, transform=_to_son)
def _fix_index(index, length):
if index is None:
return index
if index < 0:
return max(-(length), index)
else:
return min(length, index)
def filter_archive(
required: Union[str, Dict[str, Any]], archive_item: Union[Dict, ArchiveDict, str],
transform: Callable, result_root: Dict = None, resolve_inplace: bool = False) -> Dict:
if archive_item is None: if archive_item is None:
return None return None
...@@ -119,51 +155,31 @@ def filter_archive( ...@@ -119,51 +155,31 @@ def filter_archive(
if required == 'resolve': if required == 'resolve':
# TODO this requires to reflect on the definition to determine what are references! # TODO this requires to reflect on the definition to determine what are references!
pass pass
elif required in ['*', 'include']: elif required in ['*', 'include']:
pass pass
else: else:
raise ArchiveQueryError(f'unknown directive {required}') raise ArchiveQueryError(f'unknown directive {required}')
return transform(archive_item) return transform(archive_item)
elif not isinstance(required, dict): if not isinstance(required, dict):
raise ArchiveQueryError('a value in required is neither dict not string directive') raise ArchiveQueryError('a value in required is neither dict not string directive')
if isinstance(archive_item, str): if isinstance(archive_item, str):
# The archive item is a reference, the required is still a dict, the references # The archive item is a reference, the required is still a dict, the references
# needs to be resolved # needs to be resolved
# TODO # TODO
raise ArchiveQueryError(f'resolving references in non partial archives is not yet implemented') raise ArchiveQueryError(
f'resolving references in non partial archives is not yet implemented')
result: Dict[str, Any] = {} result: Dict[str, Any] = {}
for key, val in required.items(): for key, val in required.items():
key = key.strip() key = key.strip()
# process array indices # process array indices
match = __query_archive_key_pattern.match(key) match = _query_archive_key_pattern.match(key)
index: Union[Tuple[int, int], int] = None
if match: if match:
key = match.group(1) key, index = _extract_key_and_index(match)