"""Create resources, get sheets/metadata, permission checks."""
from functools import lru_cache
from pyramid.request import Request
from pyramid.traversal import resource_path
from pyramid.util import DottedNameResolver
from pyramid.decorator import reify
from substanced.content import ContentRegistry
from substanced.content import add_content_type
from substanced.content import add_service_type
from substanced.workflow import IWorkflow
from zope.interface.interfaces import IInterface
from adhocracy_core.authentication import is_created_anonymized
from adhocracy_core.authorization import is_password_required_to_edit_some
from adhocracy_core.exceptions import RuntimeConfigurationError
from adhocracy_core.interfaces import IItem
from adhocracy_core.interfaces import ISheet
from adhocracy_core.interfaces import IPool
from adhocracy_core.interfaces import ResourceMetadata
from adhocracy_core.interfaces import SheetMetadata
from adhocracy_core.interfaces import IResourceSheet
from adhocracy_core.sheets.anonymize import IAllowAddAnonymized
from adhocracy_core.sheets.anonymize import ANONYMIZE_PERMISSION
from adhocracy_core.utils import get_iresource
resolver = DottedNameResolver()
[docs]class ResourceContentRegistry(ContentRegistry):
"""Extend substanced content registry to work with resources."""
def __init__(self, registry):
"""Initialize self."""
super().__init__(registry)
self.resources_meta = {}
"""Resources meta mapping.
Dictionary with key iresource (`resource type` interface) and value
:class:`adhocracy_core.interfaces.ResourceMetadata`.
"""
self.sheets_meta = {}
"""Sheets meta mapping.
Dictionary with key isheet (`sheet type` interface) and value
:class:`adhocracy_core.interfaces.SheetMetadata`.
"""
self.workflows_meta = {}
"""Dictionary with key workflow name and value data.
The value data structure is defined in
:class:`adhocracy_core.workflows.schemas.Workflow`
"""
self.workflows = {}
"""Dictionary with key workflow name and value
:class:`substanced.workflow.IWorkflow`.
"""
[docs] def get_resources_meta_addable(self, context: object,
request: Request) -> [ResourceMetadata]:
"""Get addable resource meta for context, mind permissions."""
iresource = get_iresource(context)
addables = self.resources_meta_addable[iresource]
addables_allowed = []
for resource_meta in addables:
if self.can_add_resource(request, resource_meta, context):
addables_allowed.append(resource_meta)
return addables_allowed
[docs] def can_add_resource(self, request: Request, meta: ResourceMetadata,
context: IPool) -> bool:
"""Check that the resource type in `meta` is addable to `context`."""
permission = meta.permission_create
allowed = request.has_permission(permission, context)
return allowed
@reify
def resources_meta_addable(self) -> {}:
"""Addable resources metadata mapping.
Dictionary with key iresource (`resource type` interface)` and value
list of :class:`adhocracy_core.interfaces.ResourceMetadata`.
The value includes only addables for a context with `resource type`.
"""
resources_addables = {}
for iresource, resource_meta in self.resources_meta.items():
addables = resource_meta.element_types
all_addables = []
for iresource_, resource_meta_ in self.resources_meta.items():
is_implicit = resource_meta_.is_implicit_addable
for iresource_addable in addables:
is_subtype = is_implicit\
and iresource_.extends(iresource_addable)
is_is = iresource_ is iresource_addable
if is_subtype or is_is:
all_addables.append(resource_meta_)
resources_addables[iresource] = all_addables
return resources_addables
@property
def permissions(self) -> [str]:
"""Set of all permissions defined in the system."""
perms = self._builtin_permissions
for resource_meta in self.resources_meta.values():
perms.update(self._get_resource_permissions(resource_meta))
for sheet_meta in self.sheets_meta.values():
perms.update(self._get_sheet_permissions(sheet_meta, perms))
for workflow_meta in self.workflows_meta.values():
perms.update(self._get_workflow_permissions(workflow_meta))
perms.update(self._get_views_permissions())
return perms
@property
def _builtin_permissions(self):
return {'hide', 'edit_group', 'do_transition'}
def _get_resource_permissions(self, resource_meta):
return [p for p in [resource_meta.permission_create]
if p != '']
def _get_sheet_permissions(self, sheet_meta, perms):
return [p for p in [sheet_meta.permission_view,
sheet_meta.permission_edit,
sheet_meta.permission_create]
if p != '']
def _get_views_permissions(self):
permissions = self.registry.introspector.get_category('permissions')
if permissions is None:
return []
return [v['introspectable'].title for v in permissions]
def _get_workflow_permissions(self, workflow_meta):
return [t['permission'] for t in workflow_meta['transitions'].values()]
[docs] def get_sheet(self, context: object,
isheet: IInterface,
request: Request=None,
creating: ResourceMetadata=None) -> IResourceSheet:
"""Get sheet for `context` and set the 'context' attribute.
:raise adhocracy_core.exceptions.RuntimeConfigurationError:
if there is no `isheet` sheet registered for context.
"""
if not creating and not isheet.providedBy(context):
msg = 'Sheet {0} is not provided by resource {1}'\
.format(isheet.__identifier__, resource_path(context))
raise RuntimeConfigurationError(msg)
meta = self.sheets_meta[isheet]
sheet = self._create_sheet(meta, context, request=request)
sheet.context = context
sheet.request = request
sheet.registry = self.registry
sheet.creating = creating
return sheet
@lru_cache(maxsize=256)
def _create_sheet(self, meta: SheetMetadata,
context: object,
request: Request,
creating: ResourceMetadata=None) -> IResourceSheet:
sheet = meta.sheet_class(meta, context, self.registry,
request=request,
creating=creating,
)
return sheet
[docs] def get_sheet_field(self, context: object,
isheet: IInterface,
field: str,
request: Request=None) -> object:
"""Get sheet for `context` and return the value for field `name`.
:raise adhocracy_core.exceptions.RuntimeConfigurationError:
if there is no `isheet` sheet registered for context.
:raise KeyError: if `field` does not exists for sheet `isheet`.
"""
sheet = self.get_sheet(context, isheet, request=request)
appstruct = sheet.get()
value = appstruct[field]
return value
[docs] def get_sheets_all(self, context: object,
request: Request=None) -> [IResourceSheet]:
"""Get all sheets for `context`."""
iresource = get_iresource(context)
metas = self._get_sheets_meta(iresource)
sheets = [self.get_sheet(context, m.isheet, request) for m in metas]
return sheets
[docs] def get_sheets_create(self, context: object,
request: Request=None,
iresource: IInterface=None) -> [IResourceSheet]:
"""Get creatable sheets for `context` or `iresource`.
:param request: If not None filter by sheet create permission.
:param iresource: If not None return sheets for this resource type.
The `creating` sheet attribute is set to the resource metadata
of this type. The returned sheets should only be used to
deserialize data to create a new resource.
"""
if iresource:
creating = self.resources_meta[iresource]
else:
creating = None
iresource = get_iresource(context)
metas = self._get_sheets_meta(iresource, filter_attr='creatable')
if request:
metas = self._filter_permission(metas, context, request,
permission_attr='permission_create'
)
sheets = [self.get_sheet(context, m.isheet, request=request,
creating=creating) for m in metas]
return sheets
[docs] def get_sheets_edit(self, context: object,
request: Request=None) -> [IResourceSheet]:
"""Get editable sheets for `context`.
:param request: If not None filter by sheet edit permission.
"""
iresource = get_iresource(context)
metas = self._get_sheets_meta(iresource, filter_attr='editable')
if request:
metas = self._filter_permission(metas, context, request,
permission_attr='permission_edit')
sheets = [self.get_sheet(context, m.isheet, request=request)
for m in metas]
return sheets
[docs] def get_sheets_read(self, context: object,
request: Request=None) -> [IResourceSheet]:
"""Get readable sheets for `context`.
:param request: If not None filter by sheet edit permission.
"""
iresource = get_iresource(context)
metas = self._get_sheets_meta(iresource, filter_attr='readable')
if request:
metas = self._filter_permission(metas, context, request,
permission_attr='permission_view')
sheets = [self.get_sheet(context, m.isheet, request=request)
for m in metas]
return sheets
def _get_sheets_meta(self, iresource: IInterface,
filter_attr='') -> [SheetMetadata]:
resource_meta = self.resources_meta[iresource]
isheets = resource_meta.basic_sheets + resource_meta.extended_sheets
for isheet in isheets:
meta = self.sheets_meta[isheet]
enabled = True
if filter_attr:
enabled = getattr(meta, filter_attr)
if enabled:
yield meta
@staticmethod
def _filter_permission(metas: [ResourceMetadata],
context: object,
request: Request,
permission_attr: str) -> [ResourceMetadata]:
for meta in metas:
permission = getattr(meta, permission_attr)
if request.has_permission(permission, context):
yield(meta)
[docs] def resolve_isheet_field_from_dotted_string(self, dotted: str) -> tuple:
"""Resolve `dotted` string to isheet and field name and schema node.
:dotted: isheet.__identifier__ and field_name separated by ':'
:return: tuple with isheet (ISheet), field_name (str), field schema
node (colander.SchemaNode).
:raise ValueError: If the string is not dotted or it cannot be
resolved to isheet and field name.
"""
if ':' not in dotted:
raise ValueError(
'Not a colon-separated dotted string: {}'.format(dotted))
name = ''.join(dotted.split(':')[:-1])
field = dotted.split(':')[-1]
try:
isheet = resolver.resolve(name)
except ImportError:
raise ValueError('No such sheet: {}'.format(name))
if not (IInterface.providedBy(isheet) and isheet.isOrExtends(ISheet)):
raise ValueError('Not a sheet: {}'.format(name))
schema = self.sheets_meta[isheet].schema_class()
node = schema.get(field, None)
if not node:
raise ValueError('No such field: {}'.format(dotted))
return isheet, field, node
[docs] def get_workflow(self, context: object) -> IWorkflow:
"""Get workflow of `context` or None."""
from adhocracy_core.sheets.workflow import IWorkflowAssignment
try:
name = self.get_sheet_field(context,
IWorkflowAssignment,
'workflow')
except RuntimeConfigurationError:
name = ''
if name:
workflow = self.workflows[name]
else:
workflow = None
return workflow
[docs] def can_add_anonymized(self, context: object, request: Request) -> bool:
"""Check if children can be created/added anonymized to `context`."""
if IItem.providedBy(context):
# if item was anonymized you can also add versions anonymized
is_allowed = is_created_anonymized(context)
else:
is_allowed = IAllowAddAnonymized.providedBy(context)
has_permission = request.has_permission(ANONYMIZE_PERMISSION, context)
return is_allowed and has_permission
[docs] def can_edit_anonymized(self, context: object, request: Request) -> bool:
"""Check if `context` may be edited anonymously."""
can_anonymize = _is_anonymized_and_has_permission(context, request)
return can_anonymize
[docs] def can_delete_anonymized(self, context: object, request: Request) -> bool:
"""Check if `context` may be deleted anonymously."""
can_anonymize = _is_anonymized_and_has_permission(context, request)
return can_anonymize
[docs] def is_password_required(self, context: object, request: Request) -> bool:
"""Check if some sheets of `context` require a password for editing."""
sheets_edit = self.get_sheets_edit(context, request)
return is_password_required_to_edit_some(sheets_edit)
def _is_anonymized_and_has_permission(context: object,
request: Request) -> bool:
is_anonymized = is_created_anonymized(context)
has_permission = request.has_permission(ANONYMIZE_PERMISSION, context)
return is_anonymized and has_permission
[docs]def includeme(config): # pragma: no cover
"""Add content registry, register substanced content_type decorators."""
config.registry.content = ResourceContentRegistry(config.registry)
config.add_directive('add_content_type', add_content_type)
config.add_directive('add_service_type', add_service_type)