[Zodb-checkins] SVN: ZODB/trunk/src/ZEO/zrpc/trigger.py Merge rev
30877 from 3.4 branch.
Tim Peters
tim.one at comcast.net
Tue Jun 21 17:44:22 EDT 2005
Log message for revision 30878:
Merge rev 30877 from 3.4 branch.
Massive refactoring, to move the bulk of the trigger code into
an OS-indepedent base class.
__repr__: Use the postive_id function to embed the machine address.
Addresses with the high bit set trigger warnings before Python 2.4,
and come out as negative numbers in 2.4+.
Windows trigger.__init__: Don't make 50 guesses at a port number
to use, let Windows pick an available port for us. Also documented
the baffling single-thread socket setup dance, which took an hour to
reverse-engineer (in large part because it used a bare "except" w/ no
clue as to why).
Changed:
U ZODB/trunk/src/ZEO/zrpc/trigger.py
-=-
Modified: ZODB/trunk/src/ZEO/zrpc/trigger.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/trigger.py 2005-06-21 21:42:54 UTC (rev 30877)
+++ ZODB/trunk/src/ZEO/zrpc/trigger.py 2005-06-21 21:44:22 UTC (rev 30878)
@@ -17,200 +17,181 @@
import socket
import thread
-if os.name == 'posix':
+from ZODB.utils import positive_id
- class trigger(asyncore.file_dispatcher):
+# Original comments follow; they're hard to follow in the context of
+# ZEO's use of triggers. TODO: rewrite from a ZEO perspective.
- "Wake up a call to select() running in the main thread"
+# Wake up a call to select() running in the main thread.
+#
+# This is useful in a context where you are using Medusa's I/O
+# subsystem to deliver data, but the data is generated by another
+# thread. Normally, if Medusa is in the middle of a call to
+# select(), new output data generated by another thread will have
+# to sit until the call to select() either times out or returns.
+# If the trigger is 'pulled' by another thread, it should immediately
+# generate a READ event on the trigger object, which will force the
+# select() invocation to return.
+#
+# A common use for this facility: letting Medusa manage I/O for a
+# large number of connections; but routing each request through a
+# thread chosen from a fixed-size thread pool. When a thread is
+# acquired, a transaction is performed, but output data is
+# accumulated into buffers that will be emptied more efficiently
+# by Medusa. [picture a server that can process database queries
+# rapidly, but doesn't want to tie up threads waiting to send data
+# to low-bandwidth connections]
+#
+# The other major feature provided by this class is the ability to
+# move work back into the main thread: if you call pull_trigger()
+# with a thunk argument, when select() wakes up and receives the
+# event it will call your thunk from within that thread. The main
+# purpose of this is to remove the need to wrap thread locks around
+# Medusa's data structures, which normally do not need them. [To see
+# why this is true, imagine this scenario: A thread tries to push some
+# new data onto a channel's outgoing data queue at the same time that
+# the main thread is trying to remove some]
- # This is useful in a context where you are using Medusa's I/O
- # subsystem to deliver data, but the data is generated by another
- # thread. Normally, if Medusa is in the middle of a call to
- # select(), new output data generated by another thread will have
- # to sit until the call to select() either times out or returns.
- # If the trigger is 'pulled' by another thread, it should immediately
- # generate a READ event on the trigger object, which will force the
- # select() invocation to return.
+class _triggerbase:
+ """OS-independent base class for OS-dependent trigger class."""
- # A common use for this facility: letting Medusa manage I/O for a
- # large number of connections; but routing each request through a
- # thread chosen from a fixed-size thread pool. When a thread is
- # acquired, a transaction is performed, but output data is
- # accumulated into buffers that will be emptied more efficiently
- # by Medusa. [picture a server that can process database queries
- # rapidly, but doesn't want to tie up threads waiting to send data
- # to low-bandwidth connections]
+ kind = None # subclass must set to "pipe" or "loopback"; used by repr
- # The other major feature provided by this class is the ability to
- # move work back into the main thread: if you call pull_trigger()
- # with a thunk argument, when select() wakes up and receives the
- # event it will call your thunk from within that thread. The main
- # purpose of this is to remove the need to wrap thread locks around
- # Medusa's data structures, which normally do not need them. [To see
- # why this is true, imagine this scenario: A thread tries to push some
- # new data onto a channel's outgoing data queue at the same time that
- # the main thread is trying to remove some]
+ def __init__(self):
+ self._closed = False
- def __init__(self):
- r, w = self._fds = os.pipe()
- self.trigger = w
- asyncore.file_dispatcher.__init__(self, r)
- self.lock = thread.allocate_lock()
- self.thunks = []
- self._closed = 0
+ # `lock` protects the `thunks` list from being traversed and
+ # appended to simultaneously.
+ self.lock = thread.allocate_lock()
- # Override the asyncore close() method, because it seems that
- # it would only close the r file descriptor and not w. The
- # constructor calls file_dispatcher.__init__ and passes r,
- # which would get stored in a file_wrapper and get closed by
- # the default close. But that would leave w open...
+ # List of no-argument callbacks to invoke when the trigger is
+ # pulled. These run in the thread running the asyncore mainloop,
+ # regardless of which thread pulls the trigger.
+ self.thunks = []
- def close(self):
- if not self._closed:
- self._closed = 1
- self.del_channel()
- for fd in self._fds:
- os.close(fd)
- self._fds = []
+ def readable(self):
+ return 1
- def __repr__(self):
- return '<select-trigger (pipe) at %x>' % id(self)
+ def writable(self):
+ return 0
- def readable(self):
- return 1
+ def handle_connect(self):
+ pass
- def writable(self):
- return 0
+ def handle_close(self):
+ self.close()
- def handle_connect(self):
- pass
+ # Override the asyncore close() method, because it doesn't know about
+ # (so can't close) all the gimmicks we have open. Subclass must
+ # supply a _close() method to do platform-specific closing work. _close()
+ # will be called iff we're not already closed.
+ def close(self):
+ if not self._closed:
+ self._closed = True
+ self.del_channel()
+ self._close() # subclass does OS-specific stuff
- def handle_close(self):
- self.close()
+ def _close(self): # see close() above; subclass must supply
+ raise NotImplementedError
- def pull_trigger(self, thunk=None):
- if thunk:
- self.lock.acquire()
- try:
- self.thunks.append(thunk)
- finally:
- self.lock.release()
- os.write(self.trigger, 'x')
-
- def handle_read(self):
- try:
- self.recv(8192)
- except socket.error:
- return
+ def pull_trigger(self, thunk=None):
+ if thunk:
self.lock.acquire()
try:
- for thunk in self.thunks:
- try:
- thunk()
- except:
- nil, t, v, tbinfo = asyncore.compact_traceback()
- print ('exception in trigger thunk:'
- ' (%s:%s %s)' % (t, v, tbinfo))
- self.thunks = []
+ self.thunks.append(thunk)
finally:
self.lock.release()
+ self._physical_pull()
-else:
+ # Subclass must supply _physical_pull, which does whatever the OS
+ # needs to do to provoke the "write" end of the trigger.
+ def _physical_pull(self):
+ raise NotImplementedError
- # TODO: Should define a base class that has the common methods and
- # then put the platform-specific in a subclass named trigger.
+ def handle_read(self):
+ try:
+ self.recv(8192)
+ except socket.error:
+ return
+ self.lock.acquire()
+ try:
+ for thunk in self.thunks:
+ try:
+ thunk()
+ except:
+ nil, t, v, tbinfo = asyncore.compact_traceback()
+ print ('exception in trigger thunk:'
+ ' (%s:%s %s)' % (t, v, tbinfo))
+ self.thunks = []
+ finally:
+ self.lock.release()
- # win32-safe version
+ def __repr__(self):
+ return '<select-trigger (%s) at %x>' % (self.kind, positive_id(self))
- HOST = '127.0.0.1'
- MINPORT = 19950
- NPORTS = 50
+if os.name == 'posix':
- class trigger(asyncore.dispatcher):
+ class trigger(_triggerbase, asyncore.file_dispatcher):
+ kind = "pipe"
- portoffset = 0
-
def __init__(self):
- a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ _triggerbase.__init__(self)
+ r, self.trigger = self._fds = os.pipe()
+ asyncore.file_dispatcher.__init__(self, r)
+ def _close(self):
+ for fd in self._fds:
+ os.close(fd)
+ self._fds = []
+
+ def _physical_pull(self):
+ os.write(self.trigger, 'x')
+
+else:
+ # Windows version; uses just sockets, because a pipe isn't select'able
+ # on Windows.
+
+ class trigger(_triggerbase, asyncore.dispatcher):
+ kind = "loopback"
+
+ def __init__(self):
+ _triggerbase.__init__(self)
+ # Get a pair of connected sockets. The trigger is the 'w'
+ # end of the pair, which is connected to 'r'. 'r' is put
+ # in the asyncore socket map. "pulling the trigger" then
+ # means writing something on w, which will wake up r.
+ a = socket.socket() # temporary, to set up the connection
+ w = socket.socket()
+ self.trigger = w
# set TCP_NODELAY to true to avoid buffering
w.setsockopt(socket.IPPROTO_TCP, 1, 1)
- # tricky: get a pair of connected sockets
- for i in range(NPORTS):
- trigger.portoffset = (trigger.portoffset + 1) % NPORTS
- port = MINPORT + trigger.portoffset
- address = (HOST, port)
- try:
- a.bind(address)
- except socket.error:
- continue
- else:
- break
- else:
- raise RuntimeError, 'Cannot bind trigger!'
-
+ # Specifying port 0 tells Windows to pick a port for us.
+ a.bind(("127.0.0.1", 0))
+ connect_address = a.getsockname() # actual (host, port) pair
a.listen(1)
+
+ # Before connecting, set w non-blocking, because the connect can't
+ # succeed before we call a.accept() -- while a.accept() can't
+ # succeed before we try to connect. Maybe it would be clearer
+ # to spin off a thread to do this, but that's much more expensive
+ # than this hack.
w.setblocking(0)
try:
- w.connect(address)
- except:
+ w.connect(connect_address)
+ except socket.error:
+ # Expected exception, since a.accept() hasn't been called
+ # yet.
pass
- r, addr = a.accept()
- a.close()
w.setblocking(1)
- self.trigger = w
-
+ r, addr = a.accept() # r becomes asyncore's socket
+ a.close()
asyncore.dispatcher.__init__(self, r)
- self.lock = thread.allocate_lock()
- self.thunks = []
- self._trigger_connected = 0
- self._closed = 0
- def close(self):
- if not self._closed:
- self._closed = 1
- self.del_channel()
- # self.socket is a, self.trigger is w from __init__
- self.socket.close()
- self.trigger.close()
+ def _close(self):
+ # self.socket is r, self.trigger is w from __init__
+ self.socket.close()
+ self.trigger.close()
- def __repr__(self):
- return '<select-trigger (loopback) at %x>' % id(self)
-
- def readable(self):
- return 1
-
- def writable(self):
- return 0
-
- def handle_connect(self):
- pass
-
- def pull_trigger(self, thunk=None):
- if thunk:
- self.lock.acquire()
- try:
- self.thunks.append(thunk)
- finally:
- self.lock.release()
+ def _physical_pull(self):
self.trigger.send('x')
-
- def handle_read(self):
- try:
- self.recv(8192)
- except socket.error:
- return
- self.lock.acquire()
- try:
- for thunk in self.thunks:
- try:
- thunk()
- except:
- nil, t, v, tbinfo = asyncore.compact_traceback()
- print ('exception in trigger thunk:'
- ' (%s:%s %s)' % (t, v, tbinfo))
- self.thunks = []
- finally:
- self.lock.release()
More information about the Zodb-checkins
mailing list