[Zodb-checkins] SVN: ZODB/trunk/src/ ZEO clients (``ClientStorage`` instances) now work in forked processes,
Jim Fulton
jim at zope.com
Fri Jan 29 17:47:23 EST 2010
Log message for revision 108653:
ZEO clients (``ClientStorage`` instances) now work in forked processes,
including those created via ``multiprocessing.Process`` instances.
This entailed giving each client storage it's own networking thread.
Changed:
U ZODB/trunk/src/CHANGES.txt
U ZODB/trunk/src/ZEO/tests/testZEO.py
U ZODB/trunk/src/ZEO/zrpc/client.py
U ZODB/trunk/src/ZEO/zrpc/connection.py
-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt 2010-01-29 22:47:19 UTC (rev 108652)
+++ ZODB/trunk/src/CHANGES.txt 2010-01-29 22:47:22 UTC (rev 108653)
@@ -18,6 +18,9 @@
raise a StorageTransactionError when invalid transactions are passed
to tpc_begin, tpc_vote, or tpc_finish.
+- ZEO clients (``ClientStorage`` instances) now work in forked processes,
+ including those created via ``multiprocessing.Process`` instances.
+
- Broken objects now provide the IBroken interface.
Bugs Fixed
Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py 2010-01-29 22:47:19 UTC (rev 108652)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py 2010-01-29 22:47:22 UTC (rev 108653)
@@ -52,8 +52,6 @@
logger = logging.getLogger('ZEO.tests.testZEO')
-ZEO.zrpc.connection.start_client_thread()
-
class DummyDB:
def invalidate(self, *args):
pass
@@ -389,14 +387,17 @@
def setUp(self):
# Crank down the select frequency
- self.__old_client_timeout = ZEO.zrpc.connection.client_timeout
- ZEO.zrpc.connection.client_timeout = 0.1
- ZEO.zrpc.connection.client_trigger.pull_trigger()
+ self.__old_client_timeout = ZEO.zrpc.client.client_timeout
+ ZEO.zrpc.client.client_timeout = self.__client_timeout
ZEO.tests.ConnectionTests.CommonSetupTearDown.setUp(self)
+ __client_timeouts = 0
+ def __client_timeout(self):
+ self.__client_timeouts += 1
+ return .1
+
def tearDown(self):
- ZEO.zrpc.connection.client_timeout = self.__old_client_timeout
- ZEO.zrpc.connection.client_trigger.pull_trigger()
+ ZEO.zrpc.client.client_timeout = self.__old_client_timeout
ZEO.tests.ConnectionTests.CommonSetupTearDown.tearDown(self)
def getConfig(self, path, create, read_only):
@@ -405,11 +406,11 @@
def checkHeartbeatWithServerClose(self):
# This is a minimal test that mainly tests that the heartbeat
# function does no harm.
- client_timeout_count = ZEO.zrpc.connection.client_timeout_count
self._storage = self.openClientStorage()
- time.sleep(1) # allow some time for the select loop to fire a few times
- self.assert_(ZEO.zrpc.connection.client_timeout_count
- > client_timeout_count)
+ client_timeouts = self.__client_timeouts
+ forker.wait_until('got a timeout',
+ lambda : self.__client_timeouts > client_timeouts
+ )
self._dostore()
if hasattr(os, 'kill'):
@@ -419,23 +420,10 @@
else:
self.shutdownServer()
- for i in range(91):
- # wait for disconnection
- if not self._storage.is_connected():
- break
- time.sleep(0.1)
- else:
- raise AssertionError("Didn't detect server shutdown in 5 seconds")
-
- def checkHeartbeatWithClientClose(self):
- # This is a minimal test that mainly tests that the heartbeat
- # function does no harm.
- client_timeout_count = ZEO.zrpc.connection.client_timeout_count
- self._storage = self.openClientStorage()
+ forker.wait_until('disconnected',
+ lambda : not self._storage.is_connected()
+ )
self._storage.close()
- time.sleep(1) # allow some time for the select loop to fire a few times
- self.assert_(ZEO.zrpc.connection.client_timeout_count
- > client_timeout_count)
class ZRPCConnectionTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
@@ -451,27 +439,27 @@
def writable(self):
raise SystemError("I'm evil")
- log = []
- ZEO.zrpc.connection.client_logger.critical = (
- lambda m, *a, **kw: log.append((m % a, kw))
- )
+ import zope.testing.loggingsupport
+ handler = zope.testing.loggingsupport.InstalledHandler(
+ 'ZEO.zrpc.client')
- ZEO.zrpc.connection.client_map[None] = Evil()
+ self._storage._rpc_mgr.map[None] = Evil()
try:
- ZEO.zrpc.connection.client_trigger.pull_trigger()
+ self._storage._rpc_mgr.trigger.pull_trigger()
except DisconnectedError:
pass
- time.sleep(.1)
- self.failIf(self._storage.is_connected())
- self.assertEqual(len(ZEO.zrpc.connection.client_map), 1)
- del ZEO.zrpc.connection.client_logger.critical
- self.assertEqual(log[0][0], 'The ZEO client loop failed.')
- self.assert_('exc_info' in log[0][1])
- self.assertEqual(log[1][0], "Couldn't close a dispatcher.")
- self.assert_('exc_info' in log[1][1])
+ forker.wait_until(
+ 'disconnected',
+ lambda : not self._storage.is_connected()
+ )
+ log = str(handler)
+ handler.uninstall()
+ self.assert_("ZEO client loop failed" in log)
+ self.assert_("Couldn't close a dispatcher." in log)
+
def checkExceptionLogsAtError(self):
# Test the exceptions are logged at error
self._storage = self.openClientStorage()
@@ -1201,9 +1189,12 @@
def client_asyncore_thread_has_name():
"""
+ >>> addr, _ = start_server()
+ >>> db = ZEO.DB(addr)
>>> len([t for t in threading.enumerate()
- ... if t.getName() == 'ZEO.zrpc.connection'])
+ ... if ' zeo client networking thread' in t.getName()])
1
+ >>> db.close()
"""
def runzeo_without_configfile():
@@ -1260,6 +1251,37 @@
>>> thread.join(1)
"""
+if sys.version_info >= (2, 6):
+ import multiprocessing
+
+ def work_with_multiprocessing_process(name, addr, q):
+ conn = ZEO.connection(addr)
+ q.put((name, conn.root.x))
+ conn.close()
+
+ def work_with_multiprocessing():
+ """Client storage should work with multi-processing.
+
+ >>> import StringIO
+ >>> sys.stdin = StringIO.StringIO()
+ >>> addr, _ = start_server()
+ >>> conn = ZEO.connection(addr)
+ >>> conn.root.x = 1
+ >>> transaction.commit()
+ >>> q = multiprocessing.Queue()
+ >>> processes = [multiprocessing.Process(
+ ... target=work_with_multiprocessing_process,
+ ... args=(i, addr, q))
+ ... for i in range(3)]
+ >>> _ = [p.start() for p in processes]
+ >>> sorted(q.get(timeout=60) for p in processes)
+ [(0, 1), (1, 1), (2, 1)]
+
+ >>> _ = [p.join(30) for p in processes]
+ >>> conn.close()
+ """
+
+
slow_test_classes = [
BlobAdaptedFileStorageTests, BlobWritableCacheTests,
DemoStorageTests, FileStorageTests, MappingStorageTests,
Modified: ZODB/trunk/src/ZEO/zrpc/client.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/client.py 2010-01-29 22:47:19 UTC (rev 108652)
+++ ZODB/trunk/src/ZEO/zrpc/client.py 2010-01-29 22:47:22 UTC (rev 108653)
@@ -11,11 +11,15 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
+import asyncore
import errno
+import logging
import select
import socket
+import sys
import threading
import time
+import traceback
import types
import logging
@@ -24,15 +28,113 @@
from ZEO.zrpc.log import log
import ZEO.zrpc.trigger
-from ZEO.zrpc.connection import ManagedClientConnection, start_client_thread
+from ZEO.zrpc.connection import ManagedClientConnection
+def client_timeout():
+ return 30.0
+
+def client_loop(map):
+ read = asyncore.read
+ write = asyncore.write
+ _exception = asyncore._exception
+
+ while map:
+ try:
+
+ # The next two lines intentionally don't use
+ # iterators. Other threads can close dispatchers, causeing
+ # the socket map to shrink.
+ r = e = map.keys()
+ w = [fd for (fd, obj) in map.items() if obj.writable()]
+
+ try:
+ r, w, e = select.select(r, w, e, client_timeout())
+ except select.error, err:
+ if err[0] != errno.EINTR:
+ if err[0] == errno.EBADF:
+
+ # If a connection is closed while we are
+ # calling select on it, we can get a bad
+ # file-descriptor error. We'll check for this
+ # case by looking for entries in r and w that
+ # are not in the socket map.
+
+ if [fd for fd in r if fd not in map]:
+ continue
+ if [fd for fd in w if fd not in map]:
+ continue
+
+ raise
+ else:
+ continue
+
+ if not map:
+ break
+
+ if not (r or w or e):
+ # The line intentionally doesn't use iterators. Other
+ # threads can close dispatchers, causeing the socket
+ # map to shrink.
+ for obj in map.values():
+ if isinstance(obj, ManagedClientConnection):
+ # Send a heartbeat message as a reply to a
+ # non-existent message id.
+ try:
+ obj.send_reply(-1, None)
+ except DisconnectedError:
+ pass
+ continue
+
+ for fd in r:
+ obj = map.get(fd)
+ if obj is None:
+ continue
+ read(obj)
+
+ for fd in w:
+ obj = map.get(fd)
+ if obj is None:
+ continue
+ write(obj)
+
+ for fd in e:
+ obj = map.get(fd)
+ if obj is None:
+ continue
+ _exception(obj)
+
+ except:
+ if map:
+ try:
+ logging.getLogger(__name__+'.client_loop').critical(
+ 'A ZEO client loop failed.',
+ exc_info=sys.exc_info())
+ except:
+ pass
+
+ for fd, obj in map.items():
+ if not hasattr(obj, 'mgr'):
+ continue
+ try:
+ obj.mgr.client.close()
+ except:
+ map.pop(fd, None)
+ try:
+ logging.getLogger(__name__+'.client_loop'
+ ).critical(
+ "Couldn't close a dispatcher.",
+ exc_info=sys.exc_info())
+ except:
+ pass
+
+
class ConnectionManager(object):
"""Keeps a connection up over time"""
def __init__(self, addrs, client, tmin=1, tmax=180):
- start_client_thread()
+ self.client = client
+ self._start_asyncore_loop()
self.addrlist = self._parse_addrs(addrs)
- self.client = client
self.tmin = min(tmin, tmax)
self.tmax = tmax
self.cond = threading.Condition(threading.Lock())
@@ -42,6 +144,15 @@
# attempting to connect.
self.thread = None # Protected by self.cond
+ def _start_asyncore_loop(self):
+ self.map = {}
+ self.trigger = ZEO.zrpc.trigger.trigger(self.map)
+ self.loop_thread = threading.Thread(
+ name="%s zeo client networking thread" % self.client.__name__,
+ target=client_loop, args=(self.map,))
+ self.loop_thread.setDaemon(True)
+ self.loop_thread.start()
+
def __repr__(self):
return "<%s for %s>" % (self.__class__.__name__, self.addrlist)
@@ -84,7 +195,6 @@
try:
t = self.thread
self.thread = None
- conn = self.connection
finally:
self.cond.release()
if t is not None:
@@ -94,10 +204,22 @@
if t.isAlive():
log("CM.close(): self.thread.join() timed out",
level=logging.WARNING)
- if conn is not None:
- # This will call close_conn() below which clears self.connection
- conn.close()
+ for fd, obj in self.map.items():
+ if obj is not self.trigger:
+ try:
+ obj.close()
+ except:
+ logging.getLogger(__name__+'.'+self.__class__.__name__
+ ).critical(
+ "Couldn't close a dispatcher.",
+ exc_info=sys.exc_info())
+
+ self.map.clear()
+ self.trigger.pull_trigger()
+ self.loop_thread.join(9)
+ self.trigger.close()
+
def attempt_connect(self):
"""Attempt a connection to the server without blocking too long.
Modified: ZODB/trunk/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/connection.py 2010-01-29 22:47:19 UTC (rev 108652)
+++ ZODB/trunk/src/ZEO/zrpc/connection.py 2010-01-29 22:47:22 UTC (rev 108653)
@@ -21,10 +21,11 @@
import traceback, time
+import ZEO.zrpc.trigger
+
from ZEO.zrpc import smac
from ZEO.zrpc.error import ZRPCError, DisconnectedError
from ZEO.zrpc.marshal import Marshaller, ServerMarshaller
-from ZEO.zrpc.trigger import trigger
from ZEO.zrpc.log import short_repr, log
from ZODB.loglevels import BLATHER, TRACE
import ZODB.POSException
@@ -35,142 +36,6 @@
debug_zrpc = False
-##############################################################################
-# Dedicated Client select loop:
-client_timeout = 30.0
-client_timeout_count = 0 # for testing
-client_map = {}
-client_trigger = trigger(client_map)
-client_logger = logging.getLogger('ZEO.zrpc.client_loop')
-client_exit_event = threading.Event()
-client_running = False
-def client_exit():
- global client_running
- if client_running:
- client_running = False
- client_trigger.pull_trigger()
- client_exit_event.wait(99)
-
-atexit.register(client_exit)
-
-def client_loop():
- global client_running
- client_running = True
- client_exit_event.clear()
-
- map = client_map
- read = asyncore.read
- write = asyncore.write
- _exception = asyncore._exception
- loop_failures = 0
-
- while client_running and map:
- try:
-
- # The next two lines intentionally don't use
- # iterators. Other threads can close dispatchers, causeing
- # the socket map to shrink.
- r = e = client_map.keys()
- w = [fd for (fd, obj) in map.items() if obj.writable()]
-
- try:
- r, w, e = select.select(r, w, e, client_timeout)
- except select.error, err:
- if err[0] != errno.EINTR:
- if err[0] == errno.EBADF:
-
- # If a connection is closed while we are
- # calling select on it, we can get a bad
- # file-descriptor error. We'll check for this
- # case by looking for entries in r and w that
- # are not in the socket map.
-
- if [fd for fd in r if fd not in map]:
- continue
- if [fd for fd in w if fd not in map]:
- continue
-
- raise
- else:
- continue
-
- if not client_running:
- break
-
- if not (r or w or e):
- # The line intentionally doesn't use iterators. Other
- # threads can close dispatchers, causeing the socket
- # map to shrink.
- for obj in map.values():
- if isinstance(obj, Connection):
- # Send a heartbeat message as a reply to a
- # non-existent message id.
- try:
- obj.send_reply(-1, None)
- except DisconnectedError:
- pass
- global client_timeout_count
- client_timeout_count += 1
- continue
-
- for fd in r:
- obj = map.get(fd)
- if obj is None:
- continue
- read(obj)
-
- for fd in w:
- obj = map.get(fd)
- if obj is None:
- continue
- write(obj)
-
- for fd in e:
- obj = map.get(fd)
- if obj is None:
- continue
- _exception(obj)
-
- except:
- if map:
- try:
- client_logger.critical('The ZEO client loop failed.',
- exc_info=sys.exc_info())
- except:
- pass
-
- for fd, obj in map.items():
- if obj is client_trigger:
- continue
- try:
- obj.mgr.client.close()
- except:
- map.pop(fd, None)
- try:
- client_logger.critical(
- "Couldn't close a dispatcher.",
- exc_info=sys.exc_info())
- except:
- pass
-
- client_exit_event.set()
-
-client_thread_lock = threading.Lock()
-client_thread = None
-def start_client_thread():
- client_thread_lock.acquire()
- try:
- global client_thread
- if client_thread is None:
- client_thread = threading.Thread(target=client_loop, name=__name__)
- client_thread.setDaemon(True)
- client_thread.start()
- finally:
- client_thread_lock.release()
-
-#
-##############################################################################
-
class Delay:
"""Used to delay response to client for synchronous calls.
@@ -679,7 +544,7 @@
unlogged_exception_types = (ZODB.POSException.POSKeyError, )
# Servers use a shared server trigger that uses the asyncore socket map
- trigger = trigger()
+ trigger = ZEO.zrpc.trigger.trigger()
call_from_thread = trigger.pull_trigger
def __init__(self, sock, addr, obj, mgr):
@@ -724,9 +589,6 @@
__super_init = Connection.__init__
base_message_output = Connection.message_output
- trigger = client_trigger
- call_from_thread = trigger.pull_trigger
-
def __init__(self, sock, addr, mgr):
self.mgr = mgr
@@ -753,8 +615,10 @@
self.replies_cond = threading.Condition()
self.replies = {}
- self.__super_init(sock, addr, None, tag='C', map=client_map)
- client_trigger.pull_trigger()
+ self.__super_init(sock, addr, None, tag='C', map=mgr.map)
+ self.trigger = mgr.trigger
+ self.call_from_thread = self.trigger.pull_trigger
+ self.call_from_thread()
def close(self):
Connection.close(self)
More information about the Zodb-checkins
mailing list