[Zodb-checkins] SVN: ZODB/branches/jim-thready-zeo2/src/ZEO/ Reimplemented an opimization to write result data earlier on the

Jim Fulton jim at zope.com
Tue Sep 22 06:52:28 EDT 2009


Log message for revision 104423:
  Reimplemented an opimization to write result data earlier on the
  server in a way that doesn't break auth.
  
  Improved ClientStorage.close to reduce the chance of errors when a
  client is getting lots of invalidations.
  

Changed:
  U   ZODB/branches/jim-thready-zeo2/src/ZEO/ClientStorage.py
  U   ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py
  U   ZODB/branches/jim-thready-zeo2/src/ZEO/cache.py
  U   ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testConversionSupport.py
  U   ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py
  U   ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/client.py
  U   ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/connection.py

-=-
Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/ClientStorage.py	2009-09-22 10:47:05 UTC (rev 104422)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/ClientStorage.py	2009-09-22 10:52:27 UTC (rev 104423)
@@ -442,11 +442,20 @@
             logger.info("%s Waiting for cache verification to finish",
                         self.__name__)
 
-    def close(self):
-        """Storage API: finalize the storage, releasing external resources."""
+    def close(self, kill=False):
+        "Storage API: finalize the storage, releasing external resources."
         if self._rpc_mgr is not None:
             self._rpc_mgr.close()
             self._rpc_mgr = None
+
+        if (self._connection is not None) and not kill:
+            event = threading.Event()
+            self._connection.trigger.pull_trigger(lambda: self._close(event))
+            event.wait(9)
+        else:
+            self._close()
+
+    def _close(self, event=None):
         if self._connection is not None:
             self._connection.register_object(None) # Don't call me!
             self._connection.close()
@@ -462,6 +471,9 @@
         if self._check_blob_size_thread is not None:
             self._check_blob_size_thread.join()
 
+        if event is not None:
+            event.set()
+
     _check_blob_size_thread = None
     def _check_blob_size(self, bytes=None):
         if self._blob_cache_size is None:

Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py	2009-09-22 10:47:05 UTC (rev 104422)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py	2009-09-22 10:52:27 UTC (rev 104423)
@@ -88,7 +88,6 @@
     def __init__(self, server, read_only=0, auth_realm=None):
         self.server = server
         # timeout and stats will be initialized in register()
-        self.timeout = None
         self.stats = None
         self.connection = None
         self.client = None
@@ -277,6 +276,7 @@
         if self.storage is not None:
             self.log("duplicate register() call")
             raise ValueError("duplicate register() call")
+
         storage = self.server.storages.get(storage_id)
         if storage is None:
             self.log("unknown storage_id: %s" % storage_id)
@@ -289,8 +289,8 @@
         self.storage_id = storage_id
         self.storage = storage
         self.setup_delegation()
-        self.timeout, self.stats = self.server.register_connection(storage_id,
-                                                                   self)
+        self.stats = self.server.register_connection(storage_id, self)
+        self.connection.thread_ident = self.connection.unregistered_thread_ident
 
     def get_info(self):
         storage = self.storage
@@ -1048,7 +1048,7 @@
         Returns the timeout and stats objects for the appropriate storage.
         """
         self.connections[storage_id].append(conn)
-        return self.timeouts[storage_id], self.stats[storage_id]
+        return self.stats[storage_id]
 
     def _invalidateCache(self, storage_id):
         """We need to invalidate any caches we have.

Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/cache.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/cache.py	2009-09-22 10:47:05 UTC (rev 104422)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/cache.py	2009-09-22 10:52:27 UTC (rev 104423)
@@ -379,10 +379,12 @@
     # Close the underlying file.  No methods accessing the cache should be
     # used after this.
     def close(self):
-        if self.f:
-            sync(self.f)
-            self.f.close()
-            self.f = None
+        f = self.f
+        self.f = None
+        if f is not None:
+            sync(f)
+            f.close()
+
         if hasattr(self,'_lock_file'):
             self._lock_file.close()
 

Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testConversionSupport.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testConversionSupport.py	2009-09-22 10:47:05 UTC (rev 104422)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testConversionSupport.py	2009-09-22 10:52:27 UTC (rev 104423)
@@ -13,6 +13,7 @@
 ##############################################################################
 import unittest
 from zope.testing import doctest
+import ZEO.zrpc.connection
 
 class FakeStorageBase:
 
@@ -50,6 +51,12 @@
     def register_connection(*args):
         return None, None
 
+class FauxConn:
+    addr = 'x'
+    thread_ident = unregistered_thread_ident = None
+    peer_protocol_version = (
+        ZEO.zrpc.connection.Connection.current_protocol)
+
 def test_server_record_iternext():
     """
     
@@ -59,6 +66,7 @@
     >>> import ZEO.StorageServer
 
     >>> zeo = ZEO.StorageServer.ZEOStorage(FakeServer(), False)
+    >>> zeo.notifyConnected(FauxConn())
     >>> zeo.register('1', False)
 
     >>> next = None
@@ -78,6 +86,7 @@
     True
 
     >>> zeo = ZEO.StorageServer.ZEOStorage(FakeServer(), False)
+    >>> zeo.notifyConnected(FauxConn())
     >>> zeo.register('2', False)
 
     >>> zeo.get_info()['supports_record_iternext']

Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py	2009-09-22 10:47:05 UTC (rev 104422)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py	2009-09-22 10:52:27 UTC (rev 104423)
@@ -25,7 +25,6 @@
 from ZODB.tests.MinPO import MinPO
 from ZODB.tests.StorageTestBase import zodb_unpickle
 
-import asyncore
 import doctest
 import logging
 import os
@@ -464,7 +463,11 @@
             pass
 
         time.sleep(.1)
-        self.failIf(self._storage.is_connected())
+        try:
+            self.failIf(self._storage.is_connected())
+        except:
+            print log
+            raise
         self.assertEqual(len(ZEO.zrpc.connection.client_map), 1)
         del ZEO.zrpc.connection.client_logger.critical
         self.assertEqual(log[0][0], 'The ZEO client loop failed.')
@@ -717,6 +720,12 @@
     blob_cache_dir = 'blobs'
     shared_blob_dir = True
 
+class FauxConn:
+    addr = 'x'
+    thread_ident = unregistered_thread_ident = None
+    peer_protocol_version = (
+        ZEO.zrpc.connection.Connection.current_protocol)
+
 class StorageServerClientWrapper:
 
     def __init__(self):
@@ -733,6 +742,7 @@
     def __init__(self, server, storage_id):
         self.storage_id = storage_id
         self.server = ZEO.StorageServer.ZEOStorage(server, server.read_only)
+        self.server.notifyConnected(FauxConn())
         self.server.register(storage_id, False)
         self.server.client = StorageServerClientWrapper()
 
@@ -837,6 +847,8 @@
     >>> fs = FileStorage('t.fs')
     >>> sv = StorageServer(('', get_port()), dict(fs=fs))
     >>> s = ZEOStorage(sv, sv.read_only)
+
+    >>> s.notifyConnected(FauxConn())
     >>> s.register('fs', False)
 
 If we ask for the last transaction, we should get the last transaction
@@ -928,6 +940,11 @@
     ...         pass
     ...     def close(self):
     ...         print 'connection closed'
+    ...     @property
+    ...     def trigger(self):
+    ...         return self
+    ...     def pull_trigger(self, f):
+    ...         f()
 
     >>> class ConnectionManager:
     ...     def __init__(self, addr, client, tmin, tmax):
@@ -1205,6 +1222,34 @@
     testing exit immediately
     """
 
+def close_client_storage_w_invalidations():
+    r"""
+Invalidations could cause errors when closing client storages,
+
+    >>> addr, _ = start_server()
+    >>> writing = threading.Event()
+    >>> def mad_write_thread():
+    ...     global writing
+    ...     conn = ZEO.connection(addr)
+    ...     writing.set()
+    ...     while writing.isSet():
+    ...         conn.root.x = 1
+    ...         transaction.commit()
+
+    >>> thread = threading.Thread(target=mad_write_thread)
+    >>> thread.setDaemon(True)
+    >>> thread.start()
+    >>> writing.wait()
+    >>> time.sleep(.01)
+    >>> for i in range(10):
+    ...     conn = ZEO.connection(addr)
+    ...     _ = conn._storage.load('\0'*8)
+    ...     conn.close()
+
+    >>> writing.clear()
+    >>> thread.join(1)
+    """
+
 slow_test_classes = [
     BlobAdaptedFileStorageTests, BlobWritableCacheTests,
     DemoStorageTests, FileStorageTests, MappingStorageTests,

Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/client.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/client.py	2009-09-22 10:47:05 UTC (rev 104422)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/client.py	2009-09-22 10:52:27 UTC (rev 104423)
@@ -83,7 +83,6 @@
         try:
             t = self.thread
             self.thread = None
-            conn = self.connection
         finally:
             self.cond.release()
         if t is not None:
@@ -93,9 +92,6 @@
             if t.isAlive():
                 log("CM.close(): self.thread.join() timed out",
                     level=logging.WARNING)
-        if conn is not None:
-            # This will call close_conn() below which clears self.connection
-            conn.close()
 
     def attempt_connect(self):
         """Attempt a connection to the server without blocking too long.

Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/connection.py	2009-09-22 10:47:05 UTC (rev 104422)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/connection.py	2009-09-22 10:52:27 UTC (rev 104423)
@@ -142,7 +142,7 @@
                     if obj is client_trigger:
                         continue
                     try:
-                        obj.mgr.client.close()
+                        obj.mgr.client.close(True)
                     except:
                         map.pop(fd, None)
                         try:
@@ -382,8 +382,6 @@
     # Exception types that should not be logged:
     unlogged_exception_types = ()
 
-    thread_ident = None
-
     # Client constructor passes 'C' for tag, server constructor 'S'.  This
     # is used in log messages, and to determine whether we can speak with
     # our peer.
@@ -603,12 +601,7 @@
             err = ZRPCError("Couldn't pickle return %.100s" % r)
             msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
         self.message_output(msg)
-        if thread.get_ident() == self.thread_ident:
-            # we're being called by the async loop, we can try to write and
-            # we don't need to poll.
-            self.handle_write()
-        else:
-            self.poll()
+        self.poll()
 
     def return_error(self, msgid, flags, err_type, err_value):
         if flags & ASYNC:
@@ -756,7 +749,6 @@
         self.trigger.pull_trigger()
 
 
-
 class ManagedServerConnection(Connection):
     """Server-side Connection subclass."""
 
@@ -773,9 +765,9 @@
         self.marshal = ServerMarshaller()
         self.trigger = trigger(map)
 
-        thread = threading.Thread(target=server_loop, args=(map, self))
-        thread.setDaemon(True)
-        thread.start()
+        t = threading.Thread(target=server_loop, args=(map, self))
+        t.setDaemon(True)
+        t.start()
 
     def handshake(self):
         # Send the server's preferred protocol to the client.
@@ -789,28 +781,22 @@
         self.obj.notifyDisconnected()
         Connection.close(self)
 
+    thread_ident = unregistered_thread_ident = None
+    def poll(self):
+        "Invoke asyncore mainloop to get pending message out."
+        ident = self.thread_ident
+        if ident is not None and thread.get_ident() == ident:
+            self.handle_write()
+        else:
+            self.trigger.pull_trigger()
+
 def server_loop(map, conn):
-    conn.thread_ident = thread.get_ident()
+    conn.unregistered_thread_ident = thread.get_ident()
     while len(map) > 1:
         asyncore.poll(30.0, map)
     for o in map.values():
         o.close()
 
-# def server_loop(map, conn):
-#     conn.thread_ident = thread.get_ident()
-#     import cProfile
-#     cProfile.runctx('_loop(map)', globals(), locals(),
-#                     "stats/%s" % thread.get_ident())
-# #     while len(map) > 1:
-# #         asyncore.poll(30.0, map)
-#     for o in map.values():
-#         o.close()
-
-def _loop(map):
-    while len(map) > 1:
-        asyncore.poll(30.0, map)
-
-
 class ManagedClientConnection(Connection):
     """Client-side Connection subclass."""
     __super_init = Connection.__init__



More information about the Zodb-checkins mailing list