# -*- coding: utf-8 -*- line endings: unix -*- #------------------------------------------------------------------------------ # miniboa/telnet.py # Copyright 2009 Jim Storch # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain a # copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. #------------------------------------------------------------------------------ # Changes made by pR0Ps.CM[at]gmail[dot]com on 18/07/2012 # -Updated for use with Python 3.x # -Repackaged into a single file to simplify distribution # -Other misc fixes and changes # # Report any bugs in this implementation to me (email above) #------------------------------------------------------------------------------ # Additional changes by Quixadhal on 2014.06.16 # -Re-split code into multiple files, for ease of maintenance # -Rewrote terminal system #------------------------------------------------------------------------------ """ Manage one Telnet client connected via a TCP/IP socket. """ import sys import socket import time import logging from miniboa.terminal import colorize from miniboa.terminal import word_wrap #---[ Telnet Notes ]----------------------------------------------------------- # (See RFC 854 for more information) # # Negotiating a Local Option # -------------------------- # # Side A begins with: # # "IAC WILL/WONT XX" Meaning "I would like to [use|not use] option XX." # # Side B replies with either: # # "IAC DO XX" Meaning "OK, you may use option XX." # "IAC DONT XX" Meaning "No, you cannot use option XX." # # # Negotiating a Remote Option # ---------------------------- # # Side A begins with: # # "IAC DO/DONT XX" Meaning "I would like YOU to [use|not use] option XX." # # Side B replies with either: # # "IAC WILL XX" Meaning "I will begin using option XX" # "IAC WONT XX" Meaning "I will not begin using option XX" # # # The syntax is designed so that if both parties receive simultaneous requests # for the same option, each will see the other's request as a positive # acknowledgement of it's own. # # If a party receives a request to enter a mode that it is already in, the # request should not be acknowledged. #--[ Global Constants ]-------------------------------------------------------- UNKNOWN = -1 ## Cap sockets to 500 on Windows because winsock can only process 512 at time ## Cap sockets to 1000 on Linux because you can only have 1024 file descriptors MAX_CONNECTIONS = 500 if sys.platform == 'win32' else 1000 #--[ Telnet Commands ]--------------------------------------------------------- SE = chr(240) # End of subnegotiation parameters NOP = chr(241) # No operation DATMK = chr(242) # Data stream portion of a sync. BREAK = chr(243) # NVT Character BRK IP = chr(244) # Interrupt Process AO = chr(245) # Abort Output AYT = chr(246) # Are you there EC = chr(247) # Erase Character EL = chr(248) # Erase Line GA = chr(249) # The Go Ahead Signal SB = chr(250) # Sub-option to follow WILL = chr(251) # Will; request or confirm option begin WONT = chr(252) # Wont; deny option request DO = chr(253) # Do = Request or confirm remote option DONT = chr(254) # Don't = Demand or confirm option halt IAC = chr(255) # Interpret as Command SEND = chr(1) # Sub-process negotiation SEND command IS = chr(0) # Sub-process negotiation IS command #--[ Telnet Options ]---------------------------------------------------------- BINARY = chr(0) # Transmit Binary ECHO = chr(1) # Echo characters back to sender RECON = chr(2) # Reconnection SGA = chr(3) # Suppress Go-Ahead TTYPE = chr(24) # Terminal Type NAWS = chr(31) # Negotiate About Window Size LINEMO = chr(34) # Line Mode #--[ Connection Lost ]--------------------------------------------------------- class ConnectionLost(Exception): """ Custom exception to signal a lost connection to the Telnet Server. """ #--[ Telnet Option ]----------------------------------------------------------- class TelnetOption(object): """ Simple class used to track the status of an extended Telnet option. """ def __init__(self): self.local_option = UNKNOWN # Local state of an option self.remote_option = UNKNOWN # Remote state of an option self.reply_pending = False # Are we expecting a reply? #--[ Telnet Client ]----------------------------------------------------------- class TelnetClient(object): """ Represents a client connection via Telnet. First argument is the socket discovered by the Telnet Server. Second argument is the tuple (ip address, port number). """ def __init__(self, sock, addr_tup): self.protocol = 'telnet' self.active = True # Turns False when the connection is lost self.sock = sock # The connection's socket self.fileno = sock.fileno() # The socket's file descriptor self.address = addr_tup[0] # The client's remote TCP/IP address self.port = addr_tup[1] # The client's remote port self.terminal_type = 'ANSI' # set via request_terminal_type() self.use_ansi = True self.columns = 80 self.rows = 24 self.send_pending = False self.send_buffer = '' self.recv_buffer = '' self.bytes_sent = 0 self.bytes_received = 0 self.cmd_ready = False self.command_list = [] self.connect_time = time.time() self.last_input_time = time.time() ## State variables for interpreting incoming telnet commands self.telnet_got_iac = False # Are we inside an IAC sequence? self.telnet_got_cmd = None # Did we get a telnet command? self.telnet_got_sb = False # Are we inside a subnegotiation? self.telnet_opt_dict = {} # Mapping for up to 256 TelnetOptions self.telnet_echo = False # Echo input back to the client? self.telnet_echo_password = False # Echo back '*' for passwords? self.telnet_sb_buffer = '' # Buffer for sub-negotiations def get_command(self): """ Get a line of text that was received from the client. The class's cmd_ready attribute will be true if lines are available. """ cmd = None count = len(self.command_list) if count > 0: cmd = self.command_list.pop(0) ## If that was the last line, turn off lines_pending if count == 1: self.cmd_ready = False return cmd def send(self, text, wrap = None, terminal = 'ansi'): """ Send raw text to the distant end. """ if text: text = text.replace('\n\r', '\n') # DikuMUD got their line endings backwards text = text.replace('\r\n', '\n') # This is correct for TELNET, but we need to ensure # that "\r\n" doesn't become "\r\n\n" later. if wrap: text = '\n'.join(word_wrap(text, wrap)) # Note self.columns is negotiated if terminal: text = colorize(text, terminal) # Note self.terminal_type is negotiated self.send_buffer += text.replace('\n', '\r\n') self.send_pending = True #def send_cc(self, text): # """ # Send text with caret codes converted to ansi. # """ # self.send(colorize(text, self.use_ansi)) #def send_wrapped(self, text): # """ # Send text padded and wrapped to the user's screen width. # """ # lines = word_wrap(text, self.columns) # for line in lines: # self.send_cc(line + '\n') def deactivate(self): """ Set the client to disconnect on the next server poll. """ self.active = False def addrport(self): """ Return the client's IP address and port number as a string. """ return "{}:{}".format(self.address, self.port) def idle(self): """ Returns the number of seconds that have elasped since the client last sent us some input. """ return time.time() - self.last_input_time def duration(self): """ Returns the number of seconds the client has been connected. """ return time.time() - self.connect_time def request_do_sga(self): """ Request client to Suppress Go-Ahead. See RFC 858. """ self._iac_do(SGA) self._note_reply_pending(SGA, True) def request_will_echo(self): """ Tell the client that we would like to echo their text. See RFC 857. """ self._iac_will(ECHO) self._note_reply_pending(ECHO, True) self.telnet_echo = True def request_wont_echo(self): """ Tell the client that we would like to stop echoing their text. See RFC 857. """ self._iac_wont(ECHO) self._note_reply_pending(ECHO, True) self.telnet_echo = False def password_mode_on(self): """ Tell client we will echo (but don't) so typed passwords don't show. """ self._iac_will(ECHO) self._note_reply_pending(ECHO, True) def password_mode_off(self): """ Tell client we are done echoing (we lied) and show typing again. """ self._iac_wont(ECHO) self._note_reply_pending(ECHO, True) def request_naws(self): """ Request to Negotiate About Window Size. See RFC 1073. """ self._iac_do(NAWS) self._note_reply_pending(NAWS, True) def request_terminal_type(self): """ Begins the Telnet negotiations to request the terminal type from the client. See RFC 779. """ self._iac_do(TTYPE) self._note_reply_pending(TTYPE, True) def socket_send(self): """ Called by TelnetServer when send data is ready. """ if len(self.send_buffer): try: #convert to ansi before sending sent = self.sock.send(bytes(self.send_buffer, "cp1252")) except socket.error as err: logging.error("SEND error '{}' from {}".format(err, self.addrport())) self.active = False return self.bytes_sent += sent self.send_buffer = self.send_buffer[sent:] else: self.send_pending = False def socket_recv(self): """ Called by TelnetServer when recv data is ready. """ try: #Encode recieved bytes in ansi data = str(self.sock.recv(2048), "cp1252") except socket.error as err: logging.error("RECIEVE socket error '{}' from {}".format(err, self.addrport())) raise ConnectionLost() ## Did they close the connection? size = len(data) if size == 0: logging.debug("No data recieved, client closed connection") raise ConnectionLost() ## Update some trackers self.last_input_time = time.time() self.bytes_received += size ## Test for telnet commands for byte in data: self._iac_sniffer(byte) ## Look for newline characters to get whole lines from the buffer while True: mark = self.recv_buffer.find('\n') if mark == -1: break cmd = self.recv_buffer[:mark].strip() self.command_list.append(cmd) self.cmd_ready = True self.recv_buffer = self.recv_buffer[mark + 1:] def _recv_byte(self, byte): """ Non-printable filtering currently disabled because it did not play well with extended character sets. """ ## Filter out non-printing characters #if (byte >= ' ' and byte <= '~') or byte == '\n': if self.telnet_echo: self._echo_byte(byte) self.recv_buffer += byte def _echo_byte(self, byte): """ Echo a character back to the client and convert LF into CR\LF. """ if byte == '\n': self.send_buffer += '\r' if self.telnet_echo_password: self.send_buffer += '*' else: self.send_buffer += byte def _iac_sniffer(self, byte): """ Watches incomming data for Telnet IAC sequences. Passes the data, if any, with the IAC commands stripped to _recv_byte(). """ ## Are we not currently in an IAC sequence coming from the client? if self.telnet_got_iac is False: if byte == IAC: ## Well, we are now self.telnet_got_iac = True return ## Are we currenty in a sub-negotion? elif self.telnet_got_sb is True: ## Sanity check on length if len(self.telnet_sb_buffer) < 64: self.telnet_sb_buffer += byte else: self.telnet_got_sb = False self.telnet_sb_buffer = "" return else: ## Just a normal NVT character self._recv_byte(byte) return ## Byte handling when already in an IAC sequence sent from the client else: ## Did we get sent a second IAC? if byte == IAC and self.telnet_got_sb is True: ## Must be an escaped 255 (IAC + IAC) self.telnet_sb_buffer += byte self.telnet_got_iac = False return ## Do we already have an IAC + CMD? elif self.telnet_got_cmd: ## Yes, so handle the option self._three_byte_cmd(byte) return ## We have IAC but no CMD else: ## Is this the middle byte of a three-byte command? if byte == DO: self.telnet_got_cmd = DO return elif byte == DONT: self.telnet_got_cmd = DONT return elif byte == WILL: self.telnet_got_cmd = WILL return elif byte == WONT: self.telnet_got_cmd = WONT return else: ## Nope, must be a two-byte command self._two_byte_cmd(byte) def _two_byte_cmd(self, cmd): """ Handle incoming Telnet commands that are two bytes long. """ logging.debug("Got two byte cmd '{}'".format(ord(cmd))) if cmd == SB: ## Begin capturing a sub-negotiation string self.telnet_got_sb = True self.telnet_sb_buffer = '' elif cmd == SE: ## Stop capturing a sub-negotiation string self.telnet_got_sb = False self._sb_decoder() elif cmd == NOP: pass elif cmd == DATMK: pass elif cmd == IP: pass elif cmd == AO: pass elif cmd == AYT: pass elif cmd == EC: pass elif cmd == EL: pass elif cmd == GA: pass else: logging.warning("Send an invalid 2 byte command") self.telnet_got_iac = False self.telnet_got_cmd = None def _three_byte_cmd(self, option): """ Handle incoming Telnet commmands that are three bytes long. """ cmd = self.telnet_got_cmd logging.debug("Got three byte cmd {}:{}".format(ord(cmd), ord(option))) ## Incoming DO's and DONT's refer to the status of this end if cmd == DO: if option == BINARY or option == SGA or option == ECHO: if self._check_reply_pending(option): self._note_reply_pending(option, False) self._note_local_option(option, True) elif (self._check_local_option(option) is False or self._check_local_option(option) is UNKNOWN): self._note_local_option(option, True) self._iac_will(option) ## Just nod unless setting echo if option == ECHO: self.telnet_echo = True else: ## All other options = Default to refusing once if self._check_local_option(option) is UNKNOWN: self._note_local_option(option, False) self._iac_wont(option) elif cmd == DONT: if option == BINARY or option == SGA or option == ECHO: if self._check_reply_pending(option): self._note_reply_pending(option, False) self._note_local_option(option, False) elif (self._check_local_option(option) is True or self._check_local_option(option) is UNKNOWN): self._note_local_option(option, False) self._iac_wont(option) ## Just nod unless setting echo if option == ECHO: self.telnet_echo = False else: ## All other options = Default to ignoring pass ## Incoming WILL's and WONT's refer to the status of the client elif cmd == WILL: if option == ECHO: ## Nutjob client offering to echo the server... if self._check_remote_option(ECHO) is UNKNOWN: self._note_remote_option(ECHO, False) # No no, bad client! self._iac_dont(ECHO) elif option == NAWS or option == SGA: if self._check_reply_pending(option): self._note_reply_pending(option, False) self._note_remote_option(option, True) elif (self._check_remote_option(option) is False or self._check_remote_option(option) is UNKNOWN): self._note_remote_option(option, True) self._iac_do(option) ## Client should respond with SB (for NAWS) elif option == TTYPE: if self._check_reply_pending(TTYPE): self._note_reply_pending(TTYPE, False) self._note_remote_option(TTYPE, True) ## Tell them to send their terminal type self.send("{}{}{}{}{}{}".format(IAC, SB, TTYPE, SEND, IAC, SE)) elif (self._check_remote_option(TTYPE) is False or self._check_remote_option(TTYPE) is UNKNOWN): self._note_remote_option(TTYPE, True) self._iac_do(TTYPE) elif cmd == WONT: if option == ECHO: ## Client states it wont echo us -- good, they're not supposes ## to. if self._check_remote_option(ECHO) is UNKNOWN: self._note_remote_option(ECHO, False) self._iac_dont(ECHO) elif option == SGA or option == TTYPE: if self._check_reply_pending(option): self._note_reply_pending(option, False) self._note_remote_option(option, False) elif (self._check_remote_option(option) is True or self._check_remote_option(option) is UNKNOWN): self._note_remote_option(option, False) self._iac_dont(option) ## Should TTYPE be below this? else: ## All other options = Default to ignoring pass else: logging.warning("Send an invalid 3 byte command") self.telnet_got_iac = False self.telnet_got_cmd = None def _sb_decoder(self): """ Figures out what to do with a received sub-negotiation block. """ bloc = self.telnet_sb_buffer if len(bloc) > 2: if bloc[0] == TTYPE and bloc[1] == IS: self.terminal_type = bloc[2:] logging.debug("Terminal type = '{}'".format(self.terminal_type)) if bloc[0] == NAWS: if len(bloc) != 5: logging.warning("Bad length on NAWS SB: " + str(len(bloc))) else: self.columns = (256 * ord(bloc[1])) + ord(bloc[2]) self.rows = (256 * ord(bloc[3])) + ord(bloc[4]) logging.info("Screen is {} x {}".format(self.columns, self.rows)) self.telnet_sb_buffer = '' #---[ State Juggling for Telnet Options ]---------------------------------- ## Sometimes verbiage is tricky. I use 'note' rather than 'set' here ## because (to me) set infers something happened. def _check_local_option(self, option): """Test the status of local negotiated Telnet options.""" if not option in self.telnet_opt_dict: self.telnet_opt_dict[option] = TelnetOption() return self.telnet_opt_dict[option].local_option def _note_local_option(self, option, state): """Record the status of local negotiated Telnet options.""" if not option in self.telnet_opt_dict: self.telnet_opt_dict[option] = TelnetOption() self.telnet_opt_dict[option].local_option = state def _check_remote_option(self, option): """Test the status of remote negotiated Telnet options.""" if not option in self.telnet_opt_dict: self.telnet_opt_dict[option] = TelnetOption() return self.telnet_opt_dict[option].remote_option def _note_remote_option(self, option, state): """Record the status of local negotiated Telnet options.""" if not option in self.telnet_opt_dict: self.telnet_opt_dict[option] = TelnetOption() self.telnet_opt_dict[option].remote_option = state def _check_reply_pending(self, option): """Test the status of requested Telnet options.""" if not option in self.telnet_opt_dict: self.telnet_opt_dict[option] = TelnetOption() return self.telnet_opt_dict[option].reply_pending def _note_reply_pending(self, option, state): """Record the status of requested Telnet options.""" if not option in self.telnet_opt_dict: self.telnet_opt_dict[option] = TelnetOption() self.telnet_opt_dict[option].reply_pending = state #---[ Telnet Command Shortcuts ]------------------------------------------- def _iac_do(self, option): """Send a Telnet IAC "DO" sequence.""" self.send("{}{}{}".format(IAC, DO, option)) def _iac_dont(self, option): """Send a Telnet IAC "DONT" sequence.""" self.send("{}{}{}".format(IAC, DONT, option)) def _iac_will(self, option): """Send a Telnet IAC "WILL" sequence.""" self.send("{}{}{}".format(IAC, WILL, option)) def _iac_wont(self, option): """Send a Telnet IAC "WONT" sequence.""" self.send("{}{}{}".format(IAC, WONT, option))