[Zodb-checkins] CVS: StandaloneZODB/ZEO - zrpc2.py:1.1.2.24
Jeremy Hylton
jeremy@zope.com
Tue, 8 Jan 2002 23:40:16 -0500
Update of /cvs-repository/StandaloneZODB/ZEO
In directory cvs.zope.org:/tmp/cvs-serv3178
Modified Files:
Tag: ZEO-ZRPC-Dev
zrpc2.py
Log Message:
Change ConnectionManager to support multiple server addresses.
This is a significant reworking of the connect() logic in
ConnectionManager. It works on an arbitrary list of server
addresses. It opens a single socket for each address and does a
non-blocking connect. It stops when it gets the first successful
connection.
The __m_connect() method is the entry point for the connection thread.
The self.addr attribute has changed its use. It doesn't sort a single
addresss, it stores a list of domain X address 2-tuples. The
constructor accepts a single address or a list and parses each in
advance to determine if it's a Unix domain socket or an Internet
domain socket.
Two other changes:
- The ConnectionManager keeps a reference to its connection and
closes the connection when the manager's close() is executed.
- The notifyDisconnected() callback no longer passes an unused
None.
=== StandaloneZODB/ZEO/zrpc2.py 1.1.2.23 => 1.1.2.24 ===
import asyncore
+import errno
import cPickle
import os
+import select
import socket
import sys
import threading
@@ -385,12 +387,13 @@
# notifyDisconnected. make this optional?
def __init__(self, addr, obj=None, debug=1, tmin=1, tmax=180):
- self.addr = addr
+ self.set_addr(addr)
self.obj = obj
self.tmin = tmin
self.tmax = tmax
self.debug = debug
self.connected = 0
+ self.connection = None
# If _thread is not None, then there is a helper thread
# attempting to connect. _thread is protected by _connect_lock.
self._thread = None
@@ -403,6 +406,38 @@
def __repr__(self):
return "<%s for %s>" % (self.__class__.__name__, self.addr)
+ def set_addr(self, addr):
+ "Set one or more addresses to use for server."
+
+ # For backwards compatibility (and simplicity?) the
+ # constructor accepts a single address in the addr argument --
+ # a string for a Unix domain socket or a 2-tuple with a
+ # hostname and port. It can also accept a list of such addresses.
+
+ addr_type = self._guess_type(addr)
+ if addr_type is not None:
+ self.addr = [(addr_type, addr)]
+ else:
+ self.addr = []
+ for a in addr:
+ addr_type = self._guess_type(a)
+ if addr_type is None:
+ raise ValueError, "unknown address in list: %s" % repr(a)
+ self.addr.append((addr_type, a))
+
+ def _guess_type(self, addr):
+ if isinstance(addr, types.StringType):
+ return socket.AF_UNIX
+
+ if (len(addr) == 2
+ and isinstance(addr[0], types.StringType)
+ and isinstance(addr[1], types.IntType)):
+ return socket.AF_INET
+
+ # not anything I know about
+
+ return None
+
def close(self):
"""Prevent ConnectionManager from opening new connections"""
self.closed = 1
@@ -412,6 +447,8 @@
self._thread.join()
finally:
self._connect_lock.release()
+ if self.connection:
+ self.connection.close()
def register_object(self, obj):
self.obj = obj
@@ -427,8 +464,9 @@
self._connect_lock.acquire()
try:
if self._thread is None:
- self._thread = threading.Thread(target=self.__connect,
- args=(1,))
+ zLOG.LOG(_label, zLOG.BLATHER,
+ "starting thread to connect to server")
+ self._thread = threading.Thread(target=self.__m_connect)
self._thread.start()
if sync:
try:
@@ -440,72 +478,115 @@
self._connect_lock.release()
def attempt_connect(self):
- self.__connect(repeat=0)
+ # XXX will _attempt_connects() take too long? think select().
+ self._attempt_connects()
return self.connected
- def __connect(self, repeat=1):
- """Attempt to connect to StorageServer.
-
- This method should always be called by attempt_connect() or by
- connect().
- """
+ def notify_closed(self, conn):
+ self.connected = 0
+ self.connection = None
+ self.obj.notifyDisconnected()
+ if not self.closed:
+ self.connect()
+ class Connected(Exception):
+ def __init__(self, sock):
+ self.sock = sock
+
+ def __m_connect(self):
+ # a new __connect that handles multiple addresses
try:
- tries = 0
- t = self.tmin
- while not (self.connected or self.closed) \
- and (repeat or (tries == 0)):
- tries = tries + 1
- log("Trying to connect to server")
- s = self._connect_socket()
- if s is None:
- if repeat:
- t = self._wait(t)
- else:
- if self.debug:
- log("Connected to server", level=zLOG.DEBUG)
- self.connected = 1
- if self.connected and not self.closed:
- c = ManagedConnection(s, self.addr, self.obj, self)
- log("Connection created: %s" % c)
- try:
- self.obj.notifyConnected(c)
- except:
- self._thread = None
- c.close()
- # When the connection is closed, we'll trigger
- # another attempt to reconnect.
+ delay = self.tmin
+ while not (self.closed or self._attempt_connects()):
+ time.sleep(delay)
+ delay *= 2
+ if delay > self.tmax:
+ delay = self.tmax
finally:
- # must always clear _thread on the way out
self._thread = None
- def _connect_socket(self):
+ def _attempt_connects(self):
+ "Return true if any connect attempt succeeds."
+ sockets = {}
+
+ zLOG.LOG(_label, zLOG.BLATHER,
+ "attempting connection on %d sockets" % len(self.addr))
try:
- if type(self.addr) is types.StringType:
- s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- else:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(self.addr)
- except socket.error, msg:
- if self.debug:
- log("Failed to connect to server: %s" % msg,
- level=zLOG.DEBUG)
- s.close()
- return None
- return s
+ for domain, addr in self.addr:
+ if __debug__:
+ zLOG.LOG(_label, zLOG.DEBUG,
+ "attempt connection to %s" % repr(addr))
+ s = socket.socket(domain, socket.SOCK_STREAM)
+ s.setblocking(0)
+ # XXX can still block for a while if addr requires DNS
+ e = self._connect_ex(s, addr)
+ if e is not None:
+ sockets[s] = addr
+
+ # next wait until the actually connect
+ while sockets:
+ if self.closed:
+ for s in sockets.keys():
+ s.close()
+ return 0
+ try:
+ r, w, x = select.select([], sockets.keys(), [], 1.0)
+ except select.error:
+ continue
+ for s in w:
+ e = self._connect_ex(s, sockets[s])
+ if e is None:
+ del sockets[s]
+ except self.Connected, container:
+ s = container.sock
+ del sockets[s]
+ # close all the other sockets
+ for s in sockets.keys():
+ s.close()
+ return 1
+ return 0
+
+ def _connect_ex(self, s, addr):
+ """Call s.connect_ex(addr) and return true if loop should continue.
+
+ We have to handle several possible return values from
+ connect_ex(). If the socket is connected and the initial ZEO
+ setup works, we're done. Report success by raising an
+ exception. Yes, the is odd, but we need to bail out of the
+ select() loop in the caller and an exception is a principled
+ way to do the abort.
- def _wait(self, t):
- time.sleep(t)
- t = t * 2
- if t > self.tmax:
- t = self.tmax
- return t
+ If the socket sonnects and the initial ZEO setup fails or the
+ connect_ex() returns an error, we close the socket and ignore it.
- def notify_closed(self, conn):
- self.connected = 0
- self.obj.notifyDisconnected(None)
- if not self.closed:
- self.connect()
+ If connect_ex() returns EINPROGRESS, we need to try again later.
+ """
+
+ e = s.connect_ex(addr)
+ if e == errno.EINPROGRESS:
+ return 1
+ elif e == 0:
+ c = self._test_connection(s, addr)
+ if c:
+ self.connected = 1
+ raise self.Connected(s)
+ else:
+ if __debug__:
+ zLOG.LOG(_label, zLOG.DEBUG,
+ "error connecting to %s: %s" % (addr,
+ errno.errorcode[e]))
+ s.close()
+
+ def _test_connection(self, s, addr):
+ c = ManagedConnection(s, addr, self.obj, self)
+ try:
+ self.obj.notifyConnected(c)
+ self.connection = c
+ return 1
+ except:
+ # log something here?
+ c.close()
+ return 0
class ManagedServerConnection(ServerConnection):
"""A connection that notifies its ConnectionManager of closing"""