Source code for adhocracy_core.catalog

"""Configure search catalogs."""
from itertools import chain

from zope.interface import Interface
from pyramid.registry import Registry
from itertools import islice
from collections.abc import Iterable
from substanced import catalog
from substanced.interfaces import IIndexingActionProcessor
from substanced.catalog import CatalogsService
from substanced.catalog.indexes import AllowsComparator
from substanced.util import find_objectmap
from substanced.util import get_oid
from hypatia.interfaces import IIndex
from hypatia.interfaces import IResultSet
from hypatia.query import Query
from hypatia.util import ResultSet
from adhocracy_core.interfaces import IServicePool
from adhocracy_core.interfaces import FieldComparator
from adhocracy_core.interfaces import FieldSequenceComparator
from adhocracy_core.interfaces import KeywordComparator
from adhocracy_core.interfaces import KeywordSequenceComparator
from adhocracy_core.interfaces import ReferenceComparator
from adhocracy_core.interfaces import SearchResult
from adhocracy_core.interfaces import SearchQuery
from adhocracy_core.interfaces import search_result
from adhocracy_core.interfaces import IResource
from adhocracy_core.resources.service import service_meta
from adhocracy_core.resources import add_resource_type_to_registry
from adhocracy_core.utils import normalize_to_tuple


[docs]class ICatalogsService(IServicePool): """The 'catalogs' ServicePool."""
[docs]class CatalogsServiceAdhocracy(CatalogsService):
[docs] def reindex_all(self, resource: IResource): """Reindex `resource` with all indexes.""" for value in self.values(): value.reindex_resource(resource)
[docs] def reindex_index(self, resource: IResource, index_name: str): """Reindex `resource` with index `index_name`. :raises KeyError: if `index_name` index does not exists. """ index = self.get_index(index_name) if index is None: msg = 'catalog index {0} does not exist.'.format(index_name) raise KeyError(msg) index.reindex_resource(resource)
[docs] def search(self, query: SearchQuery) -> SearchResult: """Search indexes in catalogs `adhocracy` and `system`.""" elements = self._search_elements(query) frequency_of = self._get_frequency_of(elements, query) group_by = self._get_group_by(elements, query) sorted_elements = self._sort_elements(elements, query) count = len(sorted_elements) elements_slice = self._get_slice(sorted_elements, query) resolved = self._resolve(elements_slice, query) result = search_result._replace(elements=resolved, count=count, group_by=group_by, frequency_of=frequency_of) return result
def _get_interfaces_index_query(self, query) -> Query: interfaces_value = self._get_query_value(query.interfaces) if not interfaces_value: return None interfaces_comparator = self._get_query_comparator(query.interfaces) interfaces_index = self.get_index('interfaces') if interfaces_comparator is None: interfaces_value = normalize_to_tuple(interfaces_value) index_query = interfaces_index.all(interfaces_value) else: index_comparator = getattr(interfaces_index, interfaces_comparator) index_query = index_comparator(interfaces_value) return index_query def _get_path_index_query(self, query) -> Query: if query.root is None: return None depth = query.depth or None path_index = self.get_index('path') return path_index.eq(query.root, depth=depth, include_origin=False) def _get_indexes_index_query(self, query) -> [Query]: if not query.indexes: return [] indexes = [] for index_name, value in query.indexes.items(): index = self.get_index(index_name) comparator = self._get_query_comparator(value) if comparator is None: index_comparator = index.eq else: index_comparator = getattr(index, comparator) index_value = self._get_query_value(value) indexes.append(index_comparator(index_value)) return indexes def _get_references_index_query(self, query) -> [Query]: indexes = [] index = self.get_index('reference') for value in query.references: index_comparator = self._get_query_comparator(value) traverse = False if index_comparator == ReferenceComparator.traverse.value: traverse = True index_value = self._get_query_value(value) indexes.append(index.eq({'reference': index_value, 'traverse': traverse})) return indexes def _get_private_visibility_index_query(self, query) -> Query: if not query.only_visible: return None visibility_index = self.get_index('private_visibility') return visibility_index.eq('visible') def _get_allowed_index_query(self, query) -> Query: if not query.allows: return None allowed_index = self.get_index('allowed') principals, permission = query.allows return allowed_index.allows(principals, permission) def _combine_indexes(self, query, *list_of_indexes) -> [Query]: maybes_indexes = chain.from_iterable(list_of_indexes) indexes = [idx for idx in maybes_indexes if idx is not None] return indexes def _execute_query(self, indexes) -> IResultSet: """Combine all query `indexes` with `&` and execute the query. If `indexes` is empty or it starts with a query from the `allows` index an empty result is returned. The allows index can only be used as a filter so you need a query that returns a search result first. """ has_indexes = len(indexes) > 0 is_starting_with_allows = has_indexes and isinstance(indexes[0], AllowsComparator) if has_indexes and not is_starting_with_allows: index_query = indexes[0] for idx in indexes[1:]: index_query &= idx elements = index_query.execute(resolver=lambda x: x) else: elements = ResultSet(set(), 0, None) return elements def _search_elements(self, query) -> IResultSet: if not self.values(): # child catalogs/indexes are not created yet return ResultSet(set(), 0, None) indexes = self._combine_indexes( query, self._get_references_index_query(query), [self._get_path_index_query(query)], [self._get_interfaces_index_query(query)], self._get_indexes_index_query(query), [self._get_private_visibility_index_query(query)], [self._get_allowed_index_query(query)],) elements = self._execute_query(indexes) return elements def _get_frequency_of(self, elements: IResultSet, query: SearchQuery) -> dict: frequency_of = {} if query.frequency_of: index = self.get_index(query.frequency_of) for value in index.unique_values(): value_query = index.eq(value) value_elements = value_query.execute(resolver=None) intersect = elements.intersect(value_elements) count = len(intersect) if count == 0: continue frequency_of[value] = count return frequency_of def _get_group_by(self, elements: IResultSet, query: SearchQuery) -> dict: group_by = {} if query.group_by: index = self.get_index(query.group_by) for value in index.unique_values(): value_query = index.eq(value) value_elements = value_query.execute(resolver=None) intersect = elements.intersect(value_elements) if len(intersect) == 0: continue group_by[value] = intersect sort_index = self.get_index(query.sort_by) if sort_index is not None and query.sort_by != 'reference': for key, intersect in group_by.items(): intersect_sorted = intersect.sort(sort_index, reverse=query.reverse, limit=query.limit or None) group_by[key] = intersect_sorted for key, intersect in group_by.items(): group_by[key] = self._resolve(intersect.all(), query) return group_by def _sort_elements(self, elements: IResultSet, query: SearchQuery) -> IResultSet: if query.sort_by == '': pass elif query.sort_by == 'reference': if query.reverse: raise NotImplementedError() if query.limit: raise NotImplementedError() references = [x for x in query.references if x[0] is not None] if not references: # we need at least one reference return elements reference = self._get_query_value(references[0]) references_index = self.get_index('reference') elements_sorted = references_index.search_with_order(reference) elements = elements_sorted.intersect(elements) else: sort_index = self.get_index(query.sort_by) # TODO: We should assert the IIndexSort interface here, but # hypatia.field.FieldIndex is missing this interface. assert 'sort' in sort_index.__dir__() elements = elements.sort(sort_index, reverse=query.reverse) return elements def _get_slice(self, elements: Iterable, query: IResultSet) -> Iterable: """Get slice defined by `query.limit` and `query.offset`. :returns: Iterable or IResultSet if limit is not specified """ elements_slice = elements if query.limit: elements_slice = islice(elements.all(resolve=None), query.offset, query.offset + query.limit) return elements_slice def _resolve(self, elements: [int], query: SearchQuery) -> Iterable: """Resolve oids from `elements`, convert to list if `query.resolve`.""" objectmap = find_objectmap(self) elements = (objectmap.object_for(e) for e in elements) if query.resolve: elements = [x for x in elements] return elements
[docs] def get_index(self, name) -> IIndex: system = self.get('system', {}) adhocracy = self.get('adhocracy', {}) index = system.get(name, None) or adhocracy.get(name, None) return index
def _get_query_value(self, query_parameter: tuple) -> object: if self._is_tuple_starting_with_comparator(query_parameter): return query_parameter[1] else: return query_parameter def _get_query_comparator(self, query_parameter: tuple) -> object: if self._is_tuple_starting_with_comparator(query_parameter): return query_parameter[0] else: return def _is_tuple_starting_with_comparator(self, parameter: tuple) -> bool: if not isinstance(parameter, tuple): return False elif len(parameter) != 2: return False elif parameter[0] in self._comparators: return True else: return False
[docs] def get_index_value(self, context: IResource, index_name: str): """Get value of index.""" index = self.get_index(index_name) oid = get_oid(context) return index.document_repr(oid)
_comparators = set(FieldComparator.__members__) \ .union(KeywordComparator.__members__) \ .union(KeywordSequenceComparator.__members__) \ .union(FieldSequenceComparator.__members__) \ .union(ReferenceComparator.__members__)
[docs]def add_catalogs_system_and_adhocracy(context: ICatalogsService, registry: Registry, options: dict): """Add catalogs `system` and `adhocracy`.""" context.add_catalog('system') context.add_catalog('adhocracy')
catalogs_service_meta = service_meta._replace( iresource=ICatalogsService, content_name='catalogs', content_class=CatalogsServiceAdhocracy, after_creation=[add_catalogs_system_and_adhocracy] )
[docs]def includeme(config): """Register catalog utilities.""" config.include('adhocracy_core.events') config.add_view_predicate('catalogable', catalog._CatalogablePredicate) config.add_directive('add_catalog_factory', catalog.add_catalog_factory) config.add_directive('add_indexview', catalog.add_indexview, action_wrap=False) config.registry.registerAdapter(catalog.deferred.BasicActionProcessor, (Interface,), IIndexingActionProcessor) add_resource_type_to_registry(catalogs_service_meta, config) config.scan('substanced.catalog') config.scan('.index') config.include('.system') config.include('.adhocracy') config.include('.subscriber')