bot.py (4551B)
1 #!/usr/bin/env python3 2 3 import asyncio, random 4 5 from irctokens import build, Line 6 from ircrobots import Bot as BaseBot 7 from ircrobots import Server as BaseServer 8 from ircrobots import ConnectionParams 9 # aaaaaaaaaaaaaAAAAAAAAAAAAAAA 10 # im too lazy to import more stuffs :tm: 11 from ircrobots.server import * 12 13 from config import * 14 15 class Server(BaseServer): 16 17 # overwrite connect so i can put try except blocks there 18 async def connect(self, 19 transport: ITCPTransport, 20 params: ConnectionParams): 21 try: 22 await sts_transmute(params) 23 await resume_transmute(params) 24 25 reader, writer = await transport.connect( 26 params.host, 27 params.port, 28 tls =params.tls, 29 tls_verify=params.tls_verify, 30 bindhost =params.bindhost) 31 32 self._reader = reader 33 self._writer = writer 34 35 self.params = params 36 await self.handshake() 37 except: 38 print('connection with {} failed, disconnecting'.format(self.name)) 39 self.disconnected = True 40 41 async def line_read(self, line: Line): 42 print(f"{self.name} < {line.format()}") 43 if line.command == "001": 44 print(f"connected to {self.name}") 45 self.chan = FNCHANNEL if self.name == "freenode" else CHANNEL 46 await self.send(build("JOIN", [self.chan])) 47 if line.command == "PRIVMSG" and line.params.pop(0) == self.chan: 48 text = line.params[0].replace("\1ACTION","*").replace("\1","") 49 nick = line.source.split('!')[0] 50 if nick == self.nickname or (line.tags and "batch" in line.tags) or "\x0f\x0f\x0f\x0f" in text: 51 return 52 if nick.lower() in self.users and self.users[nick.lower()].account in ADMINS: 53 if text[:len(self.nickname)+2].lower() == f'{self.nickname}: '.lower(): 54 args = text[len(self.nickname)+2:].split(' ') 55 if args[0] == 'connect' and len(args) > 4: 56 await self.bot.add_server(args[1],ConnectionParams(NICKNAME,args[2],args[3],bool(int(args[4])))) 57 await self.send(build("PRIVMSG",[self.chan,"Connected to {} :3".format(args[1])])) 58 return 59 for i in random.sample(list(self.bot.servers),len(self.bot.servers)): 60 asyncio.create_task(self.bot.servers[i].ac(self.name,args)) 61 return 62 for npn in NOPING: 63 offset = 1 64 for loc in find_all_indexes(text.lower(), npn.lower()): 65 text = text[:loc+offset]+"\u200c"+text[loc+offset:] 66 offset += 1 67 for i in random.sample(list(self.bot.servers),len(self.bot.servers)): 68 asyncio.create_task(self.bot.servers[i].bc(self.name,nick,text)) 69 #await self.send(build("PRIVMSG ##xfnw :ine and boat ",[text])) 70 if line.command == "INVITE": 71 await self.send(build("JOIN",[line.params[1]])) 72 async def line_send(self, line: Line): 73 print(f"{self.name} > {line.format()}") 74 async def bc(self,name,nick,msg): 75 if self.disconnected or name == self.name or "chan" not in list(dir(self)): 76 return 77 await self.send(build("PRIVMSG",[self.chan,"\x0f\x0f\x0f\x0f<"+nick[:1]+"\u200c"+nick[1:]+"@"+name+"> "+msg])) 78 async def ac(self,name,args): 79 if self.disconnected or "chan" not in list(dir(self)): 80 return 81 nargs = [] 82 isComb = False 83 for arg in args: 84 if arg[0] == ':': 85 isComb = True 86 nargs.append(arg[1:]) 87 continue 88 if isComb: 89 nargs[-1] += ' '+arg 90 else: 91 nargs.append(arg) 92 await self.send(build(nargs[0],[self.chan]+nargs[1:])) 93 94 class Bot(BaseBot): 95 def create_server(self, name: str): 96 return Server(self, name) 97 98 99 100 def find_all_indexes(input_str, search_str): 101 l1 = [] 102 length = len(input_str) 103 index = 0 104 while index < length: 105 i = input_str.find(search_str, index) 106 if i == -1: 107 return l1 108 l1.append(i) 109 index = i + 1 110 return l1 111 112 113 114 async def main(): 115 bot = Bot() 116 for name, host, port, ssl in SERVERS: 117 params = ConnectionParams(NICKNAME, host, port, ssl) 118 await bot.add_server(name, params) 119 120 await bot.run() 121 122 if __name__ == "__main__": 123 asyncio.run(main())