"""Helper functions shared between modules."""
from collections.abc import Iterable
from collections.abc import Sequence
from datetime import datetime
from pytz import UTC
import os
import time
import json
from colander import Schema
from multipledispatch import dispatch
from pyramid.compat import is_nonstr_iter
from pyramid.location import lineage
from pyramid.request import Request
from pyramid.registry import Registry
from pyramid.traversal import resource_path
from substanced.util import acquire
from substanced.util import find_catalog
from substanced.util import get_dotted_name
from zope.interface import directlyProvidedBy
from zope.interface import providedBy
from zope.interface.interfaces import IInterface
from adhocracy_core.interfaces import ChangelogMetadata
from adhocracy_core.interfaces import IResource
from adhocracy_core.interfaces import VisibilityChange
from adhocracy_core.events import ResourceWillBeDeleted
from adhocracy_core.events import ResourceSheetModified
[docs]def find_graph(context) -> object:
"""Get the Graph object in the lineage of `context` or None.
:rtype: :class:`adhocracy_core.graph.Graph`
"""
return acquire(context, '__graph__', None)
[docs]def get_iresource(context) -> IInterface:
"""Get the :class:`adhocracy_core.interfaces.IResource` of `context`.
:return: :class:`IInterface` or None to ease testing
"""
ifaces = list(directlyProvidedBy(context))
iresources = [i for i in ifaces if i.isOrExtends(IResource)]
return iresources[0] if iresources else None
[docs]def get_matching_isheet(context, isheet: IInterface) -> IInterface:
"""
Get `isheet` or a subclass of it if `context` provides it.
If `context` provides neither `isheet` nor any of its subclasses, None
is returned.
"""
ifaces = list(providedBy(context))
for iface in ifaces:
if iface.isOrExtends(isheet):
return iface
return None
[docs]def log_compatible_datetime(dt: datetime=datetime.now()):
"""Format a datetime in the same way as the logging framework.
Mimics the output of the '%(asctime)' placeholder.
"""
return '{}-{:02}-{:02} {:02}:{:02}:{:02},{:03}'.format(
dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second,
dt.microsecond // 1000)
[docs]def to_dotted_name(context) -> str:
"""Get the dotted name of `context`.
:returns:
The dotted name of `context`, if it's a type. If `context` is a string
it is returned as is (since we suppose that it already
represents a type name).
"""
if isinstance(context, str):
return context
else:
return get_dotted_name(context)
[docs]def remove_keys_from_dict(dictionary: dict, keys_to_remove=()) -> dict:
"""Remove keys from `dictionary`.
:param keys_to_remove: Tuple with keys or one key
"""
if not is_nonstr_iter(keys_to_remove):
keys_to_remove = (keys_to_remove,)
dictionary_copy = {}
for key, value in dictionary.items():
if key not in keys_to_remove:
dictionary_copy[key] = value
return dictionary_copy
[docs]def exception_to_str(err: Exception):
"""Convert an exception to a string.
:param err: the exception
:return: "{type}: {str}", where {type} is the class name of the exception
and {str} is the result of calling `str(err)`; or just "{type}"
if {str} is empty
"""
name = err.__class__.__name__
desc = str(err)
if desc:
return '{}: {}'.format(name, desc)
else:
return name
[docs]def normalize_to_tuple(context) -> tuple:
"""Convert `context` to :class:`tuple`."""
if isinstance(context, tuple):
return context
elif isinstance(context, str):
return context,
elif isinstance(context, Sequence):
return tuple(context)
else:
return context,
[docs]def nested_dict_set(d: dict, keys: list, value: object):
"""
Set a nested key in a dictionary.
The following two expressions are equivalent, if ``d['key']['subkey']``
already exists::
nested_dict_set(d, ['key', 'subkey', 'subsubkey'], value)
d['key']['subkey']['subsubkey'] = value
If parent elements such as ``d['key']['subkey']`` or ``d['key']`` don't
yet exist, this function will initialize them as dictionaries.
"""
for key in keys[:-1]:
d = d.setdefault(key, {})
d[keys[-1]] = value
[docs]def unflatten_multipart_request(request: Request) -> dict:
"""Convert a multipart/form-data request into the usual dict structure."""
result = {}
for key, value in request.POST.items():
keyparts = key.split(':')
nested_dict_set(result, keyparts, value)
return result
[docs]def set_batchmode(request: Request, value=True):
"""Set 'batchmode' marker for the current request.
This is called by :class:`adhocracy_core.rest.batchview.BatchView`.
Other code can check :func:`is_batchmode` to modify behavior.
"""
request.__is_batchmode__ = value
[docs]def is_batchmode(request: Request) -> bool:
"""Get 'batchmode' marker for the current request."""
return getattr(request, '__is_batchmode__', False)
[docs]def is_hidden(resource: IResource) -> dict:
"""Check whether a resource is hidden.
This also returns True for descendants of hidden resources, as a positive
hidden status is inherited.
"""
for context in lineage(resource):
if getattr(context, 'hidden', False):
return True
return False
[docs]def get_reason_if_blocked(resource: IResource) -> str:
"""Check if a resource is blocked and return Reason, None otherwise."""
reason = None
if is_hidden(resource):
reason = 'hidden'
return reason
[docs]def list_resource_with_descendants(resource: IResource) -> Iterable:
"""List all descendants of a resource, including the resource itself."""
system_catalog = find_catalog(resource, 'system')
if system_catalog is None: # ease testing
return []
path_index = system_catalog['path']
query = path_index.eq(resource_path(resource), include_origin=True)
return query.execute()
@dispatch(ResourceSheetModified)
def get_visibility_change(event):
"""Return changed visbility for `event.object`."""
was_visible = not event.old_appstruct['hidden']
is_visible = not event.new_appstruct.get('hidden', False)
if was_visible:
if is_visible:
return VisibilityChange.visible
else:
return VisibilityChange.concealed
else:
if is_visible:
return VisibilityChange.revealed
else:
return VisibilityChange.invisible
@dispatch(ResourceWillBeDeleted) # flake8: noqa
def get_visibility_change(event):
"""Return changed visbility for `event.object`."""
return VisibilityChange.concealed
[docs]def now() -> datetime:
"""Return current date time with 'UTC' time zone."""
date = datetime.utcnow().replace(tzinfo=UTC)
return date
[docs]def get_modification_date(registry: Registry) -> datetime:
"""Get the shared modification date for the current transaction.
This way every date created in one batch/post request
can use this as default value.
The frontend relies on this to ease sorting.
"""
date = getattr(registry, '__modification_date__', None)
if date is None:
date = now()
registry.__modification_date__ = date
return date
[docs]def create_filename(directory='.', prefix='', suffix='.csv') -> str:
"""Use current time to generate a unique filename.
:params dir: directory path for the filename.
If non existing the directory is created.
:params prefix: prefix for the generated filename
:params suffix: type suffix for the generated filename, like 'csv'
"""
if not os.path.exists(directory):
os.makedirs(directory)
time_str = time.strftime('%Y%m%d-%H%M%S')
name = '{0}-{1}{2}'.format(prefix, time_str, suffix)
path = os.path.join(directory, name)
return path
[docs]def load_json(filename):
"""Load a json file from the disk."""
with open(filename, 'r') as f:
return json.load(f)
[docs]def has_annotation_sheet_data(resource: IResource) -> bool:
"""Check if `resource` has no data stored in AnnotationResourceSheets."""
for attribute in resource.__dict__:
if attribute.startswith('_sheet_'):
return True
else:
return False
[docs]def is_created_in_current_transaction(resource: IResource,
registry: Registry) -> bool:
"""Check if `resource` is created during the current transaction."""
changelog = get_changelog_metadata(resource, registry)
return changelog.created
[docs]def create_schema(schema_class, context, request, **kwargs) -> Schema:
"""Create `schema` from `schema_class` and add bindings.
The default bindings are: `context`, `request` , `registry`, `creating`
(defaults to False). These can be overridden or extended by `**kwargs`.
"""
bindings = {'request': request,
'registry': request and getattr(request, 'registry'),
'context': context,
'creating': False,
}
bindings.update(**kwargs)
schema = schema_class().bind(**bindings)
return schema