"""POST batch requests processing."""
from json import dumps
from logging import getLogger
from pyramid.httpexceptions import HTTPException
from pyramid.httpexceptions import HTTPClientError
from adhocracy_core.interfaces import API_ROUTE_NAME
from adhocracy_core.rest.exceptions import handle_error_x0x_exception
from adhocracy_core.rest.exceptions import handle_error_40x_exception
from adhocracy_core.rest.exceptions import handle_error_500_exception
from adhocracy_core.rest.exceptions import JSONHTTPClientError
from adhocracy_core.rest.exceptions import get_json_body
from adhocracy_core.authentication import AnonymizeHeader
from adhocracy_core.authentication import UserPasswordHeader
from pyramid.request import Request
from pyramid.interfaces import IRequest
from pyramid.view import view_defaults
from adhocracy_core.resources.root import IRootPool
from adhocracy_core.rest import api_view
from adhocracy_core.rest.schemas import POSTBatchRequestSchema
from adhocracy_core.rest.schemas import UpdatedResourcesSchema
from adhocracy_core.rest.views import _build_updated_resources_dict
from adhocracy_core.utils import set_batchmode
from adhocracy_core.utils import create_schema
logger = getLogger(__name__)
[docs]class BatchItemResponse:
"""Wrap the response to a nested request in a batch request.
Attributes:
:code: the status code (int)
:title: the status title (str)
:body: the response body as a JSOn object (dict)
"""
def __init__(self, code: int, title: str, body: dict={}):
"""Initialize self."""
self.code = code
self.body = body
self.title = title
[docs] def was_successful(self):
"""Return true if batch was successful."""
return self.code == 200
[docs] def to_dict(self):
"""Convert to dict."""
return {'code': self.code, 'body': self.body}
@view_defaults(
context=IRootPool,
name='batch',
)
[docs]class BatchView:
"""Process batch requests."""
def __init__(self, context: IRootPool, request: IRequest):
self.context = context
self.request = request
self.registry = request.registry
@api_view(
request_method='POST',
schema=POSTBatchRequestSchema,
accept='application/json',
)
[docs] def post(self) -> dict:
"""Create new resource and get response data."""
response_list = []
path_map = {}
set_batchmode(self.request)
for pos, item in enumerate(self.request.validated):
item_response = self._process_nested_request(item, path_map)
response_list.append(item_response)
if not item_response.was_successful():
error = JSONHTTPClientError([],
code=item_response.code,
title=item_response.title,
request=self.request)
# TODO: the error response lists updated resources
json_body = error.json_body
response_list_json = self._response_list_to_json(response_list)
json_body.update(response_list_json)
error.json_body = json_body
msg = 'Failing batch request item position {0} request {1} {2}'
logger.warn(msg.format(pos,
item['method'],
item['path']))
raise error
response = self._response_list_to_json(response_list)
return response
@api_view(request_method='OPTIONS')
[docs] def options(self) -> dict:
"""Return options for batch view.
This currently only exist in order to satisfy the preflight request,
which browsers do in CORS situations before doing the actual POST.
"""
return {}
def _process_nested_request(self, nested_request: dict,
path_map: dict) -> BatchItemResponse:
result_path = nested_request['result_path']
result_first_version_path = nested_request['result_first_version_path']
nested_request['path'] = self._resolve_preliminary_paths(
nested_request['path'], path_map)
nested_request['body'] = self._resolve_preliminary_paths(
nested_request['body'], path_map)
subrequest = self._make_subrequest(nested_request)
item_response = self._invoke_subrequest_and_handle_errors(subrequest)
self._extend_path_map(path_map, result_path, result_first_version_path,
item_response)
return item_response
def _response_list_to_json(self, response_list: list) -> list:
"""
Convert the list of batch responses into a JSON dict.
The dict has two fields:
* responses: the list of batch responses, but without their
"updated_resources" child element
* updated_resources: the listing of resources affected by the
transaction
"""
responses = []
for response in response_list:
if 'updated_resources' in response.body:
del response.body['updated_resources']
responses.append(response.to_dict())
updated_resources = _build_updated_resources_dict(self.registry)
schema = create_schema(UpdatedResourcesSchema,
self.context,
self.request)
return {'responses': responses,
'updated_resources': schema.serialize(updated_resources)}
def _resolve_preliminary_paths(self, json_value: object,
path_map: dict) -> object:
"""Create a copy of `json_value` with preliminary paths resolved.
This method accepts arbitrary JSON values and calls itself
recursively, as needed. In trivial cases (no change necessary),
the original `json_value` may be returned instead of a copy.
"""
if not (path_map and json_value):
return json_value
if isinstance(json_value, str):
result = path_map.get(json_value, json_value)
elif isinstance(json_value, dict):
result = {}
for key, value in json_value.items():
result[key] = self._resolve_preliminary_paths(value, path_map)
elif isinstance(json_value, list):
result = []
for value in json_value:
result.append(self._resolve_preliminary_paths(value, path_map))
else:
result = json_value
return result
def _make_subrequest(self, nested_request: dict) -> IRequest:
path = nested_request['path']
method = nested_request['method']
json_body = nested_request['body']
keywords_args = {'method': method,
'base_url': self.request.host_url}
if json_body:
keywords_args['body'] = dumps(json_body).encode()
if method not in ['GET', 'OPTIONS', 'HEAD']:
keywords_args['content_type'] = 'application/json'
request = Request.blank(path, **keywords_args)
set_batchmode(request)
self.copy_attr_if_exists('root', request)
self.copy_attr_if_exists('__cached_principals__', request)
self.copy_attr_if_exists('__cached_userid__', request)
self.copy_header_if_exists('X-User-Path', request)
self.copy_header_if_exists('X-User-Token', request)
self.copy_header_if_exists(AnonymizeHeader, request)
self.copy_header_if_exists(UserPasswordHeader, request)
# properly setup subrequest in case script_name env is set,
# see https://github.com/Pylons/pyramid/issues/1434
request.script_name = self.request.script_name
# if a script_name (a prefix in front of backend paths, e.g. /api) is
# set, subrequest paths also need to start with it.
if not request.path_info.startswith(request.script_name):
raise Exception('Batch subrequest path (%s) does not start with '
'script name (%s)' % (request.path_info,
request.script_name))
request.path_info = request.path_info[len(self.request.script_name):]
return request
def _invoke_subrequest_and_handle_errors(
self, subrequest: IRequest) -> BatchItemResponse:
try:
subresponse = self.request.invoke_subrequest(subrequest)
except Exception as err:
error_view = self._get_error_view(err)
subresponse = error_view(err, subrequest)
body = get_json_body(subresponse)
return BatchItemResponse(subresponse.status_code,
subresponse.status,
body)
def _get_error_view(self, error: Exception) -> callable:
"""Return view callable to handle exception.
The error handler in :mod:`adhocracy_core.rest.exceptions` are only
called at end of the request, but not for sub requests. To make
sure batch requests show the same error messages as normal requests
we try to find the right error view manually here.
"""
if isinstance(error, HTTPException):
error_view = handle_error_x0x_exception
else:
error_view = handle_error_500_exception
if isinstance(error, HTTPClientError):
error_view = handle_error_40x_exception
instrospector = self.request.registry.introspector
for view in instrospector.get_category('views'):
context = view['introspectable']['context']
route_name = view['introspectable']['route_name']
if context == error.__class__ and route_name == API_ROUTE_NAME:
error_view = view['introspectable']['callable']
break
return error_view
def _extend_path_map(self, path_map: dict, result_path: str,
result_first_version_path: str,
item_response: BatchItemResponse):
if not (result_path and item_response.was_successful()):
return
path = item_response.body.get('path', '')
first_version_path = item_response.body.get('first_version_path', '')
if path:
path_map[result_path] = path
if first_version_path:
path_map[result_first_version_path] = first_version_path
[docs] def copy_attr_if_exists(self, attributename: str, request: IRequest):
"""Copy attr if exists."""
value = getattr(self.request, attributename, None)
if value is not None:
setattr(request, attributename, value)
[docs]def includeme(config): # pragma: no cover
"""Register batch view."""
config.scan('.batchview')