[Zodb-checkins] CVS: ZEO/ZEO/zrpc - __init__.py:1.1.2.1.2.1 client.py:1.1.2.2.2.1 connection.py:1.1.2.2.2.1 error.py:1.1.2.1.2.1 log.py:1.1.2.2.2.1 marshal.py:1.1.2.1.2.1 server.py:1.1.2.1.2.1 trigger.py:1.1.2.1.2.1
Jeremy Hylton
jeremy@zope.com
Thu, 25 Apr 2002 16:19:55 -0400
Update of /cvs-repository/ZEO/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv5401/ZEO/zrpc
Modified Files:
Tag: ZEO2-branch
__init__.py client.py connection.py error.py log.py marshal.py
server.py trigger.py
Log Message:
Merge the Standby-branch into the ZEO2-branch.
The Standby-branch is history.
=== ZEO/ZEO/zrpc/__init__.py 1.1.2.1 => 1.1.2.1.2.1 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
# zrpc is a package with the following modules
# error -- exceptions raised by zrpc
# marshal -- internal, handles basic protocol issues
=== ZEO/ZEO/zrpc/client.py 1.1.2.2 => 1.1.2.2.2.1 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
import errno
import select
import socket
@@ -23,13 +36,13 @@
self.tmax = tmax
self.connected = 0
self.connection = None
+ self.closed = 0
# If _thread is not None, then there is a helper thread
# attempting to connect. _thread is protected by _connect_lock.
self._thread = None
self._connect_lock = threading.Lock()
self.trigger = None
self.thr_async = 0
- self.closed = 0
ThreadedAsync.register_loop_callback(self.set_async)
def __repr__(self):
@@ -79,18 +92,45 @@
self._connect_lock.release()
if self.connection:
self.connection.close()
+ if self.trigger is not None:
+ self.trigger.close()
def set_async(self, map):
- # XXX need each connection started with async==0 to have a callback
- self.trigger = trigger()
- self.thr_async = 1 # XXX needs to be set on the Connection
+ # This is the callback registered with ThreadedAsync. The
+ # callback might be called multiple times, so it shouldn't
+ # create a trigger every time and should never do anything
+ # after it's closed.
+
+ # It may be that the only case where it is called multiple
+ # times is in the test suite, where ThreadedAsync's loop can
+ # be started in a child process after a fork. Regardless,
+ # it's good to be defensive.
+
+ # XXX need each connection started with async==0 to have a
+ # callback
+ if not self.closed and self.trigger is None:
+ self.trigger = trigger()
+ self.thr_async = 1 # XXX needs to be set on the Connection
def attempt_connect(self):
+ """Attempt a connection to the server without blocking too long.
+
+ There isn't a crisp definition for too long. When a
+ ClientStorage is created, it attempts to connect to the
+ server. If the server isn't immediately available, it can
+ operate from the cache. This method will start the background
+ connection thread and wait a little while to see if it
+ finishes quickly.
+ """
+
# XXX will a single attempt take too long?
self.connect()
try:
event = self._thread.one_attempt
except AttributeError:
+ # An AttributeError means that (1) _thread is None and (2)
+ # as a consquence of (1) that the connect thread has
+ # already exited.
pass
else:
event.wait()
@@ -132,77 +172,122 @@
# helper for non-local exit
def __init__(self, sock):
self.sock = sock
-
+
+# When trying to do a connect on a non-blocking socket, some outcomes
+# are expected. Set _CONNECT_IN_PROGRESS to the errno value(s) expected
+# when an initial connect can't complete immediately. Set _CONNECT_OK
+# to the errno value(s) expected if the connect succeeds *or* if it's
+# already connected (our code can attempt redundant connects).
+if hasattr(errno, "WSAEWOULDBLOCK"): # Windows
+ _CONNECT_IN_PROGRESS = (errno.WSAEWOULDBLOCK,)
+ _CONNECT_OK = (0, errno.WSAEISCONN)
+else: # Unix
+ _CONNECT_IN_PROGRESS = (errno.EINPROGRESS,)
+ _CONNECT_OK = (0, errno.EISCONN)
+
class ConnectThread(threading.Thread):
+ """Thread that tries to connect to server given one or more addresses.
+ The thread is passed a ConnectionManager and the manager's client
+ as arguments. It calls notifyConnected() on the client when a
+ socket connects. If notifyConnected() returns without raising an
+ exception, the thread is done; it calls connect_done() on the
+ manager and exits.
+
+ The thread will continue to run, attempting connections, until a
+ successful notifyConnected() or stop() is called.
+ """
__super_init = threading.Thread.__init__
- def __init__(self, mgr, client, addr, tmin, tmax):
- self.__super_init(name="Connect(%s)" % addr)
+ # We don't expect clients to call any methods of this Thread other
+ # than close() and those defined by the Thread API.
+
+ def __init__(self, mgr, client, addrs, tmin, tmax):
+ self.__super_init(name="Connect(%s)" % addrs)
self.mgr = mgr
self.client = client
- self.addr = addr
+ self.addrs = addrs
self.tmin = tmin
self.tmax = tmax
self.stopped = 0
self.one_attempt = threading.Event()
+ # A ConnectThread keeps track of whether it has finished a
+ # call to attempt_connects(). This allows the
+ # ConnectionManager to make an attempt to connect right away,
+ # but not block for too long if the server isn't immediately
+ # available.
def stop(self):
self.stopped = 1
+ # Every method from run() to the end is used internally by the Thread.
+
def run(self):
delay = self.tmin
- while not (self.stopped or self.attempt_connects()):
+ while not self.stopped:
+ success = self.attempt_connects()
if not self.one_attempt.isSet():
self.one_attempt.set()
+ if success:
+ break
time.sleep(delay)
delay *= 2
if delay > self.tmax:
delay = self.tmax
log("thread exiting: %s" % self.getName())
-
+
+ def close_sockets(self):
+ for s in self.sockets.keys():
+ s.close()
+
def attempt_connects(self):
- "Return true if any connect attempt succeeds."
- sockets = {}
+ """Try connecting to all self.addrs addresses.
+
+ If at least one succeeds, pick a success arbitrarily, close all other
+ successes (if any), and return true. If none succeed, return false.
+ """
+
+ self.sockets = {} # {open socket: connection address}
- log("attempting connection on %d sockets" % len(self.addr))
+ log("attempting connection on %d sockets" % len(self.addrs))
try:
- for domain, addr in self.addr:
+ for domain, addr in self.addrs:
if __debug__:
log("attempt connection to %s" % repr(addr),
level=zLOG.DEBUG)
- s = socket.socket(domain, socket.SOCK_STREAM)
+ try:
+ s = socket.socket(domain, socket.SOCK_STREAM)
+ except socket.error, err:
+ log("Failed to create socket with domain=%s: %s" % (
+ domain, err), level=zLOG.ERROR)
+ continue
s.setblocking(0)
+ self.sockets[s] = addr
+ # connect() raises Connected iff it succeeds
# XXX can still block for a while if addr requires DNS
- e = self.connect(s, addr)
- if e is not None:
- sockets[s] = addr
+ self.connect(s)
# next wait until they actually connect
- while sockets:
+ while self.sockets:
if self.stopped:
- for s in sockets.keys():
- s.close()
+ self.close_sockets()
return 0
try:
- r, w, x = select.select([], sockets.keys(), [], 1.0)
+ r, w, x = select.select([], self.sockets.keys(), [], 1.0)
except select.error:
continue
for s in w:
- e = self.connect(s, sockets[s])
- if e is None:
- del sockets[s]
+ # connect() raises Connected iff it succeeds
+ self.connect(s)
except Connected, container:
s = container.sock
- del sockets[s]
- # close all the other sockets
- for s in sockets.keys():
- s.close()
+ del self.sockets[s] # don't close the newly connected socket
+ self.close_sockets()
return 1
return 0
- def connect(self, s, addr):
- """Call s.connect_ex(addr) and return true if loop should continue.
+ def connect(self, s):
+ """Call s.connect_ex(addr); raise Connected iff connection succeeds.
We have to handle several possible return values from
connect_ex(). If the socket is connected and the initial ZEO
@@ -211,27 +296,42 @@
select() loop in the caller and an exception is a principled
way to do the abort.
- If the socket sonnects and the initial ZEO setup fails or the
- connect_ex() returns an error, we close the socket and ignore it.
+ If the socket sonnects and the initial ZEO setup
+ (notifyConnected()) fails or the connect_ex() returns an
+ error, we close the socket, remove it from self.sockets, and
+ proceed with the other sockets.
If connect_ex() returns EINPROGRESS, we need to try again later.
"""
-
- e = s.connect_ex(addr)
- if e == errno.EINPROGRESS:
- return 1
- elif e == 0:
- c = self.test_connection(s, addr)
- log("connected to %s" % repr(addr), level=zLOG.DEBUG)
- if c:
- raise Connected(s)
+ addr = self.sockets[s]
+ try:
+ e = s.connect_ex(addr)
+ except socket.error, msg:
+ log("failed to connect to %s: %s" % (addr, msg),
+ level=zLOG.ERROR)
else:
- if __debug__:
+ if e in _CONNECT_IN_PROGRESS:
+ return
+ elif e in _CONNECT_OK:
+ c = self.test_connection(s, addr)
+ if c:
+ log("connected to %s" % repr(addr), level=zLOG.DEBUG)
+ raise Connected(s)
+ else:
log("error connecting to %s: %s" % (addr, errno.errorcode[e]),
level=zLOG.DEBUG)
- s.close()
+ # Any execution that doesn't raise Connected() or return
+ # because of CONNECT_IN_PROGRESS is an error. Make sure the
+ # socket is closed and remove it from the dict of pending
+ # sockets.
+ s.close()
+ del self.sockets[s]
def test_connection(self, s, addr):
+ # Establish a connection at the zrpc level and call the
+ # client's notifyConnected(), giving the zrpc application a
+ # chance to do app-level check of whether the connection is
+ # okay.
c = ManagedConnection(s, addr, self.client, self.mgr)
try:
self.client.notifyConnected(c)
@@ -239,6 +339,8 @@
log("error connecting to server: %s" % str(addr),
level=zLOG.ERROR, error=sys.exc_info())
c.close()
+ # Closing the ZRPC connection will eventually close the
+ # socket, somewhere in asyncore.
return 0
self.mgr.connect_done(c)
return 1
=== ZEO/ZEO/zrpc/connection.py 1.1.2.2 => 1.1.2.2.2.1 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
import asyncore
import sys
import threading
@@ -137,7 +150,7 @@
if flags & ASYNC:
if ret is not None:
raise ZRPCError("async method %s returned value %s" %
- (name, repr(ret))
+ (name, repr(ret)))
else:
if __debug__:
log("%s return %s" % (name, short_repr(ret)), zLOG.DEBUG)
@@ -162,7 +175,7 @@
def send_reply(self, msgid, ret):
msg = self.marshal.encode(msgid, 0, REPLY, ret)
self.message_output(msg)
-
+
def return_error(self, msgid, flags, err_type, err_value):
if flags is None:
self.log_error("Exception raised during decoding")
@@ -234,7 +247,7 @@
# XXX The message won't go out right away in this case. It
# will wait for the asyncore loop to get control again. Seems
# okay to comment our for now, but need to understand better.
-## self._do_async_poll()
+ self._do_async_poll()
# handle IO, possibly in async mode
@@ -271,7 +284,7 @@
if self.closed:
raise DisconnectedError()
self.__reply_lock.release()
-
+
def _do_async_poll(self, wait_for_reply=0):
"Invoke asyncore mainloop to get pending message out."
@@ -284,13 +297,9 @@
asyncore.poll(0.0, self._map)
class ServerConnection(Connection):
+ """Connection on the server side"""
+ # XXX Do we need this class anymore?
- def _do_async_poll(self, wait=0):
- """If this is a server, there is no explicit IO to do"""
- pass
-
- # XXX _do_async_loop is never called. Should it be defined as
- # above anyway?
class ManagedServerConnection(ServerConnection):
"""A connection that notifies its ConnectionManager of closing"""
@@ -310,7 +319,7 @@
"""A connection that notifies its ConnectionManager of closing.
A managed connection also defers the ThreadedAsync work to its
- manager.
+ manager.
"""
__super_init = Connection.__init__
__super_close = Connection.close
@@ -324,6 +333,9 @@
# the manager should actually close the trigger
del self.trigger
+ def set_async(self, map):
+ pass
+
def _prepare_async(self):
# Don't do the register_loop_callback that the superclass does
pass
@@ -345,4 +357,3 @@
def close(self):
self.__super_close()
self.__mgr.notify_closed(self)
-
=== ZEO/ZEO/zrpc/error.py 1.1.2.1 => 1.1.2.1.2.1 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
from ZODB import POSException
from ZEO.Exceptions import Disconnected
@@ -9,4 +22,3 @@
class DisconnectedError(ZRPCError, Disconnected):
"""The database storage is disconnected from the storage server."""
-
=== ZEO/ZEO/zrpc/log.py 1.1.2.2 => 1.1.2.2.2.1 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
import os
import types
import zLOG
=== ZEO/ZEO/zrpc/marshal.py 1.1.2.1 => 1.1.2.1.2.1 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
import cPickle
from cStringIO import StringIO
import types
@@ -7,7 +20,7 @@
# It's okay to share a single Pickler as long as it's in fast
# mode, which means that it doesn't have a memo.
-
+
pickler = cPickle.Pickler()
pickler.fast = 1
pickle = pickler.dump
@@ -46,7 +59,7 @@
r = getattr(m, name)
except AttributeError:
raise ZRPCError("module %s has no global %s" % (module, name))
-
+
safe = getattr(r, '__no_side_effects__', 0)
if safe:
return r
=== ZEO/ZEO/zrpc/server.py 1.1.2.1 => 1.1.2.1.2.1 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
import asyncore
import socket
import types
@@ -14,7 +27,7 @@
reuse_addr = 1
- def __init__(self, addr, factory=Connection, reuse_addr=None):
+ def __init__(self, addr, factory=Connection, reuse_addr=None):
self.__super_init()
self.addr = addr
self.factory = factory
=== ZEO/ZEO/zrpc/trigger.py 1.1.2.1 => 1.1.2.1.2.1 ===
-#
-# Zope Public License (ZPL) Version 1.0
-# -------------------------------------
-#
-# Copyright (c) Digital Creations. All rights reserved.
-#
-# This license has been certified as Open Source(tm).
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# 1. Redistributions in source code must retain the above copyright
-# notice, this list of conditions, and the following disclaimer.
-#
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions, and the following disclaimer in
-# the documentation and/or other materials provided with the
-# distribution.
-#
-# 3. Digital Creations requests that attribution be given to Zope
-# in any manner possible. Zope includes a "Powered by Zope"
-# button that is installed by default. While it is not a license
-# violation to remove this button, it is requested that the
-# attribution remain. A significant investment has been put
-# into Zope, and this effort will continue if the Zope community
-# continues to grow. This is one way to assure that growth.
-#
-# 4. All advertising materials and documentation mentioning
-# features derived from or use of this software must display
-# the following acknowledgement:
-#
-# "This product includes software developed by Digital Creations
-# for use in the Z Object Publishing Environment
-# (http://www.zope.org/)."
-#
-# In the event that the product being advertised includes an
-# intact Zope distribution (with copyright and license included)
-# then this clause is waived.
-#
-# 5. Names associated with Zope or Digital Creations must not be used to
-# endorse or promote products derived from this software without
-# prior written permission from Digital Creations.
-#
-# 6. Modified redistributions of any form whatsoever must retain
-# the following acknowledgment:
-#
-# "This product includes software developed by Digital Creations
-# for use in the Z Object Publishing Environment
-# (http://www.zope.org/)."
-#
-# Intact (re-)distributions of any official Zope release do not
-# require an external acknowledgement.
-#
-# 7. Modifications are encouraged but must be packaged separately as
-# patches to official Zope releases. Distributions that do not
-# clearly separate the patches from the original work must be clearly
-# labeled as unofficial distributions. Modifications which do not
-# carry the name Zope may be packaged in any form, as long as they
-# conform to all of the clauses above.
-#
-#
-# Disclaimer
-#
-# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
-# EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
-# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
-# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
-# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
-# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
-# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
-#
-#
-# This software consists of contributions made by Digital Creations and
-# many individuals on behalf of Digital Creations. Specific
-# attributions are listed in the accompanying credits file.
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
#
##############################################################################
-
# This module is a simplified version of the select_trigger module
# from Sam Rushing's Medusa server.
-
import asyncore
-#import asynchat
import os
import socket
-import string
import thread
-
+
if os.name == 'posix':
class trigger (asyncore.file_dispatcher):
@@ -136,6 +61,11 @@
self.lock = thread.allocate_lock()
self.thunks = []
+ def close(self):
+ self.del_channel()
+ self.socket.close() # the read side of the pipe
+ os.close(self.trigger) # the write side of the pipe
+
def __repr__ (self):
return '<select-trigger (pipe) at %x>' % id(self)
@@ -199,7 +129,7 @@
if port <= 19950:
raise 'Bind Error', 'Cannot bind trigger!'
port=port - 1
-
+
a.listen (1)
w.setblocking (0)
try:
@@ -250,7 +180,3 @@
self.thunks = []
finally:
self.lock.release()
-
-
-
-