"""Finite state machines for resources."""
from colander import Invalid
from pyramid.interfaces import IRequest
from pyramid.registry import Registry
from pyramid.renderers import render
from pyramid.request import Request
from pyramid.threadlocal import get_current_registry
from pyrsistent import freeze
from pyrsistent import PMap
from substanced.workflow import Workflow
from substanced.workflow import WorkflowError
from substanced.util import find_service
from zope.deprecation import deprecated
from zope.interface import implementer
from zope.interface import Interface
from adhocracy_core.authorization import acm_to_acl
from adhocracy_core.authorization import create_fake_god_request
from adhocracy_core.authorization import add_local_roles
from adhocracy_core.authorization import set_acl
from adhocracy_core.exceptions import ConfigurationError
from adhocracy_core.interfaces import DEFAULT_USER_GROUP_NAME
from adhocracy_core.interfaces import IAdhocracyWorkflow
from adhocracy_core.interfaces import IPool
from adhocracy_core.interfaces import search_query
from adhocracy_core.sheets.workflow import IWorkflowAssignment
from adhocracy_core.workflows.schemas import create_workflow_meta_schema
[docs]class ISample(Interface):
"""Sample workflow."""
deprecated('ISample', 'Backward compatible code, remove after migration')
[docs]class ACLLocalRolesState(dict):
"""Workflow state setting :term:`acl` and adding :term:`local_roles`."""
def __init__(self, acl: list=None, local_roles: dict=None, **kw):
self.acl = acl
""":term:`acl` set for `context`"""
self.local_roles = local_roles
""":class:`adhocracy_core.schema.LocalRoles` added to
:term:`local_roles` of `context.`
"""
def __call__(self, context, request, transition, workflow):
registry = getattr(request, 'registry', None)
if registry is None:
registry = get_current_registry(context)
if self.acl is not None:
set_acl(context, self.acl, registry)
if self.local_roles is not None:
add_local_roles(context,
self.local_roles,
registry)
@implementer(IAdhocracyWorkflow)
[docs]class ACLLocalRolesWorkflow(Workflow):
"""Workflow using `ACLLocalRolesState` to setup states."""
_state_factory = ACLLocalRolesState
[docs] def get_next_states(self, context, request: IRequest) -> list:
"""Get states you can trigger a transition to."""
state = self.state_of(context)
transitions = self.get_transitions(context, request, from_state=state)
states = [t['to_state'] for t in transitions]
return list(set(states))
[docs] def update_acl(self, context) -> list:
"""Reset the local permission :term:`acl` for `context`."""
state = self.state_of(context)
self._states[state](context, None, None, self)
[docs]def update_workflow_state_acls(context: IPool, registry: Registry):
"""Update :term:`acl` of current workflow state for all resources."""
catalog = find_service(context, 'catalogs')
query = search_query._replace(interfaces=IWorkflowAssignment)
resources = catalog.search(query).elements
for resource in resources:
workflow = registry.content.get_workflow(resource)
if workflow is None:
continue
workflow.update_acl(resource)
[docs]def add_workflow(registry: Registry, workflow_asset: str, name: str):
"""Create and add workflow to registry.
:param registry: registry to register the workflow and store meta data.
:param workfow_asset: yaml asset file to import the workflow metadata.
The data schema is :class:`adhocracy_core.workflows.schemas.Workflow`.
:param name: identifier for the workflow
:raises adhocracy_core.exceptions.ConfigurationError: if the validation
for :term:`cstruct` or the sanity checks in
class:`substanced.workflow.Workflow` fail.
"""
cstruct = _get_meta(workflow_asset, registry)
appstruct = _deserialize_meta(cstruct, name)
appstruct = _add_defaults(appstruct, registry)
registry.content.workflows_meta[name] = appstruct
workflow = _create_workflow(appstruct, name)
registry.content.workflows[name] = workflow
def _get_meta(workflow_asset: str, registry: Registry) -> dict:
dummy_request = Request.blank('/') # pass the local registry in tests
dummy_request.registry = registry
cstruct = render(workflow_asset, {}, request=dummy_request)
return cstruct
def _deserialize_meta(cstruct: dict, name: str) -> PMap:
schema = create_workflow_meta_schema(cstruct)
try:
appstruct = schema.deserialize(cstruct)
except Invalid as err:
msg = 'Error add workflow with name {0}: {1}'
raise ConfigurationError(msg.format(name, str(err.asdict())))
return freeze(appstruct)
def _add_defaults(appstruct: PMap, registry: Registry) -> PMap:
"""Add values form default workflow to `appstruct`."""
default_name = appstruct.get('defaults', '')
if not default_name:
return appstruct
updated = registry.content.workflows_meta[default_name]
for key, value in appstruct.items():
if key in ['initial_state', 'defaults', 'auto_transition',
'add_local_role_participant_to_default_group']:
updated = updated.transform([key], value)
elif key == 'transitions':
for transition_name, transition in value.items():
updated = updated.transform(['transitions', transition_name],
transition)
elif key == 'states': # pragma: no branch
for state_name, state in value.items():
for permission in state.get('acm', {}).get('permissions', []):
name = permission[0]
permissions = \
updated['states'][state_name]['acm']['permissions']
overwriting = name in [p[0] for p in permissions]
if overwriting:
updated = updated.transform(
['states', state_name, 'acm', 'permissions',
match_permission(updated, state_name, name)],
permission)
else:
updated_permissions = permissions.append(permission)
updated = updated.transform(
['states', state_name, 'acm', 'permissions'],
updated_permissions)
return updated
def _create_workflow(appstruct: PMap,
name: str) -> Workflow:
initial_state = appstruct['initial_state']
workflow = ACLLocalRolesWorkflow(initial_state=initial_state, type=name)
if appstruct.get('add_local_role_participant_to_default_group', False):
group = 'group:' + DEFAULT_USER_GROUP_NAME
local_roles = {group: {'role:participant'}}
else:
local_roles = None
for name, data in appstruct['states'].items():
acm = data.get('acm', {})
acl = acm and acm_to_acl(acm) or []
workflow.add_state(name, callback=None, acl=acl,
local_roles=local_roles)
for name, data in appstruct['transitions'].items():
workflow.add_transition(name, **data)
try:
workflow.check()
except WorkflowError as err:
msg = 'Error add workflow with name {0}: {1}'
raise ConfigurationError(msg.format(name, str(err)))
return workflow
[docs]def add_workflow_directive(config, workflow: str, name: str):
"""Create `add_workflow` pyramid config directive.
Example usage::
config.add_workflow('mypackage:myworkflow.yaml', 'myworkflow)
"""
config.action(('add_workflow', name),
add_workflow,
args=(config.registry, workflow, name))
[docs]def transition_to_states(context, states: [str], registry: Registry,
reset=False):
"""Initialize workflow if needed and do transitions to the given states.
:raises substanced.workflow.WorkflowError: if transition is missing to
do transitions to `states`.
"""
request = create_fake_god_request(registry)
workflow = registry.content.get_workflow(context)
# TODO: raise if workflow is None
if not workflow.has_state(context) or reset:
workflow.initialize(context)
for state in states:
workflow.transition_to_state(context, request, state)
[docs]def match_permission(acm, state, permission):
"""Create a function matching a permission in an ACM.
The function can be used as matcher with the `transform`
function of pyrsistent when transforming an existing ACM to select
a specific permission to change.
"""
def matcher(idx):
return acm['states'][state]['acm']['permissions'][idx][0] == permission
return matcher
[docs]def includeme(config): # pragma: no cover
"""Include workflows and add 'add_workflow' config directive."""
config.add_directive('add_workflow', add_workflow_directive)
config.add_workflow('adhocracy_core.workflows:sample.yaml',
'sample')
config.add_workflow('adhocracy_core.workflows:standard.yaml',
'standard')
config.add_workflow('adhocracy_core.workflows:standard_private.yaml',
'standard_private')
config.add_workflow('adhocracy_core.workflows:debate.yaml',
'debate')
config.add_workflow('adhocracy_core.workflows:debate_private.yaml',
'debate_private')
config.add_workflow('adhocracy_core.workflows:badge_assignment.yaml',
'badge_assignment')