[Zodb-checkins] CVS: ZODB3/ZEO/zrpc - client.py:1.16 connection.py:1.32
Guido van Rossum
guido@python.org
Fri, 20 Sep 2002 13:37:35 -0400
Update of /cvs-repository/ZODB3/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv26301/ZEO/zrpc
Modified Files:
client.py connection.py
Log Message:
I set out making wait=1 work for fallback connections, i.e. the
ClientStorage constructor called with both wait=1 and
read_only_fallback=1 should return, indicating its readiness, when a
read-only connection was made. This is done by calling
connect(sync=1). Previously this waited for the ConnectThread to
finish, but that thread doesn't finish until it's made a read-write
connection, so a different mechanism is needed.
I ended up doing a major overhaul of the interfaces between
ClientStorage, ConnectionManager, ConnectThread/ConnectWrapper, and
even ManagedConnection. Changes:
ClientStorage.py:
ClientStorage:
- testConnection() now returns just the preferred flag; stubs are
cheap and I like to have the notifyConnected() signature be the
same for clients and servers.
- notifyConnected() now takes a connection (to match the signature
of this method in StorageServer), and creates a new stub. It also
takes care of the reconnect business if the client was already
connected, rather than the ClientManager. It stores the
connection as self._connection so it can close the previous one.
This is also reset by notifyDisconnected().
zrpc/client.py:
ConnectionManager:
- Changed self.thread_lock into a condition variable. It now also
protects self.connection. The condition is notified when
self.connection is set to a non-None value in connect_done();
connect(sync=1) waits for it. The self.connected variable is no
more; we test "self.connection is not None" instead.
- Tried to made close() reentrant. (There's a trick: you can't set
self.connection to None, conn.close() ends up calling close_conn()
which does this.)
- Renamed notify_closed() to close_conn(), for symmetry with the
StorageServer API.
- Added an is_connected() method so ConnectThread.try_connect()
doesn't have to dig inside the manager's guts to find out if the
manager is connected (important for the disposition of fallback
wrappers).
ConnectThread and ConnectWrapper:
- Follow above changes in the ClientStorage and ConnectionManager
APIs: don't close the manager's connection when reconnecting, but
leave that up to notifyConnected(); ConnectWrapper no longer
manages the stub.
- ConnectWrapper sets self.sock to None once it's created a
ManagedConnection -- from there on the connection is is charge of
closing the socket.
zrpc/connection.py:
ManagedServerConnection:
- Changed the order in which close() calls things; super_close()
should be last.
ManagedConnection:
- Ditto, and call the manager's close_conn() instead of
notify_closed().
tests/testZEO.py:
- In checkReconnectSwitch(), we can now open the client storage with
wait=1 and read_only_fallback=1.
=== ZODB3/ZEO/zrpc/client.py 1.15 => 1.16 ===
--- ZODB3/ZEO/zrpc/client.py:1.15 Wed Sep 18 23:51:23 2002
+++ ZODB3/ZEO/zrpc/client.py Fri Sep 20 13:37:34 2002
@@ -36,13 +36,12 @@
self.client = client
self.tmin = tmin
self.tmax = tmax
- self.connected = 0
- self.connection = None
+ self.cond = threading.Condition(threading.Lock())
+ self.connection = None # Protected by self.cond
self.closed = 0
# If thread is not None, then there is a helper thread
- # attempting to connect. thread is protected by thread_lock.
- self.thread = None
- self.thread_lock = threading.Lock()
+ # attempting to connect.
+ self.thread = None # Protected by self.cond
self.trigger = None
self.thr_async = 0
ThreadedAsync.register_loop_callback(self.set_async)
@@ -85,21 +84,26 @@
def close(self):
"""Prevent ConnectionManager from opening new connections"""
self.closed = 1
- self.thread_lock.acquire()
+ self.cond.acquire()
try:
t = self.thread
+ self.thread = None
+ conn = self.connection
finally:
- self.thread_lock.release()
+ self.cond.release()
if t is not None:
log("CM.close(): stopping and joining thread")
t.stop()
t.join(30)
if t.isAlive():
- log("CM.close(): self.thread.join() timed out")
- if self.connection:
- self.connection.close()
+ log("CM.close(): self.thread.join() timed out",
+ level=zLOG.WARNING)
+ if conn is not None:
+ # This will call close_conn() below which clears self.connection
+ conn.close()
if self.trigger is not None:
self.trigger.close()
+ self.trigger = None
def set_async(self, map):
# This is the callback registered with ThreadedAsync. The
@@ -131,23 +135,36 @@
finishes quickly.
"""
- # XXX will a single attempt take too long?
+ # XXX Will a single attempt take too long?
+ # XXX Answer: it depends -- normally, you'll connect or get a
+ # connection refused error very quickly. Packet-eating
+ # firewalls and other mishaps may cause the connect to take a
+ # long time to time out though. It's also possible that you
+ # connect quickly to a slow server, and the attempt includes
+ # at least one roundtrip to the server (the register() call).
+ # But that's as fast as you can expect it to be.
self.connect()
- self.thread_lock.acquire()
+ self.cond.acquire()
try:
t = self.thread
+ conn = self.connection
finally:
- self.thread_lock.release()
- if t is not None:
+ self.cond.release()
+ if t is not None and conn is None:
event = t.one_attempt
event.wait()
- return self.connected
+ self.cond.acquire()
+ try:
+ conn = self.connection
+ finally:
+ self.cond.release()
+ return conn is not None
def connect(self, sync=0):
- if self.connected == 1:
- return
- self.thread_lock.acquire()
+ self.cond.acquire()
try:
+ if self.connection is not None:
+ return
t = self.thread
if t is None:
log("CM.connect(): starting ConnectThread")
@@ -155,37 +172,51 @@
self.addrlist,
self.tmin, self.tmax)
t.start()
+ if sync:
+ while self.connection is None:
+ self.cond.wait(30)
+ if self.connection is None:
+ log("CM.connect(sync=1): still waiting...")
finally:
- self.thread_lock.release()
+ self.cond.release()
if sync:
- t.join(30)
- while t.isAlive():
- log("CM.connect(sync=1): thread join timed out")
- t.join(30)
+ assert self.connection is not None
def connect_done(self, conn, preferred):
+ # Called by ConnectWrapper.notify_client() after notifying the client
log("CM.connect_done(preferred=%s)" % preferred)
- self.connected = 1
- self.connection = conn
- if preferred:
- self.thread_lock.acquire()
- try:
+ self.cond.acquire()
+ try:
+ self.connection = conn
+ if preferred:
self.thread = None
- finally:
- self.thread_lock.release()
+ self.cond.notifyAll() # Wake up connect(sync=1)
+ finally:
+ self.cond.release()
- def notify_closed(self, conn):
- if conn is not self.connection:
- # Closing a non-current connection
- log("CM.notify_closed() non-current", level=zLOG.BLATHER)
- return
- log("CM.notify_closed()")
- self.connected = 0
- self.connection = None
+ def close_conn(self, conn):
+ # Called by the connection when it is closed
+ self.cond.acquire()
+ try:
+ if conn is not self.connection:
+ # Closing a non-current connection
+ log("CM.close_conn() non-current", level=zLOG.BLATHER)
+ return
+ log("CM.close_conn()")
+ self.connection = None
+ finally:
+ self.cond.release()
self.client.notifyDisconnected()
if not self.closed:
self.connect()
+ def is_connected(self):
+ self.cond.acquire()
+ try:
+ return self.connection is not None
+ finally:
+ self.cond.release()
+
# When trying to do a connect on a non-blocking socket, some outcomes
# are expected. Set _CONNECT_IN_PROGRESS to the errno value(s) expected
# when an initial connect can't complete immediately. Set _CONNECT_OK
@@ -207,20 +238,20 @@
The thread is passed a ConnectionManager and the manager's client
as arguments. It calls testConnection() on the client when a
- socket connects; that should return a tuple (stub, score) where
- stub is an RPC stub, and score is 1 or 0 depending on whether this
+ socket connects; that should return 1 or 0 indicating whether this
is a preferred or a fallback connection. It may also raise an
exception, in which case the connection is abandoned.
The thread will continue to run, attempting connections, until a
- preferred stub is seen or until all sockets have been tried.
+ preferred connection is seen or until all sockets have been tried.
- As soon as testConnection() returns a preferred stub, or after all
- sockets have been tried and at least one fallback stub has been
- seen, notifyConnected(stub) is called on the client and
- connect_done() on the manager. If this was a preferred stub, the
- thread then exits; otherwise, it keeps trying until it gets a
- preferred stub, and then reconnects the client using that stub.
+ As soon as testConnection() finds a preferred connection, or after
+ all sockets have been tried and at least one fallback connection
+ has been seen, notifyConnected(connection) is called on the client
+ and connect_done() on the manager. If this was a preferred
+ connection, the thread then exits; otherwise, it keeps trying
+ until it gets a preferred connection, and then reconnects the
+ client using that connection.
"""
@@ -248,6 +279,7 @@
def run(self):
delay = self.tmin
+ success = 0
while not self.stopped:
success = self.try_connecting()
if not self.one_attempt.isSet():
@@ -315,10 +347,10 @@
del wrappers[wrap]
# If we've got wrappers left at this point, they're fallback
- # connections. Try notifying then until one succeeds.
+ # connections. Try notifying them until one succeeds.
for wrap in wrappers.keys():
assert wrap.state == "tested" and wrap.preferred == 0
- if self.mgr.connected:
+ if self.mgr.is_connected():
wrap.close()
else:
wrap.notify_client()
@@ -356,7 +388,6 @@
self.state = "closed"
self.sock = None
self.conn = None
- self.stub = None
self.preferred = 0
log("CW: attempt to connect to %s" % repr(addr))
try:
@@ -402,8 +433,9 @@
"""
self.conn = ManagedConnection(self.sock, self.addr,
self.client, self.mgr)
+ self.sock = None # The socket is now owned by the connection
try:
- (self.stub, self.preferred) = self.client.testConnection(self.conn)
+ self.preferred = self.client.testConnection(self.conn)
self.state = "tested"
except ReadOnlyError:
log("CW: ReadOnlyError in testConnection (%s)" % repr(self.addr))
@@ -422,16 +454,12 @@
If this succeeds, call the manager's connect_done().
- If the client is already connected, we assume it's a fallbac
- connection, the new stub must be a preferred stub, and we
- first disconnect the client.
+ If the client is already connected, we assume it's a fallback
+ connection, and the new connection must be a preferred
+ connection. The client will close the old connection.
"""
- if self.mgr.connected:
- assert self.preferred
- log("CW: reconnecting client to preferred stub")
- self.mgr.connection.close()
try:
- self.client.notifyConnected(self.stub)
+ self.client.notifyConnected(self.conn)
except:
log("CW: error in notifyConnected (%s)" % repr(self.addr),
level=zLOG.ERROR, error=sys.exc_info())
@@ -443,7 +471,7 @@
def close(self):
"""Close the socket and reset everything."""
self.state = "closed"
- self.stub = self.mgr = self.client = None
+ self.mgr = self.client = None
self.preferred = 0
if self.conn is not None:
# Closing the ZRPC connection will eventually close the
=== ZODB3/ZEO/zrpc/connection.py 1.31 => 1.32 ===
--- ZODB3/ZEO/zrpc/connection.py:1.31 Wed Sep 18 23:51:23 2002
+++ ZODB3/ZEO/zrpc/connection.py Fri Sep 20 13:37:34 2002
@@ -427,8 +427,8 @@
def close(self):
self.obj.notifyDisconnected()
- self.__super_close()
self.__mgr.close_conn(self)
+ self.__super_close()
class ManagedConnection(Connection):
"""Client-side Connection subclass."""
@@ -469,5 +469,5 @@
return self.check_mgr_async()
def close(self):
+ self.__mgr.close_conn(self)
self.__super_close()
- self.__mgr.notify_closed(self)