Source code for adhocracy_core.websockets.client

"""Our own Websocket client that notifies the server of changes."""
from threading import Thread
import json
import logging
import time

from websocket import ABNF
from websocket import create_connection
from websocket import WebSocketException
from websocket import WebSocketConnectionClosedException
from websocket import WebSocketTimeoutException

from adhocracy_core.interfaces import IResource
from adhocracy_core.utils import exception_to_str
from adhocracy_core.utils import extract_events_from_changelog_metadata
from adhocracy_core.websockets.schemas import ServerNotification


logger = logging.getLogger(__name__)


[docs]class Client: """Websocket Client.""" def __init__(self, ws_url): """Create instance with running thread that talks to the server. :param ws_url: the URL of the websocket server to connect to; if None, no connection will be set up (useful for testing) """ self.changelog_metadata_messages_to_send = set() """Set with :class:`adhocracy_core.resources.subscriber.ChangelogMetadata` that will be send to the websocket server with :func:`Client.send_messages`.""" self._ws_url = ws_url self._ws_connection = None self._is_running = False self._is_stopped = False if ws_url is not None: # pragma: no cover self._init_listener_thread() def _init_listener_thread(self): # pragma: no cover """Init thread that keeps the connection alive.""" runner = Thread(target=self._run) runner.daemon = True runner.start() self._wait_a_bit_until_connected() def _run(self): # pragma: no cover """Start and keep alive connection to the websocket server.""" assert self._ws_url while not self._is_stopped: try: self._connect_and_receive_and_log_messages() except (WebSocketConnectionClosedException, ConnectionError, OSError) as err: logger.info('Problem connecting to Websocket server: %s', exception_to_str(err)) self._is_running = False time.sleep(1) except WebSocketException as err: # pragma: no cover logger.warning( 'Problem communicating with Websocket server: %s', exception_to_str(err)) time.sleep(1) def _wait_a_bit_until_connected(self): # pragma: no cover """Wait until the connection has been set up, but at most 2.5 secs.""" for i in range(25): # pragma: no branch if self._is_running: break time.sleep(.1) def _connect_and_receive_and_log_messages(self): self._connect_to_server() frame = self._ws_connection.recv_frame() self._process_frame(frame) def _connect_to_server(self): # pragma: no cover if not self._is_connected(): logger.debug('Try connecting to the Websocket server at ' + self._ws_url) self._ws_connection = create_connection(self._ws_url) self._is_running = True logger.debug('Connected to the Websocket server') def _is_connected(self): return (self._ws_connection is not None and self._ws_connection.connected) def _process_frame(self, frame: ABNF): if not frame: logger.warning('Received invalid frame from Websocket server: %s', frame) elif frame.opcode == ABNF.OPCODE_TEXT: logger.debug('Received text message from Websocket server: %s', frame.data) elif frame.opcode == ABNF.OPCODE_CLOSE: self._close_connection(b'server triggered close') logger.error('Websocket server closed the connection!') elif frame.opcode == ABNF.OPCODE_PING: self._ws_connection.pong('Hi!') else: logger.warning('Received unexpected frame from Websocket server: ' 'opcode=%s, data="%s"', frame.opcode, frame.data) def _close_connection(self, reason: bytes): self._ws_connection.send_close(reason=reason) self._ws_connection.close() self._is_running = False
[docs] def send_messages(self, changelog_metadata=[]): """Send all changelog messages to the websocket server. :param changelog_metadata: list of :class:'ChangelogMetadata', metadata.resource == None is ignored. All websocket exceptions are catched, hoping the problems will be solved when you run this method again. """ if not self._is_running: return try: self._send_messages(changelog_metadata) except WebSocketTimeoutException: # pragma: no cover logger.warning('Could not send message, connection timeout,' ' try again later') except OSError: # pragma: no cover logger.warning('Could not send message, connection is broken,' ' try again later')
def _send_messages(self, changelog_metadata: list): metadata = [x._replace(modified_appstructs=None) # remove non hashable for x in changelog_metadata] self.changelog_metadata_messages_to_send.update(metadata) while self.changelog_metadata_messages_to_send: meta = self.changelog_metadata_messages_to_send.pop() events = extract_events_from_changelog_metadata(meta) for event in events: self._send_resource_event(meta.resource, event) def _send_resource_event(self, resource: IResource, event_type: str): schema = ServerNotification().bind(context=resource) message = schema.serialize({'event': event_type, 'resource': resource}) message_text = json.dumps(message) logger.debug('Sending message to Websocket server: %s', message_text) self._ws_connection.send(message_text)
[docs] def stop(self): # pragma: no cover """Stop the client.""" self._is_stopped = True try: if self._is_connected(): self._close_connection(b'done') logger.debug('Websocket client closed') except WebSocketException as err: # pragma: no cover logger.warning('Error closing connection to Websocket server: %s', exception_to_str(err))
[docs]def get_ws_client(registry) -> Client: """Return websocket client object or None.""" if registry is not None: return getattr(registry, 'ws_client', None)
[docs]def send_messages_after_commit_hook(success, registry): """Send transaction changelog messages to the websocket client.""" ws_client = get_ws_client(registry) if success and ws_client is not None: changelog_metadata = registry.changelog.values() ws_client.send_messages(changelog_metadata)
[docs]def includeme(config): """Add websocket client (`ws_client`) to the registry. You need to set the ws server url in your settings to make this work:: adhocracy.ws_url = ws://localhost:6561 """ settings = config.registry['config'] ws_url = settings.adhocracy.ws_url if ws_url: # pragma: no cover ws_client = Client(ws_url=ws_url) config.registry.ws_client = ws_client else: logger.info('No websocket server setup, add config "adhocracy.ws_url"')