[Zodb-checkins] SVN: ZODB/trunk/ Removed the "sync" mode for
ClientStorage. Previously, a
Jim Fulton
jim at zope.com
Tue Jul 18 14:15:58 EDT 2006
Log message for revision 69179:
Removed the "sync" mode for ClientStorage. Previously, a
ClientStorage could be in either "sync" mode or "async" mode. Now
there is just "async" mode. There is now a dedicicated asyncore main
loop dedicated to ZEO clients.
This addresses a test failure on Mac OS X,
http://www.zope.org/Collectors/Zope3-dev/650, that I believe was due
to a bug in sync mode. Some asyncore-based code was being called from
multiple threads that didn't expect to be.
Converting to always-async mode revealed some bugs that weren't caught
before because the tests ran in sync mode. These problems could
explain some problems we've seen at times with clients taking a long
time to reconnect after a disconnect.
Added a partial heart beat to try to detect lost connections that
aren't otherwise caught,
http://mail.zope.org/pipermail/zodb-dev/2005-June/008951.html, by
perioidically writing to all connections during periods of inactivity.
Changed:
U ZODB/trunk/NEWS.txt
U ZODB/trunk/src/ZEO/ClientStorage.py
U ZODB/trunk/src/ZEO/ServerStub.py
U ZODB/trunk/src/ZEO/tests/CommitLockTests.py
U ZODB/trunk/src/ZEO/tests/ConnectionTests.py
U ZODB/trunk/src/ZEO/tests/testAuth.py
U ZODB/trunk/src/ZEO/tests/testZEO.py
U ZODB/trunk/src/ZEO/zrpc/client.py
U ZODB/trunk/src/ZEO/zrpc/connection.py
U ZODB/trunk/src/ZEO/zrpc/trigger.py
-=-
Modified: ZODB/trunk/NEWS.txt
===================================================================
--- ZODB/trunk/NEWS.txt 2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/NEWS.txt 2006-07-18 18:15:57 UTC (rev 69179)
@@ -1,13 +1,35 @@
-What's new in ZODB3 3.7a1?
-==========================
-Release date: DD-MMM-200Y
+What's new on ZODB 3.7b2?
+=========================
+ClientStorage
+-------------
-Following is combined news from internal releases (to support ongoing
-Zope development). These are the dates of the internal releases:
+- (3.7b2) Removed the "sync" mode for ClientStorage.
-- 3.7a1 DD-MMM-200Y
+ Previously, a ClientStorage could be in either "sync" mode or "async"
+ mode. Now there is just "async" mode. There is now a dedicicated
+ asyncore main loop dedicated to ZEO clients.
+ Applications no-longer need to run an asyncore main loop to cause
+ client storages to run in async mode. Even if an application runs an
+ asyncore main loop, it is independent of the loop used by client
+ storages.
+
+ This addresses a test failure on Mac OS X,
+ http://www.zope.org/Collectors/Zope3-dev/650, that I believe was due
+ to a bug in sync mode. Some asyncore-based code was being called from
+ multiple threads that didn't expect to be.
+
+ Converting to always-async mode revealed some bugs that weren't caught
+ before because the tests ran in sync mode. These problems could
+ explain some problems we've seen at times with clients taking a long
+ time to reconnect after a disconnect.
+
+ Added a partial heart beat to try to detect lost connections that
+ aren't otherwise caught,
+ http://mail.zope.org/pipermail/zodb-dev/2005-June/008951.html, by
+ perioidically writing to all connections during periods of inactivity.
+
Connection management
---------------------
@@ -34,6 +56,14 @@
- (3.7a1) The documentation for ``_p_oid`` now specifies the concrete
type of oids (in short, an oid is either None or a non-empty string).
+Testing
+-------
+
+- (3.7b2) Fixed test-runner output truncation.
+
+ A bug was fixed in the test runner that caused result summaries to be
+ omitted when running on Windows.
+
Tools
-----
Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py 2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/src/ZEO/ClientStorage.py 2006-07-18 18:15:57 UTC (rev 69179)
@@ -339,43 +339,16 @@
# still be going on. This code must wait until validation
# finishes, but if the connection isn't a zrpc async
# connection it also needs to poll for input.
- if self._connection.is_async():
- while 1:
- self._ready.wait(30)
- if self._ready.isSet():
- break
- if timeout and time.time() > deadline:
- log2("Timed out waiting for connection",
- level=logging.WARNING)
- break
- log2("Waiting for cache verification to finish")
- else:
- self._wait_sync(deadline)
-
- def _wait_sync(self, deadline=None):
- # Log no more than one "waiting" message per LOG_THROTTLE seconds.
- LOG_THROTTLE = 300 # 5 minutes
- next_log_time = time.time()
-
- while not self._ready.isSet():
- now = time.time()
- if deadline and now > deadline:
- log2("Timed out waiting for connection", level=logging.WARNING)
+ assert self._connection.is_async()
+ while 1:
+ self._ready.wait(30)
+ if self._ready.isSet():
break
- if now >= next_log_time:
- log2("Waiting for cache verification to finish")
- next_log_time = now + LOG_THROTTLE
- if self._connection is None:
- # If the connection was closed while we were
- # waiting for it to become ready, start over.
- if deadline is None:
- timeout = None
- else:
- timeout = deadline - now
- return self._wait(timeout)
- # No mainloop ia running, so we need to call something fancy to
- # handle asyncore events.
- self._connection.pending(30)
+ if timeout and time.time() > deadline:
+ log2("Timed out waiting for connection",
+ level=logging.WARNING)
+ break
+ log2("Waiting for cache verification to finish")
def close(self):
"""Storage API: finalize the storage, releasing external resources."""
@@ -403,18 +376,9 @@
return self._ready.isSet()
def sync(self):
- """Handle any pending invalidation messages.
+ # The separate async thread should keep us up to date
+ pass
- This is called by the sync method in ZODB.Connection.
- """
- # If there is no connection, return immediately. Technically,
- # there are no pending invalidations so they are all handled.
- # There doesn't seem to be much benefit to raising an exception.
-
- cn = self._connection
- if cn is not None:
- cn.pending()
-
def doAuth(self, protocol, stub):
if not (self._username and self._password):
raise AuthError("empty username or password")
@@ -517,11 +481,17 @@
stub = self.StorageServerStubClass(conn)
self._oids = []
- self._info.update(stub.get_info())
self.verify_cache(stub)
- if not conn.is_async():
- log2("Waiting for cache verification to finish")
- self._wait_sync()
+
+ # It's important to call get_info after calling verify_cache.
+ # If we end up doing a full-verification, we need to wait till
+ # it's done. By doing a synchonous call, we are guarenteed
+ # that the verification will be done because operations are
+ # handled in order.
+ self._info.update(stub.get_info())
+
+ assert conn.is_async()
+
self._handle_extensions()
def _handle_extensions(self):
Modified: ZODB/trunk/src/ZEO/ServerStub.py
===================================================================
--- ZODB/trunk/src/ZEO/ServerStub.py 2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/src/ZEO/ServerStub.py 2006-07-18 18:15:57 UTC (rev 69179)
@@ -13,6 +13,8 @@
##############################################################################
"""RPC stubs for interface exported by StorageServer."""
+import time
+
##
# ZEO storage server.
# <p>
@@ -44,9 +46,11 @@
zrpc.connection.Connection class.
"""
self.rpc = rpc
+
# Wait until we know what version the other side is using.
while rpc.peer_protocol_version is None:
- rpc.pending()
+ time.sleep(0.1)
+
if rpc.peer_protocol_version == 'Z200':
self.lastTransaction = lambda: None
self.getInvalidations = lambda tid: None
Modified: ZODB/trunk/src/ZEO/tests/CommitLockTests.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/CommitLockTests.py 2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/src/ZEO/tests/CommitLockTests.py 2006-07-18 18:15:57 UTC (rev 69179)
@@ -35,10 +35,9 @@
# run the entire test in a thread so that the blocking call for
# tpc_vote() doesn't hang the test suite.
- def __init__(self, storage, trans, method="tpc_finish"):
+ def __init__(self, storage, trans):
self.storage = storage
self.trans = trans
- self.method = method
self.ready = threading.Event()
TestThread.__init__(self)
@@ -52,10 +51,7 @@
p = zodb_pickle(MinPO("c"))
self.storage.store(oid, ZERO, p, '', self.trans)
self.myvote()
- if self.method == "tpc_finish":
- self.storage.tpc_finish(self.trans)
- else:
- self.storage.tpc_abort(self.trans)
+ self.storage.tpc_finish(self.trans)
except ClientDisconnected:
pass
@@ -120,7 +116,7 @@
t.start()
t.ready.wait()
- # Close on the connections abnormally to test server response
+ # Close one of the connections abnormally to test server response
if i == 0:
storage.close()
else:
@@ -237,7 +233,6 @@
trans_id = self._get_trans_id()
oid, txn = self._start_txn()
msgid = self._begin_undo(trans_id, txn)
-
self._begin_threads()
self._finish_undo(msgid)
Modified: ZODB/trunk/src/ZEO/tests/ConnectionTests.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/ConnectionTests.py 2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/src/ZEO/tests/ConnectionTests.py 2006-07-18 18:15:57 UTC (rev 69179)
@@ -55,6 +55,12 @@
StorageServerStubClass = TestServerStub
+ connection_count_for_tests = 0
+
+ def notifyConnected(self, conn):
+ ClientStorage.notifyConnected(self, conn)
+ self.connection_count_for_tests += 1
+
def verify_cache(self, stub):
self.end_verify = threading.Event()
self.verify_result = ClientStorage.verify_cache(self, stub)
@@ -959,40 +965,39 @@
storage.close()
def checkTimeoutAfterVote(self):
- raises = self.assertRaises
- unless = self.failUnless
self._storage = storage = self.openClientStorage()
# Assert that the zeo cache is empty
- unless(not list(storage._cache.contents()))
+ self.assert_(not list(storage._cache.contents()))
# Create the object
oid = storage.new_oid()
obj = MinPO(7)
# Now do a store, sleeping before the finish so as to cause a timeout
t = Transaction()
+ old_connection_count = storage.connection_count_for_tests
storage.tpc_begin(t)
revid1 = storage.store(oid, ZERO, zodb_pickle(obj), '', t)
storage.tpc_vote(t)
# Now sleep long enough for the storage to time out
time.sleep(3)
- storage.sync()
- unless(not storage.is_connected())
+ self.assert_(
+ (not storage.is_connected())
+ or
+ (storage.connection_count_for_tests > old_connection_count)
+ )
storage._wait()
- unless(storage.is_connected())
+ self.assert_(storage.is_connected())
# We expect finish to fail
- raises(ClientDisconnected, storage.tpc_finish, t)
+ self.assertRaises(ClientDisconnected, storage.tpc_finish, t)
# The cache should still be empty
- unless(not list(storage._cache.contents()))
+ self.assert_(not list(storage._cache.contents()))
# Load should fail since the object should not be in either the cache
# or the server.
- raises(KeyError, storage.load, oid, '')
+ self.assertRaises(KeyError, storage.load, oid, '')
def checkTimeoutProvokingConflicts(self):
- eq = self.assertEqual
- raises = self.assertRaises
- require = self.assert_
self._storage = storage = self.openClientStorage()
# Assert that the zeo cache is empty.
- require(not list(storage._cache.contents()))
+ self.assert_(not list(storage._cache.contents()))
# Create the object
oid = storage.new_oid()
obj = MinPO(7)
@@ -1007,6 +1012,7 @@
# Now do a store, sleeping before the finish so as to cause a timeout.
obj.value = 8
t = Transaction()
+ old_connection_count = storage.connection_count_for_tests
storage.tpc_begin(t)
revid2a = storage.store(oid, revid1, zodb_pickle(obj), '', t)
revid2b = storage.tpc_vote(t)
@@ -1020,17 +1026,21 @@
# of 3).
deadline = time.time() + 60 # wait up to a minute
while time.time() < deadline:
- if storage.is_connected():
+ if (storage.is_connected() and
+ (storage.connection_count_for_tests == old_connection_count)
+ ):
time.sleep(self.timeout / 1.8)
- storage.sync()
else:
break
- storage.sync()
- require(not storage.is_connected())
+ self.assert_(
+ (not storage.is_connected())
+ or
+ (storage.connection_count_for_tests > old_connection_count)
+ )
storage._wait()
- require(storage.is_connected())
+ self.assert_(storage.is_connected())
# We expect finish to fail.
- raises(ClientDisconnected, storage.tpc_finish, t)
+ self.assertRaises(ClientDisconnected, storage.tpc_finish, t)
# Now we think we've committed the second transaction, but we really
# haven't. A third one should produce a POSKeyError on the server,
# which manifests as a ConflictError on the client.
@@ -1038,7 +1048,7 @@
t = Transaction()
storage.tpc_begin(t)
storage.store(oid, revid2, zodb_pickle(obj), '', t)
- raises(ConflictError, storage.tpc_vote, t)
+ self.assertRaises(ConflictError, storage.tpc_vote, t)
# Even aborting won't help.
storage.tpc_abort(t)
storage.tpc_finish(t)
@@ -1048,7 +1058,7 @@
storage.tpc_begin(t)
storage.store(oid, revid2, zodb_pickle(obj), '', t)
# Even aborting won't help.
- raises(ConflictError, storage.tpc_vote, t)
+ self.assertRaises(ConflictError, storage.tpc_vote, t)
# Abort this one and try a transaction that should succeed.
storage.tpc_abort(t)
storage.tpc_finish(t)
@@ -1062,8 +1072,8 @@
storage.tpc_finish(t)
# Now load the object and verify that it has a value of 11.
data, revid = storage.load(oid, '')
- eq(zodb_unpickle(data), MinPO(11))
- eq(revid, revid2)
+ self.assertEqual(zodb_unpickle(data), MinPO(11))
+ self.assertEqual(revid, revid2)
class MSTThread(threading.Thread):
Modified: ZODB/trunk/src/ZEO/tests/testAuth.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testAuth.py 2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/src/ZEO/tests/testAuth.py 2006-07-18 18:15:57 UTC (rev 69179)
@@ -95,10 +95,11 @@
def testUnauthenticatedMessage(self):
# Test that an unauthenticated message is rejected by the server
# if it was sent after the connection was authenticated.
+
+ self._storage = self.openClientStorage(wait=0, username="foo",
+ password="bar", realm=self.realm)
# Sleep for 0.2 seconds to give the server some time to start up
# seems to be needed before and after creating the storage
- self._storage = self.openClientStorage(wait=0, username="foo",
- password="bar", realm=self.realm)
self.wait()
self._storage.versions()
# Manually clear the state of the hmac connection
Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py 2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py 2006-07-18 18:15:57 UTC (rev 69179)
@@ -14,13 +14,15 @@
"""Test suite for ZEO based on ZODB.tests."""
# System imports
+import asyncore
+import logging
import os
import random
+import signal
import socket
-import asyncore
import tempfile
+import time
import unittest
-import logging
# ZODB test support
import ZODB
@@ -36,8 +38,13 @@
from ZODB.tests.testDemoStorage import DemoStorageWrappedBase
from ZEO.ClientStorage import ClientStorage
+
+import ZEO.zrpc.connection
+
from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests
+import ZEO.tests.ConnectionTests
+
logger = logging.getLogger('ZEO.tests.testZEO')
class DummyDB:
@@ -70,14 +77,19 @@
self.assertEqual(zodb_unpickle(data), MinPO('first'))
self.assertEqual(serial, revid1)
revid2 = self._dostore(oid, data=MinPO('second'), revid=revid1)
- for n in range(3):
- # Let the server and client talk for a moment.
- # Is there a better way to do this?
- asyncore.poll(0.1)
- data, serial = storage2.load(oid, '')
- self.assertEqual(zodb_unpickle(data), MinPO('second'),
- 'Invalidation message was not sent!')
- self.assertEqual(serial, revid2)
+
+ # Now, storage 2 should eventually get the new data. It
+ # will take some time, although hopefully not much.
+ # We'll poll till we get it and whine if we time out:
+ for n in range(30):
+ time.sleep(.1)
+ data, serial = storage2.load(oid, '')
+ if (serial == revid2 and
+ zodb_unpickle(data) == MinPO('second')
+ ):
+ break
+ else:
+ raise AssertionError('Invalidation message was not sent!')
finally:
storage2.close()
@@ -198,6 +210,67 @@
def getConfig(self):
return """<mappingstorage 1/>"""
+
+class HeartbeatTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
+ """Make sure a heartbeat is being sent and that it does no harm
+
+ This is really hard to test properly because we can't see the data
+ flow between the client and server and we can't really tell what's
+ going on in the server very well. :(
+
+ """
+
+ def setUp(self):
+ # Crank down the select frequency
+ self.__old_client_timeout = ZEO.zrpc.connection.client_timeout
+ ZEO.zrpc.connection.client_timeout = 0.1
+ ZEO.zrpc.connection.client_trigger.pull_trigger()
+ ZEO.tests.ConnectionTests.CommonSetupTearDown.setUp(self)
+
+ def tearDown(self):
+ ZEO.zrpc.connection.client_timeout = self.__old_client_timeout
+ ZEO.zrpc.connection.client_trigger.pull_trigger()
+ ZEO.tests.ConnectionTests.CommonSetupTearDown.tearDown(self)
+
+ def getConfig(self, path, create, read_only):
+ return """<mappingstorage 1/>"""
+
+ def checkHeartbeatWithServerClose(self):
+ # This is a minimal test that mainly tests that the heartbeat
+ # function does no harm.
+ client_timeout_count = ZEO.zrpc.connection.client_timeout_count
+ self._storage = self.openClientStorage()
+ time.sleep(1) # allow some time for the select loop to fire a few times
+ self.assert_(ZEO.zrpc.connection.client_timeout_count
+ > client_timeout_count)
+ self._dostore()
+
+ if hasattr(os, 'kill'):
+ # Kill server violently, in hopes of provoking problem
+ os.kill(self._pids[0], signal.SIGKILL)
+ self._servers[0] = None
+ else:
+ self.shutdownServer()
+
+ for i in range(91):
+ # wait for disconnection
+ if not self._storage.is_connected():
+ break
+ time.sleep(0.1)
+ else:
+ raise AssertionError("Didn't detect server shutdown in 5 seconds")
+
+ def checkHeartbeatWithClientClose(self):
+ # This is a minimal test that mainly tests that the heartbeat
+ # function does no harm.
+ client_timeout_count = ZEO.zrpc.connection.client_timeout_count
+ self._storage = self.openClientStorage()
+ self._storage.close()
+ time.sleep(1) # allow some time for the select loop to fire a few times
+ self.assert_(ZEO.zrpc.connection.client_timeout_count
+ > client_timeout_count)
+
+
class DemoStorageWrappedAroundClientStorage(DemoStorageWrappedBase):
def getConfig(self):
@@ -233,6 +306,7 @@
FileStorageTests,
MappingStorageTests,
DemoStorageWrappedAroundClientStorage,
+ HeartbeatTests,
]
def test_suite():
Modified: ZODB/trunk/src/ZEO/zrpc/client.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/client.py 2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/src/ZEO/zrpc/client.py 2006-07-18 18:15:57 UTC (rev 69179)
@@ -11,6 +11,7 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
+import asyncore
import errno
import select
import socket
@@ -20,13 +21,11 @@
import types
import logging
-import ThreadedAsync
-
from ZODB.POSException import ReadOnlyError
from ZODB.loglevels import BLATHER
from ZEO.zrpc.log import log
-from ZEO.zrpc.trigger import trigger
+import ZEO.zrpc.trigger
from ZEO.zrpc.connection import ManagedClientConnection
class ConnectionManager(object):
@@ -43,9 +42,6 @@
# If thread is not None, then there is a helper thread
# attempting to connect.
self.thread = None # Protected by self.cond
- self.trigger = None
- self.thr_async = 0
- ThreadedAsync.register_loop_callback(self.set_async)
def __repr__(self):
return "<%s for %s>" % (self.__class__.__name__, self.addrlist)
@@ -85,7 +81,6 @@
def close(self):
"""Prevent ConnectionManager from opening new connections"""
self.closed = 1
- ThreadedAsync.remove_loop_callback(self.set_async)
self.cond.acquire()
try:
t = self.thread
@@ -103,30 +98,7 @@
if conn is not None:
# This will call close_conn() below which clears self.connection
conn.close()
- if self.trigger is not None:
- self.trigger.close()
- self.trigger = None
- ThreadedAsync.remove_loop_callback(self.set_async)
- def set_async(self, map):
- # This is the callback registered with ThreadedAsync. The
- # callback might be called multiple times, so it shouldn't
- # create a trigger every time and should never do anything
- # after it's closed.
-
- # It may be that the only case where it is called multiple
- # times is in the test suite, where ThreadedAsync's loop can
- # be started in a child process after a fork. Regardless,
- # it's good to be defensive.
-
- # We need each connection started with async==0 to have a
- # callback.
- log("CM.set_async(%s)" % repr(map), level=logging.DEBUG)
- if not self.closed and self.trigger is None:
- log("CM.set_async(): first call")
- self.trigger = trigger()
- self.thr_async = 1 # needs to be set on the Connection
-
def attempt_connect(self):
"""Attempt a connection to the server without blocking too long.
Modified: ZODB/trunk/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/connection.py 2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/src/ZEO/zrpc/connection.py 2006-07-18 18:15:57 UTC (rev 69179)
@@ -19,6 +19,8 @@
import types
import logging
+import traceback, time
+
import ThreadedAsync
from ZEO.zrpc import smac
from ZEO.zrpc.error import ZRPCError, DisconnectedError
@@ -30,6 +32,89 @@
REPLY = ".reply" # message name used for replies
ASYNC = 1
+##############################################################################
+# Dedicated Client select loop:
+client_map = {}
+client_trigger = trigger(client_map)
+client_timeout = 30.0
+client_timeout_count = 0 # for testing
+
+def client_loop():
+ map = client_map
+ logger = logging.getLogger('ZEO.zrpc.client_loop')
+ logger.addHandler(logging.StreamHandler())
+
+ read = asyncore.read
+ write = asyncore.write
+ _exception = asyncore._exception
+
+ while map:
+ try:
+ r = e = list(client_map)
+ w = [fd for (fd, obj) in map.iteritems() if obj.writable()]
+
+ try:
+ r, w, e = select.select(r, w, e, client_timeout)
+ except select.error, err:
+ if err[0] != errno.EINTR:
+ if err[0] == errno.EBADF:
+
+ # If a connection is closed while we are
+ # calling select on it, we can get a bad
+ # file-descriptor error. We'll check for this
+ # case by looking for entries in r and w that
+ # are not in the socket map.
+
+ if [fd for fd in r if fd not in client_map]:
+ continue
+ if [fd for fd in w if fd not in client_map]:
+ continue
+
+ raise
+ else:
+ continue
+
+ if not (r or w or e):
+ for obj in client_map.itervalues():
+ if isinstance(obj, Connection):
+ # Send a heartbeat message as a reply to a
+ # non-existent message id.
+ try:
+ obj.send_reply(-1, None)
+ except DisconnectedError:
+ pass
+ global client_timeout_count
+ client_timeout_count += 1
+ continue
+
+ for fd in r:
+ obj = map.get(fd)
+ if obj is None:
+ continue
+ read(obj)
+
+ for fd in w:
+ obj = map.get(fd)
+ if obj is None:
+ continue
+ write(obj)
+
+ for fd in e:
+ obj = map.get(fd)
+ if obj is None:
+ continue
+ _exception(obj)
+
+ except:
+ logger.exception('poll failure')
+ raise
+
+client_thread = threading.Thread(target=client_loop)
+client_thread.setDaemon(True)
+client_thread.start()
+#
+##############################################################################
+
class Delay:
"""Used to delay response to client for synchronous calls.
@@ -235,7 +320,7 @@
# Client constructor passes 'C' for tag, server constructor 'S'. This
# is used in log messages, and to determine whether we can speak with
# our peer.
- def __init__(self, sock, addr, obj, tag):
+ def __init__(self, sock, addr, obj, tag, map=None):
self.obj = None
self.marshal = Marshaller()
self.closed = False
@@ -315,8 +400,10 @@
# isn't necessary before Python 2.4, but doesn't hurt then (it just
# gives us an unused attribute in 2.3); updating the global socket
# map is necessary regardless of Python version.
- self._map = asyncore.socket_map
- asyncore.socket_map.update(ourmap)
+ if map is None:
+ map = asyncore.socket_map
+ self._map = map
+ map.update(ourmap)
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.addr)
@@ -331,12 +418,13 @@
return
self._singleton.clear()
self.closed = True
+ self.__super_close()
self.close_trigger()
- self.__super_close()
def close_trigger(self):
# Overridden by ManagedClientConnection.
if self.trigger is not None:
+ self.trigger.pull_trigger()
self.trigger.close()
def register_object(self, obj):
@@ -538,16 +626,16 @@
return r_args
# For testing purposes, it is useful to begin a synchronous call
- # but not block waiting for its response. Since these methods are
- # used for testing they can assume they are not in async mode and
- # call asyncore.poll() directly to get the message out without
- # also waiting for the reply.
+ # but not block waiting for its response.
def _deferred_call(self, method, *args):
if self.closed:
raise DisconnectedError()
msgid = self.send_call(method, args, 0)
- asyncore.poll(0.01, self._singleton)
+ if self.is_async():
+ self.trigger.pull_trigger()
+ else:
+ asyncore.poll(0.01, self._singleton)
return msgid
def _deferred_wait(self, msgid):
@@ -663,7 +751,7 @@
else:
asyncore.poll(0.0, self._singleton)
- def pending(self, timeout=0):
+ def _pending(self, timeout=0):
"""Invoke mainloop until any pending messages are handled."""
if __debug__:
self.log("pending(), async=%d" % self.is_async(), level=TRACE)
@@ -758,8 +846,10 @@
self.queue_output = True
self.queued_messages = []
- self.__super_init(sock, addr, obj, tag='C')
- self.check_mgr_async()
+ self.__super_init(sock, addr, obj, tag='C', map=client_map)
+ self.thr_async = True
+ self.trigger = client_trigger
+ client_trigger.pull_trigger()
# Our message_ouput() queues messages until recv_handshake() gets the
# protocol handshake from the server.
@@ -806,10 +896,13 @@
# Defer the ThreadedAsync work to the manager.
def close_trigger(self):
- # the manager should actually close the trigger
- # TODO: what is that comment trying to say? What 'manager'?
- del self.trigger
+ # We are using a shared trigger for all client connections.
+ # We never want to close it.
+ # We do want to pull it to make sure the select loop detects that
+ # we're closed.
+ self.trigger.pull_trigger()
+
def set_async(self, map):
pass
@@ -817,20 +910,8 @@
# Don't do the register_loop_callback that the superclass does
pass
- def check_mgr_async(self):
- if not self.thr_async and self.mgr.thr_async:
- assert self.mgr.trigger is not None, \
- "manager (%s) has no trigger" % self.mgr
- self.thr_async = True
- self.trigger = self.mgr.trigger
- return 1
- return 0
-
def is_async(self):
- # TODO: could the check_mgr_async() be avoided on each test?
- if self.thr_async:
- return 1
- return self.check_mgr_async()
+ return True
def close(self):
self.mgr.close_conn(self)
Modified: ZODB/trunk/src/ZEO/zrpc/trigger.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/trigger.py 2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/src/ZEO/zrpc/trigger.py 2006-07-18 18:15:57 UTC (rev 69179)
@@ -135,10 +135,10 @@
class trigger(_triggerbase, asyncore.file_dispatcher):
kind = "pipe"
- def __init__(self):
+ def __init__(self, map=None):
_triggerbase.__init__(self)
r, self.trigger = self._fds = os.pipe()
- asyncore.file_dispatcher.__init__(self, r)
+ asyncore.file_dispatcher.__init__(self, r, map)
def _close(self):
for fd in self._fds:
@@ -155,7 +155,7 @@
class trigger(_triggerbase, asyncore.dispatcher):
kind = "loopback"
- def __init__(self):
+ def __init__(self, map=None):
_triggerbase.__init__(self)
# Get a pair of connected sockets. The trigger is the 'w'
@@ -208,7 +208,7 @@
r, addr = a.accept() # r becomes asyncore's (self.)socket
a.close()
self.trigger = w
- asyncore.dispatcher.__init__(self, r)
+ asyncore.dispatcher.__init__(self, r, map)
def _close(self):
# self.socket is r, and self.trigger is w, from __init__
More information about the Zodb-checkins
mailing list