Source code for csbot.core

import collections
import itertools
from typing import Mapping, Sequence, Type

from csbot.plugin import Plugin, SpecialPlugin, find_plugins
from csbot.plugin import build_plugin_dict, PluginManager, PluginConfigError
import as events
from import Event, CommandEvent
from csbot.util import maybe_future_result

from .irc import IRCClient, IRCUser
from . import config

[docs]class PluginError(Exception): pass
[docs]class Bot(SpecialPlugin, IRCClient): # TODO: use IRCUser instances instead of raw user string
[docs] class Config(config.Config): ircv3 = config.option(bool, default=False, help="Enable IRCv3 features (i.e. 'client capabilities')") nickname = config.option(str, required=True, example="csyorkbot", help="IRC nick") username = config.option(str, default="csyorkbot", help="IRC user") realname = config.option(str, default="", example="cs-york bot", help="IRC 'real name'") auth_method = config.option(str, default="pass", help="Authentication method: 'pass' or 'sasl_plain") password = config.option(str, env="IRC_PASS", example="password123", help="Authentication password") irc_host = config.option(str, required=True, example="", help="IRC server hostname") irc_port = config.option(int, default=6667, help="IRC server port") command_prefix = config.option(str, default="!", help="Prefix for invoking commands") channels = config.option(config.WordList, example=["#cs-york-dev"], help="Channels to join") plugins = config.option(config.WordList, example=lambda: sorted(p.plugin_name() for p in find_plugins()), help="Plugins to load") use_notice = config.option(int, default=True, help="Use NOTICE instead of PRIVMSG to send messages") client_ping = config.option(int, default=0, help="Send PING if no messages for this many seconds (0=disabled)") bind_addr = config.option(str, example="", help="Bind to specific local address") rate_limit_period = config.option(int, default=0, help="Period (in seconds) to consider for rate limit") rate_limit_count = config.option(int, default=0, help="Maximum number of messages to send in rate limit period")
#: Dictionary containing available plugins for loading, using #: straight.plugin to discover plugin classes under a namespace. available_plugins: Mapping[str, Type[Plugin]] _WHO_IDENTIFY = ('1', '%na') def __init__(self, config=None, *, plugins: Sequence[Type[Plugin]] = None, loop=None): # Record available plugins if plugins is None: self.available_plugins = build_plugin_dict(find_plugins()) else: self.available_plugins = build_plugin_dict(plugins) # Load configuration self.config_root = config if self.config_root is None: self.config_root = {} if not isinstance(self.config_root, raise TypeError("expected 'config' to be a dict-like object") # Initialise plugin SpecialPlugin.__init__(self, self) # Initialise IRCClient from Bot configuration IRCClient.__init__( self, loop=loop, ircv3=self.config.ircv3, nick=self.config.nickname, username=self.config.username, host=self.config.irc_host, port=self.config.irc_port, password=self.config.password, auth_method=self.config.auth_method, bind_addr=self.config.bind_addr, client_ping_enabled=(self.config.client_ping > 0), client_ping_interval=self.config.client_ping, rate_limit_enabled=(self.config.rate_limit_period > 0 and self.config.rate_limit_count > 0), rate_limit_period=self.config.rate_limit_period, rate_limit_count=self.config.rate_limit_count, ) self._recent_messages = collections.deque(maxlen=10) # Plumb in reply(...) method if self.config.use_notice: self.reply = self.notice else: self.reply = self.msg # Plugin management self.plugins = PluginManager([self], self.available_plugins, self.config.plugins, [self]) self.commands = {} # Event runner = events.HybridEventRunner(self._get_hooks, self.loop) # Keeps partial name lists between RPL_NAMREPLY and # RPL_ENDOFNAMES events self.names_accumulator = collections.defaultdict(list)
[docs] def bot_setup(self): """Load plugins defined in configuration and run setup methods. """ self.plugins.setup()
[docs] def bot_teardown(self): """Run plugin teardown methods. """ self.plugins.teardown()
def _get_hooks(self, event): return itertools.chain(*self.plugins.get_hooks(event.event_type))
[docs] def post_event(self, event): return
[docs] def register_command(self, cmd, metadata, f, tag=None): # Bail out if the command already exists if cmd in self.commands: self.log.warning('tried to overwrite command: {}'.format(cmd)) return False self.commands[cmd] = (f, metadata, tag)'registered command: ({}, {})'.format(cmd, tag)) return True
[docs] def unregister_command(self, cmd, tag=None): if cmd in self.commands: f, m, t = self.commands[cmd] if t == tag: del self.commands[cmd]'unregistered command: ({}, {})' .format(cmd, tag)) else: self.log.error(('tried to remove command {} ' + 'with wrong tag {}').format(cmd, tag))
[docs] def unregister_commands(self, tag): delcmds = [c for c, (_, _, t) in self.commands.items() if t == tag] for cmd in delcmds: f, _, tag = self.commands[cmd] del self.commands[cmd]'unregistered command: ({}, {})'.format(cmd, tag))
[docs] @Plugin.hook('core.self.connected') def signedOn(self, event): for c in self.config.channels:
[docs] @Plugin.hook('core.message.privmsg') def privmsg(self, event): """Handle commands inside PRIVMSGs.""" # See if this is a command command = CommandEvent.parse_command( event, self.config.command_prefix, if command is not None: self.post_event(command)
[docs] @Plugin.hook('core.command') async def fire_command(self, event): """Dispatch a command event to its callback. """ # Ignore unknown commands if event['command'] not in self.commands: return f, _, _ = self.commands[event['command']] await maybe_future_result(f(event), log=self.log)
[docs] @Plugin.command('help', help=('help [command]: show help for command, or ' 'show available commands')) def show_commands(self, e): args = e.arguments() if len(args) > 0: cmd = args[0] if cmd in self.commands: f, meta, tag = self.commands[cmd] e.reply(meta.get('help', cmd + ': no help string')) else: e.reply(cmd + ': no such command') else: e.reply(', '.join(sorted(self.commands)))
[docs] @Plugin.command('plugins') def show_plugins(self, e): e.reply('loaded plugins: ' + ', '.join(self.plugins))
# Implement IRCClient events
[docs] def emit_new(self, event_type, data=None): """Shorthand for firing a new event. """ event = Event(self, event_type, data) return
[docs] def emit(self, event): """Shorthand for firing an existing event. """
[docs] async def connection_made(self): await super().connection_made() if self.config.ircv3: await self.request_capabilities(enable={'account-notify', 'extended-join'}) self.emit_new('core.raw.connected')
[docs] async def connection_lost(self, exc): await super().connection_lost(exc) self.emit_new('core.raw.disconnected', {'reason': repr(exc)})
[docs] def line_sent(self, line: str): super().line_sent(line) self.emit_new('core.raw.sent', {'message': line})
[docs] def line_received(self, line): self._recent_messages.append(line) fut = self.emit_new('core.raw.received', {'message': line}) super().line_received(line) return fut
@property def recent_messages(self): return list(self._recent_messages)
[docs] def on_welcome(self): self.emit_new('core.self.connected')
[docs] def on_joined(self, channel): self.identify(channel) self.emit_new('core.self.joined', {'channel': channel})
[docs] def on_left(self, channel): self.emit_new('core.self.left', {'channel': channel})
[docs] def on_privmsg(self, user, channel, message): self.emit_new('core.message.privmsg', { 'channel': channel, 'user': user.raw, 'message': message, 'is_private': channel == self.nick, 'reply_to': user.nick if channel == self.nick else channel, })
[docs] def on_notice(self, user, channel, message): self.emit_new('core.message.notice', { 'channel': channel, 'user': user.raw, 'message': message, 'is_private': channel == self.nick, 'reply_to': user.nick if channel == self.nick else channel, })
[docs] def on_action(self, user, channel, message): self.emit_new('core.message.action', { 'channel': channel, 'user': user.raw, 'message': message, 'is_private': channel == self.nick, 'reply_to': user.nick if channel == self.nick else channel, })
[docs] def on_user_joined(self, user, channel): self.emit_new('', { 'channel': channel, 'user': user.raw, })
[docs] def on_user_left(self, user, channel, message): self.emit_new('', { 'channel': channel, 'user': user.raw, })
[docs] def on_user_quit(self, user, message): self.emit_new('core.user.quit', { 'user': user.raw, 'message': message, })
[docs] def on_user_renamed(self, oldnick, newnick): self.emit_new('core.user.renamed', { 'oldnick': oldnick, 'newnick': newnick, })
[docs] def on_topic_changed(self, user, channel, topic): self.emit_new('', { 'channel': channel, 'author': user.raw, # might be server name or nick 'topic': topic, })
# Implement NAMES handling
[docs] def irc_RPL_NAMREPLY(self, msg): channel = msg.params[2] self.names_accumulator[channel].extend(msg.params[3].split())
[docs] def irc_RPL_ENDOFNAMES(self, msg): # Get channel and raw names list channel = msg.params[1] raw_names = self.names_accumulator.pop(channel, []) # TODO: restore this functionality # Get a mapping from status characters to mode flags # prefixes = self.supported.getFeature('PREFIX') # inverse_prefixes = dict((v[0], k) for k, v in prefixes.items()) # Get mode characters from name prefix # def f(name): # if name[0] in inverse_prefixes: # return (name[1:], set(inverse_prefixes[name[0]])) # else: # return (name, set()) def f(name): return name.lstrip('@+'), set() names = list(map(f, raw_names)) # Fire the event self.on_names(channel, names, raw_names)
[docs] def on_names(self, channel, names, raw_names): """Called when the NAMES list for a channel has been received. """ self.emit_new('', { 'channel': channel, 'names': names, 'raw_names': raw_names, })
# Implement active account discovery via "formatted WHO"
[docs] def identify(self, target): """Find the account for a user or all users in a channel.""" tag, query = self._WHO_IDENTIFY self.send_line('WHO {} {}t,{}'.format(target, query, tag))
[docs] def irc_354(self, msg): """Handle "formatted WHO" responses.""" tag = msg.params[1] if tag == self._WHO_IDENTIFY[0]: user, account = msg.params[2:] self.on_user_identified(user, None if account == '0' else account)
[docs] def on_user_identified(self, user, account): self.emit_new('core.user.identified', { 'user': user, 'account': account, })
# Implement passive account discovery via "Client Capabilities"
[docs] def irc_ACCOUNT(self, msg): """Account change notification from ``account-notify`` capability.""" account = msg.params[0] self.on_user_identified(msg.prefix, None if account == '*' else account)
[docs] def irc_JOIN(self, msg): """Re-implement ``JOIN`` handler to account for ``extended-join`` info. """ # Only do special handling if extended-join was enabled if 'extended-join' not in self.enabled_capabilities: return super().irc_JOIN(msg) user = IRCUser.parse(msg.prefix) nick = user.nick channel, account, _ = msg.params if nick == self.nick: self.on_joined(channel) else: self.on_user_identified(user.raw, None if account == '*' else account) self.on_user_joined(user, channel)
[docs] def reply(self, to, message): """Reply to a nick/channel. This is not implemented because it should be replaced in the constructor with a reference to a real method, e.g. ``self.reply = self.msg``. """ raise NotImplementedError
[docs] @classmethod def write_example_config(cls, f, plugins=None, commented=False): plugins_ = [cls] if plugins is None: plugins_.extend(sorted(find_plugins(), key=lambda p: p.plugin_name())) else: plugins_.extend(plugins) generator = config.TomlExampleGenerator(commented=commented) for P in plugins_: config_cls = getattr(P, 'Config', None) if config.is_config(config_cls): try: generator.generate(config_cls, f, prefix=[P.plugin_name()]) except config.ConfigError as e: raise PluginConfigError(f"error in example config for plugin '{P.plugin_name()}': {e}") from e f.write("\n\n")