[Zodb-checkins] CVS: ZEO/ZEO/zrpc - NOTES:1.2 __init__.py:1.2 client.py:1.2 connection.py:1.2 error.py:1.2 log.py:1.2 marshal.py:1.2 server.py:1.2 trigger.py:1.2
Jeremy Hylton
jeremy@zope.com
Tue, 11 Jun 2002 15:22:27 -0400
Update of /cvs-repository/ZEO/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv15241/zrpc
Added Files:
NOTES __init__.py client.py connection.py error.py log.py
marshal.py server.py trigger.py
Log Message:
Merge ZEO2-branch to trunk. (Files added on branch.)
=== ZEO/ZEO/zrpc/NOTES 1.1 => 1.2 ===
+handling for outstanding calls. In particular, it should be possible
+to have multiple calls with return values outstanding.
+
+The mechanism described here is based on the promises mechanism in
+Argus, which was influenced by futures in Multilisp.
+
+ Promises: Linguistic Support for Efficient Asynchronous Procedure
+ Calls in Distributed Systems. Barbara Liskov and Liuba Shrira.
+ Proc. of Conf. on Programming Language Design and Implementation
+ (PLDI), June 1988.
+
+We want to support two different kinds of calls:
+
+ - send : invoke a method that returns no value
+ - call : invoke a method that returns a value
+
+On the client, a call immediately returns a promise. A promise is an
+object that can be used to claim the return value when it becomes
+available.
+
+ - ready(): returns true if the return value is ready or an exception
+ occurred
+ - claim(): returns the call's return value or raises an exception,
+ blocking if necessary
+
+The server side of a zrpc connection can be implemented using
+asyncore. In that case, a method call blocks other RPC activity until
+it returns. If a call needs to return a value, but can't return
+immediately, it returns a delay object (ZEO.zrpc.server.Delay).
+
+When the zrpc connection receives a Delay object, it does not
+immediately return to the caller. Instead, it returns when the
+reply() method is called. A Delay has two methods:
+
+ - set_sender()
+ - reply(obj): returns obj to the sender
+
+-----------------------------------------
+
+Open issues:
+
+Delayed exception
+
+There is currently no mechanism to raise an exception from a delayed
+pcall.
+
+Synchronization
+
+The following item is part of Argus, but the motivation isn't entirely
+clear.
+
+ For any two calls, C1 and C2, C1 always starts on the server
+ first. For the promises, C2 is ready() iff C1 is also ready().
+ The promises can be claimed in any order.
+
+A related notion:
+
+ The connection should also support a synch() method that returns
+ only when all outstanding calls have completed. If any of these
+ calls raised an exception, the synch() call raises an exception.
+
+XXX synch() sounds potentially useful, but it's not clear if it would
+be useful for ZEO. In ZEO a single connection object handles multiple
+threads, each thread is going to make independent calls. When a
+particular tpc_begin() returns and a thread commits its transaction,
+it makes more calls. These calls will before any of the other
+tpc_begin() calls.
+
+I think the Argus approach would be to use separate handlers for each
+thread (not sure Argus had threads), so that a single thread could
+rely on ordering guarantees.
+
+Multithreaded server
+
+There are lots of issues to work out here.
+
+Delays may not be necessary if the connecftion handler runs in a
+different thread than the object the handles the calls.
\ No newline at end of file
=== ZEO/ZEO/zrpc/__init__.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+# zrpc is a package with the following modules
+# error -- exceptions raised by zrpc
+# marshal -- internal, handles basic protocol issues
+# connection -- object dispatcher
+# client -- manages connection creation to remote server
+# server -- manages incoming connections from remote clients
+# trigger -- medusa's trigger
=== ZEO/ZEO/zrpc/client.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import errno
+import select
+import socket
+import sys
+import threading
+import time
+import types
+
+import ThreadedAsync
+import zLOG
+
+from ZEO.zrpc.log import log
+from ZEO.zrpc.trigger import trigger
+from ZEO.zrpc.connection import ManagedConnection
+
+class ConnectionManager:
+ """Keeps a connection up over time"""
+
+ def __init__(self, addr, client, tmin=1, tmax=180):
+ self.set_addr(addr)
+ self.client = client
+ self.tmin = tmin
+ self.tmax = tmax
+ self.connected = 0
+ self.connection = None
+ self.closed = 0
+ # If _thread is not None, then there is a helper thread
+ # attempting to connect. _thread is protected by _connect_lock.
+ self._thread = None
+ self._connect_lock = threading.Lock()
+ self.trigger = None
+ self.thr_async = 0
+ ThreadedAsync.register_loop_callback(self.set_async)
+
+ def __repr__(self):
+ return "<%s for %s>" % (self.__class__.__name__, self.addr)
+
+ def set_addr(self, addr):
+ "Set one or more addresses to use for server."
+
+ # For backwards compatibility (and simplicity?) the
+ # constructor accepts a single address in the addr argument --
+ # a string for a Unix domain socket or a 2-tuple with a
+ # hostname and port. It can also accept a list of such addresses.
+
+ addr_type = self._guess_type(addr)
+ if addr_type is not None:
+ self.addr = [(addr_type, addr)]
+ else:
+ self.addr = []
+ for a in addr:
+ addr_type = self._guess_type(a)
+ if addr_type is None:
+ raise ValueError, "unknown address in list: %s" % repr(a)
+ self.addr.append((addr_type, a))
+
+ def _guess_type(self, addr):
+ if isinstance(addr, types.StringType):
+ return socket.AF_UNIX
+
+ if (len(addr) == 2
+ and isinstance(addr[0], types.StringType)
+ and isinstance(addr[1], types.IntType)):
+ return socket.AF_INET
+
+ # not anything I know about
+ return None
+
+ def close(self):
+ """Prevent ConnectionManager from opening new connections"""
+ self.closed = 1
+ self._connect_lock.acquire()
+ try:
+ if self._thread is not None:
+ # XXX race on _thread
+ self._thread.stop()
+ self._thread.join()
+ finally:
+ self._connect_lock.release()
+ if self.connection:
+ self.connection.close()
+ if self.trigger is not None:
+ self.trigger.close()
+
+ def set_async(self, map):
+ # This is the callback registered with ThreadedAsync. The
+ # callback might be called multiple times, so it shouldn't
+ # create a trigger every time and should never do anything
+ # after it's closed.
+
+ # It may be that the only case where it is called multiple
+ # times is in the test suite, where ThreadedAsync's loop can
+ # be started in a child process after a fork. Regardless,
+ # it's good to be defensive.
+
+ # XXX need each connection started with async==0 to have a
+ # callback
+ if not self.closed and self.trigger is None:
+ self.trigger = trigger()
+ self.thr_async = 1 # XXX needs to be set on the Connection
+
+ def attempt_connect(self):
+ """Attempt a connection to the server without blocking too long.
+
+ There isn't a crisp definition for too long. When a
+ ClientStorage is created, it attempts to connect to the
+ server. If the server isn't immediately available, it can
+ operate from the cache. This method will start the background
+ connection thread and wait a little while to see if it
+ finishes quickly.
+ """
+
+ # XXX will a single attempt take too long?
+ self.connect()
+ try:
+ event = self._thread.one_attempt
+ except AttributeError:
+ # An AttributeError means that (1) _thread is None and (2)
+ # as a consquence of (1) that the connect thread has
+ # already exited.
+ pass
+ else:
+ event.wait()
+ return self.connected
+
+ def connect(self, sync=0):
+ if self.connected == 1:
+ return
+ self._connect_lock.acquire()
+ try:
+ if self._thread is None:
+ log("starting thread to connect to server")
+ self._thread = ConnectThread(self, self.client, self.addr,
+ self.tmin, self.tmax)
+ self._thread.start()
+ if sync:
+ try:
+ self._thread.join()
+ except AttributeError:
+ # probably means the thread exited quickly
+ pass
+ finally:
+ self._connect_lock.release()
+
+ def connect_done(self, c):
+ log("connect_done()")
+ self.connected = 1
+ self.connection = c
+ self._thread = None
+
+ def notify_closed(self, conn):
+ self.connected = 0
+ self.connection = None
+ self.client.notifyDisconnected()
+ if not self.closed:
+ self.connect()
+
+class Connected(Exception):
+ # helper for non-local exit
+ def __init__(self, sock):
+ self.sock = sock
+
+# When trying to do a connect on a non-blocking socket, some outcomes
+# are expected. Set _CONNECT_IN_PROGRESS to the errno value(s) expected
+# when an initial connect can't complete immediately. Set _CONNECT_OK
+# to the errno value(s) expected if the connect succeeds *or* if it's
+# already connected (our code can attempt redundant connects).
+if hasattr(errno, "WSAEWOULDBLOCK"): # Windows
+ _CONNECT_IN_PROGRESS = (errno.WSAEWOULDBLOCK,)
+ _CONNECT_OK = (0, errno.WSAEISCONN)
+else: # Unix
+ _CONNECT_IN_PROGRESS = (errno.EINPROGRESS,)
+ _CONNECT_OK = (0, errno.EISCONN)
+
+class ConnectThread(threading.Thread):
+ """Thread that tries to connect to server given one or more addresses.
+ The thread is passed a ConnectionManager and the manager's client
+ as arguments. It calls notifyConnected() on the client when a
+ socket connects. If notifyConnected() returns without raising an
+ exception, the thread is done; it calls connect_done() on the
+ manager and exits.
+
+ The thread will continue to run, attempting connections, until a
+ successful notifyConnected() or stop() is called.
+ """
+
+ __super_init = threading.Thread.__init__
+
+ # We don't expect clients to call any methods of this Thread other
+ # than close() and those defined by the Thread API.
+
+ def __init__(self, mgr, client, addrs, tmin, tmax):
+ self.__super_init(name="Connect(%s)" % addrs)
+ self.mgr = mgr
+ self.client = client
+ self.addrs = addrs
+ self.tmin = tmin
+ self.tmax = tmax
+ self.stopped = 0
+ self.one_attempt = threading.Event()
+ # A ConnectThread keeps track of whether it has finished a
+ # call to attempt_connects(). This allows the
+ # ConnectionManager to make an attempt to connect right away,
+ # but not block for too long if the server isn't immediately
+ # available.
+
+ def stop(self):
+ self.stopped = 1
+
+ # Every method from run() to the end is used internally by the Thread.
+
+ def run(self):
+ delay = self.tmin
+ while not self.stopped:
+ success = self.attempt_connects()
+ if not self.one_attempt.isSet():
+ self.one_attempt.set()
+ if success:
+ break
+ time.sleep(delay)
+ delay *= 2
+ if delay > self.tmax:
+ delay = self.tmax
+ log("thread exiting: %s" % self.getName())
+
+ def close_sockets(self):
+ for s in self.sockets.keys():
+ s.close()
+
+ def attempt_connects(self):
+ """Try connecting to all self.addrs addresses.
+
+ If at least one succeeds, pick a success arbitrarily, close all other
+ successes (if any), and return true. If none succeed, return false.
+ """
+
+ self.sockets = {} # {open socket: connection address}
+
+ log("attempting connection on %d sockets" % len(self.addrs))
+ try:
+ for domain, addr in self.addrs:
+ if __debug__:
+ log("attempt connection to %s" % repr(addr),
+ level=zLOG.DEBUG)
+ try:
+ s = socket.socket(domain, socket.SOCK_STREAM)
+ except socket.error, err:
+ log("Failed to create socket with domain=%s: %s" % (
+ domain, err), level=zLOG.ERROR)
+ continue
+ s.setblocking(0)
+ self.sockets[s] = addr
+ # connect() raises Connected iff it succeeds
+ # XXX can still block for a while if addr requires DNS
+ self.connect(s)
+
+ # next wait until they actually connect
+ while self.sockets:
+ if self.stopped:
+ self.close_sockets()
+ return 0
+ try:
+ sockets = self.sockets.keys()
+ r, w, x = select.select([], sockets, sockets, 1.0)
+ except select.error:
+ continue
+ for s in x:
+ del self.sockets[s]
+ s.close()
+ for s in w:
+ # connect() raises Connected iff it succeeds
+ self.connect(s)
+ except Connected, container:
+ s = container.sock
+ del self.sockets[s] # don't close the newly connected socket
+ self.close_sockets()
+ return 1
+ return 0
+
+ def connect(self, s):
+ """Call s.connect_ex(addr); raise Connected iff connection succeeds.
+
+ We have to handle several possible return values from
+ connect_ex(). If the socket is connected and the initial ZEO
+ setup works, we're done. Report success by raising an
+ exception. Yes, the is odd, but we need to bail out of the
+ select() loop in the caller and an exception is a principled
+ way to do the abort.
+
+ If the socket sonnects and the initial ZEO setup
+ (notifyConnected()) fails or the connect_ex() returns an
+ error, we close the socket, remove it from self.sockets, and
+ proceed with the other sockets.
+
+ If connect_ex() returns EINPROGRESS, we need to try again later.
+ """
+ addr = self.sockets[s]
+ try:
+ e = s.connect_ex(addr)
+ except socket.error, msg:
+ log("failed to connect to %s: %s" % (addr, msg),
+ level=zLOG.ERROR)
+ else:
+ log("connect_ex(%s) == %s" % (addr, e))
+ if e in _CONNECT_IN_PROGRESS:
+ return
+ elif e in _CONNECT_OK:
+ # special cases to deal with winsock oddities
+ if sys.platform.startswith("win") and e == 0:
+
+ # It appears that winsock isn't behaving as
+ # expected on Win2k. It's possible for connect()
+ # to return 0, but the connection to have failed.
+ # In particular, in situations where I expect to
+ # get a Connection refused (10061), I'm seeing
+ # connect_ex() return 0. OTOH, it looks like
+ # select() is a more reliable indicator on
+ # Windows.
+
+ r, w, x = select.select([s], [s], [s], 0.1)
+ if not (r or w or x):
+ return
+ if x:
+ # see comment at the end of the function
+ s.close()
+ del self.socket[s]
+ c = self.test_connection(s, addr)
+ if c:
+ log("connected to %s" % repr(addr), level=zLOG.DEBUG)
+ raise Connected(s)
+ else:
+ log("error connecting to %s: %s" % (addr, errno.errorcode[e]),
+ level=zLOG.DEBUG)
+ # Any execution that doesn't raise Connected() or return
+ # because of CONNECT_IN_PROGRESS is an error. Make sure the
+ # socket is closed and remove it from the dict of pending
+ # sockets.
+ s.close()
+ del self.sockets[s]
+
+ def test_connection(self, s, addr):
+ # Establish a connection at the zrpc level and call the
+ # client's notifyConnected(), giving the zrpc application a
+ # chance to do app-level check of whether the connection is
+ # okay.
+ c = ManagedConnection(s, addr, self.client, self.mgr)
+ try:
+ self.client.notifyConnected(c)
+ except:
+ log("error connecting to server: %s" % str(addr),
+ level=zLOG.ERROR, error=sys.exc_info())
+ c.close()
+ # Closing the ZRPC connection will eventually close the
+ # socket, somewhere in asyncore.
+ return 0
+ self.mgr.connect_done(c)
+ return 1
=== ZEO/ZEO/zrpc/connection.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import asyncore
+import sys
+import threading
+import types
+
+import ThreadedAsync
+from ZEO import smac # XXX put smac in zrpc?
+from ZEO.zrpc.error import ZRPCError, DisconnectedError, DecodingError
+from ZEO.zrpc.log import log, short_repr
+from ZEO.zrpc.marshal import Marshaller
+from ZEO.zrpc.trigger import trigger
+import zLOG
+from ZODB import POSException
+
+REPLY = ".reply" # message name used for replies
+ASYNC = 1
+
+class Delay:
+ """Used to delay response to client for synchronous calls
+
+ When a synchronous call is made and the original handler returns
+ without handling the call, it returns a Delay object that prevents
+ the mainloop from sending a response.
+ """
+
+ def set_sender(self, msgid, send_reply):
+ self.msgid = msgid
+ self.send_reply = send_reply
+
+ def reply(self, obj):
+ self.send_reply(self.msgid, obj)
+
+class Connection(smac.SizedMessageAsyncConnection):
+ """Dispatcher for RPC on object on both sides of socket.
+
+ The connection supports synchronous calls, which expect a return,
+ and asynchronous calls that do not.
+
+ It uses the Marshaller class to handle encoding and decoding of
+ method calls are arguments.
+
+ A Connection is designed for use in a multithreaded application,
+ where a synchronous call must block until a response is ready.
+ The current design only allows a single synchronous call to be
+ outstanding.
+
+ A socket connection between a client and a server allows either
+ side to invoke methods on the other side. The processes on each
+ end of the socket use a Connection object to manage communication.
+ """
+
+ __super_init = smac.SizedMessageAsyncConnection.__init__
+ __super_close = smac.SizedMessageAsyncConnection.close
+ __super_writable = smac.SizedMessageAsyncConnection.writable
+ __super_message_output = smac.SizedMessageAsyncConnection.message_output
+
+ protocol_version = "Z200"
+
+ def __init__(self, sock, addr, obj=None):
+ self.obj = None
+ self.marshal = Marshaller()
+ self.closed = 0
+ self.msgid = 0
+ self.__super_init(sock, addr)
+ # A Connection either uses asyncore directly or relies on an
+ # asyncore mainloop running in a separate thread. If
+ # thr_async is true, then the mainloop is running in a
+ # separate thread. If thr_async is true, then the asyncore
+ # trigger (self.trigger) is used to notify that thread of
+ # activity on the current thread.
+ self.thr_async = 0
+ self.trigger = None
+ self._prepare_async()
+ self._map = {self._fileno: self}
+ self.__call_lock = threading.Lock()
+ # The reply lock is used to block when a synchronous call is
+ # waiting for a response
+ self.__reply_lock = threading.Lock()
+ self.__reply_lock.acquire()
+ self.register_object(obj)
+ self.handshake()
+
+ def __repr__(self):
+ return "<%s %s>" % (self.__class__.__name__, self.addr)
+
+ def close(self):
+ if self.closed:
+ return
+ self.closed = 1
+ self.close_trigger()
+ self.__super_close()
+
+ def close_trigger(self):
+ # overridden by ManagedConnection
+ if self.trigger is not None:
+ self.trigger.close()
+
+ def register_object(self, obj):
+ """Register obj as the true object to invoke methods on"""
+ self.obj = obj
+
+ def handshake(self):
+ # When a connection is created the first message sent is a
+ # 4-byte protocol version. This mechanism should allow the
+ # protocol to evolve over time, and let servers handle clients
+ # using multiple versions of the protocol.
+
+ # The mechanism replace the message_input() method for the
+ # first message received.
+
+ # The client sends the protocol version it is using.
+ self._message_input = self.message_input
+ self.message_input = self.recv_handshake
+ self.message_output(self.protocol_version)
+
+ def recv_handshake(self, message):
+ if message == self.protocol_version:
+ self.message_input = self._message_input
+ # otherwise do something else...
+
+ def message_input(self, message):
+ """Decoding an incoming message and dispatch it"""
+ # XXX Not sure what to do with errors that reach this level.
+ # Need to catch ZRPCErrors in handle_reply() and
+ # handle_request() so that they get back to the client.
+ try:
+ msgid, flags, name, args = self.marshal.decode(message)
+ except DecodingError, msg:
+ return self.return_error(None, None, DecodingError, msg)
+
+ if __debug__:
+ log("recv msg: %s, %s, %s, %s" % (msgid, flags, name,
+ short_repr(args)),
+ level=zLOG.DEBUG)
+ if name == REPLY:
+ self.handle_reply(msgid, flags, args)
+ else:
+ self.handle_request(msgid, flags, name, args)
+
+ def handle_reply(self, msgid, flags, args):
+ if __debug__:
+ log("recv reply: %s, %s, %s" % (msgid, flags, str(args)[:40]),
+ level=zLOG.DEBUG)
+ self.__reply = msgid, flags, args
+ self.__reply_lock.release() # will fail if lock is unlocked
+
+ def handle_request(self, msgid, flags, name, args):
+ if not self.check_method(name):
+ msg = "Invalid method name: %s on %s" % (name, repr(self.obj))
+ raise ZRPCError(msg)
+ if __debug__:
+ log("%s%s" % (name, args), level=zLOG.BLATHER)
+
+ meth = getattr(self.obj, name)
+ try:
+ ret = meth(*args)
+ except Exception, msg:
+ error = sys.exc_info()[:2]
+ log("%s() raised exception: %s" % (name, msg), zLOG.ERROR, error)
+ return self.return_error(msgid, flags, error[0], error[1])
+
+ if flags & ASYNC:
+ if ret is not None:
+ raise ZRPCError("async method %s returned value %s" %
+ (name, repr(ret)))
+ else:
+ if __debug__:
+ log("%s return %s" % (name, short_repr(ret)), zLOG.DEBUG)
+ if isinstance(ret, Delay):
+ ret.set_sender(msgid, self.send_reply)
+ else:
+ self.send_reply(msgid, ret)
+
+ def handle_error(self):
+ self.log_error()
+ self.close()
+
+ def log_error(self, msg="No error message supplied"):
+ log(msg, zLOG.ERROR, error=sys.exc_info())
+
+ def check_method(self, name):
+ # XXX Is this sufficient "security" for now?
+ if name.startswith('_'):
+ return None
+ return hasattr(self.obj, name)
+
+ def send_reply(self, msgid, ret):
+ msg = self.marshal.encode(msgid, 0, REPLY, ret)
+ self.message_output(msg)
+
+ def return_error(self, msgid, flags, err_type, err_value):
+ if flags is None:
+ self.log_error("Exception raised during decoding")
+ return
+ if flags & ASYNC:
+ self.log_error("Asynchronous call raised exception: %s" % self)
+ return
+ if type(err_value) is not types.InstanceType:
+ err_value = err_type, err_value
+
+ try:
+ msg = self.marshal.encode(msgid, 0, REPLY, (err_type, err_value))
+ except self.marshal.errors:
+ err = ZRPCError("Couldn't pickle error %s" % `err_value`)
+ msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
+ self.message_output(msg)
+ self._do_async_poll()
+
+ # The next two public methods (call and callAsync) are used by
+ # clients to invoke methods on remote objects
+
+ def call(self, method, *args):
+ self.__call_lock.acquire()
+ try:
+ return self._call(method, args)
+ finally:
+ self.__call_lock.release()
+
+ def _call(self, method, args):
+ if self.closed:
+ raise DisconnectedError("This action is temporarily unavailable")
+ msgid = self.msgid
+ self.msgid = self.msgid + 1
+ if __debug__:
+ log("send msg: %d, 0, %s, ..." % (msgid, method))
+ self.message_output(self.marshal.encode(msgid, 0, method, args))
+
+ # XXX implementation of promises starts here
+
+ self.__reply = None
+ # reply lock is currently held
+ self._do_async_loop()
+ # reply lock is held again...
+ r_msgid, r_flags, r_args = self.__reply
+ self.__reply_lock.acquire()
+ assert r_msgid == msgid, "%s != %s: %s" % (r_msgid, msgid, r_args)
+
+ if type(r_args) == types.TupleType \
+ and type(r_args[0]) == types.ClassType \
+ and issubclass(r_args[0], Exception):
+ raise r_args[1] # error raised by server
+ return r_args
+
+ def callAsync(self, method, *args):
+ self.__call_lock.acquire()
+ try:
+ self._callAsync(method, args)
+ finally:
+ self.__call_lock.release()
+
+ def _callAsync(self, method, args):
+ if self.closed:
+ raise DisconnectedError("This action is temporarily unavailable")
+ msgid = self.msgid
+ self.msgid += 1
+ if __debug__:
+ log("send msg: %d, %d, %s, ..." % (msgid, ASYNC, method))
+ self.message_output(self.marshal.encode(msgid, ASYNC, method, args))
+ # XXX The message won't go out right away in this case. It
+ # will wait for the asyncore loop to get control again. Seems
+ # okay to comment our for now, but need to understand better.
+ self._do_async_poll()
+
+ # handle IO, possibly in async mode
+
+ def _prepare_async(self):
+ self.thr_async = 0
+ ThreadedAsync.register_loop_callback(self.set_async)
+ # XXX If we are not in async mode, this will cause dead
+ # Connections to be leaked.
+
+ def set_async(self, map):
+ self.trigger = trigger()
+ self.thr_async = 1
+
+ def is_async(self):
+ if self.thr_async:
+ return 1
+ else:
+ return 0
+
+ def _do_async_loop(self):
+ "Invoke asyncore mainloop and wait for reply."
+ if __debug__:
+ log("_do_async_loop() async=%d" % self.is_async(),
+ level=zLOG.DEBUG)
+ if self.is_async():
+ self.trigger.pull_trigger()
+ self.__reply_lock.acquire()
+ # wait until reply...
+ else:
+ # Do loop only if lock is already acquired. XXX But can't
+ # we already guarantee that the lock is already acquired?
+ while not self.__reply_lock.acquire(0):
+ asyncore.poll(10.0, self._map)
+ if self.closed:
+ raise DisconnectedError()
+ self.__reply_lock.release()
+
+ def _do_async_poll(self, wait_for_reply=0):
+ "Invoke asyncore mainloop to get pending message out."
+
+ if __debug__:
+ log("_do_async_poll(), async=%d" % self.is_async(),
+ level=zLOG.DEBUG)
+ if self.is_async():
+ self.trigger.pull_trigger()
+ else:
+ asyncore.poll(0.0, self._map)
+
+class ServerConnection(Connection):
+ """Connection on the server side"""
+
+ # The server side does not send a protocol message. Instead, it
+ # adapts to whatever the client sends it.
+
+class ManagedServerConnection(ServerConnection):
+ """A connection that notifies its ConnectionManager of closing"""
+ __super_init = Connection.__init__
+ __super_close = Connection.close
+
+ def __init__(self, sock, addr, obj, mgr):
+ self.__mgr = mgr
+ self.__super_init(sock, addr, obj)
+ obj.notifyConnected(self)
+
+ def close(self):
+ self.__super_close()
+ self.__mgr.close(self)
+
+class ManagedConnection(Connection):
+ """A connection that notifies its ConnectionManager of closing.
+
+ A managed connection also defers the ThreadedAsync work to its
+ manager.
+ """
+ __super_init = Connection.__init__
+ __super_close = Connection.close
+
+ def __init__(self, sock, addr, obj, mgr):
+ self.__mgr = mgr
+ self.__super_init(sock, addr, obj)
+ self.check_mgr_async()
+
+ def close_trigger(self):
+ # the manager should actually close the trigger
+ del self.trigger
+
+ def set_async(self, map):
+ pass
+
+ def _prepare_async(self):
+ # Don't do the register_loop_callback that the superclass does
+ pass
+
+ def check_mgr_async(self):
+ if not self.thr_async and self.__mgr.thr_async:
+ assert self.__mgr.trigger is not None, \
+ "manager (%s) has no trigger" % self.__mgr
+ self.thr_async = 1
+ self.trigger = self.__mgr.trigger
+ return 1
+ return 0
+
+ def is_async(self):
+ if self.thr_async:
+ return 1
+ return self.check_mgr_async()
+
+ def close(self):
+ self.__super_close()
+ self.__mgr.notify_closed(self)
=== ZEO/ZEO/zrpc/error.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+from ZODB import POSException
+from ZEO.Exceptions import Disconnected
+
+class ZRPCError(POSException.StorageError):
+ pass
+
+class DecodingError(ZRPCError):
+ """A ZRPC message could not be decoded."""
+
+class DisconnectedError(ZRPCError, Disconnected):
+ """The database storage is disconnected from the storage server."""
=== ZEO/ZEO/zrpc/log.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import os
+import types
+import zLOG
+
+_label = "zrpc:%s" % os.getpid()
+
+def new_label():
+ global _label
+ _label = "zrpc:%s" % os.getpid()
+
+def log(message, level=zLOG.BLATHER, label=None, error=None):
+ zLOG.LOG(label or _label, level, message, error=error)
+
+REPR_LIMIT = 40
+
+def short_repr(obj):
+ "Return an object repr limited to REPR_LIMIT bytes."
+ # Some of the objects being repr'd are large strings. It's wastes
+ # a lot of memory to repr them and then truncate, so special case
+ # them in this function.
+ # Also handle short repr of a tuple containing a long string.
+ if isinstance(obj, types.StringType):
+ obj = obj[:REPR_LIMIT]
+ elif isinstance(obj, types.TupleType):
+ elts = []
+ size = 0
+ for elt in obj:
+ r = repr(elt)
+ elts.append(r)
+ size += len(r)
+ if size > REPR_LIMIT:
+ break
+ obj = tuple(elts)
+ return repr(obj)[:REPR_LIMIT]
=== ZEO/ZEO/zrpc/marshal.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import cPickle
+from cStringIO import StringIO
+import struct
+import types
+
+from ZEO.zrpc.error import ZRPCError
+
+class Marshaller:
+ """Marshal requests and replies to second across network"""
+
+ # It's okay to share a single Pickler as long as it's in fast
+ # mode, which means that it doesn't have a memo.
+
+ pickler = cPickle.Pickler()
+ pickler.fast = 1
+ pickle = pickler.dump
+
+ errors = (cPickle.UnpickleableError,
+ cPickle.UnpicklingError,
+ cPickle.PickleError,
+ cPickle.PicklingError)
+
+ VERSION = 1
+
+ def encode(self, msgid, flags, name, args):
+ """Returns an encoded message"""
+ return self.pickle((msgid, flags, name, args), 1)
+
+ def decode(self, msg):
+ """Decodes msg and returns its parts"""
+ unpickler = cPickle.Unpickler(StringIO(msg))
+ unpickler.find_global = find_global
+
+ try:
+ return unpickler.load() # msgid, flags, name, args
+ except (self.errors, IndexError), err_msg:
+ log("can't decode %s" % repr(msg), level=zLOG.ERROR)
+ raise DecodingError(msg)
+
+_globals = globals()
+_silly = ('__doc__',)
+
+def find_global(module, name):
+ """Helper for message unpickler"""
+ try:
+ m = __import__(module, _globals, _globals, _silly)
+ except ImportError, msg:
+ raise ZRPCError("import error %s: %s" % (module, msg))
+
+ try:
+ r = getattr(m, name)
+ except AttributeError:
+ raise ZRPCError("module %s has no global %s" % (module, name))
+
+ safe = getattr(r, '__no_side_effects__', 0)
+ if safe:
+ return r
+
+ # XXX what's a better way to do this? esp w/ 2.1 & 2.2
+ if type(r) == types.ClassType and issubclass(r, Exception):
+ return r
+
+ raise ZRPCError("Unsafe global: %s.%s" % (module, name))
=== ZEO/ZEO/zrpc/server.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import asyncore
+import socket
+import types
+
+from ZEO.zrpc.connection import Connection, Delay
+from ZEO.zrpc.log import log
+
+# Export the main asyncore loop
+loop = asyncore.loop
+
+class Dispatcher(asyncore.dispatcher):
+ """A server that accepts incoming RPC connections"""
+ __super_init = asyncore.dispatcher.__init__
+
+ reuse_addr = 1
+
+ def __init__(self, addr, factory=Connection, reuse_addr=None):
+ self.__super_init()
+ self.addr = addr
+ self.factory = factory
+ self.clients = []
+ if reuse_addr is not None:
+ self.reuse_addr = reuse_addr
+ self._open_socket()
+
+ def _open_socket(self):
+ if type(self.addr) == types.TupleType:
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ else:
+ self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.set_reuse_addr()
+ log("listening on %s" % str(self.addr))
+ self.bind(self.addr)
+ self.listen(5)
+
+ def writable(self):
+ return 0
+
+ def readable(self):
+ return 1
+
+ def handle_accept(self):
+ try:
+ sock, addr = self.accept()
+ except socket.error, msg:
+ log("accepted failed: %s" % msg)
+ return
+ c = self.factory(sock, addr)
+ log("connect from %s: %s" % (repr(addr), c))
+ self.clients.append(c)
=== ZEO/ZEO/zrpc/trigger.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+# This module is a simplified version of the select_trigger module
+# from Sam Rushing's Medusa server.
+
+import asyncore
+
+import os
+import socket
+import thread
+
+if os.name == 'posix':
+
+ class trigger (asyncore.file_dispatcher):
+
+ "Wake up a call to select() running in the main thread"
+
+ # This is useful in a context where you are using Medusa's I/O
+ # subsystem to deliver data, but the data is generated by another
+ # thread. Normally, if Medusa is in the middle of a call to
+ # select(), new output data generated by another thread will have
+ # to sit until the call to select() either times out or returns.
+ # If the trigger is 'pulled' by another thread, it should immediately
+ # generate a READ event on the trigger object, which will force the
+ # select() invocation to return.
+
+ # A common use for this facility: letting Medusa manage I/O for a
+ # large number of connections; but routing each request through a
+ # thread chosen from a fixed-size thread pool. When a thread is
+ # acquired, a transaction is performed, but output data is
+ # accumulated into buffers that will be emptied more efficiently
+ # by Medusa. [picture a server that can process database queries
+ # rapidly, but doesn't want to tie up threads waiting to send data
+ # to low-bandwidth connections]
+
+ # The other major feature provided by this class is the ability to
+ # move work back into the main thread: if you call pull_trigger()
+ # with a thunk argument, when select() wakes up and receives the
+ # event it will call your thunk from within that thread. The main
+ # purpose of this is to remove the need to wrap thread locks around
+ # Medusa's data structures, which normally do not need them. [To see
+ # why this is true, imagine this scenario: A thread tries to push some
+ # new data onto a channel's outgoing data queue at the same time that
+ # the main thread is trying to remove some]
+
+ def __init__ (self):
+ r, w = os.pipe()
+ self.trigger = w
+ asyncore.file_dispatcher.__init__ (self, r)
+ self.lock = thread.allocate_lock()
+ self.thunks = []
+
+ def close(self):
+ self.del_channel()
+ self.socket.close() # the read side of the pipe
+ os.close(self.trigger) # the write side of the pipe
+
+ def __repr__ (self):
+ return '<select-trigger (pipe) at %x>' % id(self)
+
+ def readable (self):
+ return 1
+
+ def writable (self):
+ return 0
+
+ def handle_connect (self):
+ pass
+
+ def pull_trigger (self, thunk=None):
+ # print 'PULL_TRIGGER: ', len(self.thunks)
+ if thunk:
+ try:
+ self.lock.acquire()
+ self.thunks.append (thunk)
+ finally:
+ self.lock.release()
+ os.write (self.trigger, 'x')
+
+ def handle_read (self):
+ self.recv (8192)
+ try:
+ self.lock.acquire()
+ for thunk in self.thunks:
+ try:
+ thunk()
+ except:
+ (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
+ print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
+ self.thunks = []
+ finally:
+ self.lock.release()
+
+else:
+
+ # win32-safe version
+
+ class trigger (asyncore.dispatcher):
+
+ address = ('127.9.9.9', 19999)
+
+ def __init__ (self):
+ a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+ w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+
+ # set TCP_NODELAY to true to avoid buffering
+ w.setsockopt(socket.IPPROTO_TCP, 1, 1)
+
+ # tricky: get a pair of connected sockets
+ host='127.0.0.1'
+ port=19999
+ while 1:
+ try:
+ self.address=(host, port)
+ a.bind(self.address)
+ break
+ except:
+ if port <= 19950:
+ raise 'Bind Error', 'Cannot bind trigger!'
+ port=port - 1
+
+ a.listen (1)
+ w.setblocking (0)
+ try:
+ w.connect (self.address)
+ except:
+ pass
+ r, addr = a.accept()
+ a.close()
+ w.setblocking (1)
+ self.trigger = w
+
+ asyncore.dispatcher.__init__ (self, r)
+ self.lock = thread.allocate_lock()
+ self.thunks = []
+ self._trigger_connected = 0
+
+ def __repr__ (self):
+ return '<select-trigger (loopback) at %x>' % id(self)
+
+ def readable (self):
+ return 1
+
+ def writable (self):
+ return 0
+
+ def handle_connect (self):
+ pass
+
+ def pull_trigger (self, thunk=None):
+ if thunk:
+ try:
+ self.lock.acquire()
+ self.thunks.append (thunk)
+ finally:
+ self.lock.release()
+ self.trigger.send ('x')
+
+ def handle_read (self):
+ self.recv (8192)
+ try:
+ self.lock.acquire()
+ for thunk in self.thunks:
+ try:
+ thunk()
+ except:
+ (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
+ print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
+ self.thunks = []
+ finally:
+ self.lock.release()