[Zope3-checkins] CVS: Zope3/src/zodb/zeo/zrpc - __init__.py:1.2 client.py:1.2 connection.py:1.2 error.py:1.2 log.py:1.2 marshal.py:1.2 server.py:1.2 smac.py:1.2 trigger.py:1.2
Jim Fulton
jim@zope.com
Wed, 25 Dec 2002 09:13:55 -0500
Update of /cvs-repository/Zope3/src/zodb/zeo/zrpc
In directory cvs.zope.org:/tmp/cvs-serv15352/src/zodb/zeo/zrpc
Added Files:
__init__.py client.py connection.py error.py log.py marshal.py
server.py smac.py trigger.py
Log Message:
Grand renaming:
- Renamed most files (especially python modules) to lower case.
- Moved views and interfaces into separate hierarchies within each
project, where each top-level directory under the zope package
is a separate project.
- Moved everything to src from lib/python.
lib/python will eventually go away. I need access to the cvs
repository to make this happen, however.
There are probably some bits that are broken. All tests pass
and zope runs, but I haven't tried everything. There are a number
of cleanups I'll work on tomorrow.
=== Zope3/src/zodb/zeo/zrpc/__init__.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/zrpc/__init__.py Wed Dec 25 09:12:23 2002
@@ -0,0 +1,24 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+# zrpc is a package with the following modules
+# client -- manages connection creation to remote server
+# connection -- object dispatcher
+# log -- logging helper
+# error -- exceptions raised by zrpc
+# marshal -- internal, handles basic protocol issues
+# server -- manages incoming connections from remote clients
+# smac -- sized message async connections
+# trigger -- medusa's trigger
+
+# zrpc is not an advertised subpackage of ZEO; its interfaces are internal
=== Zope3/src/zodb/zeo/zrpc/client.py 1.1 => 1.2 === (423/523 lines abridged)
--- /dev/null Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/zrpc/client.py Wed Dec 25 09:12:23 2002
@@ -0,0 +1,520 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import errno
+import select
+import socket
+import sys
+import threading
+import time
+import types
+
+from zodb.interfaces import ReadOnlyError
+
+from zodb.zeo import threadedasync
+from zodb.zeo.zrpc import log
+from zodb.zeo.zrpc.trigger import trigger
+from zodb.zeo.zrpc.connection import ManagedConnection
+
+class ConnectionManager:
+ """Keeps a connection up over time"""
+
+ def __init__(self, addrs, client, tmin=1, tmax=180):
+ self.addrlist = self._parse_addrs(addrs)
+ self.client = client
+ self.tmin = tmin
+ self.tmax = tmax
+ self.cond = threading.Condition(threading.Lock())
+ self.connection = None # Protected by self.cond
+ self.closed = 0
+ # If thread is not None, then there is a helper thread
+ # attempting to connect.
+ self.thread = None # Protected by self.cond
+ self.trigger = None
+ self.thr_async = 0
+ threadedasync.register_loop_callback(self.set_async)
+
+ def __repr__(self):
[-=- -=- -=- 423 lines omitted -=- -=- -=-]
+ self.state = "tested"
+ except ReadOnlyError:
+ log.info("CW: ReadOnlyError in testConnection (%s)",
+ repr(self.addr))
+ self.close()
+ return
+ except:
+ log.error("CW: error in testConnection (%s)", repr(self.addr),
+ exc_info=True)
+ self.close()
+ return
+ if self.preferred:
+ self.notify_client()
+
+ def notify_client(self):
+ """Call the client's notifyConnected().
+
+ If this succeeds, call the manager's connect_done().
+
+ If the client is already connected, we assume it's a fallback
+ connection, and the new connection must be a preferred
+ connection. The client will close the old connection.
+ """
+ try:
+ self.client.notifyConnected(self.conn)
+ except:
+ log.error("CW: error in notifyConnected (%s)", repr(self.addr),
+ exc_info=True)
+ self.close()
+ return
+ self.state = "notified"
+ self.mgr.connect_done(self.conn, self.preferred)
+
+ def close(self):
+ """Close the socket and reset everything."""
+ self.state = "closed"
+ self.mgr = self.client = None
+ self.preferred = 0
+ if self.conn is not None:
+ # Closing the ZRPC connection will eventually close the
+ # socket, somewhere in asyncore.
+ # XXX Why do we care? --Guido
+ self.conn.close()
+ self.conn = None
+ if self.sock is not None:
+ self.sock.close()
+ self.sock = None
+
+ def fileno(self):
+ return self.sock.fileno()
=== Zope3/src/zodb/zeo/zrpc/connection.py 1.1 => 1.2 === (409/509 lines abridged)
--- /dev/null Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/zrpc/connection.py Wed Dec 25 09:12:23 2002
@@ -0,0 +1,506 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import asyncore
+import errno
+import select
+import sys
+import threading
+import types
+import logging
+
+from zodb.zeo import threadedasync
+from zodb.zeo.zrpc import smac
+from zodb.zeo.zrpc.error import ZRPCError, DisconnectedError
+from zodb.zeo.zrpc import log
+from zodb.zeo.zrpc.marshal import Marshaller
+from zodb.zeo.zrpc.trigger import trigger
+
+REPLY = ".reply" # message name used for replies
+ASYNC = 1
+
+class Delay:
+ """Used to delay response to client for synchronous calls
+
+ When a synchronous call is made and the original handler returns
+ without handling the call, it returns a Delay object that prevents
+ the mainloop from sending a response.
+ """
+
+ def set_sender(self, msgid, send_reply, return_error):
+ self.msgid = msgid
+ self.send_reply = send_reply
+ self.return_error = return_error
+
+ def reply(self, obj):
+ self.send_reply(self.msgid, obj)
+
[-=- -=- -=- 409 lines omitted -=- -=- -=-]
+ self.mgr = mgr
+ self.__super_init(sock, addr, obj)
+ self.obj.notifyConnected(self)
+
+ def close(self):
+ self.obj.notifyDisconnected()
+ self.mgr.close_conn(self)
+ self.__super_close()
+
+class ManagedConnection(Connection):
+ """Client-side Connection subclass."""
+ __super_init = Connection.__init__
+ __super_close = Connection.close
+
+ def __init__(self, sock, addr, obj, mgr):
+ self.mgr = mgr
+ self.__super_init(sock, addr, obj)
+ self.check_mgr_async()
+
+ # Defer the ThreadedAsync work to the manager.
+
+ def close_trigger(self):
+ # the manager should actually close the trigger
+ del self.trigger
+
+ def set_async(self, map):
+ pass
+
+ def _prepare_async(self):
+ # Don't do the register_loop_callback that the superclass does
+ pass
+
+ def check_mgr_async(self):
+ if not self.thr_async and self.mgr.thr_async:
+ assert self.mgr.trigger is not None, \
+ "manager (%s) has no trigger" % self.mgr
+ self.thr_async = 1
+ self.trigger = self.mgr.trigger
+ return 1
+ return 0
+
+ def is_async(self):
+ # XXX could the check_mgr_async() be avoided on each test?
+ if self.thr_async:
+ return 1
+ return self.check_mgr_async()
+
+ def close(self):
+ self.mgr.close_conn(self)
+ self.__super_close()
=== Zope3/src/zodb/zeo/zrpc/error.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/zrpc/error.py Wed Dec 25 09:12:23 2002
@@ -0,0 +1,21 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+from zodb import interfaces
+from zodb.zeo.interfaces import Disconnected
+
+class ZRPCError(interfaces.StorageError):
+ pass
+
+class DisconnectedError(ZRPCError, Disconnected):
+ """The database storage is disconnected from the storage server."""
=== Zope3/src/zodb/zeo/zrpc/log.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/zrpc/log.py Wed Dec 25 09:12:23 2002
@@ -0,0 +1,93 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import os
+import types
+import threading
+import logging
+
+LOG_THREAD_ID = 0 # Set this to 1 during heavy debugging
+
+_label = "zrpc:%s" % os.getpid()
+
+# The code duplication here is for speed (save a layer of function call).
+
+def critical(msg, *args, **kw):
+ label = _label
+ if LOG_THREAD_ID:
+ label = "%s:%s" % (label, threading.currentThread().getName())
+ logging.critical("%s: "+msg, label, *args, **kw)
+
+def error(msg, *args, **kw):
+ label = _label
+ if LOG_THREAD_ID:
+ label = "%s:%s" % (label, threading.currentThread().getName())
+ logging.error("%s: "+msg, label, *args, **kw)
+
+def warn(msg, *args, **kw):
+ label = _label
+ if LOG_THREAD_ID:
+ label = "%s:%s" % (label, threading.currentThread().getName())
+ logging.warn("%s: "+msg, label, *args, **kw)
+
+def info(msg, *args, **kw):
+ label = _label
+ if LOG_THREAD_ID:
+ label = "%s:%s" % (label, threading.currentThread().getName())
+ logging.info("%s: "+msg, label, *args, **kw)
+
+def debug(msg, *args, **kw):
+ label = _label
+ if LOG_THREAD_ID:
+ label = "%s:%s" % (label, threading.currentThread().getName())
+ logging.debug("%s: "+msg, label, *args, **kw)
+
+REPR_LIMIT = 40
+
+def short_repr(obj):
+ "Return an object repr limited to REPR_LIMIT bytes."
+
+ # Some of the objects being repr'd are large strings. It's wastes
+ # a lot of memory to repr them and then truncate, so special case
+ # them in this function.
+ # Also handle short repr of a tuple containing a long string.
+
+ # This strategy works well for arguments to StorageServer methods.
+ # The oid is usually first and will get included in its entirety.
+ # The pickle is near the beginning, too, and you can often fit the
+ # module name in the pickle.
+
+ if isinstance(obj, types.StringType):
+ if len(obj) > REPR_LIMIT:
+ r = repr(obj[:REPR_LIMIT])
+ else:
+ r = repr(obj)
+ if len(r) > REPR_LIMIT:
+ r = r[:REPR_LIMIT-4] + '...' + r[-1]
+ return r
+ elif isinstance(obj, types.TupleType):
+ elts = []
+ size = 0
+ for elt in obj:
+ r = repr(elt)
+ elts.append(r)
+ size += len(r)
+ if size > REPR_LIMIT:
+ break
+ r = "(%s)" % (", ".join(elts))
+ else:
+ r = repr(obj)
+ if len(r) > REPR_LIMIT:
+ return r[:REPR_LIMIT] + '...'
+ else:
+ return r
=== Zope3/src/zodb/zeo/zrpc/marshal.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/zrpc/marshal.py Wed Dec 25 09:12:23 2002
@@ -0,0 +1,65 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import cPickle
+from cStringIO import StringIO
+import types
+
+from zodb.zeo.zrpc.error import ZRPCError
+from zodb.zeo.zrpc import log
+
+class Marshaller:
+ """Marshal requests and replies to second across network"""
+
+ def encode(self, msgid, flags, name, args):
+ """Returns an encoded message"""
+ # (We used to have a global pickler, but that's not thread-safe. :-( )
+ pickler = cPickle.Pickler()
+ pickler.fast = 1
+ return pickler.dump((msgid, flags, name, args), 1)
+
+ def decode(self, msg):
+ """Decodes msg and returns its parts"""
+ unpickler = cPickle.Unpickler(StringIO(msg))
+ unpickler.find_global = find_global
+
+ try:
+ return unpickler.load() # msgid, flags, name, args
+ except:
+ log.error("can't decode message: %s", log.short_repr(msg))
+ raise
+
+_globals = globals()
+_silly = ('__doc__',)
+
+def find_global(module, name):
+ """Helper for message unpickler"""
+ try:
+ m = __import__(module, _globals, _globals, _silly)
+ except ImportError, msg:
+ raise ZRPCError("import error %s: %s" % (module, msg))
+
+ try:
+ r = getattr(m, name)
+ except AttributeError:
+ raise ZRPCError("module %s has no global %s" % (module, name))
+
+ safe = getattr(r, '__no_side_effects__', 0)
+ if safe:
+ return r
+
+ # XXX what's a better way to do this? esp w/ 2.1 & 2.2
+ if type(r) == types.ClassType and issubclass(r, Exception):
+ return r
+
+ raise ZRPCError("Unsafe global: %s.%s" % (module, name))
=== Zope3/src/zodb/zeo/zrpc/server.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/zrpc/server.py Wed Dec 25 09:12:23 2002
@@ -0,0 +1,64 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import asyncore
+import socket
+import types
+
+from zodb.zeo import threadedasync
+from zodb.zeo.zrpc.connection import Connection, Delay
+from zodb.zeo.zrpc import log
+
+# Export the main asyncore loop
+loop = threadedasync.loop
+
+class Dispatcher(asyncore.dispatcher):
+ """A server that accepts incoming RPC connections"""
+ __super_init = asyncore.dispatcher.__init__
+
+ reuse_addr = 1
+
+ def __init__(self, addr, factory=Connection, reuse_addr=None):
+ self.__super_init()
+ self.addr = addr
+ self.factory = factory
+ self.clients = []
+ if reuse_addr is not None:
+ self.reuse_addr = reuse_addr
+ self._open_socket()
+
+ def _open_socket(self):
+ if type(self.addr) == types.TupleType:
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ else:
+ self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.set_reuse_addr()
+ log.info("listening on %s", str(self.addr))
+ self.bind(self.addr)
+ self.listen(5)
+
+ def writable(self):
+ return 0
+
+ def readable(self):
+ return 1
+
+ def handle_accept(self):
+ try:
+ sock, addr = self.accept()
+ except socket.error, msg:
+ log.info("accepted failed: %s", msg)
+ return
+ c = self.factory(sock, addr)
+ log.info("connect from %s: %s", repr(addr), c)
+ self.clients.append(c)
=== Zope3/src/zodb/zeo/zrpc/smac.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:55 2002
+++ Zope3/src/zodb/zeo/zrpc/smac.py Wed Dec 25 09:12:23 2002
@@ -0,0 +1,225 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Sized Message Async Connections."""
+
+import asyncore, struct
+import threading
+import socket, errno
+from types import StringType
+
+from zodb.zeo.interfaces import Disconnected
+from zodb.zeo.zrpc import log
+
+# Use the dictionary to make sure we get the minimum number of errno
+# entries. We expect that EWOULDBLOCK == EAGAIN on most systems --
+# or that only one is actually used.
+
+tmp_dict = {errno.EWOULDBLOCK: 0,
+ errno.EAGAIN: 0,
+ errno.EINTR: 0,
+ }
+expected_socket_read_errors = tuple(tmp_dict.keys())
+
+tmp_dict = {errno.EAGAIN: 0,
+ errno.EWOULDBLOCK: 0,
+ errno.ENOBUFS: 0,
+ errno.EINTR: 0,
+ }
+expected_socket_write_errors = tuple(tmp_dict.keys())
+del tmp_dict
+
+# We chose 60000 as the socket limit by looking at the largest strings
+# that we could pass to send() without blocking.
+SEND_SIZE = 60000
+
+class SizedMessageAsyncConnection(asyncore.dispatcher):
+ __super_init = asyncore.dispatcher.__init__
+ __super_close = asyncore.dispatcher.close
+
+ __closed = 1 # Marker indicating that we're closed
+
+ socket = None # to outwit Sam's getattr
+
+ def __init__(self, sock, addr, map=None, debug=None):
+ self.addr = addr
+ if debug is not None:
+ self._debug = debug
+ elif not hasattr(self, '_debug'):
+ self._debug = __debug__
+ # __input_lock protects __inp, __input_len, __state, __msg_size
+ self.__input_lock = threading.Lock()
+ self.__inp = None # None, a single String, or a list
+ self.__input_len = 0
+ # Instance variables __state and __msg_size work together:
+ # when __state == 0:
+ # __msg_size == 4, and the next thing read is a message size;
+ # when __state == 1:
+ # __msg_size is variable, and the next thing read is a message.
+ # The next thing read is always of length __msg_size.
+ # The state alternates between 0 and 1.
+ self.__state = 0
+ self.__msg_size = 4
+ self.__output_lock = threading.Lock() # Protects __output
+ self.__output = []
+ self.__closed = 0
+ self.__super_init(sock, map)
+
+ def get_addr(self):
+ return self.addr
+
+ # XXX avoid expensive getattr calls? Can't remember exactly what
+ # this comment was supposed to mean, but it has something to do
+ # with the way asyncore uses getattr and uses if sock:
+ def __nonzero__(self):
+ return 1
+
+ def handle_read(self):
+ self.__input_lock.acquire()
+ try:
+ # Use a single __inp buffer and integer indexes to make this fast.
+ try:
+ d = self.recv(8192)
+ except socket.error, err:
+ if err[0] in expected_socket_read_errors:
+ return
+ raise
+ if not d:
+ return
+
+ input_len = self.__input_len + len(d)
+ msg_size = self.__msg_size
+ state = self.__state
+
+ inp = self.__inp
+ if msg_size > input_len:
+ if inp is None:
+ self.__inp = d
+ elif type(self.__inp) is StringType:
+ self.__inp = [self.__inp, d]
+ else:
+ self.__inp.append(d)
+ self.__input_len = input_len
+ return # keep waiting for more input
+
+ # load all previous input and d into single string inp
+ if isinstance(inp, StringType):
+ inp = inp + d
+ elif inp is None:
+ inp = d
+ else:
+ inp.append(d)
+ inp = "".join(inp)
+
+ offset = 0
+ while (offset + msg_size) <= input_len:
+ msg = inp[offset:offset + msg_size]
+ offset = offset + msg_size
+ if not state:
+ # waiting for message
+ msg_size = struct.unpack(">i", msg)[0]
+ state = 1
+ else:
+ msg_size = 4
+ state = 0
+ # XXX We call message_input() with __input_lock
+ # held!!! And message_input() may end up calling
+ # message_output(), which has its own lock. But
+ # message_output() cannot call message_input(), so
+ # the locking order is always consistent, which
+ # prevents deadlock. Also, message_input() may
+ # take a long time, because it can cause an
+ # incoming call to be handled. During all this
+ # time, the __input_lock is held. That's a good
+ # thing, because it serializes incoming calls.
+ self.message_input(msg)
+
+ self.__state = state
+ self.__msg_size = msg_size
+ self.__inp = inp[offset:]
+ self.__input_len = input_len - offset
+ finally:
+ self.__input_lock.release()
+
+ def readable(self):
+ return 1
+
+ def writable(self):
+ if len(self.__output) == 0:
+ return 0
+ else:
+ return 1
+
+ def handle_write(self):
+ self.__output_lock.acquire()
+ try:
+ output = self.__output
+ while output:
+ # Accumulate output into a single string so that we avoid
+ # multiple send() calls, but avoid accumulating too much
+ # data. If we send a very small string and have more data
+ # to send, we will likely incur delays caused by the
+ # unfortunate interaction between the Nagle algorithm and
+ # delayed acks. If we send a very large string, only a
+ # portion of it will actually be delivered at a time.
+
+ l = 0
+ for i in range(len(output)):
+ l += len(output[i])
+ if l > SEND_SIZE:
+ break
+
+ i += 1
+ # It is very unlikely that i will be 1.
+ v = "".join(output[:i])
+ del output[:i]
+
+ try:
+ n = self.send(v)
+ except socket.error, err:
+ if err[0] in expected_socket_write_errors:
+ break # we couldn't write anything
+ raise
+ if n < len(v):
+ output.insert(0, v[n:])
+ break # we can't write any more
+ finally:
+ self.__output_lock.release()
+
+ def handle_close(self):
+ self.close()
+
+ def message_output(self, message):
+ if __debug__:
+ if self._debug:
+ log.debug('message_output %d bytes: %s',
+ len(message), log.short_repr(message))
+
+ if self.__closed:
+ raise Disconnected("Action is temporarily unavailable")
+ self.__output_lock.acquire()
+ try:
+ # do two separate appends to avoid copying the message string
+ self.__output.append(struct.pack(">i", len(message)))
+ if len(message) <= SEND_SIZE:
+ self.__output.append(message)
+ else:
+ for i in range(0, len(message), SEND_SIZE):
+ self.__output.append(message[i:i+SEND_SIZE])
+ finally:
+ self.__output_lock.release()
+
+ def close(self):
+ if not self.__closed:
+ self.__closed = 1
+ self.__super_close()
=== Zope3/src/zodb/zeo/zrpc/trigger.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:55 2002
+++ Zope3/src/zodb/zeo/zrpc/trigger.py Wed Dec 25 09:12:23 2002
@@ -0,0 +1,206 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+
+import asyncore
+import os
+import socket
+import thread
+import traceback
+
+if os.name == 'posix':
+
+ class trigger(asyncore.file_dispatcher):
+
+ "Wake up a call to select() running in the main thread"
+
+ # This is useful in a context where you are using Medusa's I/O
+ # subsystem to deliver data, but the data is generated by another
+ # thread. Normally, if Medusa is in the middle of a call to
+ # select(), new output data generated by another thread will have
+ # to sit until the call to select() either times out or returns.
+ # If the trigger is 'pulled' by another thread, it should immediately
+ # generate a READ event on the trigger object, which will force the
+ # select() invocation to return.
+
+ # A common use for this facility: letting Medusa manage I/O for a
+ # large number of connections; but routing each request through a
+ # thread chosen from a fixed-size thread pool. When a thread is
+ # acquired, a transaction is performed, but output data is
+ # accumulated into buffers that will be emptied more efficiently
+ # by Medusa. [picture a server that can process database queries
+ # rapidly, but doesn't want to tie up threads waiting to send data
+ # to low-bandwidth connections]
+
+ # The other major feature provided by this class is the ability to
+ # move work back into the main thread: if you call pull_trigger()
+ # with a thunk argument, when select() wakes up and receives the
+ # event it will call your thunk from within that thread. The main
+ # purpose of this is to remove the need to wrap thread locks around
+ # Medusa's data structures, which normally do not need them. [To see
+ # why this is true, imagine this scenario: A thread tries to push some
+ # new data onto a channel's outgoing data queue at the same time that
+ # the main thread is trying to remove some]
+
+ def __init__(self):
+ r, w = self._fds = os.pipe()
+ self.trigger = w
+ asyncore.file_dispatcher.__init__(self, r)
+ self.lock = thread.allocate_lock()
+ self.thunks = []
+ self._closed = 0
+
+ # Override the asyncore close() method, because it seems that
+ # it would only close the r file descriptor and not w. The
+ # constructor calls file_dispatcher.__init__ and passes r,
+ # which would get stored in a file_wrapper and get closed by
+ # the default close. But that would leave w open...
+
+ def close(self):
+ if not self._closed:
+ self._closed = 1
+ self.del_channel()
+ for fd in self._fds:
+ os.close(fd)
+ self._fds = []
+
+ def __repr__(self):
+ return '<select-trigger (pipe) at %x>' % id(self)
+
+ def readable(self):
+ return 1
+
+ def writable(self):
+ return 0
+
+ def handle_connect(self):
+ pass
+
+ def handle_close(self):
+ self.close()
+
+ def pull_trigger(self, thunk=None):
+ if thunk:
+ self.lock.acquire()
+ try:
+ self.thunks.append(thunk)
+ finally:
+ self.lock.release()
+ os.write(self.trigger, 'x')
+
+ def handle_read(self):
+ try:
+ self.recv(8192)
+ except socket.error:
+ return
+ self.lock.acquire()
+ try:
+ for thunk in self.thunks:
+ try:
+ thunk()
+ except:
+ L = traceback.format_exception(*sys.exc_info())
+ print 'exception in trigger thunk:\n%s' % "".join(L)
+ self.thunks = []
+ finally:
+ self.lock.release()
+
+else:
+
+ # XXX Should define a base class that has the common methods and
+ # then put the platform-specific in a subclass named trigger.
+
+ # win32-safe version
+
+ HOST = '127.0.0.1'
+ MINPORT = 19950
+ NPORTS = 50
+
+ class trigger(asyncore.dispatcher):
+
+ portoffset = 0
+
+ def __init__(self):
+ a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+ # set TCP_NODELAY to true to avoid buffering
+ w.setsockopt(socket.IPPROTO_TCP, 1, 1)
+
+ # tricky: get a pair of connected sockets
+ for i in range(NPORTS):
+ trigger.portoffset = (trigger.portoffset + 1) % NPORTS
+ port = MINPORT + trigger.portoffset
+ address = (HOST, port)
+ try:
+ a.bind(address)
+ except socket.error:
+ continue
+ else:
+ break
+ else:
+ raise RuntimeError, 'Cannot bind trigger!'
+
+ a.listen(1)
+ w.setblocking(0)
+ try:
+ w.connect(address)
+ except:
+ pass
+ r, addr = a.accept()
+ a.close()
+ w.setblocking(1)
+ self.trigger = w
+
+ asyncore.dispatcher.__init__(self, r)
+ self.lock = thread.allocate_lock()
+ self.thunks = []
+ self._trigger_connected = 0
+
+ def __repr__(self):
+ return '<select-trigger (loopback) at %x>' % id(self)
+
+ def readable(self):
+ return 1
+
+ def writable(self):
+ return 0
+
+ def handle_connect(self):
+ pass
+
+ def pull_trigger(self, thunk=None):
+ if thunk:
+ self.lock.acquire()
+ try:
+ self.thunks.append(thunk)
+ finally:
+ self.lock.release()
+ self.trigger.send('x')
+
+ def handle_read(self):
+ try:
+ self.recv(8192)
+ except socket.error:
+ return
+ self.lock.acquire()
+ try:
+ for thunk in self.thunks:
+ try:
+ thunk()
+ except:
+ L = traceback.format_exception(*sys.exc_info())
+ print 'exception in trigger thunk:\n%s' % "".join(L)
+ self.thunks = []
+ finally:
+ self.lock.release()