Source code for csbot.plugins.cron

from csbot.plugin import Plugin
from csbot.events import Event
import asyncio
from datetime import datetime, timedelta
import pymongo


[docs]class Cron(Plugin): """ Time, that most mysterious of things. What is it? Is it discrete or continuous? What was before time? Does that even make sense to ask? This plugin will attempt to address some, perhaps all, of these questions. More seriously, this plugin allows the scheduling of events. Due to computers being the constructs of fallible humans, it's not guaranteed that a callback will be run precisely when you want it to be. Furthermore, if you schedule multiple events at the same time, don't make any assumptions about the order in which they'll be called. Example of usage: class MyPlugin(Plugin): cron = Plugin.use('cron') def setup(self): ... self.cron.after( "hello world", datetime.timedelta(days=1), "callback") def callback(self, when): self.log.info(u'I got called at {}'.format(when)) @Plugin.hook('cron.hourly') def hourlyevent(self, e): self.log.info(u'An hour has passed') """ tasks = Plugin.use('mongodb', collection='tasks')
[docs] def setup(self): super(Cron, self).setup() # Schedule own events with the same API other plugins will use self.cron = self.provide(self.plugin_name()) # An asyncio.Handle for the event runner delayed call self.scheduler = None # The datetime of the next task, which self.scheduler was created for self.scheduler_next = None # Now we need to remove the hourly, daily, and weekly events # (if there are any), because the scheduler just runs things # when their time has passed, but for these we want to run # them as close to the correct time as possible, so running a # past event is useless for our purposes. # # Sadly this can't happen in the teardown, as we want to do # this even if the bot crashes unexpectedly. self.cron.unschedule_all() # Add regular cron.hourly/daily/weekly events which plugins # can listen to. now = datetime.now() when = -timedelta(minutes=now.minute, seconds=now.second, microseconds=now.microsecond) self.cron.schedule(name='hourly', when=now + when + timedelta(hours=1), interval=timedelta(hours=1), callback='fire_event', args=['cron.hourly']) when -= timedelta(hours=now.hour) self.cron.schedule(name='daily', when=now + when + timedelta(days=1), interval=timedelta(days=1), callback='fire_event', args=['cron.daily']) when -= timedelta(days=now.weekday()) self.cron.schedule(name='weekly', when=now + when + timedelta(weeks=1), interval=timedelta(weeks=1), callback='fire_event', args=['cron.weekly'])
[docs] def teardown(self): super().teardown() if self.scheduler is not None: self.scheduler.cancel()
[docs] def fire_event(self, now, name): """Fire off a regular event. This gets called by the scheduler at the appropriate time. """ self.bot.post_event(Event(None, name))
[docs] def provide(self, plugin_name): """Return the crond for the given plugin.""" return PluginCron(self, plugin_name)
[docs] def match_task(self, owner, name=None, args=None, kwargs=None): """Create a MongoDB search for a task definition.""" matcher = {'owner': owner} if name is not None: matcher['name'] = name if args is not None: matcher['args'] = args if kwargs is not None: matcher['kwargs'] = kwargs return matcher
[docs] def schedule(self, owner, name, when, interval=None, callback=None, args=None, kwargs=None): """Schedule a new task. :param owner: The plugin which created the task :param name: The name of the task :param when: The datetime to trigger the task at :param interval: Optionally, reschedule at when + interval when triggered. Gives rise to repeating tasks. :param callback: Call owner.callback when triggered; if None, call owner.name. :param args: Callback positional arguments. :param kwargs: Callback keyword arguments. The signature of a task is ``(owner, name, args, kwargs)``, and trying to create a task with the same signature as an existing task will raise :exc:`DuplicateTaskError`. Any subset of the signature can be used to :meth:`unschedule` all matching tasks (``owner`` is mandatory). """ # Create the new task secs = interval.total_seconds() if interval is not None else None task = {'owner': owner, 'name': name, 'when': when, 'interval': secs, 'callback': callback or name, 'args': args or [], 'kwargs': kwargs or {}} # See if this task duplicates another match = self.match_task(task['owner'], task['name'], task['args'], task['kwargs']) if self.tasks.find_one(match): raise DuplicateTaskError('Identical task already scheduled', match) # If we made it this far, save the task self.tasks.insert(task) # Reschedule the event runner in case it now needs to happen earlier self.schedule_event_runner()
[docs] def unschedule(self, owner, name=None, args=None, kwargs=None): """Unschedule a task. Removes all existing tasks that match based on the criteria passed as arguments (see :meth:`match_task`). This could result in the scheduler having nothing to do in its next call, but this isn't a problem as it's not a very intensive function, so there's no point in rescheduling it here. """ self.tasks.remove(self.match_task(owner, name, args, kwargs))
[docs] def schedule_event_runner(self): """Schedule the event runner. Set up a delayed call for :meth:`event_runner` to happen no sooner than is required by the next scheduled task. If a different call already exists it is replaced. """ now = datetime.now() # There will always be at least one event remaining because we # have three repeating ones, so this is safe. remaining_tasks = self.tasks.find().sort('when', pymongo.ASCENDING) next_run = remaining_tasks[0]['when'] if self.scheduler_next is None or next_run != self.scheduler_next: if self.scheduler is not None: self.scheduler.cancel() delay = (next_run - now).total_seconds() self.log.debug('calling event runner in %s seconds', delay) # TODO: need a better API for using the bot's event loop self.scheduler = asyncio.get_event_loop().call_later(delay, self.event_runner) self.scheduler_next = next_run else: self.log.debug('already scheduled for %s', self.scheduler_next)
[docs] def event_runner(self): """Run pending tasks. Run all tasks which have a trigger time in the past, and then reschedule self to run in time for the next task. """ now = datetime.now() self.log.debug('running event runner at %s', now) # Find and run every task from before now for taskdef in self.tasks.find({'when': {'$lt': now}}): # Going to be using this a lot task_name = u'{}/{}'.format( taskdef['owner'], taskdef['name']) self.log.info(u'Running task ' + task_name) # Now that we have the task, we need to remove it from the # database (or reschedule it for the future) straight # away, as if it schedules things itself, the scheduler # will be called again, but the task will still be there # (and so be run again), resulting in an error when it # tries to schedule the second time. if taskdef['interval'] is not None: taskdef['when'] += timedelta(seconds=taskdef['interval']) self.tasks.save(taskdef) else: self.unschedule(taskdef['owner'], taskdef['name']) # There are two things that could go wrong in running a # task. The method might not exist, this can arise in two # ways: a plugin scheduled it in a prior incarnation of # the bot, and then didn't register start up on this run, # resulting in there being no entry in self.bot.plugins, # or it could have just provided a bad method name. # # There is clearly no way to recover from this with any # degree of certainty, so we just drop it from the # database to prevent an error cropping up every time it # gets run. try: func = getattr(self.bot.plugins[taskdef['owner']], taskdef['callback']) except AttributeError: self.log.error( u'Couldn\'t find method {}.{} for task {}'.format( taskdef['owner'], taskdef['callback'], task_name)) self.unschedule(taskdef['owner'], taskdef['name']) continue # The second way is if the method does exist, but raises # an exception during its execution. There are two ways to # handle this. We could let the exception propagate # upwards and outwards, killing the bot, or we could log # it as an error and carry on. I went for the latter here, # on the assumption that, whilst exceptions are bad and # shouldn't get this far anyway, killing the bot is worse. try: func(taskdef['when'], *taskdef['args'], **taskdef['kwargs']) except Exception as e: # Don't really want exceptions to kill cron, so let's just log # them as an error. self.log.error( u'Exception raised when running task {}: {} {}'.format( task_name, type(e), e.args)) # Schedule the event runner for the next task self.schedule_event_runner()
[docs]class DuplicateTaskError(Exception): """Task with a given signature already exists. This can be raised by :meth:`Cron.schedule` if a plugin tries to register two events with the same name. """ pass
[docs]class PluginCron(object): """Interface to the cron methods restricted to *plugin* as the task owner.. How scheduling works -------------------- All of the scheduling functions have a signature of the form (name, time, method_name, *args, **kwargs). This means that at the appropriate time, the method plugin.method_name will be called with the arguments (time, *args, **kwargs), where the time argument is the time it was supposed to be run by the scheduler (which may not be identical to teh actual time it is run). These functions will raise a DuplicateNameException if you try to schedule two events with the same name. """ def __init__(self, cron, plugin): self.cron = cron self.plugin = plugin
[docs] def schedule(self, name, when, interval=None, callback=None, args=None, kwargs=None): """Pass through to :meth:`Cron.schedule`, adding *owner* argument.""" self.cron.schedule(self.plugin, name, when, interval, callback, args, kwargs)
[docs] def after(self, _delay, _name, _method_name, *args, **kwargs): """Schedule an event to occur after the timedelta delay has passed.""" self.schedule(_name, datetime.now() + _delay, callback=_method_name, args=args, kwargs=kwargs)
[docs] def at(self, _when, _name, _method_name, *args, **kwargs): """Schedule an event to occur at a given time.""" self.schedule(_name, _when, callback=_method_name, args=args, kwargs=kwargs)
[docs] def every(self, _freq, _name, _method_name, *args, **kwargs): """Schedule an event to occur every time the delay passes.""" self.schedule(_name, datetime.now() + _freq, interval=_freq, callback=_method_name, args=args, kwargs=kwargs)
[docs] def unschedule(self, name, args=None, kwargs=None): """Pass through to :meth:`Cron.unschedule`, adding *owner* argument.""" self.cron.unschedule(self.plugin, name, args, kwargs)
[docs] def unschedule_all(self): """Unschedule all tasks for this plugin. This could be supported by :meth:`unschedule`, but it's nice to prevent code accidentally wiping all of a plugin's tasks. """ self.cron.unschedule(self.plugin)