[Zope-CVS] CVS: Packages/tcpwatch - tcpwatch:1.4
Shane Hathaway
shane@cvs.zope.org
Wed, 20 Feb 2002 18:48:07 -0500
Update of /cvs-repository/Packages/tcpwatch
In directory cvs.zope.org:/tmp/cvs-serv20461
Modified Files:
tcpwatch
Log Message:
Support for running as a real HTTP proxy server. (Untested)
=== Packages/tcpwatch/tcpwatch 1.3 => 1.4 ===
_dests = ()
- def __init__(self):
+ def __init__(self, conn=None):
self._outbuf = []
- asyncore.dispatcher.__init__(self)
+ asyncore.dispatcher.__init__(self, conn)
def set_dests(self, dests):
"""Sets the destination streams.
@@ -119,11 +119,14 @@
for d in self._dests:
d.write('') # A blank string means the socket just connected.
+ def received(self, data):
+ for d in self._dests:
+ d.write(data)
+
def handle_read(self):
data = self.recv(RECV_BUFFER_SIZE)
if data:
- for d in self._dests:
- d.write(data)
+ self.received(data)
def handle_write(self):
if not self.connected:
@@ -180,7 +183,7 @@
transaction = 1
- def __init__(self, connection_number, client_addr, server_addr):
+ def __init__(self, connection_number, client_addr, server_addr=None):
self.opened = time()
self.connection_number = connection_number
self.client_addr = client_addr
@@ -212,7 +215,7 @@
if info:
# Got a connection.
conn, addr = info
- conn.setblocking (0)
+ conn.setblocking(0)
ep1 = ForwardingEndpoint() # connects client to self
ep2 = ForwardingEndpoint() # connects self to server
@@ -299,9 +302,15 @@
self.flush()
def connection_from(self, fci):
- self._output_message(
- '%s:%s forwarded to %s:%s' %
- (tuple(fci.client_addr) + tuple(fci.server_addr)), 1)
+ if fci.server_addr is not None:
+ self._output_message(
+ '%s:%s forwarded to %s:%s' %
+ (tuple(fci.client_addr) + tuple(fci.server_addr)), 1)
+ else:
+ self._output_message(
+ 'connection from %s:%s' %
+ (tuple(fci.client_addr)), 1)
+
if fci.transaction > 1:
self._output_message(
('HTTP transaction #%d' % fci.transaction), 1)
@@ -885,6 +894,178 @@
self.resp_index = new_index
+#############################################################################
+#
+# HTTP proxy
+#
+#############################################################################
+
+
+class HTTPProxyResponseBuffer:
+ """Ensures that responses on a persistent HTTP connection occur
+ in the correct order."""
+
+ finished = 0
+
+ def __init__(self, proxy_conn, observer_stream=None):
+ self.response_parser = HTTPStreamParser(0)
+ self.proxy_conn = proxy_conn
+ self.observer_stream = observer_stream
+ self.held = []
+
+ def _isMyTurn(self):
+ bufs = self.proxy_conn._response_buffers
+ if bufs:
+ return (bufs[0] is self)
+ return 1
+
+ def write(self, data):
+ # Received data from the server.
+ parser = self.response_parser
+ if not parser.completed:
+ parser.received(data)
+ if parser.completed:
+ self.finished = 1
+ self.flush()
+ return
+ observer_stream = self.observer_stream
+ if observer_stream is not None:
+ observer_stream.write(data)
+ if not self._isMyTurn():
+ # Wait for earlier responses to finish.
+ self.held.append(data)
+ return
+ self.flush()
+ self.proxy_conn.write(data)
+
+ def flush(self):
+ if self.held and self._isMyTurn():
+ data = ''.join(self.held) + data
+ del self.held[:]
+ self.proxy_conn.write(data)
+ if self.finished:
+ bufs = self.proxy_conn._response_buffers
+ if bufs and bufs[0] is self:
+ del bufs[0]
+ if bufs:
+ bufs[0].flush() # kick.
+
+ def close(self):
+ observer_stream = self.observer_stream
+ if observer_stream is not None:
+ observer_stream.close()
+ self.finished = 1
+ self.flush()
+
+
+
+class HTTPProxyConnection (ForwardingEndpoint):
+
+ _req_parser = None
+ _obs = None
+ _transaction = 0
+
+ def __init__(self, conn, factory, counter, addr):
+ asyncore.dispatcher.__init__(self, conn)
+ self._obs_factory = factory
+ self._counter = counter
+ self._client_addr = addr
+ self._response_buffers = []
+ self._newRequest()
+
+ def _newRequest(self):
+ if self._req_parser is None:
+ self._req_parser = HTTPStreamParser(1)
+ factory = self._obs_factory
+ if factory is not None:
+ fci = ForwardedConnectionInfo(self._counter, self._client_addr)
+ obs = factory(fci)
+ self._transaction = self._transaction + 1
+ obs.transaction = self._transaction
+ self._obs = obs
+
+ def received(self, data):
+ while data:
+ parser = self._req_parser
+ if parser is None:
+ # Begin another request.
+ self._newRequest()
+ parser = self._req_parser
+ obs = self._obs
+ if not parser.completed:
+ # Not yet connected to a server.
+ consumed = parser.received(data)
+ if obs is not None:
+ obs.received(data[:consumed], 1)
+ data = data[:consumed]
+ if parser.completed:
+ # Connected to a server.
+ self.openProxyConnection(parser)
+ # Expect a new request or a closed connection.
+ self._obs = None
+ self._req_parser = None
+
+ def openProxyConnection(self, request):
+ url = request.first_line.strip().split(' ', 1)[-1]
+ pos = url.rfind(' HTTP/')
+ if pos >= 0:
+ url = url[:pos].rstrip()
+ if url.startswith('http://'):
+ host = url[7:].split('/', 1)[0]
+ else:
+ host = request.headers.get('HOST')
+ if not host:
+ raise NotImplementedError, 'Not a full HTTP proxy'
+
+ if ':' in host:
+ host, port = host.split(':')
+ port = int(port)
+ else:
+ port = 80
+
+ obs = self._obs
+
+ if obs is not None:
+ eo = EndpointObserver(obs, 0)
+ buf = HTTPProxyResponseBuffer(self, eo)
+ else:
+ buf = HTTPProxyResponseBuffer(self)
+ self._response_buffers.append(buf)
+
+ ep = ForwardingEndpoint() # connects server to buf (to self)
+ ep.set_dests((buf,))
+ ep.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ ep.connect((host, port))
+
+
+
+class HTTPProxyService (asyncore.dispatcher):
+
+ _counter = 0
+
+ def __init__(self, listen_host, listen_port, observer_factory=None):
+ self._obs_factory = observer_factory
+ asyncore.dispatcher.__init__(self)
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.set_reuse_addr()
+ self.bind((listen_host, listen_port))
+ self.listen(5)
+
+ def handle_accept(self):
+ info = self.accept()
+ if info:
+ # Got a connection.
+ conn, addr = info
+ conn.setblocking(0)
+ counter = self._counter + 1
+ self._counter = counter
+ HTTPProxyConnection(conn, self._obs_factory, counter, addr)
+
+ def handle_error(self):
+ # Don't stop the server.
+ import traceback
+ traceback.print_exc()
+
#############################################################################
#
@@ -904,6 +1085,7 @@
Set up a forwarded connection to a specified host, bound to an interface
-h (or --http) Parse as HTTP, splitting up multi-transaction connections
+ -p [<listen_host>:]<listen_port> Run an HTTP proxy (implies -h)
-s Output to stdout instead of a Tkinter window
-n No color in GUI (consumes less memory)
"""