Source code for csbot.events

from datetime import datetime
from collections import deque
import re
import asyncio
import logging

from csbot.util import parse_arguments, maybe_future


LOG = logging.getLogger('csbot.events')


[docs]class HybridEventRunner: """ A hybrid synchronous/asynchronous event runner. *get_handlers* is called for each event passed to :meth:`post_event`, and should return an iterable of callables to handle that event, each of which will be called with the event object. Events are processed in the order they are received, with all handlers for an event being called before the handlers for the next event. If a handler returns an awaitable, it is added to a set of asynchronous tasks to wait on. The future returned by :meth:`post_event` completes only when all events have been processed and all asynchronous tasks have completed. :param get_handlers: Get functions to call for an event :param loop: asyncio event loop to use (default: use current loop) """ def __init__(self, get_handlers, loop=None): self.get_handlers = get_handlers self.loop = loop self.events = deque() self.new_events = asyncio.Event() self.futures = set() self.future = None def __enter__(self): LOG.debug('entering event runner') def __exit__(self, exc_type, exc_value, traceback): LOG.debug('exiting event runner') self.future = None
[docs] def post_event(self, event): """Post *event* to be handled soon. *event* is added to the queue of events. Returns a future which resolves when the handlers of *event* (and all events generated during those handlers) have completed. """ self.events.append(event) LOG.debug('added event %s, pending=%s', event, len(self.events)) self.new_events.set() if not self.future: self.future = self.loop.create_task(self._run()) return self.future
def _run_events(self): """Run event handlers, accumulating awaitables as futures. """ new_futures = set() while len(self.events) > 0: LOG.debug('processing events (%s remaining)', len(self.events)) # Get next event event = self.events.popleft() LOG.debug('processing event: %s', event) # Handle the event for handler in self.get_handlers(event): # Attempt to run the handler, but don't break everything if the handler fails LOG.debug('running handler: %r', handler) future = self._run_handler(handler, event) if future: new_futures.add(future) self.new_events.clear() if len(new_futures) > 0: LOG.debug('got %s new futures', len(new_futures)) return new_futures def _run_handler(self, handler, event): """Call *handler* with *event* and log any exception. If *handler* returns an awaitable, then it is wrapped in a coroutine that will log any exception from awaiting it. """ result = None try: result = handler(event) except Exception as e: self._handle_exception(exception=e, csbot_event=event) future = maybe_future(result, log=LOG) if future: future = asyncio.ensure_future(self._finish_async_handler(future, event), loop=self.loop) return future async def _finish_async_handler(self, future, event): """Await *future* and log any exception. """ try: await future except Exception: self._handle_exception(future=future, csbot_event=event) async def _run(self): """Run the event runner loop. Process events and await futures until all events and handlers have been processed. """ # Use self as context manager so an escaping exception doesn't break # the event runner instance permanently (i.e. we clean up the future) with self: # Run until no more events or lingering futures while len(self.events) + len(self.futures) > 0: # Synchronously run event handler and collect new futures new_futures = self._run_events() self.futures |= new_futures # Don't bother waiting if no futures to wait on if len(self.futures) == 0: continue # Run until one or more futures complete (or new events are added) new_events = self.loop.create_task(self.new_events.wait()) LOG.debug('waiting on %s futures', len(self.futures)) done, pending = await asyncio.wait(self.futures | {new_events}, return_when=asyncio.FIRST_COMPLETED) # Remove done futures from the set of futures being waited on done_futures = done - {new_events} LOG.debug('%s of %s futures done', len(done_futures), len(self.futures)) self.futures -= done_futures if new_events.done(): LOG.debug('new events to process') else: # If no new events, cancel the waiter, because we'll create a new one next iteration new_events.cancel() def _handle_exception(self, *, message='Unhandled exception in event handler', exception=None, future=None, csbot_event=None): if exception is None and future is not None: exception = future.exception() self.loop.call_exception_handler({ 'message': message, 'exception': exception, 'future': future, 'csbot_event': csbot_event, })
[docs]class Event(dict): """IRC event information. Events are dicts of event information, plus some attributes which are applicable for all events. """ #: The :class:`.Bot` which triggered the event. bot = None #: The name of the event. event_type = None #: The value of :meth:`datetime.datetime.now()` when the event was #: triggered. datetime = None def __init__(self, bot, event_type, data=None): dict.__init__(self, data if data is not None else {}) self.bot = bot self.event_type = event_type self.datetime = datetime.now() def __str__(self): return f'<Event {self.event_type!r} {self!r}>'
[docs] @classmethod def extend(cls, event, event_type=None, data=None): """Create a new event by extending an existing event. The main purpose of this classmethod is to duplicate an event as a new event type, preserving existing information. For example: """ # Duplicate event information e = cls(event.bot, event.event_type, event) e.datetime = event.datetime # Apply optional updates if event_type is not None: e.event_type = event_type if data is not None: e.update(data) return e
[docs] def reply(self, message): """Send a reply. For messages that have a ``reply_to`` key, instruct the :attr:`bot` to send a reply. """ self.bot.reply(self['reply_to'], message)
[docs]class CommandEvent(Event):
[docs] @classmethod def parse_command(cls, event, prefix, nick): """Attempt to create a :class:`CommandEvent` from a ``core.message.privmsg`` event. A command is signified by *event["message"]* starting with the command prefix string followed by one or more non-space characters. Returns None if *event['message']* wasn't recognised as being a command. """ pattern = r'({prefix}|{nick}[,:]\s*)(?P<command>[^\s]+)(\s+(?P<data>.+))?'.format( prefix=re.escape(prefix), nick=re.escape(nick), ) match = re.fullmatch(pattern, event['message'].strip()) if match is None: return None else: return cls.extend(event, 'core.command', {'command': match.group('command'), 'data': match.group('data') or ''})
[docs] def arguments(self): """Parse *self["data"]* into a list of arguments using :func:`~csbot.util.parse_arguments`. This might raise a :exc:`~exceptions.ValueError` if the string cannot be parsed, e.g. if there are unmatched quotes. """ return parse_arguments(self['data'])