import zlib
_imports = ["lib_markup"]
_parents = []
_version = 1
#The avatar_telnet object handles negotiation of mud client protocols and formatting of the the text between server and
#client to fit those protocols. Once that is done, it's derived classes handle input and output between the client
#and server.
cEscProxy = lib_markup.cEscProxy
#Commands return a tuple, the first item being remaining input, the second being what to add to output.
def _Escape_ResetAll(avatar, message):
avatar.MXPOnly = False
avatar.notMXPOnly = False
return (message, ANSIColorMapping["-a"])
def _Escape_ResetMXP(avatar, message):
avatar.MXPOnly = False
avatar.notMXPOnly = False
if avatar.useMXP:
return (message, MXPMapping["xr"])
else:
return (message, "")
def _Escape_MXPOnlyOn(avatar, message):
avatar.MXPOnly = True
avatar.notMXPOnly = False
return (message, "")
def _Escape_MXPOnlyOff(avatar, message):
avatar.MXPOnly = False
return (message, "")
def _Escape_NotMXPOnlyOn(avatar, message):
avatar.MXPOnly = False
avatar.notMXPOnly = True
return (message, "")
def _Escape_NotMXPOnlyOff(avatar, message):
avatar.notMXPOnly = False
return (message, "")
def _Escape_MXPTag(avatar, message):
if avatar.useMXP:
searchIndex = message.find("".join((cEscProxy, "t-")))
if searchIndex != -1:
if avatar.useMXP:
tagText = "".join(("<", message[:searchIndex], ">"))
else:
tagText = ""
message = message[(searchIndex + 3):]
return (message, tagText)
else:
mudWorld.loggers["mud"].error("Unclosed MXP tag.")
return (message, "")
def _Escape_MXPEntity(avatar, message):
if avatar.useMXP:
searchIndex = message.find("".join((cEscProxy, "e-")))
if searchIndex != -1:
if avatar.useMXP:
entityText = "".join(("&", message[:searchIndex], ";"))
else:
entityText = ""
message = message[(searchIndex + 3):]
return (message, entityText)
else:
mudWorld.loggers["mud"].error("Unclosed MXP entity.")
return (message, "")
def _Escape_Sound(avatar, message):
searchIndex = message.find(">")
if searchIndex != -1:
#MXP takes precedence.
if avatar.useMXPSound:
tagText = "".join(("<SOUND ", message[1:searchIndex], ">"))
elif avatar.useMSP:
tagText = "".join(("!!SOUND(", message[1:searchIndex], ")"))
else:
tagText = ""
message = message[(searchIndex + 1):]
return (message, tagText)
else:
mudWorld.loggers["mud"].error("Sound string improperly ended, can not parse.")
return (message, "")
def _Escape_Music(avatar, message):
searchIndex = message.find(">")
if searchIndex != -1:
#MXP takes precedence.
if avatar.useMXPSound:
tagText = "".join(("<MUSIC ", message[1:searchIndex], ">"))
elif avatar.useMSP:
tagText = "".join(("!!MUSIC(", message[1:searchIndex], ")"))
else:
tagText = ""
message = message[(searchIndex + 1):]
return (message, tagText)
else:
mudWorld.loggers["mud"].error("Music string improperly ended, can not parse.")
return (message, "")
def _Escape_CursorMove(avatar, message):
if message[0] == "<":
searchIndex = message.find(">")
if searchIndex != -1:
cursorPos = message[1:searchIndex]
message = message[(searchIndex + 1):]
return (message, "".join(("\x1B[", cursorPos, "H")))
else:
mudWorld.loggers["mud"].error("Cursor position improperly ended, can not parse.")
return (message, "")
else:
return (message, "\x1B[H")
CommandMapping = {"--":_Escape_ResetAll, "-x":_Escape_ResetMXP, "x+":_Escape_MXPOnlyOn, "x-":_Escape_MXPOnlyOff,
"o+":_Escape_NotMXPOnlyOn, "o-":_Escape_NotMXPOnlyOff, "t+":_Escape_MXPTag, "e+":_Escape_MXPEntity,
"sd":_Escape_Sound, "ms":_Escape_Music, "@m":_Escape_CursorMove}
MiscMapping = {"@s":"\x1B[s", "@r":"\x1B[u", "es":"\x1B[2J", "el":"\x1B[K"}
ANSIColorMapping = {"-a":"\x1B[0m", "b+":"\x1B[1m", "b-":"\x1B[22m", "i+":"\x1B[3m", "i-":"\x1B[23m", "u+":"\x1B[4m",
"u-":"\x1B[24m", "n+":"\x1B[7m", "n-":"\x1B[27m", "s+":"\x1B[9m", "s-":"\x1B[29m", "fz":"\x1B[30m",
"fr":"\x1B[31m", "fg":"\x1B[32m", "fy":"\x1B[33m", "fb":"\x1B[34m", "fp":"\x1B[35m",
"fc":"\x1B[36m", "fw":"\x1B[37m", "fd":"\x1B[39m",
"fZ":"\x1B[1m\x1B[30m", "fR":"\x1B[1;31m", "fG":"\x1B[1;32m", "fY":"\x1B[1;33m",
"fB":"\x1B[1m\x1B[34m", "fP":"\x1B[1;35m", "fC":"\x1B[1;36m", "fW":"\x1B[1;37m",
"bz":"\x1B[40m", "br":"\x1B[41m", "bg":"\x1B[42m", "by":"\x1B[43m", "bb":"\x1B[44m",
"bp":"\x1B[45m", "bc":"\x1B[46m", "bw":"\x1B[47m", "bd":"\x1B[49m",
"ad":"\x1B[39;49;22;23;24;27;29m"}
MXPMapping = {"xo":"\x1B[0z","xs":"\x1B[1z","xl":"\x1B[2z", "xr":"\x1B[3z", "xn":"\x1B[4z", "lo":"\x1B[5z", "ls":"\x1B[6z",
"ll":"\x1B[7z"}
ansiTypes = ["zmud", "vt100", "ansi"]
def _Sys_Create(self, client):
self._persist = True
self.client = client
self.useMXP = False
self.useMXPSound = False
self.useMSP = False
self.useMCCP = False
self.useMCCP2 = False
self.useANSI = False
self.breakLines = False
self.termX = 80
self.termY = 20
self.curX = 0
self.curY = 0
self.usePrompt = False
self.promptCallback = None
self.atPrompt = False
self.appendLineAtPrompt = True
self.MXPOnly = False
self.notMXPOnly = False
self.terminalType = ""
self.permissions = {}
def _Sys_Destroy(self):
if self.promptCallback != None:
DelCallback(self.promptCallback)
self.promptCallback = None
def OnConnect(self):
#IAC WILL TERMINALTYPE, IAC WILL MXP, IAC WILL MSP, IAC WILL COMPRESS2, IAC WILL COMPRESS
sendString = "\xFF\xFB\x18\xFF\xFB\x5B\xFF\xFB\x5A\xFF\xFB\x56\xFF\xFB\x55"
self.RawSend(sendString)
#Abstract: called upon client going linkdead.
def OnLinkdeath(self):
pass
#Abstract: called upon client being disconnected by the game, IE by a quit command.
def Disconnect(self):
self.client.Disconnect()
def _Sys_TelnetDO(self, doType):
#DO MXP
if doType == "\x5B":
self.useMXP = True
self.useMXPSound = True
#DO MSP
elif doType == "\x5A":
self.useMSP = True
#DO COMPRESS2
elif doType == "\x56":
if self.useMCCP == True:
self.useMCCP = False
#Send the client our compression stream notice.
#IAC SB COMPRESS2 IAC SE
self.RawSend("\xFF\xFA\x56\xFF\xF0")
self.useMCCP2 = True
self.compressObj = zlib.compressobj()
#DO COMPRESS
elif doType == "\x55":
#If we're already using MCCP2, no need to use MCCP.
if self.useMCCP2 == False:
#Same as above.
#IAC SB COMPRESS WILL SE - Note that the SE is not prefixed by WILL, not IAC
self.RawSend("\xFF\xFA\x56\xFB\xF0")
self.useMCCP = True
self.compressObj = zlib.compressobj()
#DO TERMINALTYPE
elif doType == "\x18":
#IAC SB TERMINALTYPE SEND IAC SE
self.RawSend("\xFF\xFA\x18\x01\xFF\xF0")
def _Sys_TelnetDONT(self, dontType):
#DONT MXP
if dontType == "\x5B":
self.useMXP = False
self.useMXPSound = False
#DONT MSP
elif dontType == "\x5A":
self.useMSP = False
#DONT COMPRESS2
elif dontType == "\x56":
if self.useMCCP2:
self.client.Send(self.compressObj.flush(zlib.Z_FINISH))
self.useMCCP2 = False
self.compressObj = None
#DONT COMPRESS
elif dontType == "\x55":
if self.useMCCP:
self.client.Send(self.compressObj.flush(zlib.Z_FINISH))
self.useMCCP = False
self.compressObj = None
def _Sys_ReceiveCommand(self, command):
pass
def _Sys_TelnetSB(self, subCommand):
#TERMINALTYPE IS
if subCommand.startswith("\x18\x00"):
terminalType = subCommand[2:]
self.terminalType = terminalType
print terminalType
if terminalType in ansiTypes:
self.useANSI = True
else:
for char in subCommand:
print ord(char)
def GetPrompt(self):
return ">"
def SendPrompt(self):
self.promptCallback = None
if self.usePrompt:
prompt = self.GetPrompt()
prompt = self.ParseText(prompt)
if self.useMCCP or self.useMCCP2:
prompt = self.compressObj.compress(prompt)
prompt = "".join((prompt, self.compressObj.flush(zlib.Z_SYNC_FLUSH)))
self.client.Send(prompt)
self.atPrompt = True
def Send(self, message):
if self.usePrompt:
if self.promptCallback == None:
self.promptCallback = AddCallback(.25, 0, self, "SendPrompt", {})
else:
self.promptCallback[0] = .25
message = self.ParseText(message)
if self.useMCCP or self.useMCCP2:
message = self.compressObj.compress(message)
message = "".join((message, self.compressObj.flush(zlib.Z_SYNC_FLUSH)))
self.client.Send(message)
self.atPrompt = False
def RawSend(self, sendString):
if self.useMCCP or self.useMCCP2:
sendString = self.compressObj.compress(sendString)
sendString = "".join((sendString, self.compressObj.flush(zlib.Z_SYNC_FLUSH)))
self.client.Send(sendString)
#Private function section.
def ParseText(self, message):
escIndex = message.find(cEscProxy)
newMessageList = []
if self.atPrompt and self.appendLineAtPrompt:
newMessageList.append("\r\n")
while escIndex != -1:
escape = message[escIndex+1:escIndex+3]
newMessageList.append(self.ParseRawText(message[:escIndex]))
message = message[(escIndex+3):]
if ANSIColorMapping.has_key(escape):
if self.useANSI:
newMessageList.append(ANSIColorMapping[escape])
elif MXPMapping.has_key(escape):
if self.useMXP:
newMessageList.append(MXPMapping[escape])
elif MiscMapping.has_key(escape):
newMessageList.append(MiscMapping[escape])
elif CommandMapping.has_key(escape):
message, newText = CommandMapping[escape](self, message)
newMessageList.append(newText)
else:
mudWorld.loggers["mud"].error("Unrecognized escape %s" % escape)
escIndex = message.find(cEscProxy)
newMessageList.append(self.ParseRawText(message))
return "".join(newMessageList)
def ParseMXP(self, message):
if self.MXPOnly:
if self.useMXP:
return self.ParseRawText(message)
else:
return ""
elif self.notMXPOnly:
if not self.useMXP:
return self.ParseRawText(message)
else:
return ""
else:
return self.ParseRawText(message)
def ParseRawText(self, message):
#First, separate the lines so we can track termX and termY.
lineList = message.split("\n")
lineIndex = 0
lineCount = len(lineList)
while lineIndex < lineCount:
segmentList = lineList[lineIndex].split("\r")
segmentIndex = 0
segmentCount = len(segmentList)
while segmentIndex < segmentCount:
segment = segmentList[segmentIndex]
segmentLength = len(segment)
if self.breakLines:
#Auto-break lines on a space.
wordList = segment.split(" ")
wordIndex = 0
wordCount = len(wordList)
while wordIndex < wordCount:
word = wordList[wordIndex]
wordLength = len(word)
self.curX += wordLength
if self.curX > self.termX:
wordList[wordIndex] = "".join(("\r\n", word))
self.curX = wordLength
self.curY += 1
if (wordIndex != (wordCount - 1)) and (self.curX != self.termX):
wordList[wordIndex] = "".join((wordList[wordIndex], " "))
self.curX += 1
wordIndex += 1
segmentList[segmentIndex] = "".join(wordList)
if segmentIndex != (segmentCount - 1):
self.curX = 0
else:
if segmentIndex != (segmentCount - 1):
self.curX = 0
else:
self.curX += segmentLength
segmentIndex += 1
lineList[lineIndex] = "\r".join(segmentList)
if lineIndex != (len(lineList) - 1):
self.curY += 1
lineIndex += 1
message = "\n".join(lineList)
if self.useMXP:
return message.replace("&", "&").replace("\"", """).replace("<", "<").replace(">", ">")
return message