"""GET/POST/PUT requests processing."""
from collections import defaultdict
from copy import deepcopy
from logging import getLogger
from substanced.util import find_service
from substanced.stats import statsd_timer
from pyramid.interfaces import IRequest
from pyramid.view import view_defaults
from pyramid.security import remember
from pyramid.traversal import resource_path
from pyramid.registry import Registry
from zope.interface.interfaces import IInterface
from zope.interface import Interface
import colander
from adhocracy_core.authentication import UserPasswordHeader
from adhocracy_core.authentication import UserTokenHeader
from adhocracy_core.authentication import AnonymizeHeader
from adhocracy_core.interfaces import API_ROUTE_NAME
from adhocracy_core.authorization import is_password_required_to_edit_some
from adhocracy_core.authorization import is_password_required_to_edit
from adhocracy_core.interfaces import IResource
from adhocracy_core.interfaces import IItem
from adhocracy_core.interfaces import IItemVersion
from adhocracy_core.interfaces import ISimple
from adhocracy_core.interfaces import ISheet
from adhocracy_core.interfaces import IPool
from adhocracy_core.resources.asset import IAsset
from adhocracy_core.resources.asset import IAssetDownload
from adhocracy_core.resources.asset import IAssetsService
from adhocracy_core.resources.principal import IUsersService
from adhocracy_core.resources.principal import IPasswordReset
from adhocracy_core.resources.proposal import IProposalVersion
from adhocracy_core.resources.rate import IRateVersion
from adhocracy_core.resources.badge import IBadgeAssignmentsService
from adhocracy_core.rest import api_view
from adhocracy_core.rest.schemas import ResourceResponseSchema
from adhocracy_core.rest.schemas import ItemResponseSchema
from adhocracy_core.rest.schemas import POSTActivateAccountViewRequestSchema
from adhocracy_core.rest.schemas import POSTItemRequestSchema
from adhocracy_core.rest.schemas import POSTLoginEmailRequestSchema
from adhocracy_core.rest.schemas import POSTLoginUsernameRequestSchema
from adhocracy_core.rest.schemas import POSTMessageUserViewRequestSchema
from adhocracy_core.rest.schemas import POSTCreatePasswordResetRequestSchema
from adhocracy_core.rest.schemas import POSTPasswordResetRequestSchema
from adhocracy_core.rest.schemas import POSTReportAbuseViewRequestSchema
from adhocracy_core.rest.schemas import POSTResourceRequestSchema
from adhocracy_core.rest.schemas import PUTResourceRequestSchema
from adhocracy_core.rest.schemas import GETPoolRequestSchema
from adhocracy_core.rest.schemas import GETItemResponseSchema
from adhocracy_core.rest.schemas import GETResourceResponseSchema
from adhocracy_core.rest.schemas import DELETEResourceResponseSchema
from adhocracy_core.rest.schemas import POSTLoginServiceKontoSchema
from adhocracy_core.rest.schemas import options_resource_response_data_dict
from adhocracy_core.schema import SchemaNode
from adhocracy_core.schema import AbsolutePath
from adhocracy_core.schema import References
from adhocracy_core.sheets.badge import get_assignable_badges
from adhocracy_core.sheets.badge import IBadgeAssignment
from adhocracy_core.sheets.workflow import IWorkflowAssignment
from adhocracy_core.sheets.pool import IPool as IPoolSheet
from adhocracy_core.sheets.versions import IVersionable
from adhocracy_core.sheets.tags import ITags
from adhocracy_core.utils import extract_events_from_changelog_metadata
from adhocracy_core.utils import is_batchmode
from adhocracy_core.utils import to_dotted_name
from adhocracy_core.utils import is_created_in_current_transaction
from adhocracy_core.utils import create_schema
from adhocracy_core.resources.root import IRootPool
from adhocracy_core.workflows.schemas import create_workflow_meta_schema
logger = getLogger(__name__)
@view_defaults(
context=IResource,
)
[docs]class ResourceRESTView:
"""Default view for Resources, implements get and options."""
def __init__(self, context, request):
"""Initialize self."""
self.context = context
"""Context Resource."""
self.request = request
""":class:`pyramid.request.Request`."""
self.registry = request.registry
""":class:`pyramid.registry.Registry`."""
self.content = request.registry.content
""":class:`adhocracy_core.content.ResourceContentRegistry`."""
@api_view(request_method='OPTIONS')
[docs] def options(self) -> dict:
"""Get possible request/response data structures and http methods."""
with statsd_timer('process.options', rate=.1, registry=self.registry):
cstruct = self._options(self.context, self.request)
return cstruct
def _options(self, context: IResource, request: IRequest) -> dict:
empty = {} # tiny performance tweak
cstruct = deepcopy(options_resource_response_data_dict)
if request.has_permission('edit_some', context):
edits = self.content.get_sheets_edit(context, request)
put_sheets = [(s.meta.isheet.__identifier__, empty) for s in edits]
can_anonymize = self.content.can_edit_anonymized(context, request)
allow_password = is_password_required_to_edit_some(edits)
headers_dict = {}
if put_sheets:
put_sheets_dict = dict(put_sheets)
self._add_workflow_edit_permission_info(put_sheets_dict, edits)
cstruct['PUT']['request_body']['data'] = put_sheets_dict
if can_anonymize:
headers_dict[AnonymizeHeader] = []
if allow_password:
headers_dict[UserPasswordHeader] = []
cstruct['PUT']['request_headers'] = headers_dict
else:
del cstruct['PUT']
else:
del cstruct['PUT']
if request.has_permission('view', context):
views = self.content.get_sheets_read(context, request)
get_sheets = [(s.meta.isheet.__identifier__, empty) for s in views]
if get_sheets:
cstruct['GET']['response_body']['data'] = dict(get_sheets)
else:
del cstruct['GET']
else:
del cstruct['GET']
if request.has_permission('delete', context):
can_anonymize = self.content.can_delete_anonymized(context,
request)
headers_dict = can_anonymize and {AnonymizeHeader: []} or {}
cstruct['DELETE']['request_headers'] = headers_dict
else:
del cstruct['DELETE']
is_users = IUsersService.providedBy(context) \
and request.has_permission('create_user', context)
# TODO move the is_user specific part the UsersRestView
if request.has_permission('create', self.context) or is_users:
addables = self.content.get_resources_meta_addable(context,
request)
can_anonymize = self.content.can_add_anonymized(context, request)
if addables:
for resource_meta in addables:
iresource = resource_meta.iresource
resource_typ = iresource.__identifier__
creates = self.content.get_sheets_create(context,
request,
iresource)
sheet_typs = [s.meta.isheet.__identifier__ for s
in creates]
sheets_dict = dict.fromkeys(sheet_typs, empty)
post_data = {'content_type': resource_typ,
'data': sheets_dict}
cstruct['POST']['request_body'].append(post_data)
headers_dict = can_anonymize and {AnonymizeHeader: []} or {}
cstruct['POST']['request_headers'] = headers_dict
else:
del cstruct['POST']
else:
del cstruct['POST']
return cstruct
def _add_workflow_edit_permission_info(self, cstruct: dict, edit_sheets):
"""Add info if a user may set the workflow_state workflow field."""
workflow_sheets = [s for s in edit_sheets
if s.meta.isheet.isOrExtends(IWorkflowAssignment)]
for sheet in workflow_sheets:
workflow_name = sheet.get()['workflow']
workflow = self.registry.content.workflows.get(workflow_name, None)
if workflow is None:
states = []
else:
states = workflow.get_next_states(self.context, self.request)
isheet = sheet.meta.isheet
cstruct[isheet.__identifier__] = {'workflow_state': states}
@api_view(
request_method='GET',
permission='view',
)
[docs] def get(self) -> dict:
"""Get resource data (unless hidden)."""
metric = self._get_get_metric_name()
with statsd_timer(metric, rate=.1, registry=self.registry):
schema = create_schema(GETResourceResponseSchema,
self.context,
self.request)
cstruct = schema.serialize()
cstruct['data'] = self._get_sheets_data_cstruct()
return cstruct
def _get_get_metric_name(self) -> str:
if self.request.validated:
return 'process.get'
else:
return 'process.get.query'
def _get_sheets_data_cstruct(self):
queryparams = self.request.validated if self.request.validated else {}
sheets_view = self.content.get_sheets_read(self.context,
self.request)
data_cstruct = {}
for sheet in sheets_view:
key = sheet.meta.isheet.__identifier__
if sheet.meta.isheet is IPoolSheet:
cstruct = sheet.serialize(params=queryparams)
else:
cstruct = sheet.serialize()
data_cstruct[key] = cstruct
return data_cstruct
def _build_updated_resources_dict(registry: Registry) -> dict:
result = defaultdict(list)
for meta in registry.changelog.values():
events = extract_events_from_changelog_metadata(meta)
for event in events:
result[event].append(meta.resource)
return result
@view_defaults(
context=ISimple,
)
[docs]class SimpleRESTView(ResourceRESTView):
"""View for simples (non versionable), implements get, options and put."""
@api_view(
request_method='PUT',
permission='edit_some',
schema=PUTResourceRequestSchema,
accept='application/json',
)
[docs] def put(self) -> dict:
"""Edit resource and get response data."""
with statsd_timer('process.put', rate=.1, registry=self.registry):
sheets = self.content.get_sheets_edit(self.context, self.request)
sheets = self._filter_require_password_header(sheets)
appstructs = self.request.validated.get('data', {})
for sheet in sheets:
name = sheet.meta.isheet.__identifier__
if name in appstructs:
sheet.set(appstructs[name])
appstruct = {}
if not is_batchmode(self.request): # pragma: no branch
updated = _build_updated_resources_dict(self.registry)
appstruct['updated_resources'] = updated
schema = create_schema(ResourceResponseSchema,
self.context,
self.request)
cstruct = schema.serialize(appstruct)
return cstruct
def _filter_require_password_header(self, sheets):
if hasattr(self.request, 'password') and self.request.password:
return sheets
sheets_without_password = [sheet for sheet in sheets
if not is_password_required_to_edit(sheet)]
return sheets_without_password
@api_view(
request_method='DELETE',
permission='delete',
)
[docs] def delete(self) -> dict:
"""Delete resource."""
parent = self.context.__parent__
name = self.context.__name__
parent.remove(name, registry=self.registry)
if is_batchmode(self.request):
appstruct = {}
else:
updated = _build_updated_resources_dict(self.registry)
appstruct = {'updated_resources': updated}
schema = create_schema(DELETEResourceResponseSchema,
self.context,
self.request)
# temporary undelete to make serialization work
self.context.__parent__ = parent
self.context.__name__ = name
# serialize appstruct
cstruct = schema.serialize(appstruct)
del self.context.__parent__
del self.context.__name__
return cstruct
# TODO refactor (method is to long and not DRY)
@view_defaults(
context=IPool,
)
[docs]class PoolRESTView(SimpleRESTView):
"""View for Pools, implements get, options, put and post."""
@api_view(
request_method='GET',
schema=GETPoolRequestSchema,
permission='view',
)
[docs] def get(self) -> dict:
"""Get resource data."""
# This delegation method is necessary since otherwise validation_GET
# won't be found.
return super().get()
@api_view(
request_method='DELETE',
permission='delete',
)
[docs] def delete(self) -> dict: # pragma: no cover
"""Delete resource."""
return super().delete()
[docs] def build_post_response(self, resource) -> dict:
"""Build response data structure for a POST request."""
appstruct = {}
if IItem.providedBy(resource):
first = self.registry.content.get_sheet_field(resource,
ITags,
'FIRST')
appstruct['first_version_path'] = first
schema = create_schema(ItemResponseSchema, resource, self.request)
else:
schema = create_schema(ResourceResponseSchema,
resource,
self.request)
if not is_batchmode(self.request):
updated = _build_updated_resources_dict(self.registry)
appstruct['updated_resources'] = updated
return schema.serialize(appstruct)
@api_view(
request_method='POST',
permission='create',
schema=POSTResourceRequestSchema,
accept='application/json',
)
[docs] def post(self) -> dict:
"""Create new resource and get response data."""
metric = self._get_post_metric_name()
with statsd_timer(metric, rate=1, registry=self.registry):
resource = self._create()
cstruct = self.build_post_response(resource)
return cstruct
def _get_post_metric_name(self) -> str:
iresource = self.request.validated['content_type']
name = 'process.post'
if iresource.isOrExtends(IProposalVersion):
name = 'process.post.proposalversion'
elif iresource.isOrExtends(IRateVersion):
name = 'process.post.rateversion'
return name
def _create(self) -> IResource:
validated = self.request.validated
kwargs = dict(parent=self.context,
appstructs=validated.get('data', {}),
creator=self.request.user,
root_versions=validated.get('root_versions', []),
request=self.request,
is_batchmode=is_batchmode(self.request),
anonymized_creator=self.request.anonymized_user,
)
iresource = validated['content_type']
return self.content.create(iresource.__identifier__, **kwargs)
@api_view(
request_method='PUT',
permission='edit_some',
schema=PUTResourceRequestSchema,
accept='application/json',
)
[docs] def put(self) -> dict:
"""HTTP PUT."""
return super().put()
@view_defaults(
context=IItem,
)
[docs]class ItemRESTView(PoolRESTView):
"""View for Items and ItemVersions, overwrites GET and POST handling."""
@api_view(
request_method='GET',
schema=GETPoolRequestSchema,
permission='view',
)
[docs] def get(self) -> dict:
"""Get resource data."""
with statsd_timer('process.get', rate=.1, registry=self.registry):
first_version = self.registry.content.get_sheet_field(self.context,
ITags,
'FIRST')
appstruct = {}
if first_version is not None:
appstruct['first_version_path'] = first_version
schema = create_schema(GETItemResponseSchema,
self.context,
self.request)
cstruct = schema.serialize(appstruct)
cstruct['data'] = self._get_sheets_data_cstruct()
return cstruct
@api_view(
request_method='DELETE',
permission='delete',
)
[docs] def delete(self) -> dict: # pragma: no cover
"""Delete resource."""
return super().delete()
@api_view(
request_method='POST',
permission='create',
schema=POSTItemRequestSchema,
accept='application/json',
)
[docs] def post(self):
"""Create new resource and get response data.
For :class:`adhocracy_core.interfaces.IItemVersion`:
If a `new version` is already created in this transaction we don't want
to create a new one. Instead we modify the existing one.
This is needed to make :class:`adhocray_core.rest.batchview.BatchView`
work.
"""
metric = self._get_post_metric_name()
with statsd_timer(metric, rate=1, registry=self.registry):
if is_batchmode(self.request) and self._creating_new_version():
last = self.registry.content.get_sheet_field(self.context,
ITags,
'LAST')
if is_created_in_current_transaction(last, self.registry):
self._update_version(last)
resource = last
else:
resource = self._create()
else:
resource = self._create()
cstruct = self.build_post_response(resource)
return cstruct
def _creating_new_version(self) -> bool:
iresource = self.request.validated['content_type']
return IItemVersion.isEqualOrExtendedBy(iresource)
def _update_version(self, resource: IVersionable):
create_sheets = self.content.get_sheets_create(resource, self.request)
is_first = self.registry.content.get_sheet_field(self.context,
ITags,
'FIRST') == resource
appstructs = self.request.validated.get('data', {})
for sheet in create_sheets:
isheet = sheet.meta.isheet
is_version_sheet = IVersionable.isEqualOrExtendedBy(isheet)
if is_version_sheet and is_first:
continue
isheet_name = isheet.__identifier__
if isheet_name in appstructs: # pragma: no branch
sheet.set(appstructs[isheet.__identifier__])
@view_defaults(
context=IBadgeAssignmentsService,
)
[docs]class BadgeAssignmentsRESTView(PoolRESTView):
"""REST view for the badge assignment."""
@api_view(
request_method='GET',
permission='view',
)
[docs] def get(self) -> dict:
"""HTTP GET."""
return super().get()
@api_view(
request_method='POST',
permission='create',
schema=POSTResourceRequestSchema,
accept='application/json',
)
[docs] def post(self):
"""HTTP POST."""
return super().post()
@api_view(request_method='OPTIONS')
[docs] def options(self) -> dict:
"""Get possible request/response data structures and http methods."""
cstruct = super().options()
if 'POST' not in cstruct:
return cstruct
for info in cstruct['POST']['request_body']:
if IBadgeAssignment.__identifier__ not in info['data']:
continue
assignables = get_assignable_badges(self.context, self.request)
urls = [self.request.resource_url(x, route_name=API_ROUTE_NAME)
for x in assignables]
# TODO: use colander schema to create cstruct
info['data'][IBadgeAssignment.__identifier__] =\
{'badge': urls}
return cstruct
@view_defaults(
context=IUsersService,
)
[docs]class UsersRESTView(PoolRESTView):
"""View the IUsersService pool overwrites POST handling."""
@api_view(
request_method='POST',
permission='create_user',
schema=POSTResourceRequestSchema,
accept='application/json',
)
[docs] def post(self):
"""HTTP POST."""
return super().post()
@view_defaults(
context=IAssetsService,
)
[docs]class AssetsServiceRESTView(PoolRESTView):
"""View allowing multipart requests for asset upload."""
@api_view(
request_method='POST',
permission='create_asset',
schema=POSTResourceRequestSchema,
accept='multipart/form-data',
)
[docs] def post(self):
"""HTTP POST."""
return super().post()
@view_defaults(
renderer='json',
context=IAsset,
)
[docs]class AssetRESTView(SimpleRESTView):
"""View for assets, allows PUTting new versions via multipart."""
@api_view(
request_method='PUT',
permission='create_asset',
schema=PUTResourceRequestSchema,
accept='multipart/form-data',
)
[docs] def put(self) -> dict:
"""HTTP PUT."""
return super().put()
@view_defaults(
context=IAssetDownload,
)
[docs]class AssetDownloadRESTView(ResourceRESTView):
"""View for downloading assets as binary blobs."""
@api_view(
request_method='GET',
permission='view',
)
[docs] def get(self) -> dict:
"""Get asset data."""
response = self.context.get_response(self.request.registry)
self.ensure_caching_headers(response)
return response
@view_defaults(
context=IRootPool,
name='meta_api',
)
def _get_base_ifaces(iface: IInterface, root_iface=Interface) -> [str]:
bases = []
current_bases = iface.getBases()
while current_bases:
old_bases = deepcopy(current_bases)
current_bases = ()
for base in old_bases:
if base.extends(root_iface):
bases.append(base.__identifier__)
current_bases += base.getBases()
return bases
@view_defaults(
context=IRootPool,
name='login_username',
)
[docs]class LoginUsernameView:
"""Log in a user via their name."""
def __init__(self, context: IRootPool, request: IRequest):
self.context = context
self.request = request
@api_view(request_method='OPTIONS')
[docs] def options(self) -> dict:
"""Return options for view."""
return {}
@api_view(
request_method='POST',
schema=POSTLoginUsernameRequestSchema,
accept='application/json',
)
[docs] def post(self) -> dict:
"""Create new resource and get response data."""
return _login_user(self.request)
def _login_user(request: IRequest) -> dict:
"""Set cookies and return a data for token header authentication."""
user = request.validated['user']
userid = resource_path(user)
headers = remember(request, userid)
cstruct = _get_api_auth_data(headers, request, user)
return cstruct
def _get_api_auth_data(headers: [tuple], request: IRequest, user: IResource)\
-> dict:
token_headers = dict([(x, y) for x, y in headers if x == UserTokenHeader])
token = token_headers[UserTokenHeader]
user_url = request.resource_url(user, route_name=API_ROUTE_NAME)
# TODO: use colander schema to create cstruct
return {'status': 'success',
'user_path': user_url,
'user_token': token,
}
@view_defaults(
context=IRootPool,
name='login_email',
)
[docs]class LoginEmailView:
"""Log in a user via their email address."""
def __init__(self, context: IRootPool, request: IRequest):
self.context = context
self.request = request
@api_view(request_method='OPTIONS')
[docs] def options(self) -> dict:
"""Return options for view."""
return {}
@api_view(request_method='POST',
schema=POSTLoginEmailRequestSchema,
accept='application/json')
[docs] def post(self) -> dict:
"""Create new resource and get response data."""
return _login_user(self.request)
@view_defaults(
context=IRootPool,
name='activate_account',
)
[docs]class ActivateAccountView:
"""Log in a user via their name."""
def __init__(self, context: IRootPool, request: IRequest):
self.context = context
self.request = request
@api_view(request_method='OPTIONS')
[docs] def options(self) -> dict:
"""Return options for view."""
return {}
@api_view(
request_method='POST',
schema=POSTActivateAccountViewRequestSchema,
accept='application/json',
)
[docs] def post(self) -> dict:
"""Activate a user account and log the user in."""
return _login_user(self.request)
@view_defaults(
context=IRootPool,
name='report_abuse',
)
[docs]class ReportAbuseView:
"""Receive and process an abuse complaint."""
def __init__(self, context: IRootPool, request: IRequest):
self.context = context
self.request = request
@api_view(request_method='OPTIONS')
[docs] def options(self) -> dict:
"""Return options for view."""
appstruct = {}
if self.request.has_permission('message_to_user', self.context):
schema = create_schema(POSTReportAbuseViewRequestSchema,
self.context,
self.request)
appstruct['POST'] = {'request_body': schema.serialize({}),
'response_body': ''}
return appstruct
@api_view(
request_method='POST',
permission='report_abuse',
schema=POSTReportAbuseViewRequestSchema,
accept='application/json',
)
[docs] def post(self) -> dict:
"""Receive and process an abuse complaint."""
messenger = self.request.registry.messenger
messenger.send_abuse_complaint(url=self.request.validated['url'],
remark=self.request.validated['remark'],
user=self.request.user)
return ''
@view_defaults(
context=IRootPool,
name='message_user',
)
[docs]class MessageUserView:
"""Send a message to another user."""
def __init__(self, context: IRootPool, request: IRequest):
self.context = context
self.request = request
@api_view(request_method='OPTIONS')
[docs] def options(self) -> dict:
"""Return options for view."""
result = {}
if self.request.has_permission('message_to_user', self.context):
schema = create_schema(POSTMessageUserViewRequestSchema,
self.context,
self.request)
result['POST'] = {'request_body': schema.serialize({}),
'response_body': ''}
return result
@api_view(
request_method='POST',
permission='message_to_user',
schema=POSTMessageUserViewRequestSchema,
accept='application/json',
)
[docs] def post(self) -> dict:
"""Send a message to another user."""
messenger = self.request.registry.messenger
data = self.request.validated
user = self.request.user
messenger.send_message_to_user(recipient=data['recipient'],
title=data['title'],
text=data['text'],
from_user=user)
return ''
@view_defaults(
context=IRootPool,
name='create_password_reset',
)
[docs]class CreatePasswordResetView:
"""Create a password reset resource."""
def __init__(self, context: IRootPool, request: IRequest):
self.context = context
self.request = request
@api_view(request_method='OPTIONS')
[docs] def options(self) -> dict:
"""Return options for view."""
return {'POST': {}}
@api_view(
request_method='POST',
schema=POSTCreatePasswordResetRequestSchema,
accept='application/json',
)
[docs] def post(self) -> dict:
"""Create as password reset resource."""
resets = find_service(self.context, 'principals', 'resets')
user = self.request.validated['user']
self.request.registry.content.create(IPasswordReset.__identifier__,
resets,
creator=user)
return {'status': 'success'}
@view_defaults(
context=IRootPool,
name='password_reset',
)
[docs]class PasswordResetView:
"""Reset a user password."""
def __init__(self, context: IRootPool, request: IRequest):
self.context = context
self.request = request
@api_view(request_method='OPTIONS')
[docs] def options(self) -> dict:
"""Return options for view."""
return {'POST': {}}
@api_view(
request_method='POST',
schema=POSTPasswordResetRequestSchema,
accept='application/json',
)
[docs] def post(self) -> dict:
"""Reset password."""
reset = self.request.validated['path']
password = self.request.validated['password']
reset.reset_password(password)
return _login_user(self.request)
@view_defaults(
context=IRootPool,
name='login_service_konto',
)
[docs]class LoginServiceKontoView:
"""Log in a user via service konto token."""
def __init__(self, context: IRootPool, request: IRequest):
self.context = context
self.request = request
@api_view(request_method='OPTIONS')
[docs] def options(self) -> dict:
"""Return options for view."""
return {}
@api_view(
request_method='POST',
schema=POSTLoginServiceKontoSchema,
accept='application/json',
)
[docs] def post(self) -> dict:
"""Login user."""
return _login_user(self.request)
[docs]def includeme(config):
"""Register Views."""
config.scan('.views')