{
Summary:
Connection manager
## $Id: conns.pas,v 1.22 2004/04/15 17:47:22 druid Exp $
}
unit conns;
interface
uses
ZLib,
{$IFDEF WIN32}
Winsock2,
Windows,
{$ENDIF}
{$IFDEF LINUX}
Libc,
{$ENDIF}
Classes,
SysUtils,
dtypes,
socket;
type
GConnectionEvent = procedure() of object;
GConnection = class(TThread)
protected
_socket : GSocket;
_idle : integer;
input_buf : string;
comm_buf : string;
last_line : string;
sendbuffer : string;
empty_busy : boolean;
compress : boolean; { are we using MCCP v2? }
strm: TZStreamRec;
_lastupdate : TDateTime;
{ Called when GConnection.Execute() starts }
FOnOpen : GConnectionEvent;
{ Called when GConnection.Execute() terminates }
FOnClose : GConnectionEvent;
{ Called for every iteration in GConnection.Execute() }
FOnTick : GConnectionEvent;
{ Called when GConnection has one or more lines of input waiting }
FOnInput : GConnectionEvent;
{ Called when GConnection has sent one or more lines of output }
FOnOutput : GConnectionEvent;
protected
procedure Execute(); override;
procedure sendIAC(option : byte; const params : array of byte);
procedure processIAC();
procedure send(s : PChar; len : integer); overload;
public
procedure send(const s : string); overload;
procedure read();
procedure readBuffer();
procedure emptyBuffer();
procedure writeBuffer(const txt : string; in_command : boolean = false);
constructor Create(socket : GSocket);
destructor Destroy; override;
procedure disableCompression();
procedure enableCompression();
procedure negotiateCompression();
published
property socket : GSocket read _socket;
property idle : integer read _idle;
property last_update : TDateTime read _lastupdate;
property OnOpen : GConnectionEvent read FOnOpen write FOnOpen;
property OnClose : GConnectionEvent read FOnClose write FOnClose;
property OnTick : GConnectionEvent read FOnTick write FOnTick;
property OnInput : GConnectionEvent read FOnInput write FOnInput;
property OnOutput : GConnectionEvent read FOnOutput write FOnOutput;
property useCompress : boolean read compress;
end;
var
connection_list : GDLinkedList;
procedure flushConnections();
procedure terminateAndWaitConnections();
implementation
uses
chars,
constants,
console,
player,
debug;
const
IAC_COMPRESS = 85; // MCCP v1
IAC_COMPRESS2 = 86; // MCCP v2
IAC_SE = 240;
IAC_SB = 250;
IAC_WILL = 251;
IAC_WONT = 252;
IAC_DO = 253;
IAC_DONT = 254;
IAC_IAC = 255;
{ GConnection constructor }
constructor GConnection.Create(socket : GSocket);
begin
inherited Create(true);
FreeOnTerminate := true;
connection_list.add(Self);
_socket := socket;
_idle := 0;
comm_buf := '';
input_buf := '';
last_line := '';
sendbuffer := '';
compress := false;
end;
{ GConnection destructor }
destructor GConnection.Destroy();
begin
connection_list.remove(Self);
_socket.Free();
inherited Destroy();
end;
{ GConnection main loop }
procedure GConnection.Execute();
begin
writeConsole('(' + IntToStr(_socket.getDescriptor) + ') New connection (' + _socket.hostString + ')');
try
negotiateCompression();
read();
if (Assigned(FOnOpen)) then
FOnOpen();
while (not Terminated) do
begin
_lastupdate := Now();
sleep(50);
if (Assigned(FOnTick)) then
FOnTick();
if (not Terminated) then
read();
if (not Terminated) then
readBuffer();
if (Assigned(FOnInput)) and (length(comm_buf) > 0) then
FOnInput();
end;
except
on E : Exception do
begin
Terminate();
reportException(E, 'GConnection.Execute');
end;
end;
if (Assigned(FOnClose)) then
FOnClose();
writeConsole('(' + IntToStr(_socket.getDescriptor) + ') Connection closed');
end;
procedure GConnection.enableCompression();
begin
FillChar(strm, sizeof(strm), 0);
strm.zalloc := zlibAllocMem;
strm.zfree := zlibFreeMem;
deflateInit_(strm, Z_DEFAULT_COMPRESSION, zlib_version, sizeof(strm));
sendIAC(IAC_SB, [IAC_COMPRESS2]);
sendIAC(IAC_SE, []);
compress := true;
end;
procedure GConnection.disableCompression();
var
compress_size : integer;
compress_buf : array[0..4095] of char;
begin
if (compress) then
begin
compress := false;
strm.next_in := nil;
strm.avail_in := 0;
strm.next_out := compress_buf;
strm.avail_out := 4096;
deflate(strm, Z_FINISH);
compress_size := 4096 - strm.avail_out;
_socket.send(compress_buf, compress_size);
deflateEnd(strm);
end;
end;
procedure GConnection.negotiateCompression();
begin
sendIAC(IAC_WILL, [IAC_COMPRESS2]);
end;
procedure GConnection.send(s : PChar; len : integer);
var
compress_size : integer;
compress_buf : array[0..4095] of char;
begin
if (Terminated) then
exit;
try
while (not Terminated) and (not _socket.canWrite()) do;
if (compress) then
begin
strm.next_in := s;
strm.avail_in := len;
strm.next_out := compress_buf;
strm.avail_out := 4096;
deflate(strm, Z_SYNC_FLUSH);
compress_size := 4096 - strm.avail_out;
_socket.send(compress_buf, compress_size);
end
else
_socket.send(s^, len);
except
Terminate();
end;
end;
procedure GConnection.send(const s : string);
begin
send(@s[1], length(s));
end;
procedure GConnection.read();
var
read : integer;
buf : array[0..MAX_RECEIVE-1] of char;
begin
if (length(comm_buf) > 0) then
exit;
try
if (not _socket.canRead()) then
exit;
except
on E : Exception do
begin
Terminate();
exit;
end;
end;
_idle := 0;
while (not Terminated) do
begin
if (not _socket.canRead()) then
break;
read := recv(_socket.getDescriptor, buf, MAX_RECEIVE - 10, 0);
if (read > 0) then
begin
buf[read] := #0;
input_buf := input_buf + buf;
processIAC();
end
else
if (read = 0) then
begin
Terminate();
break;
end
else
if (read = SOCKET_ERROR) then
begin
{$IFDEF WIN32}
if (WSAGetLastError() = WSAEWOULDBLOCK) then
break
else
begin
Terminate();
break;
end;
{$ELSE}
break;
{$ENDIF}
end;
end;
end;
procedure GConnection.sendIAC(option : byte; const params : array of byte);
var
buf : array[0..255] of char;
i : integer;
begin
buf[0] := chr(IAC_IAC);
buf[1] := chr(option);
for i := 0 to length(params) - 1 do
buf[2 + i] := chr(params[i]);
send(buf, 2 + length(params));
end;
procedure GConnection.processIAC();
var
i : integer;
iac : boolean;
new_buf : string;
begin
iac := false;
new_buf := '';
i := 1;
while (i <= length(input_buf)) do
begin
if (iac) then
begin
case byte(input_buf[i]) of
IAC_WILL: begin
inc(i);
//writeConsole('(' + IntToStr(socket.getDescriptor) + ') IAC WILL ' + IntToStr(byte(input_buf[i])));
end;
IAC_WONT: begin
inc(i);
//writeConsole('(' + IntToStr(socket.getDescriptor) + ') IAC WON''T ' + IntToStr(byte(input_buf[i])));
end;
IAC_DO: begin
inc(i);
case byte(input_buf[i]) of
IAC_COMPRESS2: begin
writeConsole('(' + IntToStr(_socket.getDescriptor) + ') Client has MCCPv2');
enableCompression();
end;
end;
//writeConsole('(' + IntToStr(socket.getDescriptor) + ') IAC DO ' + IntToStr(byte(input_buf[i])));
end;
IAC_DONT: begin
inc(i);
case byte(input_buf[i]) of
IAC_COMPRESS2: begin
writeConsole('(' + IntToStr(_socket.getDescriptor) + ') Client has disabled MCCPv2');
disableCompression();
end;
end;
//writeConsole('(' + IntToStr(socket.getDescriptor) + ') IAC DON''T ' + IntToStr(byte(input_buf[i])));
end;
else
//writeConsole('(' + IntToStr(socket.getDescriptor) + ') IAC ' + IntToStr(byte(input_buf[i])));
end;
iac := false;
end
else
if (byte(input_buf[i]) = IAC_IAC) then // IAC
begin
iac := true;
end
else
new_buf := new_buf + input_buf[i];
inc(i);
end;
input_buf := new_buf;
end;
procedure GConnection.readBuffer();
var
i : integer;
begin
if (length(comm_buf) <> 0) or ((pos(#10, input_buf) = 0) and (pos(#13, input_buf) = 0)) then
exit;
i := 1;
while (i <= length(input_buf)) and (input_buf[i] <> #13) and (input_buf[i] <> #10) do
begin
if ((input_buf[i] = #8) or (input_buf[i] = #127)) then
delete(comm_buf, length(comm_buf), 1)
else
//if (byte(input_buf[i]) > 31) and (byte(input_buf[i]) < 127) then
begin
comm_buf := comm_buf + input_buf[i];
end;
inc(i);
end;
while (i <= length(input_buf)) and ((input_buf[i] = #13) or (input_buf[i] = #10)) do
begin
comm_buf := comm_buf + input_buf[i];
inc(i);
end;
if (comm_buf = '!'#13#10) then
comm_buf := last_line
else
last_line := comm_buf;
delete(input_buf, 1, i - 1);
end;
procedure GConnection.emptyBuffer();
begin
if (empty_busy) then
exit;
empty_busy := true;
if (length(sendbuffer) > 0) then
begin
send(sendbuffer);
if (Assigned(FOnOutput)) then
FOnOutput();
sendbuffer := '';
end;
empty_busy := false;
end;
procedure GConnection.writeBuffer(const txt : string; in_command : boolean = false);
begin
if ((length(sendbuffer) + length(txt)) > 2048) then
begin
send(sendbuffer);
sendbuffer := '';
end;
if (not in_command) and (length(sendbuffer) = 0) then
sendbuffer := sendbuffer + #13#10;
sendbuffer := sendbuffer + txt;
end;
procedure flushConnections();
var
conn : GPlayerConnection;
iterator : GIterator;
begin
iterator := connection_list.iterator();
while (iterator.hasNext()) do
begin
conn := GPlayerConnection(iterator.next());
if (conn.isPlaying()) and (not conn.ch.IS_NPC) then
GPlayer(conn.ch).quit
else
conn.Terminate();
end;
iterator.Free();
end;
procedure terminateAndWaitConnections();
var
conn : GPlayerConnection;
iterator : GIterator;
begin
iterator := connection_list.iterator();
while (iterator.hasNext()) do
begin
conn := GPlayerConnection(iterator.next());
conn.Terminate();
end;
iterator.Free();
// Wait for connection_list to clean itself
while (connection_list.size() > 0) do
begin
Sleep(25);
end;
end;
initialization
connection_list := GDLinkedList.Create();
connection_list.ownsObjects := false;
finalization
connection_list.clear();
connection_list.Free();
end.