import zlib _imports = ["lib_markup"] _parents = [] _version = 1 #The avatar_telnet object handles negotiation of mud client protocols and formatting of the the text between server and #client to fit those protocols. Once that is done, it's derived classes handle input and output between the client #and server. cEscProxy = lib_markup.cEscProxy #Commands return a tuple, the first item being remaining input, the second being what to add to output. def _Escape_ResetAll(avatar, message): avatar.MXPOnly = False avatar.notMXPOnly = False return (message, ANSIColorMapping["-a"]) def _Escape_ResetMXP(avatar, message): avatar.MXPOnly = False avatar.notMXPOnly = False if avatar.useMXP: return (message, MXPMapping["xr"]) else: return (message, "") def _Escape_MXPOnlyOn(avatar, message): avatar.MXPOnly = True avatar.notMXPOnly = False return (message, "") def _Escape_MXPOnlyOff(avatar, message): avatar.MXPOnly = False return (message, "") def _Escape_NotMXPOnlyOn(avatar, message): avatar.MXPOnly = False avatar.notMXPOnly = True return (message, "") def _Escape_NotMXPOnlyOff(avatar, message): avatar.notMXPOnly = False return (message, "") def _Escape_MXPTag(avatar, message): if avatar.useMXP: searchIndex = message.find("".join((cEscProxy, "t-"))) if searchIndex != -1: if avatar.useMXP: tagText = "".join(("<", message[:searchIndex], ">")) else: tagText = "" message = message[(searchIndex + 3):] return (message, tagText) else: mudWorld.loggers["mud"].error("Unclosed MXP tag.") return (message, "") def _Escape_MXPEntity(avatar, message): if avatar.useMXP: searchIndex = message.find("".join((cEscProxy, "e-"))) if searchIndex != -1: if avatar.useMXP: entityText = "".join(("&", message[:searchIndex], ";")) else: entityText = "" message = message[(searchIndex + 3):] return (message, entityText) else: mudWorld.loggers["mud"].error("Unclosed MXP entity.") return (message, "") def _Escape_Sound(avatar, message): searchIndex = message.find(">") if searchIndex != -1: #MXP takes precedence. if avatar.useMXPSound: tagText = "".join(("<SOUND ", message[1:searchIndex], ">")) elif avatar.useMSP: tagText = "".join(("!!SOUND(", message[1:searchIndex], ")")) else: tagText = "" message = message[(searchIndex + 1):] return (message, tagText) else: mudWorld.loggers["mud"].error("Sound string improperly ended, can not parse.") return (message, "") def _Escape_Music(avatar, message): searchIndex = message.find(">") if searchIndex != -1: #MXP takes precedence. if avatar.useMXPSound: tagText = "".join(("<MUSIC ", message[1:searchIndex], ">")) elif avatar.useMSP: tagText = "".join(("!!MUSIC(", message[1:searchIndex], ")")) else: tagText = "" message = message[(searchIndex + 1):] return (message, tagText) else: mudWorld.loggers["mud"].error("Music string improperly ended, can not parse.") return (message, "") def _Escape_CursorMove(avatar, message): if message[0] == "<": searchIndex = message.find(">") if searchIndex != -1: cursorPos = message[1:searchIndex] message = message[(searchIndex + 1):] return (message, "".join(("\x1B[", cursorPos, "H"))) else: mudWorld.loggers["mud"].error("Cursor position improperly ended, can not parse.") return (message, "") else: return (message, "\x1B[H") CommandMapping = {"--":_Escape_ResetAll, "-x":_Escape_ResetMXP, "x+":_Escape_MXPOnlyOn, "x-":_Escape_MXPOnlyOff, "o+":_Escape_NotMXPOnlyOn, "o-":_Escape_NotMXPOnlyOff, "t+":_Escape_MXPTag, "e+":_Escape_MXPEntity, "sd":_Escape_Sound, "ms":_Escape_Music, "@m":_Escape_CursorMove} MiscMapping = {"@s":"\x1B[s", "@r":"\x1B[u", "es":"\x1B[2J", "el":"\x1B[K"} ANSIColorMapping = {"-a":"\x1B[0m", "b+":"\x1B[1m", "b-":"\x1B[22m", "i+":"\x1B[3m", "i-":"\x1B[23m", "u+":"\x1B[4m", "u-":"\x1B[24m", "n+":"\x1B[7m", "n-":"\x1B[27m", "s+":"\x1B[9m", "s-":"\x1B[29m", "fz":"\x1B[30m", "fr":"\x1B[31m", "fg":"\x1B[32m", "fy":"\x1B[33m", "fb":"\x1B[34m", "fp":"\x1B[35m", "fc":"\x1B[36m", "fw":"\x1B[37m", "fd":"\x1B[39m", "fZ":"\x1B[1m\x1B[30m", "fR":"\x1B[1;31m", "fG":"\x1B[1;32m", "fY":"\x1B[1;33m", "fB":"\x1B[1m\x1B[34m", "fP":"\x1B[1;35m", "fC":"\x1B[1;36m", "fW":"\x1B[1;37m", "bz":"\x1B[40m", "br":"\x1B[41m", "bg":"\x1B[42m", "by":"\x1B[43m", "bb":"\x1B[44m", "bp":"\x1B[45m", "bc":"\x1B[46m", "bw":"\x1B[47m", "bd":"\x1B[49m", "ad":"\x1B[39;49;22;23;24;27;29m"} MXPMapping = {"xo":"\x1B[0z","xs":"\x1B[1z","xl":"\x1B[2z", "xr":"\x1B[3z", "xn":"\x1B[4z", "lo":"\x1B[5z", "ls":"\x1B[6z", "ll":"\x1B[7z"} ansiTypes = ["zmud", "vt100", "ansi"] def _Sys_Create(self, client): self._persist = True self.client = client self.useMXP = False self.useMXPSound = False self.useMSP = False self.useMCCP = False self.useMCCP2 = False self.useANSI = False self.breakLines = False self.termX = 80 self.termY = 20 self.curX = 0 self.curY = 0 self.usePrompt = False self.promptCallback = None self.atPrompt = False self.appendLineAtPrompt = True self.MXPOnly = False self.notMXPOnly = False self.terminalType = "" self.permissions = {} def _Sys_Destroy(self): if self.promptCallback != None: DelCallback(self.promptCallback) self.promptCallback = None def OnConnect(self): #IAC WILL TERMINALTYPE, IAC WILL MXP, IAC WILL MSP, IAC WILL COMPRESS2, IAC WILL COMPRESS sendString = "\xFF\xFB\x18\xFF\xFB\x5B\xFF\xFB\x5A\xFF\xFB\x56\xFF\xFB\x55" self.RawSend(sendString) #Abstract: called upon client going linkdead. def OnLinkdeath(self): pass #Abstract: called upon client being disconnected by the game, IE by a quit command. def Disconnect(self): self.client.Disconnect() def _Sys_TelnetDO(self, doType): #DO MXP if doType == "\x5B": self.useMXP = True self.useMXPSound = True #DO MSP elif doType == "\x5A": self.useMSP = True #DO COMPRESS2 elif doType == "\x56": if self.useMCCP == True: self.useMCCP = False #Send the client our compression stream notice. #IAC SB COMPRESS2 IAC SE self.RawSend("\xFF\xFA\x56\xFF\xF0") self.useMCCP2 = True self.compressObj = zlib.compressobj() #DO COMPRESS elif doType == "\x55": #If we're already using MCCP2, no need to use MCCP. if self.useMCCP2 == False: #Same as above. #IAC SB COMPRESS WILL SE - Note that the SE is not prefixed by WILL, not IAC self.RawSend("\xFF\xFA\x56\xFB\xF0") self.useMCCP = True self.compressObj = zlib.compressobj() #DO TERMINALTYPE elif doType == "\x18": #IAC SB TERMINALTYPE SEND IAC SE self.RawSend("\xFF\xFA\x18\x01\xFF\xF0") def _Sys_TelnetDONT(self, dontType): #DONT MXP if dontType == "\x5B": self.useMXP = False self.useMXPSound = False #DONT MSP elif dontType == "\x5A": self.useMSP = False #DONT COMPRESS2 elif dontType == "\x56": if self.useMCCP2: self.client.Send(self.compressObj.flush(zlib.Z_FINISH)) self.useMCCP2 = False self.compressObj = None #DONT COMPRESS elif dontType == "\x55": if self.useMCCP: self.client.Send(self.compressObj.flush(zlib.Z_FINISH)) self.useMCCP = False self.compressObj = None def _Sys_ReceiveCommand(self, command): pass def _Sys_TelnetSB(self, subCommand): #TERMINALTYPE IS if subCommand.startswith("\x18\x00"): terminalType = subCommand[2:] self.terminalType = terminalType print terminalType if terminalType in ansiTypes: self.useANSI = True else: for char in subCommand: print ord(char) def GetPrompt(self): return ">" def SendPrompt(self): self.promptCallback = None if self.usePrompt: prompt = self.GetPrompt() prompt = self.ParseText(prompt) if self.useMCCP or self.useMCCP2: prompt = self.compressObj.compress(prompt) prompt = "".join((prompt, self.compressObj.flush(zlib.Z_SYNC_FLUSH))) self.client.Send(prompt) self.atPrompt = True def Send(self, message): if self.usePrompt: if self.promptCallback == None: self.promptCallback = AddCallback(.25, 0, self, "SendPrompt", {}) else: self.promptCallback[0] = .25 message = self.ParseText(message) if self.useMCCP or self.useMCCP2: message = self.compressObj.compress(message) message = "".join((message, self.compressObj.flush(zlib.Z_SYNC_FLUSH))) self.client.Send(message) self.atPrompt = False def RawSend(self, sendString): if self.useMCCP or self.useMCCP2: sendString = self.compressObj.compress(sendString) sendString = "".join((sendString, self.compressObj.flush(zlib.Z_SYNC_FLUSH))) self.client.Send(sendString) #Private function section. def ParseText(self, message): escIndex = message.find(cEscProxy) newMessageList = [] if self.atPrompt and self.appendLineAtPrompt: newMessageList.append("\r\n") while escIndex != -1: escape = message[escIndex+1:escIndex+3] newMessageList.append(self.ParseRawText(message[:escIndex])) message = message[(escIndex+3):] if ANSIColorMapping.has_key(escape): if self.useANSI: newMessageList.append(ANSIColorMapping[escape]) elif MXPMapping.has_key(escape): if self.useMXP: newMessageList.append(MXPMapping[escape]) elif MiscMapping.has_key(escape): newMessageList.append(MiscMapping[escape]) elif CommandMapping.has_key(escape): message, newText = CommandMapping[escape](self, message) newMessageList.append(newText) else: mudWorld.loggers["mud"].error("Unrecognized escape %s" % escape) escIndex = message.find(cEscProxy) newMessageList.append(self.ParseRawText(message)) return "".join(newMessageList) def ParseMXP(self, message): if self.MXPOnly: if self.useMXP: return self.ParseRawText(message) else: return "" elif self.notMXPOnly: if not self.useMXP: return self.ParseRawText(message) else: return "" else: return self.ParseRawText(message) def ParseRawText(self, message): #First, separate the lines so we can track termX and termY. lineList = message.split("\n") lineIndex = 0 lineCount = len(lineList) while lineIndex < lineCount: segmentList = lineList[lineIndex].split("\r") segmentIndex = 0 segmentCount = len(segmentList) while segmentIndex < segmentCount: segment = segmentList[segmentIndex] segmentLength = len(segment) if self.breakLines: #Auto-break lines on a space. wordList = segment.split(" ") wordIndex = 0 wordCount = len(wordList) while wordIndex < wordCount: word = wordList[wordIndex] wordLength = len(word) self.curX += wordLength if self.curX > self.termX: wordList[wordIndex] = "".join(("\r\n", word)) self.curX = wordLength self.curY += 1 if (wordIndex != (wordCount - 1)) and (self.curX != self.termX): wordList[wordIndex] = "".join((wordList[wordIndex], " ")) self.curX += 1 wordIndex += 1 segmentList[segmentIndex] = "".join(wordList) if segmentIndex != (segmentCount - 1): self.curX = 0 else: if segmentIndex != (segmentCount - 1): self.curX = 0 else: self.curX += segmentLength segmentIndex += 1 lineList[lineIndex] = "\r".join(segmentList) if lineIndex != (len(lineList) - 1): self.curY += 1 lineIndex += 1 message = "\n".join(lineList) if self.useMXP: return message.replace("&", "&").replace("\"", """).replace("<", "<").replace(">", ">") return message