[Zodb-checkins] CVS: StandaloneZODB/ZEO - zrpc2.py:1.1.2.17
Jeremy Hylton
jeremy@zope.com
Thu, 3 Jan 2002 19:18:54 -0500
Update of /cvs-repository/StandaloneZODB/ZEO
In directory cvs.zope.org:/tmp/cvs-serv23687
Modified Files:
Tag: ZEO-ZRPC-Dev
zrpc2.py
Log Message:
Three sets of changes.
In a block _do_io() call, raise Disconnected() if the connection is closed.
Refactor __connect() to put some of the socket logic in a separate
method. _connect_socket() returns either a connected socket or None.
Put __connect() in a try/finally that always clears _thread.
Add readable() and writable() implementations to Connection() that
prevent a closed connection from getting in a socket map.
=== StandaloneZODB/ZEO/zrpc2.py 1.1.2.16 => 1.1.2.17 ===
import zeolog
import ThreadedAsync
+from Exceptions import Disconnected
REPLY = ".reply" # message name used for replies
ASYNC = 1
@@ -140,6 +141,7 @@
"""
__super_init = smac.SizedMessageAsyncConnection.__init__
__super_close = smac.SizedMessageAsyncConnection.close
+ __super_writable = smac.SizedMessageAsyncConnection.writable
def __init__(self, sock, addr, obj=None, pickle=None):
self.msgid = 0
@@ -163,6 +165,16 @@
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.addr)
+ # XXX are the readable() and writable() methods necessary?
+
+ def readable(self):
+ return not self.closed
+
+ def writable(self):
+ if self.closed:
+ return 0
+ return self.__super_writable()
+
def close(self):
if self.closed:
return
@@ -366,7 +378,9 @@
if wait:
# do loop only if lock is already acquired
while not self.__reply_lock.acquire(0):
- asyncore.poll(60.0, self._map)
+ asyncore.poll(10.0, self._map)
+ if self.closed:
+ raise Disconnected()
self.__reply_lock.release()
else:
asyncore.poll(0.0, self._map)
@@ -394,6 +408,8 @@
self.tmax = tmax
self.debug = debug
self.connected = 0
+ # If _thread is not None, then there is a helper thread
+ # attempting to connect. _thread is protected by _connect_lock.
self._thread = None
self._connect_lock = threading.Lock()
self.trigger = None
@@ -446,34 +462,51 @@
This method should always be called by attempt_connect() or by
connect().
"""
-
- tries = 0
- t = self.tmin
- while not (self.connected or self.closed) \
- and (repeat or (tries == 0)):
- tries = tries + 1
- log("Trying to connect to server")
- try:
- if type(self.addr) is types.StringType:
- s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+
+ try:
+ tries = 0
+ t = self.tmin
+ while not (self.connected or self.closed) \
+ and (repeat or (tries == 0)):
+ tries = tries + 1
+ print self, tries, self.closed
+ log("Trying to connect to server")
+ s = self._connect_socket()
+ if s is None:
+ if repeat:
+ t = self._wait(t)
else:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(self.addr)
- except socket.error, msg:
- if self.debug:
- log("Failed to connect to server: %s" % msg,
- level=zeolog.DEBUG)
- if repeat:
- t = self._wait(t)
+ if self.debug:
+ log("Connected to server", level=zeolog.DEBUG)
+ self.connected = 1
+ if self.connected and not self.closed:
+ print "connected"
+ c = ManagedConnection(s, self.addr, self.obj, self)
+ log("Connection created: %s" % c)
+ try:
+ self.obj.notifyConnected(c)
+ except:
+ # XXX
+ c.close()
+ raise
+ finally:
+ # must always clear _thread on the way out
+ self._thread = None
+
+ def _connect_socket(self):
+ try:
+ if type(self.addr) is types.StringType:
+ s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
- if self.debug:
- log("Connected to server", level=zeolog.DEBUG)
- self.connected = 1
- if self.connected and not self.closed:
- c = ManagedConnection(s, self.addr, self.obj, self)
- log("Connection created: %s" % c)
- self.obj.notifyConnected(c)
- self._thread = None
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect(self.addr)
+ except socket.error, msg:
+ if self.debug:
+ log("Failed to connect to server: %s" % msg,
+ level=zeolog.DEBUG)
+ s.close()
+ return None
+ return s
def _wait(self, t):
time.sleep(t)