[Zodb-checkins] SVN: ZODB/trunk/src/ Made locking logic more robust.

Jim Fulton jim at zope.com
Tue May 4 10:41:36 EDT 2010


Log message for revision 111924:
  Made locking logic more robust.
  
  - Improved ZEO server commit lock logging.  Now, locking activity is
    logged at the debug level until the number of wating lock requests
    gets above 3.  Log at the critical level when the number of waiting
    lock requests gets above 9.
  
  - ZEO servers no longer log their pids in every log message. It's just
    not interesting. :)
  

Changed:
  U   ZODB/trunk/src/CHANGES.txt
  U   ZODB/trunk/src/ZEO/StorageServer.py
  U   ZODB/trunk/src/ZEO/tests/testZEO.py
  U   ZODB/trunk/src/ZEO/tests/testZEO2.py

-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt	2010-05-04 14:41:33 UTC (rev 111923)
+++ ZODB/trunk/src/CHANGES.txt	2010-05-04 14:41:35 UTC (rev 111924)
@@ -17,12 +17,25 @@
   help in situations where object ids are used as BTree keys and the
   sequential allocation of object ids leads to conflict errors.
 
+- ZEO servers now support a server_status method for for getting
+  information the number of clients, lock requests and general
+  statistics.
+
 - The mkzeoinst script has been moved to a separate project:
 
     http://pypi.python.org/pypi/zope.mkzeoinstance
 
   and is no-longer included with ZODB.
 
+- Improved ZEO server commit lock logging.  Now, locking activity is
+  logged at the debug level until the number of wating lock requests
+  gets above 3.  Log at the critical level when the number of waiting
+  lock requests gets above 9.
+
+- ZEO servers no longer log their pids in every log message. It's just
+  not interesting. :)
+
+
 Bugs Fixed
 ----------
 
@@ -47,6 +60,8 @@
 
   https://bugs.launchpad.net/zodb/+bug/435547
 
+- Fixed some problems in ZEO server commit lock management.
+
 3.10.0a1 (2010-02-08)
 =====================
 

Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py	2010-05-04 14:41:33 UTC (rev 111923)
+++ ZODB/trunk/src/ZEO/StorageServer.py	2010-05-04 14:41:35 UTC (rev 111924)
@@ -57,20 +57,8 @@
 
 logger = logging.getLogger('ZEO.StorageServer')
 
-# TODO:  This used to say "ZSS", which is now implied in the logger name.
-# Can this be either set to str(os.getpid()) (if that makes sense) or removed?
-_label = "" # default label used for logging.
-
-
-def set_label():
-    """Internal helper to reset the logging label (e.g. after fork())."""
-    global _label
-    _label = "%s" % os.getpid()
-
-
-def log(message, level=logging.INFO, label=None, exc_info=False):
+def log(message, level=logging.INFO, label='', exc_info=False):
     """Internal helper to log a message."""
-    label = label or _label
     if label:
         message = "(%s) %s" % (label, message)
     logger.log(level, message, exc_info=exc_info)
@@ -97,10 +85,10 @@
         self.storage_id = "uninitialized"
         self.transaction = None
         self.read_only = read_only
+        self.log_label = 'unconnected'
         self.locked = False             # Don't have storage lock
         self.verifying = 0
         self.store_failed = 0
-        self.log_label = _label
         self.authenticated = 0
         self.auth_realm = auth_realm
         self.blob_tempfile = None
@@ -131,13 +119,7 @@
             conn.register_object(ZEOStorage308Adapter(self))
         else:
             self.client = ClientStub(conn)
-        addr = conn.addr
-        if isinstance(addr, type("")):
-            label = addr
-        else:
-            host, port = addr
-            label = str(host) + ":" + str(port)
-        self.log_label = _label + "/" + label
+        self.log_label = _addr_label(conn.addr)
 
     def notifyDisconnected(self):
         self.connection = None
@@ -146,7 +128,7 @@
         # any pending transaction.
         if self.transaction is not None:
             self.log("disconnected during %s transaction"
-                     % self.locked and 'locked' or 'unlocked')
+                     % (self.locked and 'locked' or 'unlocked'))
             self.tpc_abort(self.transaction.id)
         else:
             self.log("disconnected")
@@ -451,7 +433,9 @@
         if self.locked:
             self.server.unlock_storage(self)
             self.locked = 0
-        self.transaction = None
+        if self.transaction is not None:
+            self.server.stop_waiting(self)
+            self.transaction = None
         self.stats.active_txns -= 1
         if self.txnlog is not None:
             self.txnlog.close()
@@ -462,12 +446,22 @@
 
     def vote(self, tid):
         self._check_tid(tid, exc=StorageTransactionError)
+        if self.locked or self.server.already_waiting(self):
+            raise StorageTransactionError(
+                'Already voting (%s)' % (self.locked and 'locked' or 'waiting')
+                )
         return self._try_to_vote()
 
     def _try_to_vote(self, delay=None):
         if self.connection is None:
             return # We're disconnected
-        self.locked = self.server.lock_storage(self)
+        if delay is not None and delay.sent:
+            # as a consequence of the unlocking strategy, _try_to_vote
+            # may be called multiple times for delayed
+            # transactions. The first call will mark the delay as
+            # sent. We should skip if the delay was already sent.
+            return
+        self.locked, delay = self.server.lock_storage(self, delay)
         if self.locked:
             try:
                 self.log(
@@ -511,17 +505,17 @@
             else:
                 if delay is not None:
                     delay.reply(None)
+                else:
+                    return None
+
         else:
-            if delay == None:
-                self.log("(%r) queue lock: transactions waiting: %s"
-                         % (self.storage_id, self.server.waiting(self)+1))
-                delay = Delay()
-            self.server.unlock_callback(self, delay)
             return delay
 
     def _unlock_callback(self, delay):
         connection = self.connection
-        if connection is not None:
+        if connection is None:
+            self.server.stop_waiting(self)
+        else:
             connection.call_from_thread(self._try_to_vote, delay)
 
     # The public methods of the ZEO client API do not do the real work.
@@ -764,6 +758,8 @@
         for iid in iids:
             self._iterators.pop(iid, None)
 
+    def server_status(self):
+        return self.server.server_status(self)
 
 class StorageServerDB:
 
@@ -873,7 +869,6 @@
 
         self.addr = addr
         self.storages = storages
-        set_label()
         msg = ", ".join(
             ["%s:%s:%s" % (name, storage.isReadOnly() and "RO" or "RW",
                            storage.getName())
@@ -884,7 +879,7 @@
 
         self._lock = threading.Lock()
         self._commit_locks = {}
-        self._unlock_callbacks = dict((name, []) for name in storages)
+        self._waiting = dict((name, []) for name in storages)
 
         self.read_only = read_only
         self.auth_protocol = auth_protocol
@@ -1171,29 +1166,59 @@
             if conn.obj in cl:
                 cl.remove(conn.obj)
 
-    def lock_storage(self, zeostore):
+    def lock_storage(self, zeostore, delay):
         storage_id = zeostore.storage_id
+        waiting = self._waiting[storage_id]
         with self._lock:
+
             if storage_id in self._commit_locks:
-                return False
-            self._commit_locks[storage_id] = zeostore
-            self.timeouts[storage_id].begin(zeostore)
-            self.stats[storage_id].lock_time = time.time()
-        return True
+                # The lock is held by another zeostore
 
+                assert self._commit_locks[storage_id] is not zeostore, (
+                    storage_id, delay)
+
+                if delay is None:
+                    # New request, queue it
+                    assert not [i for i in waiting if i[0] is zeostore
+                                ], "already waiting"
+                    delay = Delay()
+                    waiting.append((zeostore, delay))
+                    zeostore.log("(%r) queue lock: transactions waiting: %s"
+                                 % (storage_id, len(waiting)),
+                                 _level_for_waiting(waiting)
+                                 )
+
+                return False, delay
+            else:
+                self._commit_locks[storage_id] = zeostore
+                self.timeouts[storage_id].begin(zeostore)
+                self.stats[storage_id].lock_time = time.time()
+                if delay is not None:
+                    # we were waiting, stop
+                    waiting[:] = [i for i in waiting if i[0] is not zeostore]
+                zeostore.log("(%r) lock: transactions waiting: %s"
+                             % (storage_id, len(waiting)),
+                             _level_for_waiting(waiting)
+                             )
+                return True, delay
+
     def unlock_storage(self, zeostore):
         storage_id = zeostore.storage_id
+        waiting = self._waiting[storage_id]
         with self._lock:
             assert self._commit_locks[storage_id] is zeostore
             del self._commit_locks[storage_id]
             self.timeouts[storage_id].end(zeostore)
             self.stats[storage_id].lock_time = None
-            callbacks = self._unlock_callbacks[storage_id][:]
-            del self._unlock_callbacks[storage_id][:]
+            callbacks = waiting[:]
 
         if callbacks:
+            assert not [i for i in waiting if i[0] is zeostore
+                        ], "waiting while unlocking"
             zeostore.log("(%r) unlock: transactions waiting: %s"
-                         % (storage_id, len(callbacks)-1))
+                         % (storage_id, len(callbacks)),
+                         _level_for_waiting(callbacks)
+                         )
 
             for zeostore, delay in callbacks:
                 try:
@@ -1203,14 +1228,42 @@
                 except Exception:
                     logger.exception("Calling unlock callback")
 
-    def unlock_callback(self, zeostore, delay):
+
+    def stop_waiting(self, zeostore):
         storage_id = zeostore.storage_id
+        waiting = self._waiting[storage_id]
         with self._lock:
-            self._unlock_callbacks[storage_id].append((zeostore, delay))
+            new_waiting = [i for i in waiting if i[0] is not zeostore]
+            if len(new_waiting) == len(waiting):
+                return
+            waiting[:] = new_waiting
 
-    def waiting(self, zeostore):
-        return len(self._unlock_callbacks[zeostore.storage_id])
+        zeostore.log("(%r) dequeue lock: transactions waiting: %s"
+                     % (storage_id, len(waiting)),
+                     _level_for_waiting(waiting)
+                     )
 
+    def already_waiting(self, zeostore):
+        storage_id = zeostore.storage_id
+        waiting = self._waiting[storage_id]
+        with self._lock:
+            return bool([i for i in waiting if i[0] is zeostore])
+
+    def server_status(self, zeostore):
+        storage_id = zeostore.storage_id
+        status = self.stats[storage_id].__dict__.copy()
+        status['connections'] = len(status['connections'])
+        status['waiting'] = len(self._waiting[storage_id])
+        return status
+
+def _level_for_waiting(waiting):
+    if len(waiting) > 9:
+        return logging.CRITICAL
+    if len(waiting) > 3:
+        return logging.WARNING
+    else:
+        return logging.DEBUG
+
 class StubTimeoutThread:
 
     def begin(self, client):
@@ -1454,4 +1507,10 @@
     def __getattr__(self, name):
         return getattr(self.storage, name)
 
+def _addr_label(addr):
+    if isinstance(addr, type("")):
+        return addr
+    else:
+        host, port = addr
+        return str(host) + ":" + str(port)
 

Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py	2010-05-04 14:41:33 UTC (rev 111923)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py	2010-05-04 14:41:35 UTC (rev 111924)
@@ -1219,7 +1219,7 @@
     ------
     --T INFO ZEO.runzeo () opening storage '1' using FileStorage
     ------
-    --T INFO ZEO.StorageServer () StorageServer created RW with storages 1RWt
+    --T INFO ZEO.StorageServer StorageServer created RW with storages 1RWt
     ------
     --T INFO ZEO.zrpc () listening on ...
     ------

Modified: ZODB/trunk/src/ZEO/tests/testZEO2.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO2.py	2010-05-04 14:41:33 UTC (rev 111923)
+++ ZODB/trunk/src/ZEO/tests/testZEO2.py	2010-05-04 14:41:35 UTC (rev 111924)
@@ -13,6 +13,7 @@
 ##############################################################################
 from zope.testing import doctest, setupstack, renormalizing
 import logging
+import pprint
 import re
 import sys
 import transaction
@@ -93,7 +94,6 @@
 handled correctly:
 
     >>> zs1.tpc_abort('0') # doctest: +ELLIPSIS
-    (511/test-addr) ('1') unlock: transactions waiting: 0
     2 callAsync serialnos ...
     reply 1 None
 
@@ -200,12 +200,210 @@
     """
 
 
+def some_basic_locking_tests():
+    r"""
+
+    >>> itid = 0
+    >>> def start_trans(zs):
+    ...     global itid
+    ...     itid += 1
+    ...     tid = str(itid)
+    ...     zs.tpc_begin(tid, '', '', {})
+    ...     zs.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', tid)
+    ...     return tid
+
+    >>> server = ZEO.tests.servertesting.StorageServer()
+
+    >>> handler = logging.StreamHandler(sys.stdout)
+    >>> handler.setFormatter(logging.Formatter(
+    ...     '%(name)s %(levelname)s\n%(message)s'))
+    >>> logging.getLogger('ZEO').addHandler(handler)
+    >>> logging.getLogger('ZEO').setLevel(logging.DEBUG)
+
+We start a transaction and vote, this leads to getting the lock.
+
+    >>> zs1 = ZEO.tests.servertesting.client(server, '1')
+    >>> tid1 = start_trans(zs1)
+    >>> zs1.vote(tid1) # doctest: +ELLIPSIS
+    ZEO.StorageServer DEBUG
+    (test-addr-1) ('1') lock: transactions waiting: 0
+    ZEO.StorageServer BLATHER
+    (test-addr-1) Preparing to commit transaction: 1 objects, 36 bytes
+    1 callAsync serialnos ...
+
+If another client tried to vote, it's lock request will be queued and
+a delay will be returned:
+
+    >>> zs2 = ZEO.tests.servertesting.client(server, '2')
+    >>> tid2 = start_trans(zs2)
+    >>> delay = zs2.vote(tid2)
+    ZEO.StorageServer DEBUG
+    (test-addr-2) ('1') queue lock: transactions waiting: 1
+
+    >>> delay.set_sender(0, zs2.connection)
+
+When we end the first transaction, the queued vote gets the lock.
+
+    >>> zs1.tpc_abort(tid1) # doctest: +ELLIPSIS
+    ZEO.StorageServer DEBUG
+    (test-addr-1) ('1') unlock: transactions waiting: 1
+    ZEO.StorageServer DEBUG
+    (test-addr-2) ('1') lock: transactions waiting: 0
+    ZEO.StorageServer BLATHER
+    (test-addr-2) Preparing to commit transaction: 1 objects, 36 bytes
+    2 callAsync serialnos ...
+
+Let's try again with the first client. The vote will be queued:
+
+    >>> tid1 = start_trans(zs1)
+    >>> delay = zs1.vote(tid1)
+    ZEO.StorageServer DEBUG
+    (test-addr-1) ('1') queue lock: transactions waiting: 1
+
+If the queued transaction is aborted, it will be dequeued:
+
+    >>> zs1.tpc_abort(tid1) # doctest: +ELLIPSIS
+    ZEO.StorageServer DEBUG
+    (test-addr-1) ('1') dequeue lock: transactions waiting: 0
+
+BTW, voting multiple times will error:
+
+    >>> zs2.vote(tid2)
+    Traceback (most recent call last):
+    ...
+    StorageTransactionError: Already voting (locked)
+
+    >>> tid1 = start_trans(zs1)
+    >>> delay = zs1.vote(tid1)
+    ZEO.StorageServer DEBUG
+    (test-addr-1) ('1') queue lock: transactions waiting: 1
+
+    >>> delay.set_sender(0, zs1.connection)
+
+    >>> zs1.vote(tid1)
+    Traceback (most recent call last):
+    ...
+    StorageTransactionError: Already voting (waiting)
+
+Note that the locking activity is logged at debug level to avoid
+cluttering log files, however, as the number of waiting votes
+increased, so does the logging level:
+
+    >>> clients = []
+    >>> for i in range(9):
+    ...     client = ZEO.tests.servertesting.client(server, str(i+10))
+    ...     tid = start_trans(client)
+    ...     delay = client.vote(tid)
+    ...     clients.append(client)
+    ZEO.StorageServer DEBUG
+    (test-addr-10) ('1') queue lock: transactions waiting: 2
+    ZEO.StorageServer DEBUG
+    (test-addr-11) ('1') queue lock: transactions waiting: 3
+    ZEO.StorageServer WARNING
+    (test-addr-12) ('1') queue lock: transactions waiting: 4
+    ZEO.StorageServer WARNING
+    (test-addr-13) ('1') queue lock: transactions waiting: 5
+    ZEO.StorageServer WARNING
+    (test-addr-14) ('1') queue lock: transactions waiting: 6
+    ZEO.StorageServer WARNING
+    (test-addr-15) ('1') queue lock: transactions waiting: 7
+    ZEO.StorageServer WARNING
+    (test-addr-16) ('1') queue lock: transactions waiting: 8
+    ZEO.StorageServer WARNING
+    (test-addr-17) ('1') queue lock: transactions waiting: 9
+    ZEO.StorageServer CRITICAL
+    (test-addr-18) ('1') queue lock: transactions waiting: 10
+
+If a client with the transaction lock disconnects, it will abort and
+release the lock and one of the waiting clients will get the lock.
+
+    >>> zs2.notifyDisconnected() # doctest: +ELLIPSIS
+    ZEO.StorageServer INFO
+    (test-addr-2) disconnected during locked transaction
+    ZEO.StorageServer CRITICAL
+    (test-addr-2) ('1') unlock: transactions waiting: 10
+    ZEO.StorageServer WARNING
+    (test-addr-1) ('1') lock: transactions waiting: 9
+    ZEO.StorageServer BLATHER
+    (test-addr-1) Preparing to commit transaction: 1 objects, 36 bytes
+    1 callAsync serialnos ...
+
+(In practice, waiting clients won't necessarily get the lock in order.)
+
+We can find out about the current lock state, and get other server
+statistics using the server_status method:
+
+    >>> pprint.pprint(zs1.server_status(), width=1)
+    {'aborts': 3,
+     'active_txns': 10,
+     'commits': 0,
+     'conflicts': 0,
+     'conflicts_resolved': 0,
+     'connections': 11,
+     'loads': 0,
+     'lock_time': 1272653598.693882,
+     'start': 'Fri Apr 30 14:53:18 2010',
+     'stores': 13,
+     'verifying_clients': 0,
+     'waiting': 9}
+
+(Note that the connections count above is off by 1 due to the way the
+test infrastructure works.)
+
+If clients disconnect while waiting, they will be dequeued:
+
+    >>> for client in clients:
+    ...     client.notifyDisconnected()
+    ZEO.StorageServer INFO
+    (test-addr-10) disconnected during unlocked transaction
+    ZEO.StorageServer WARNING
+    (test-addr-10) ('1') dequeue lock: transactions waiting: 8
+    ZEO.StorageServer INFO
+    (test-addr-11) disconnected during unlocked transaction
+    ZEO.StorageServer WARNING
+    (test-addr-11) ('1') dequeue lock: transactions waiting: 7
+    ZEO.StorageServer INFO
+    (test-addr-12) disconnected during unlocked transaction
+    ZEO.StorageServer WARNING
+    (test-addr-12) ('1') dequeue lock: transactions waiting: 6
+    ZEO.StorageServer INFO
+    (test-addr-13) disconnected during unlocked transaction
+    ZEO.StorageServer WARNING
+    (test-addr-13) ('1') dequeue lock: transactions waiting: 5
+    ZEO.StorageServer INFO
+    (test-addr-14) disconnected during unlocked transaction
+    ZEO.StorageServer WARNING
+    (test-addr-14) ('1') dequeue lock: transactions waiting: 4
+    ZEO.StorageServer INFO
+    (test-addr-15) disconnected during unlocked transaction
+    ZEO.StorageServer DEBUG
+    (test-addr-15) ('1') dequeue lock: transactions waiting: 3
+    ZEO.StorageServer INFO
+    (test-addr-16) disconnected during unlocked transaction
+    ZEO.StorageServer DEBUG
+    (test-addr-16) ('1') dequeue lock: transactions waiting: 2
+    ZEO.StorageServer INFO
+    (test-addr-17) disconnected during unlocked transaction
+    ZEO.StorageServer DEBUG
+    (test-addr-17) ('1') dequeue lock: transactions waiting: 1
+    ZEO.StorageServer INFO
+    (test-addr-18) disconnected during unlocked transaction
+    ZEO.StorageServer DEBUG
+    (test-addr-18) ('1') dequeue lock: transactions waiting: 0
+
+    >>> logging.getLogger('ZEO').setLevel(logging.NOTSET)
+    >>> logging.getLogger('ZEO').removeHandler(handler)
+    """
+
+
 def test_suite():
     return unittest.TestSuite((
         doctest.DocTestSuite(
             setUp=ZODB.tests.util.setUp, tearDown=setupstack.tearDown,
             checker=renormalizing.RENormalizing([
                 (re.compile('\d+/test-addr'), ''),
+                (re.compile("'lock_time': \d+.\d+"), 'lock_time'),
+                (re.compile(r"'start': '[^\n]+'"), 'start'),
                 ]),
             ),
         ))



More information about the Zodb-checkins mailing list