Source code for adhocracy_core.scripts

"""scripts."""

import logging
import json
import os
from pathlib import Path

from pyramid.asset import resolve_asset_spec
from pyramid.path import package_path
from pyramid.request import Request
from pyramid.registry import Registry
from pyramid.traversal import find_resource
from pyramid.traversal import get_current_registry
from pyramid.traversal import resource_path
from pyrsistent import PMap
from pyrsistent import freeze
from pyrsistent import ny
from substanced.interfaces import IUserLocator
from zope.interface.interfaces import IInterface

from adhocracy_core.authorization import create_fake_god_request
from adhocracy_core.authorization import set_local_roles
from adhocracy_core.exceptions import ConfigurationError
from adhocracy_core.interfaces import IResource
from adhocracy_core.interfaces import IPool
from adhocracy_core.interfaces import ISheet
from adhocracy_core.schema import ContentType
from adhocracy_core.sheets.name import IName
from adhocracy_core.scripts.ad_import_groups import import_groups
from adhocracy_core.scripts.ad_import_users import import_users
from adhocracy_core.scripts.ad_set_workflow_state import set_workflow_state


logger = logging.getLogger(__name__)


[docs]def import_resources(root: IResource, registry: Registry, filename: str): """Import resources from a JSON file with dummy `god` user.""" request = create_fake_god_request(registry) resources_info = _load_info(filename) for resource_info in resources_info: expected_path = _get_expected_path(resource_info) if _resource_exists(expected_path, root): logger.info('Skipping {}'.format(expected_path)) else: logger.info('Creating {}'.format(expected_path)) _create_resource(freeze(resource_info), request, registry, root)
def _get_expected_path(resource_info: dict) -> str: name_field = resource_info['data'].get(IName.__identifier__, {}) name = name_field.get('name', '') path = name and os.path.join(resource_info['path'], name) return path def _resource_exists(expected_path: dict, context: IResource) -> bool: try: find_resource(context, expected_path) return True except KeyError: return False def _create_resource(resource_info: PMap, request: Request, registry: Registry, root: IPool): iresource = _deserialize_content_type(resource_info) parent = find_resource(root, resource_info['path']) resource_info = _resolve_users(resource_info, root, registry, request) appstructs = _deserialize_data(resource_info, parent, registry, request) creator = _get_creator(resource_info, root, registry, request) registry.content.create(iresource.__identifier__, parent=parent, appstructs=appstructs, registry=registry, request=request, creator=creator, ) def _load_info(filename: str) -> [dict]: with open(filename, 'r') as f: return json.load(f) def _resolve_users(resource_info: PMap, root: IResource, registry: Registry, request: Request) -> PMap: """Resolve strings containing "user_by_name: <username>". Strings of this form are resolved to the user's path. """ def _resolve_user(s): if not isinstance(s, str) or not s.startswith('user_by_login:'): return s user_locator = registry.getMultiAdapter((root, request), IUserLocator) user_name = s.split('user_by_login:')[1] user = user_locator.get_user_by_login(user_name) if user is None: raise ValueError('No such user: {}.'.format(user_name)) return resource_path(user) return resource_info.transform(['data', ny, ny], _resolve_user) def _deserialize_content_type(resource_info: dict) -> IInterface: schema = ContentType().bind(creating=True) iresource = schema.deserialize(resource_info['content_type']) return iresource def _deserialize_data(resource_info: dict, parent: IPool, registry: Registry, request: Request) -> dict: appstructs = {} iresource = _deserialize_content_type(resource_info) data = resource_info['data'] sheets = registry.content.get_sheets_create(parent, iresource=iresource, request=request) for sheet in sheets: sheet_name = sheet.meta.isheet.__identifier__ if sheet_name not in data: continue appstruct = sheet.deserialize(data[sheet_name]) appstructs[sheet_name] = appstruct return appstructs def _get_creator(resource_info: dict, context: IResource, registry: Registry, request: Request) -> IResource: creator_name = resource_info.get('creator', None) if not creator_name: return None locator = registry.getMultiAdapter((context, request), IUserLocator) creator = locator.get_user_by_login(creator_name) return creator
[docs]def import_local_roles(context: IResource, registry: Registry, filename: str): """Import/set local roles from a JSON file.""" multi_local_roles_info = _load_info(filename) for local_roles_info in multi_local_roles_info: _set_local_roles(local_roles_info, context, registry)
def _set_local_roles(local_roles_info: dict, context: IResource, registry: Registry): resource = find_resource(context, local_roles_info['path']) local_roles_info['roles'] = _deserialize_roles(local_roles_info['roles']) set_local_roles(resource, local_roles_info['roles'], registry) def _deserialize_roles(roles: dict) -> dict: for k, v in roles.items(): roles[k] = set(v) return roles
[docs]def append_cvs_field(result: list, content: str): """Normalize and append content for CVS.""" result.append(normalize_text_for_cvs(content))
[docs]def normalize_text_for_cvs(s: str) -> str: """Normalize text to put it in CVS.""" return s.replace(';', '')
[docs]def get_sheet_field_for_partial(sheet: ISheet, field: str, resource: IResource) -> object: """Get sheet field with resource as last parameter to use with partial.""" registry = get_current_registry(resource) return registry.content.get_sheet_field(resource, sheet, field)
[docs]def import_fixture(asset: str, root: IPool, registry: Registry, log_only=False, print_stdout=True): """Import files in fixture directory defined by :term:`asset` .""" fixture = _asset_to_fixture(asset) imports = _get_import_files(fixture) for import_type, import_file in imports: if print_stdout: # pragma: no cover msg = 'Import type "{}" file "{}"'.format(import_type, import_file) print(msg) if log_only: continue if import_type == 'groups': import_groups(root, registry, import_file) elif import_type == 'users': import_users(root, registry, import_file) elif import_type == 'resources': import_resources(root, registry, import_file) elif import_type == 'local_roles': import_local_roles(root, registry, import_file) elif import_type == 'states': # pragma: no branch _import_workflow_states(root, registry, import_file)
def _asset_to_fixture(asset: str) -> Path: """Translate :term:`asset` to absolute fixture path.""" package_name, file_name = resolve_asset_spec(asset) if package_name: package = __import__(package_name) path = Path(package_path(package), file_name) else: path = Path(file_name) if not path.is_dir(): msg = 'This is not a directory {}'.format(asset) raise ConfigurationError(details=msg) return path def _get_import_files(fixture: Path) -> [tuple]: allowed_types = ['groups', 'users', 'resources', 'local_roles', 'states'] import_files = [] for sub_dir in fixture.iterdir(): if sub_dir.name in allowed_types: for import_file in sub_dir.iterdir(): import_files.append((sub_dir.name, str(import_file))) else: msg = 'This is not a valid type directory {}'.format(sub_dir) raise ConfigurationError(details=msg) return sorted(import_files, key=lambda x: allowed_types.index(x[0])) def _import_workflow_states(root: IResource, registry: Registry, filename: str): with open(filename) as import_file: lines = [str(line).strip() for line in import_file.readlines()] states = {line.split(':')[0]: line.split(':')[1].split('->') for line in lines} for path, states in states.items(): logger.info('Set workflow state for {} to {}'.format(path, states)) set_workflow_state(root, registry, path, states, absolute=True, reset=True)