[Zodb-checkins] SVN: ZODB/branches/jim-thready-zeo2/src/ZEO/ Reimplemented an opimization to write result data earlier on the
Jim Fulton
jim at zope.com
Tue Sep 22 06:52:28 EDT 2009
Log message for revision 104423:
Reimplemented an opimization to write result data earlier on the
server in a way that doesn't break auth.
Improved ClientStorage.close to reduce the chance of errors when a
client is getting lots of invalidations.
Changed:
U ZODB/branches/jim-thready-zeo2/src/ZEO/ClientStorage.py
U ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py
U ZODB/branches/jim-thready-zeo2/src/ZEO/cache.py
U ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testConversionSupport.py
U ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py
U ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/client.py
U ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/connection.py
-=-
Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/ClientStorage.py 2009-09-22 10:47:05 UTC (rev 104422)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/ClientStorage.py 2009-09-22 10:52:27 UTC (rev 104423)
@@ -442,11 +442,20 @@
logger.info("%s Waiting for cache verification to finish",
self.__name__)
- def close(self):
- """Storage API: finalize the storage, releasing external resources."""
+ def close(self, kill=False):
+ "Storage API: finalize the storage, releasing external resources."
if self._rpc_mgr is not None:
self._rpc_mgr.close()
self._rpc_mgr = None
+
+ if (self._connection is not None) and not kill:
+ event = threading.Event()
+ self._connection.trigger.pull_trigger(lambda: self._close(event))
+ event.wait(9)
+ else:
+ self._close()
+
+ def _close(self, event=None):
if self._connection is not None:
self._connection.register_object(None) # Don't call me!
self._connection.close()
@@ -462,6 +471,9 @@
if self._check_blob_size_thread is not None:
self._check_blob_size_thread.join()
+ if event is not None:
+ event.set()
+
_check_blob_size_thread = None
def _check_blob_size(self, bytes=None):
if self._blob_cache_size is None:
Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py 2009-09-22 10:47:05 UTC (rev 104422)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py 2009-09-22 10:52:27 UTC (rev 104423)
@@ -88,7 +88,6 @@
def __init__(self, server, read_only=0, auth_realm=None):
self.server = server
# timeout and stats will be initialized in register()
- self.timeout = None
self.stats = None
self.connection = None
self.client = None
@@ -277,6 +276,7 @@
if self.storage is not None:
self.log("duplicate register() call")
raise ValueError("duplicate register() call")
+
storage = self.server.storages.get(storage_id)
if storage is None:
self.log("unknown storage_id: %s" % storage_id)
@@ -289,8 +289,8 @@
self.storage_id = storage_id
self.storage = storage
self.setup_delegation()
- self.timeout, self.stats = self.server.register_connection(storage_id,
- self)
+ self.stats = self.server.register_connection(storage_id, self)
+ self.connection.thread_ident = self.connection.unregistered_thread_ident
def get_info(self):
storage = self.storage
@@ -1048,7 +1048,7 @@
Returns the timeout and stats objects for the appropriate storage.
"""
self.connections[storage_id].append(conn)
- return self.timeouts[storage_id], self.stats[storage_id]
+ return self.stats[storage_id]
def _invalidateCache(self, storage_id):
"""We need to invalidate any caches we have.
Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/cache.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/cache.py 2009-09-22 10:47:05 UTC (rev 104422)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/cache.py 2009-09-22 10:52:27 UTC (rev 104423)
@@ -379,10 +379,12 @@
# Close the underlying file. No methods accessing the cache should be
# used after this.
def close(self):
- if self.f:
- sync(self.f)
- self.f.close()
- self.f = None
+ f = self.f
+ self.f = None
+ if f is not None:
+ sync(f)
+ f.close()
+
if hasattr(self,'_lock_file'):
self._lock_file.close()
Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testConversionSupport.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testConversionSupport.py 2009-09-22 10:47:05 UTC (rev 104422)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testConversionSupport.py 2009-09-22 10:52:27 UTC (rev 104423)
@@ -13,6 +13,7 @@
##############################################################################
import unittest
from zope.testing import doctest
+import ZEO.zrpc.connection
class FakeStorageBase:
@@ -50,6 +51,12 @@
def register_connection(*args):
return None, None
+class FauxConn:
+ addr = 'x'
+ thread_ident = unregistered_thread_ident = None
+ peer_protocol_version = (
+ ZEO.zrpc.connection.Connection.current_protocol)
+
def test_server_record_iternext():
"""
@@ -59,6 +66,7 @@
>>> import ZEO.StorageServer
>>> zeo = ZEO.StorageServer.ZEOStorage(FakeServer(), False)
+ >>> zeo.notifyConnected(FauxConn())
>>> zeo.register('1', False)
>>> next = None
@@ -78,6 +86,7 @@
True
>>> zeo = ZEO.StorageServer.ZEOStorage(FakeServer(), False)
+ >>> zeo.notifyConnected(FauxConn())
>>> zeo.register('2', False)
>>> zeo.get_info()['supports_record_iternext']
Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py 2009-09-22 10:47:05 UTC (rev 104422)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py 2009-09-22 10:52:27 UTC (rev 104423)
@@ -25,7 +25,6 @@
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle
-import asyncore
import doctest
import logging
import os
@@ -464,7 +463,11 @@
pass
time.sleep(.1)
- self.failIf(self._storage.is_connected())
+ try:
+ self.failIf(self._storage.is_connected())
+ except:
+ print log
+ raise
self.assertEqual(len(ZEO.zrpc.connection.client_map), 1)
del ZEO.zrpc.connection.client_logger.critical
self.assertEqual(log[0][0], 'The ZEO client loop failed.')
@@ -717,6 +720,12 @@
blob_cache_dir = 'blobs'
shared_blob_dir = True
+class FauxConn:
+ addr = 'x'
+ thread_ident = unregistered_thread_ident = None
+ peer_protocol_version = (
+ ZEO.zrpc.connection.Connection.current_protocol)
+
class StorageServerClientWrapper:
def __init__(self):
@@ -733,6 +742,7 @@
def __init__(self, server, storage_id):
self.storage_id = storage_id
self.server = ZEO.StorageServer.ZEOStorage(server, server.read_only)
+ self.server.notifyConnected(FauxConn())
self.server.register(storage_id, False)
self.server.client = StorageServerClientWrapper()
@@ -837,6 +847,8 @@
>>> fs = FileStorage('t.fs')
>>> sv = StorageServer(('', get_port()), dict(fs=fs))
>>> s = ZEOStorage(sv, sv.read_only)
+
+ >>> s.notifyConnected(FauxConn())
>>> s.register('fs', False)
If we ask for the last transaction, we should get the last transaction
@@ -928,6 +940,11 @@
... pass
... def close(self):
... print 'connection closed'
+ ... @property
+ ... def trigger(self):
+ ... return self
+ ... def pull_trigger(self, f):
+ ... f()
>>> class ConnectionManager:
... def __init__(self, addr, client, tmin, tmax):
@@ -1205,6 +1222,34 @@
testing exit immediately
"""
+def close_client_storage_w_invalidations():
+ r"""
+Invalidations could cause errors when closing client storages,
+
+ >>> addr, _ = start_server()
+ >>> writing = threading.Event()
+ >>> def mad_write_thread():
+ ... global writing
+ ... conn = ZEO.connection(addr)
+ ... writing.set()
+ ... while writing.isSet():
+ ... conn.root.x = 1
+ ... transaction.commit()
+
+ >>> thread = threading.Thread(target=mad_write_thread)
+ >>> thread.setDaemon(True)
+ >>> thread.start()
+ >>> writing.wait()
+ >>> time.sleep(.01)
+ >>> for i in range(10):
+ ... conn = ZEO.connection(addr)
+ ... _ = conn._storage.load('\0'*8)
+ ... conn.close()
+
+ >>> writing.clear()
+ >>> thread.join(1)
+ """
+
slow_test_classes = [
BlobAdaptedFileStorageTests, BlobWritableCacheTests,
DemoStorageTests, FileStorageTests, MappingStorageTests,
Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/client.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/client.py 2009-09-22 10:47:05 UTC (rev 104422)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/client.py 2009-09-22 10:52:27 UTC (rev 104423)
@@ -83,7 +83,6 @@
try:
t = self.thread
self.thread = None
- conn = self.connection
finally:
self.cond.release()
if t is not None:
@@ -93,9 +92,6 @@
if t.isAlive():
log("CM.close(): self.thread.join() timed out",
level=logging.WARNING)
- if conn is not None:
- # This will call close_conn() below which clears self.connection
- conn.close()
def attempt_connect(self):
"""Attempt a connection to the server without blocking too long.
Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/connection.py 2009-09-22 10:47:05 UTC (rev 104422)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/connection.py 2009-09-22 10:52:27 UTC (rev 104423)
@@ -142,7 +142,7 @@
if obj is client_trigger:
continue
try:
- obj.mgr.client.close()
+ obj.mgr.client.close(True)
except:
map.pop(fd, None)
try:
@@ -382,8 +382,6 @@
# Exception types that should not be logged:
unlogged_exception_types = ()
- thread_ident = None
-
# Client constructor passes 'C' for tag, server constructor 'S'. This
# is used in log messages, and to determine whether we can speak with
# our peer.
@@ -603,12 +601,7 @@
err = ZRPCError("Couldn't pickle return %.100s" % r)
msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
self.message_output(msg)
- if thread.get_ident() == self.thread_ident:
- # we're being called by the async loop, we can try to write and
- # we don't need to poll.
- self.handle_write()
- else:
- self.poll()
+ self.poll()
def return_error(self, msgid, flags, err_type, err_value):
if flags & ASYNC:
@@ -756,7 +749,6 @@
self.trigger.pull_trigger()
-
class ManagedServerConnection(Connection):
"""Server-side Connection subclass."""
@@ -773,9 +765,9 @@
self.marshal = ServerMarshaller()
self.trigger = trigger(map)
- thread = threading.Thread(target=server_loop, args=(map, self))
- thread.setDaemon(True)
- thread.start()
+ t = threading.Thread(target=server_loop, args=(map, self))
+ t.setDaemon(True)
+ t.start()
def handshake(self):
# Send the server's preferred protocol to the client.
@@ -789,28 +781,22 @@
self.obj.notifyDisconnected()
Connection.close(self)
+ thread_ident = unregistered_thread_ident = None
+ def poll(self):
+ "Invoke asyncore mainloop to get pending message out."
+ ident = self.thread_ident
+ if ident is not None and thread.get_ident() == ident:
+ self.handle_write()
+ else:
+ self.trigger.pull_trigger()
+
def server_loop(map, conn):
- conn.thread_ident = thread.get_ident()
+ conn.unregistered_thread_ident = thread.get_ident()
while len(map) > 1:
asyncore.poll(30.0, map)
for o in map.values():
o.close()
-# def server_loop(map, conn):
-# conn.thread_ident = thread.get_ident()
-# import cProfile
-# cProfile.runctx('_loop(map)', globals(), locals(),
-# "stats/%s" % thread.get_ident())
-# # while len(map) > 1:
-# # asyncore.poll(30.0, map)
-# for o in map.values():
-# o.close()
-
-def _loop(map):
- while len(map) > 1:
- asyncore.poll(30.0, map)
-
-
class ManagedClientConnection(Connection):
"""Client-side Connection subclass."""
__super_init = Connection.__init__
More information about the Zodb-checkins
mailing list