[Zodb-checkins] CVS: ZODB4/ZEO/zrpc - smac.py:1.1 __init__.py:1.3 client.py:1.6 connection.py:1.7 error.py:1.3 log.py:1.3 marshal.py:1.3 server.py:1.5 trigger.py:1.3 NOTES:NONE
Jeremy Hylton
jeremy@zope.com
Fri, 22 Nov 2002 16:24:54 -0500
Update of /cvs-repository/ZODB4/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv7160/ZEO/zrpc
Modified Files:
__init__.py client.py connection.py error.py log.py marshal.py
server.py trigger.py
Added Files:
smac.py
Removed Files:
NOTES
Log Message:
Merge ZEO2 into ZODB4.
=== Added File ZODB4/ZEO/zrpc/smac.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Sized Message Async Connections."""
import asyncore, struct
import threading
from ZEO.Exceptions import Disconnected
import zLOG
from types import StringType
from ZEO.zrpc.log import log, short_repr
import socket, errno
# Use the dictionary to make sure we get the minimum number of errno
# entries. We expect that EWOULDBLOCK == EAGAIN on most systems --
# or that only one is actually used.
tmp_dict = {errno.EWOULDBLOCK: 0,
errno.EAGAIN: 0,
errno.EINTR: 0,
}
expected_socket_read_errors = tuple(tmp_dict.keys())
tmp_dict = {errno.EAGAIN: 0,
errno.EWOULDBLOCK: 0,
errno.ENOBUFS: 0,
errno.EINTR: 0,
}
expected_socket_write_errors = tuple(tmp_dict.keys())
del tmp_dict
# We chose 60000 as the socket limit by looking at the largest strings
# that we could pass to send() without blocking.
SEND_SIZE = 60000
class SizedMessageAsyncConnection(asyncore.dispatcher):
__super_init = asyncore.dispatcher.__init__
__super_close = asyncore.dispatcher.close
__closed = 1 # Marker indicating that we're closed
socket = None # to outwit Sam's getattr
def __init__(self, sock, addr, map=None, debug=None):
self.addr = addr
if debug is not None:
self._debug = debug
elif not hasattr(self, '_debug'):
self._debug = __debug__
# __input_lock protects __inp, __input_len, __state, __msg_size
self.__input_lock = threading.Lock()
self.__inp = None # None, a single String, or a list
self.__input_len = 0
# Instance variables __state and __msg_size work together:
# when __state == 0:
# __msg_size == 4, and the next thing read is a message size;
# when __state == 1:
# __msg_size is variable, and the next thing read is a message.
# The next thing read is always of length __msg_size.
# The state alternates between 0 and 1.
self.__state = 0
self.__msg_size = 4
self.__output_lock = threading.Lock() # Protects __output
self.__output = []
self.__closed = 0
self.__super_init(sock, map)
def get_addr(self):
return self.addr
# XXX avoid expensive getattr calls? Can't remember exactly what
# this comment was supposed to mean, but it has something to do
# with the way asyncore uses getattr and uses if sock:
def __nonzero__(self):
return 1
def handle_read(self):
self.__input_lock.acquire()
try:
# Use a single __inp buffer and integer indexes to make this fast.
try:
d = self.recv(8192)
except socket.error, err:
if err[0] in expected_socket_read_errors:
return
raise
if not d:
return
input_len = self.__input_len + len(d)
msg_size = self.__msg_size
state = self.__state
inp = self.__inp
if msg_size > input_len:
if inp is None:
self.__inp = d
elif type(self.__inp) is StringType:
self.__inp = [self.__inp, d]
else:
self.__inp.append(d)
self.__input_len = input_len
return # keep waiting for more input
# load all previous input and d into single string inp
if isinstance(inp, StringType):
inp = inp + d
elif inp is None:
inp = d
else:
inp.append(d)
inp = "".join(inp)
offset = 0
while (offset + msg_size) <= input_len:
msg = inp[offset:offset + msg_size]
offset = offset + msg_size
if not state:
# waiting for message
msg_size = struct.unpack(">i", msg)[0]
state = 1
else:
msg_size = 4
state = 0
# XXX We call message_input() with __input_lock
# held!!! And message_input() may end up calling
# message_output(), which has its own lock. But
# message_output() cannot call message_input(), so
# the locking order is always consistent, which
# prevents deadlock. Also, message_input() may
# take a long time, because it can cause an
# incoming call to be handled. During all this
# time, the __input_lock is held. That's a good
# thing, because it serializes incoming calls.
self.message_input(msg)
self.__state = state
self.__msg_size = msg_size
self.__inp = inp[offset:]
self.__input_len = input_len - offset
finally:
self.__input_lock.release()
def readable(self):
return 1
def writable(self):
if len(self.__output) == 0:
return 0
else:
return 1
def handle_write(self):
self.__output_lock.acquire()
try:
output = self.__output
while output:
# Accumulate output into a single string so that we avoid
# multiple send() calls, but avoid accumulating too much
# data. If we send a very small string and have more data
# to send, we will likely incur delays caused by the
# unfortunate interaction between the Nagle algorithm and
# delayed acks. If we send a very large string, only a
# portion of it will actually be delivered at a time.
l = 0
for i in range(len(output)):
l += len(output[i])
if l > SEND_SIZE:
break
i += 1
# It is very unlikely that i will be 1.
v = "".join(output[:i])
del output[:i]
try:
n = self.send(v)
except socket.error, err:
if err[0] in expected_socket_write_errors:
break # we couldn't write anything
raise
if n < len(v):
output.insert(0, v[n:])
break # we can't write any more
finally:
self.__output_lock.release()
def handle_close(self):
self.close()
def message_output(self, message):
if __debug__:
if self._debug:
log('message_output %d bytes: %s' %
(len(message), short_repr(message)),
level=zLOG.TRACE)
if self.__closed:
raise Disconnected, (
"This action is temporarily unavailable."
"<p>"
)
self.__output_lock.acquire()
try:
# do two separate appends to avoid copying the message string
self.__output.append(struct.pack(">i", len(message)))
if len(message) <= SEND_SIZE:
self.__output.append(message)
else:
for i in range(0, len(message), SEND_SIZE):
self.__output.append(message[i:i+SEND_SIZE])
finally:
self.__output_lock.release()
def close(self):
if not self.__closed:
self.__closed = 1
self.__super_close()
=== ZODB4/ZEO/zrpc/__init__.py 1.2 => 1.3 ===
--- ZODB4/ZEO/zrpc/__init__.py:1.2 Tue Jun 11 15:22:26 2002
+++ ZODB4/ZEO/zrpc/__init__.py Fri Nov 22 16:24:53 2002
@@ -2,19 +2,23 @@
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
-#
+#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
-#
+#
##############################################################################
# zrpc is a package with the following modules
+# client -- manages connection creation to remote server
+# connection -- object dispatcher
+# log -- logging helper
# error -- exceptions raised by zrpc
# marshal -- internal, handles basic protocol issues
-# connection -- object dispatcher
-# client -- manages connection creation to remote server
# server -- manages incoming connections from remote clients
+# smac -- sized message async connections
# trigger -- medusa's trigger
+
+# zrpc is not an advertised subpackage of ZEO; its interfaces are internal
=== ZODB4/ZEO/zrpc/client.py 1.5 => 1.6 === (568/668 lines abridged)
--- ZODB4/ZEO/zrpc/client.py:1.5 Tue Aug 6 19:09:20 2002
+++ ZODB4/ZEO/zrpc/client.py Fri Nov 22 16:24:53 2002
@@ -2,14 +2,14 @@
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
-#
+#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
-#
+#
##############################################################################
import errno
import select
@@ -22,6 +22,8 @@
import ThreadedAsync
import zLOG
+from ZODB.POSException import ReadOnlyError
+
from ZEO.zrpc.log import log
from ZEO.zrpc.trigger import trigger
from ZEO.zrpc.connection import ManagedConnection
@@ -29,43 +31,44 @@
class ConnectionManager:
"""Keeps a connection up over time"""
- def __init__(self, addr, client, tmin=1, tmax=180):
- self.set_addr(addr)
+ def __init__(self, addrs, client, tmin=1, tmax=180):
+ self.addrlist = self._parse_addrs(addrs)
self.client = client
self.tmin = tmin
self.tmax = tmax
- self.connected = 0
- self.connection = None
+ self.cond = threading.Condition(threading.Lock())
+ self.connection = None # Protected by self.cond
self.closed = 0
- # If _thread is not None, then there is a helper thread
- # attempting to connect. _thread is protected by _connect_lock.
- self._thread = None
- self._connect_lock = threading.Lock()
+ # If thread is not None, then there is a helper thread
+ # attempting to connect.
[-=- -=- -=- 568 lines omitted -=- -=- -=-]
+ return
except:
- log("error connecting to server: %s" % str(addr),
+ log("CW: error in testConnection (%s)" % repr(self.addr),
level=zLOG.ERROR, error=sys.exc_info())
- c.close()
+ self.close()
+ return
+ if self.preferred:
+ self.notify_client()
+
+ def notify_client(self):
+ """Call the client's notifyConnected().
+
+ If this succeeds, call the manager's connect_done().
+
+ If the client is already connected, we assume it's a fallback
+ connection, and the new connection must be a preferred
+ connection. The client will close the old connection.
+ """
+ try:
+ self.client.notifyConnected(self.conn)
+ except:
+ log("CW: error in notifyConnected (%s)" % repr(self.addr),
+ level=zLOG.ERROR, error=sys.exc_info())
+ self.close()
+ return
+ self.state = "notified"
+ self.mgr.connect_done(self.conn, self.preferred)
+
+ def close(self):
+ """Close the socket and reset everything."""
+ self.state = "closed"
+ self.mgr = self.client = None
+ self.preferred = 0
+ if self.conn is not None:
# Closing the ZRPC connection will eventually close the
# socket, somewhere in asyncore.
- return 0
- self.mgr.connect_done(c)
- return 1
+ # XXX Why do we care? --Guido
+ self.conn.close()
+ self.conn = None
+ if self.sock is not None:
+ self.sock.close()
+ self.sock = None
+
+ def fileno(self):
+ return self.sock.fileno()
=== ZODB4/ZEO/zrpc/connection.py 1.6 => 1.7 === (459/559 lines abridged)
--- ZODB4/ZEO/zrpc/connection.py:1.6 Tue Aug 6 19:10:31 2002
+++ ZODB4/ZEO/zrpc/connection.py Fri Nov 22 16:24:53 2002
@@ -2,23 +2,25 @@
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
-#
+#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
-#
+#
##############################################################################
import asyncore
+import errno
+import select
import sys
import threading
import types
import ThreadedAsync
-from ZEO import smac # XXX put smac in zrpc?
-from ZEO.zrpc.error import ZRPCError, DisconnectedError, DecodingError
+from ZEO.zrpc import smac
+from ZEO.zrpc.error import ZRPCError, DisconnectedError
from ZEO.zrpc.log import log, short_repr
from ZEO.zrpc.marshal import Marshaller
from ZEO.zrpc.trigger import trigger
@@ -36,49 +38,82 @@
the mainloop from sending a response.
"""
- def set_sender(self, msgid, send_reply):
+ def set_sender(self, msgid, send_reply, return_error):
self.msgid = msgid
self.send_reply = send_reply
+ self.return_error = return_error
def reply(self, obj):
self.send_reply(self.msgid, obj)
+ def error(self, exc_info):
+ log("Error raised in delayed method", zLOG.ERROR, error=exc_info)
+ self.return_error(self.msgid, 0, *exc_info[:2])
+
class MTDelay(Delay):
[-=- -=- -=- 459 lines omitted -=- -=- -=-]
self.__super_close()
- self.__mgr.close(self)
class ManagedConnection(Connection):
- """A connection that notifies its ConnectionManager of closing.
-
- A managed connection also defers the ThreadedAsync work to its
- manager.
- """
+ """Client-side Connection subclass."""
__super_init = Connection.__init__
__super_close = Connection.close
def __init__(self, sock, addr, obj, mgr):
- self.__mgr = mgr
+ self.mgr = mgr
self.__super_init(sock, addr, obj)
self.check_mgr_async()
+ # Defer the ThreadedAsync work to the manager.
+
def close_trigger(self):
# the manager should actually close the trigger
del self.trigger
@@ -387,19 +479,20 @@
pass
def check_mgr_async(self):
- if not self.thr_async and self.__mgr.thr_async:
- assert self.__mgr.trigger is not None, \
- "manager (%s) has no trigger" % self.__mgr
+ if not self.thr_async and self.mgr.thr_async:
+ assert self.mgr.trigger is not None, \
+ "manager (%s) has no trigger" % self.mgr
self.thr_async = 1
- self.trigger = self.__mgr.trigger
+ self.trigger = self.mgr.trigger
return 1
return 0
def is_async(self):
+ # XXX could the check_mgr_async() be avoided on each test?
if self.thr_async:
return 1
return self.check_mgr_async()
def close(self):
+ self.mgr.close_conn(self)
self.__super_close()
- self.__mgr.notify_closed(self)
=== ZODB4/ZEO/zrpc/error.py 1.2 => 1.3 ===
--- ZODB4/ZEO/zrpc/error.py:1.2 Tue Jun 11 15:22:26 2002
+++ ZODB4/ZEO/zrpc/error.py Fri Nov 22 16:24:53 2002
@@ -2,23 +2,20 @@
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
-#
+#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
-#
+#
##############################################################################
from ZODB import POSException
from ZEO.Exceptions import Disconnected
class ZRPCError(POSException.StorageError):
pass
-
-class DecodingError(ZRPCError):
- """A ZRPC message could not be decoded."""
class DisconnectedError(ZRPCError, Disconnected):
"""The database storage is disconnected from the storage server."""
=== ZODB4/ZEO/zrpc/log.py 1.2 => 1.3 ===
--- ZODB4/ZEO/zrpc/log.py:1.2 Tue Jun 11 15:22:26 2002
+++ ZODB4/ZEO/zrpc/log.py Fri Nov 22 16:24:53 2002
@@ -2,18 +2,21 @@
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
-#
+#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
-#
+#
##############################################################################
import os
import types
import zLOG
+import threading
+
+LOG_THREAD_ID = 0 # Set this to 1 during heavy debugging
_label = "zrpc:%s" % os.getpid()
@@ -22,18 +25,34 @@
_label = "zrpc:%s" % os.getpid()
def log(message, level=zLOG.BLATHER, label=None, error=None):
- zLOG.LOG(label or _label, level, message, error=error)
+ label = label or _label
+ if LOG_THREAD_ID:
+ label = "%s:%s" % (label, threading.currentThread().getName())
+ zLOG.LOG(label, level, message, error=error)
REPR_LIMIT = 40
def short_repr(obj):
"Return an object repr limited to REPR_LIMIT bytes."
+
# Some of the objects being repr'd are large strings. It's wastes
# a lot of memory to repr them and then truncate, so special case
# them in this function.
# Also handle short repr of a tuple containing a long string.
+
+ # This strategy works well for arguments to StorageServer methods.
+ # The oid is usually first and will get included in its entirety.
+ # The pickle is near the beginning, too, and you can often fit the
+ # module name in the pickle.
+
if isinstance(obj, types.StringType):
- obj = obj[:REPR_LIMIT]
+ if len(obj) > REPR_LIMIT:
+ r = repr(obj[:REPR_LIMIT])
+ else:
+ r = repr(obj)
+ if len(r) > REPR_LIMIT:
+ r = r[:REPR_LIMIT-4] + '...' + r[-1]
+ return r
elif isinstance(obj, types.TupleType):
elts = []
size = 0
@@ -43,5 +62,10 @@
size += len(r)
if size > REPR_LIMIT:
break
- obj = tuple(elts)
- return repr(obj)[:REPR_LIMIT]
+ r = "(%s)" % (", ".join(elts))
+ else:
+ r = repr(obj)
+ if len(r) > REPR_LIMIT:
+ return r[:REPR_LIMIT] + '...'
+ else:
+ return r
=== ZODB4/ZEO/zrpc/marshal.py 1.2 => 1.3 ===
--- ZODB4/ZEO/zrpc/marshal.py:1.2 Tue Jun 11 15:22:26 2002
+++ ZODB4/ZEO/zrpc/marshal.py Fri Nov 22 16:24:53 2002
@@ -2,42 +2,33 @@
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
-#
+#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
-#
+#
##############################################################################
import cPickle
from cStringIO import StringIO
-import struct
import types
+import zLOG
+
from ZEO.zrpc.error import ZRPCError
+from ZEO.zrpc.log import log, short_repr
class Marshaller:
"""Marshal requests and replies to second across network"""
- # It's okay to share a single Pickler as long as it's in fast
- # mode, which means that it doesn't have a memo.
-
- pickler = cPickle.Pickler()
- pickler.fast = 1
- pickle = pickler.dump
-
- errors = (cPickle.UnpickleableError,
- cPickle.UnpicklingError,
- cPickle.PickleError,
- cPickle.PicklingError)
-
- VERSION = 1
-
def encode(self, msgid, flags, name, args):
"""Returns an encoded message"""
- return self.pickle((msgid, flags, name, args), 1)
+ # (We used to have a global pickler, but that's not thread-safe. :-( )
+ pickler = cPickle.Pickler()
+ pickler.fast = 1
+ return pickler.dump((msgid, flags, name, args), 1)
def decode(self, msg):
"""Decodes msg and returns its parts"""
@@ -46,10 +37,10 @@
try:
return unpickler.load() # msgid, flags, name, args
- except (self.errors, IndexError), err_msg:
- log("can't decode %s" % repr(msg), level=zLOG.ERROR)
- raise DecodingError(msg)
-
+ except:
+ log("can't decode message: %s" % short_repr(msg), level=zLOG.ERROR)
+ raise
+
_globals = globals()
_silly = ('__doc__',)
=== ZODB4/ZEO/zrpc/server.py 1.4 => 1.5 ===
--- ZODB4/ZEO/zrpc/server.py:1.4 Thu Jul 25 12:47:55 2002
+++ ZODB4/ZEO/zrpc/server.py Fri Nov 22 16:24:53 2002
@@ -2,14 +2,14 @@
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
-#
+#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
-#
+#
##############################################################################
import asyncore
import socket
@@ -17,6 +17,7 @@
from ZEO.zrpc.connection import Connection, Delay
from ZEO.zrpc.log import log
+import zLOG
# Export the main asyncore loop
loop = asyncore.loop
@@ -42,7 +43,7 @@
else:
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.set_reuse_addr()
- log("listening on %s" % str(self.addr))
+ log("listening on %s" % str(self.addr), zLOG.INFO)
self.bind(self.addr)
self.listen(5)
=== ZODB4/ZEO/zrpc/trigger.py 1.2 => 1.3 ===
--- ZODB4/ZEO/zrpc/trigger.py:1.2 Tue Jun 11 15:22:26 2002
+++ ZODB4/ZEO/zrpc/trigger.py Fri Nov 22 16:24:53 2002
@@ -2,27 +2,25 @@
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
-#
+#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
-#
+#
##############################################################################
-# This module is a simplified version of the select_trigger module
-# from Sam Rushing's Medusa server.
import asyncore
-
import os
import socket
import thread
+import errno
if os.name == 'posix':
- class trigger (asyncore.file_dispatcher):
+ class trigger(asyncore.file_dispatcher):
"Wake up a call to select() running in the main thread"
@@ -54,129 +52,157 @@
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
- def __init__ (self):
- r, w = os.pipe()
+ def __init__(self):
+ r, w = self._fds = os.pipe()
self.trigger = w
- asyncore.file_dispatcher.__init__ (self, r)
+ asyncore.file_dispatcher.__init__(self, r)
self.lock = thread.allocate_lock()
self.thunks = []
+ self._closed = 0
+
+ # Override the asyncore close() method, because it seems that
+ # it would only close the r file descriptor and not w. The
+ # constructor calls file_dispatcher.__init__ and passes r,
+ # which would get stored in a file_wrapper and get closed by
+ # the default close. But that would leave w open...
def close(self):
- self.del_channel()
- self.socket.close() # the read side of the pipe
- os.close(self.trigger) # the write side of the pipe
+ if not self._closed:
+ self._closed = 1
+ self.del_channel()
+ for fd in self._fds:
+ os.close(fd)
+ self._fds = []
- def __repr__ (self):
+ def __repr__(self):
return '<select-trigger (pipe) at %x>' % id(self)
- def readable (self):
+ def readable(self):
return 1
- def writable (self):
+ def writable(self):
return 0
- def handle_connect (self):
+ def handle_connect(self):
pass
- def pull_trigger (self, thunk=None):
- # print 'PULL_TRIGGER: ', len(self.thunks)
+ def handle_close(self):
+ self.close()
+
+ def pull_trigger(self, thunk=None):
if thunk:
+ self.lock.acquire()
try:
- self.lock.acquire()
- self.thunks.append (thunk)
+ self.thunks.append(thunk)
finally:
self.lock.release()
- os.write (self.trigger, 'x')
+ os.write(self.trigger, 'x')
- def handle_read (self):
- self.recv (8192)
+ def handle_read(self):
+ try:
+ self.recv(8192)
+ except socket.error:
+ return
+ self.lock.acquire()
try:
- self.lock.acquire()
for thunk in self.thunks:
try:
thunk()
except:
- (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
- print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
+ nil, t, v, tbinfo = asyncore.compact_traceback()
+ print ('exception in trigger thunk:'
+ ' (%s:%s %s)' % (t, v, tbinfo))
self.thunks = []
finally:
self.lock.release()
else:
+ # XXX Should define a base class that has the common methods and
+ # then put the platform-specific in a subclass named trigger.
+
# win32-safe version
- class trigger (asyncore.dispatcher):
+ HOST = '127.0.0.1'
+ MINPORT = 19950
+ NPORTS = 50
+
+ class trigger(asyncore.dispatcher):
- address = ('127.9.9.9', 19999)
+ portoffset = 0
- def __init__ (self):
- a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
- w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+ def __init__(self):
+ a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# set TCP_NODELAY to true to avoid buffering
w.setsockopt(socket.IPPROTO_TCP, 1, 1)
# tricky: get a pair of connected sockets
- host='127.0.0.1'
- port=19999
- while 1:
+ for i in range(NPORTS):
+ trigger.portoffset = (trigger.portoffset + 1) % NPORTS
+ port = MINPORT + trigger.portoffset
+ address = (HOST, port)
try:
- self.address=(host, port)
- a.bind(self.address)
+ a.bind(address)
+ except socket.error:
+ continue
+ else:
break
- except:
- if port <= 19950:
- raise 'Bind Error', 'Cannot bind trigger!'
- port=port - 1
+ else:
+ raise RuntimeError, 'Cannot bind trigger!'
- a.listen (1)
- w.setblocking (0)
+ a.listen(1)
+ w.setblocking(0)
try:
- w.connect (self.address)
+ w.connect(address)
except:
pass
r, addr = a.accept()
a.close()
- w.setblocking (1)
+ w.setblocking(1)
self.trigger = w
- asyncore.dispatcher.__init__ (self, r)
+ asyncore.dispatcher.__init__(self, r)
self.lock = thread.allocate_lock()
self.thunks = []
self._trigger_connected = 0
- def __repr__ (self):
+ def __repr__(self):
return '<select-trigger (loopback) at %x>' % id(self)
- def readable (self):
+ def readable(self):
return 1
- def writable (self):
+ def writable(self):
return 0
- def handle_connect (self):
+ def handle_connect(self):
pass
- def pull_trigger (self, thunk=None):
+ def pull_trigger(self, thunk=None):
if thunk:
+ self.lock.acquire()
try:
- self.lock.acquire()
- self.thunks.append (thunk)
+ self.thunks.append(thunk)
finally:
self.lock.release()
- self.trigger.send ('x')
+ self.trigger.send('x')
- def handle_read (self):
- self.recv (8192)
+ def handle_read(self):
+ try:
+ self.recv(8192)
+ except socket.error:
+ return
+ self.lock.acquire()
try:
- self.lock.acquire()
for thunk in self.thunks:
try:
thunk()
except:
- (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
- print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
+ nil, t, v, tbinfo = asyncore.compact_traceback()
+ print ('exception in trigger thunk:'
+ ' (%s:%s %s)' % (t, v, tbinfo))
self.thunks = []
finally:
self.lock.release()
=== Removed File ZODB4/ZEO/zrpc/NOTES ===