Source code for adhocracy_core.resources.subscriber

"""Autoupdate resources."""
from base64 import b64encode
from collections import Sequence
from io import BytesIO
from logging import getLogger
from os import urandom
import requests

from pyramid.interfaces import IApplicationCreated
from pyramid.registry import Registry
from pyramid.request import Request
from pyramid.traversal import find_interface
from pyramid.i18n import get_localizer
from pyramid.i18n import TranslationStringFactory
from substanced.util import find_service
from substanced.file import File
from substanced.file import USE_MAGIC

from adhocracy_core.activity import generate_activity_description
from adhocracy_core.authorization import set_acms_for_app_root
from adhocracy_core.interfaces import IResource
from adhocracy_core.interfaces import IItem
from adhocracy_core.interfaces import IItemVersion
from adhocracy_core.interfaces import IPool
from adhocracy_core.interfaces import ISimple
from adhocracy_core.interfaces import ISheet
from adhocracy_core.interfaces import IActivitiesGenerated
from adhocracy_core.interfaces import IResourceCreatedAndAdded
from adhocracy_core.interfaces import ISheetReferenceAutoUpdateMarker
from adhocracy_core.interfaces import ISheetReferenceNewVersion
from adhocracy_core.interfaces import IItemVersionNewVersionAdded
from adhocracy_core.interfaces import IResourceSheetModified
from adhocracy_core.interfaces import DEFAULT_USER_GROUP_NAME
from adhocracy_core.resources.activity import IActivity
from adhocracy_core.resources.principal import IGroup
from adhocracy_core.resources.principal import IUser
from adhocracy_core.resources.principal import IPasswordReset
from adhocracy_core.resources.asset import add_metadata
from adhocracy_core.resources.asset import IAsset
from adhocracy_core.resources.image import add_image_size_downloads
from adhocracy_core.resources.image import IImage
from adhocracy_core.sheets.principal import IPermissions
from adhocracy_core.sheets.tags import ITags
from adhocracy_core.exceptions import AutoUpdateNoForkAllowedError
from adhocracy_core.utils import find_graph
from adhocracy_core.utils import get_changelog_metadata
from adhocracy_core.utils import get_iresource
from adhocracy_core.utils import get_modification_date
from adhocracy_core.sheets.versions import IVersionable
from adhocracy_core.sheets.metadata import IMetadata
from adhocracy_core.sheets.asset import IAssetData
from adhocracy_core.sheets.image import IImageReference
from adhocracy_core.sheets.principal import IActivationConfiguration
from adhocracy_core.sheets.principal import IPasswordAuthentication
from adhocracy_core.sheets.principal import IEmailNew

import adhocracy_core.sheets.activity

logger = getLogger(__name__)

_ = TranslationStringFactory('adhocracy')


[docs]def update_modification_date_modified_by(event): """Update the IMetadata fields `modified_by` and `modification_date`.""" sheet = event.registry.content.get_sheet(event.object, IMetadata, request=event.request) appstruct = {} appstruct['modification_date'] = get_modification_date(event.registry) if event.request is not None: appstruct['modified_by'] = event.request.user sheet.set(appstruct, send_event=False, omit_readonly=False, )
[docs]def add_default_group_to_user(event): """Add default group to user if no group is set.""" group = _get_default_group(event.object) if group is None: return user_groups = _get_user_groups(event.object, event.registry) if user_groups: return None _add_user_to_group(event.object, group, event.registry)
def _get_default_group(context) -> IGroup: groups = find_service(context, 'principals', 'groups') default_group = groups.get(DEFAULT_USER_GROUP_NAME, None) return default_group def _get_user_groups(user: IUser, registry: Registry): from pyramid.traversal import resource_path from adhocracy_core.interfaces import IRolesUserLocator request = Request.blank('/') request.registry = registry locator = registry.getMultiAdapter((user, request), IRolesUserLocator) user_id = resource_path(user) groups = locator.get_groups(user_id) return groups def _add_user_to_group(user: IUser, group: IGroup, registry: Registry): sheet = registry.content.get_sheet(user, IPermissions) groups = sheet.get()['groups'] groups = groups + [group] sheet.set({'groups': groups})
[docs]def autoupdate_versionable_has_new_version(event): """Auto updated versionable resource if a reference has new version. :raises AutoUpdateNoForkAllowedError: if a fork is created but not allowed """ if not _is_in_root_version_subtree(event): return sheet = event.registry.content.get_sheet(event.object, event.isheet) if not sheet.meta.editable: return appstruct = _get_updated_appstruct(event, sheet) new_version = _get_last_version_created_in_transaction(event) if new_version is None: if _new_version_needed_and_not_forking(event): _create_new_version(event, appstruct) else: new_version_sheet = event.registry.content.get_sheet(new_version, event.isheet) new_version_sheet.set(appstruct)
def _is_in_root_version_subtree(event: ISheetReferenceNewVersion) -> bool: if event.root_versions == []: return True graph = find_graph(event.object) return graph.is_in_subtree(event.object, event.root_versions) def _get_updated_appstruct(event: ISheetReferenceNewVersion, sheet: ISheet) -> dict: appstruct = sheet.get() field = appstruct[event.isheet_field] if isinstance(field, Sequence): old_version_index = field.index(event.old_version) field.pop(old_version_index) field.insert(old_version_index, event.new_version) else: appstruct[event.isheet_field] = event.new_version return appstruct def _get_last_version_created_in_transaction(event: ISheetReferenceNewVersion)\ -> IItemVersion: if event.is_batchmode: item = find_interface(event.object, IItem) changelog = get_changelog_metadata(item, event.registry) new_version = changelog.last_version else: changelog = get_changelog_metadata(event.object, event.registry) if changelog.created: new_version = event.object else: new_version = changelog.followed_by return new_version def _new_version_needed_and_not_forking(event: ISheetReferenceNewVersion)\ -> bool: """Check whether to autoupdate if resource is non-forkable. If the given resource is the last version or there's no last version yet, do autoupdate. If it's not the last version, but references the same object (namely the one which caused the autoupdate), don't update. If it's not the last version, but references a different object, throw an AutoUpdateNoForkAllowedError. This should only happen in batch requests. """ last = _get_last_version(event.object, event.registry) if last is None or last is event.object: return True value = event.registry.content.get_sheet_field(event.object, event.isheet, event.isheet_field) last_value = event.registry.content.get_sheet_field(last, event.isheet, event.isheet_field) if last_value == value: return False else: raise AutoUpdateNoForkAllowedError(event.object, event) def _get_last_version(resource: IItemVersion, registry: Registry) -> IItemVersion: """Get last version of resource' according to the last tag.""" item = find_interface(resource, IItem) last = registry.content.get_sheet_field(item, ITags, 'LAST') return last def _create_new_version(event, appstruct) -> IResource: appstructs = _get_writable_appstructs(event.object, event.registry) appstructs[IVersionable.__identifier__]['follows'] = [event.object] appstructs[event.isheet.__identifier__] = appstruct registry = event.registry iresource = get_iresource(event.object) new_version = registry.content.create(iresource.__identifier__, parent=event.object.__parent__, appstructs=appstructs, creator=event.creator, registry=event.registry, root_versions=event.root_versions, is_batchmode=event.is_batchmode, autoupdated=True, ) return new_version def _get_writable_appstructs(resource, registry) -> dict: appstructs = {} sheets = registry.content.get_sheets_all(resource) for sheet in sheets: editable = sheet.meta.editable creatable = sheet.meta.creatable if editable or creatable: # pragma: no branch appstructs[sheet.meta.isheet.__identifier__] = sheet.get() return appstructs
[docs]def autoupdate_non_versionable_has_new_version(event): """Auto update non versionable resources if a reference has new version.""" if not _is_in_root_version_subtree(event): return sheet = event.registry.content.get_sheet(event.object, event.isheet) if not sheet.meta.editable: return appstruct = _get_updated_appstruct(event, sheet) sheet.set(appstruct, autoupdated=True)
[docs]def send_password_reset_mail(event): """Send mail with reset password link if a reset resource is created.""" user = event.registry.content.get_sheet_field(event.object, IMetadata, 'creator') password_reset = event.object event.registry.messenger.send_password_reset_mail(user, password_reset)
[docs]def send_password_change_mail(event): """Send mail notification when password has changed.""" user = event.registry.content.get_sheet_field(event.object, IMetadata, 'creator') event.registry.messenger.send_password_change_mail(user)
[docs]def apply_user_activation_configuration(event): """Activate user or send activation or invite email.""" user = event.object registry = event.registry sheet = registry.content.get_sheet(user, IActivationConfiguration) activation_config = sheet.get()['activation'] if activation_config == 'direct': user.activate() elif activation_config == 'registration_mail': _send_activation_mail(user, registry) elif activation_config == 'invitation_mail': # pragma: no branch _send_invitation_mail(user, registry)
def _send_activation_mail(user, registry): activation_path = _generate_activation_path() user.activation_path = activation_path messenger = getattr(registry, 'messenger', None) if messenger is not None: # ease testing messenger.send_registration_mail(user, activation_path) def _generate_activation_path() -> str: random_bytes = urandom(18) # TODO: not DRY, .resources.generate_name does almost the same # We use '+_' as altchars since both are reliably recognized in URLs, # even if they occur at the end. Conversely, '-' at the end of URLs is # not recognized as part of the URL by some programs such as Thunderbird, # and '/' might cause problems as well, especially if it occurs multiple # times in a row. return '/activate/' + b64encode(random_bytes, altchars=b'+_').decode() def _send_invitation_mail(user, registry): resets = find_service(user, 'principals', 'resets') reset = registry.content.create(IPasswordReset.__identifier__, resets, creator=user, send_event=False, ) messenger = getattr(registry, 'messenger', None) if messenger is not None: # ease testing messenger.send_invitation_mail(user, reset)
[docs]def send_new_user_email_activation(event): """Send new email address activation email.""" registry = event.registry user = event.object new_email = registry.content.get_sheet_field(user, IEmailNew, 'email') if new_email: activation_path = _generate_activation_path() user.activation_path = activation_path messenger = getattr(registry, 'messenger', None) messenger.send_new_email_activation_mail(user, activation_path, new_email)
[docs]def update_asset_download(event): """Update asset download.""" add_metadata(event.object, event.registry)
[docs]def update_image_downloads(event): """Update image downloads.""" add_image_size_downloads(event.object, event.registry)
[docs]def download_external_picture_for_created(event: IResourceCreatedAndAdded): """Download external_picture_url for new resources.""" if IItemVersion.providedBy(event.object): return # download_external_picture_for_version takes care for it registry = event.registry new_url = registry.content.get_sheet_field(event.object, IImageReference, 'external_picture_url') _download_picture_url(event.object, '', new_url, registry)
[docs]def download_external_picture_for_version(event: IItemVersionNewVersionAdded): """Download external_picture_url for new item versions.""" registry = event.registry old_url = registry.content.get_sheet_field(event.object, IImageReference, 'external_picture_url') new_url = registry.content.get_sheet_field(event.new_version, IImageReference, 'external_picture_url') _download_picture_url(event.new_version, old_url, new_url, registry)
[docs]def download_external_picture_for_edited(event: IResourceSheetModified): """Download external_picture_url for edited resources.""" old_url = event.old_appstruct.get('external_picture_url', '') new_url = event.new_appstruct.get('external_picture_url', '') _download_picture_url(event.object, old_url, new_url, event.registry)
def _download_picture_url(context: IImageReference, old_url: str, new_url: str, registry: Registry): if old_url == new_url: return elif new_url == '': _set_picture_reference(context, None, registry) else: file = _download(new_url) image = _create_image(context, file, registry) _set_picture_reference(context, image, registry) def _set_picture_reference(context: IResource, value: IImage, registry: Registry): sheet = registry.content.get_sheet(context, IImageReference) sheet.set({'picture': value}, send_event=False) def _download(url: str) -> File: resp = requests.get(url, timeout=5) content = BytesIO(resp.content) file = File(stream=content, mimetype=USE_MAGIC) file.size = resp.headers['Content-Length'] return file def _create_image(context: IResource, file: File, registry: Registry) -> IImage: assets = find_service(context, 'assets') appstructs = {IAssetData.__identifier__: {'data': file}} image = registry.content.create(IImage.__identifier__, parent=assets, appstructs=appstructs, registry=registry, ) return image
[docs]def add_activities_to_activity_stream(event: IActivitiesGenerated): """Add activity resources to activity_stream.""" request = event.request settings = request.registry['config'] activity_stream_enabled = settings.adhocracy.activity_stream.enabled if not activity_stream_enabled: return activities = event.activities service = find_service(request.root, 'activity_stream') translate = get_localizer(request).translate for activity in activities: description = generate_activity_description(activity, request) description_full = translate(description) appstructs = { adhocracy_core.sheets.activity.IActivity.__identifier__: { 'subject': activity.subject, 'type': activity.type.value, 'object': activity.object, 'target': activity.target, 'name': description_full, 'published': activity.published, } } request.registry.content.create(IActivity.__identifier__, appstructs=appstructs, parent=service, registry=request.registry)
[docs]def includeme(config): """Register subscribers.""" config.add_subscriber(set_acms_for_app_root, IApplicationCreated) config.add_subscriber(autoupdate_versionable_has_new_version, ISheetReferenceNewVersion, object_iface=IItemVersion, event_isheet=ISheetReferenceAutoUpdateMarker) config.add_subscriber(autoupdate_non_versionable_has_new_version, ISheetReferenceNewVersion, object_iface=IPool, event_isheet=ISheetReferenceAutoUpdateMarker) config.add_subscriber(autoupdate_non_versionable_has_new_version, ISheetReferenceNewVersion, object_iface=ISimple, event_isheet=ISheetReferenceAutoUpdateMarker) config.add_subscriber(add_default_group_to_user, IResourceCreatedAndAdded, object_iface=IUser) config.add_subscriber(apply_user_activation_configuration, IResourceCreatedAndAdded, object_iface=IUser) config.add_subscriber(send_new_user_email_activation, IResourceSheetModified, event_isheet=IEmailNew) config.add_subscriber(update_modification_date_modified_by, IResourceSheetModified, object_iface=IMetadata) config.add_subscriber(send_password_reset_mail, IResourceCreatedAndAdded, object_iface=IPasswordReset) config.add_subscriber(send_password_change_mail, IResourceSheetModified, event_isheet=IPasswordAuthentication) config.add_subscriber(update_asset_download, IResourceSheetModified, object_iface=IAsset, event_isheet=IAssetData) config.add_subscriber(update_image_downloads, IResourceSheetModified, object_iface=IImage, event_isheet=IAssetData) config.add_subscriber(download_external_picture_for_created, IResourceCreatedAndAdded, object_iface=IImageReference) config.add_subscriber(download_external_picture_for_version, IItemVersionNewVersionAdded, object_iface=IImageReference) config.add_subscriber(download_external_picture_for_edited, IResourceSheetModified, event_isheet=IImageReference) config.add_subscriber(add_activities_to_activity_stream, IActivitiesGenerated)