[Zodb-checkins] SVN: ZODB/trunk/src/ Made locking logic more robust.
Jim Fulton
jim at zope.com
Tue May 4 10:41:36 EDT 2010
Log message for revision 111924:
Made locking logic more robust.
- Improved ZEO server commit lock logging. Now, locking activity is
logged at the debug level until the number of wating lock requests
gets above 3. Log at the critical level when the number of waiting
lock requests gets above 9.
- ZEO servers no longer log their pids in every log message. It's just
not interesting. :)
Changed:
U ZODB/trunk/src/CHANGES.txt
U ZODB/trunk/src/ZEO/StorageServer.py
U ZODB/trunk/src/ZEO/tests/testZEO.py
U ZODB/trunk/src/ZEO/tests/testZEO2.py
-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt 2010-05-04 14:41:33 UTC (rev 111923)
+++ ZODB/trunk/src/CHANGES.txt 2010-05-04 14:41:35 UTC (rev 111924)
@@ -17,12 +17,25 @@
help in situations where object ids are used as BTree keys and the
sequential allocation of object ids leads to conflict errors.
+- ZEO servers now support a server_status method for for getting
+ information the number of clients, lock requests and general
+ statistics.
+
- The mkzeoinst script has been moved to a separate project:
http://pypi.python.org/pypi/zope.mkzeoinstance
and is no-longer included with ZODB.
+- Improved ZEO server commit lock logging. Now, locking activity is
+ logged at the debug level until the number of wating lock requests
+ gets above 3. Log at the critical level when the number of waiting
+ lock requests gets above 9.
+
+- ZEO servers no longer log their pids in every log message. It's just
+ not interesting. :)
+
+
Bugs Fixed
----------
@@ -47,6 +60,8 @@
https://bugs.launchpad.net/zodb/+bug/435547
+- Fixed some problems in ZEO server commit lock management.
+
3.10.0a1 (2010-02-08)
=====================
Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py 2010-05-04 14:41:33 UTC (rev 111923)
+++ ZODB/trunk/src/ZEO/StorageServer.py 2010-05-04 14:41:35 UTC (rev 111924)
@@ -57,20 +57,8 @@
logger = logging.getLogger('ZEO.StorageServer')
-# TODO: This used to say "ZSS", which is now implied in the logger name.
-# Can this be either set to str(os.getpid()) (if that makes sense) or removed?
-_label = "" # default label used for logging.
-
-
-def set_label():
- """Internal helper to reset the logging label (e.g. after fork())."""
- global _label
- _label = "%s" % os.getpid()
-
-
-def log(message, level=logging.INFO, label=None, exc_info=False):
+def log(message, level=logging.INFO, label='', exc_info=False):
"""Internal helper to log a message."""
- label = label or _label
if label:
message = "(%s) %s" % (label, message)
logger.log(level, message, exc_info=exc_info)
@@ -97,10 +85,10 @@
self.storage_id = "uninitialized"
self.transaction = None
self.read_only = read_only
+ self.log_label = 'unconnected'
self.locked = False # Don't have storage lock
self.verifying = 0
self.store_failed = 0
- self.log_label = _label
self.authenticated = 0
self.auth_realm = auth_realm
self.blob_tempfile = None
@@ -131,13 +119,7 @@
conn.register_object(ZEOStorage308Adapter(self))
else:
self.client = ClientStub(conn)
- addr = conn.addr
- if isinstance(addr, type("")):
- label = addr
- else:
- host, port = addr
- label = str(host) + ":" + str(port)
- self.log_label = _label + "/" + label
+ self.log_label = _addr_label(conn.addr)
def notifyDisconnected(self):
self.connection = None
@@ -146,7 +128,7 @@
# any pending transaction.
if self.transaction is not None:
self.log("disconnected during %s transaction"
- % self.locked and 'locked' or 'unlocked')
+ % (self.locked and 'locked' or 'unlocked'))
self.tpc_abort(self.transaction.id)
else:
self.log("disconnected")
@@ -451,7 +433,9 @@
if self.locked:
self.server.unlock_storage(self)
self.locked = 0
- self.transaction = None
+ if self.transaction is not None:
+ self.server.stop_waiting(self)
+ self.transaction = None
self.stats.active_txns -= 1
if self.txnlog is not None:
self.txnlog.close()
@@ -462,12 +446,22 @@
def vote(self, tid):
self._check_tid(tid, exc=StorageTransactionError)
+ if self.locked or self.server.already_waiting(self):
+ raise StorageTransactionError(
+ 'Already voting (%s)' % (self.locked and 'locked' or 'waiting')
+ )
return self._try_to_vote()
def _try_to_vote(self, delay=None):
if self.connection is None:
return # We're disconnected
- self.locked = self.server.lock_storage(self)
+ if delay is not None and delay.sent:
+ # as a consequence of the unlocking strategy, _try_to_vote
+ # may be called multiple times for delayed
+ # transactions. The first call will mark the delay as
+ # sent. We should skip if the delay was already sent.
+ return
+ self.locked, delay = self.server.lock_storage(self, delay)
if self.locked:
try:
self.log(
@@ -511,17 +505,17 @@
else:
if delay is not None:
delay.reply(None)
+ else:
+ return None
+
else:
- if delay == None:
- self.log("(%r) queue lock: transactions waiting: %s"
- % (self.storage_id, self.server.waiting(self)+1))
- delay = Delay()
- self.server.unlock_callback(self, delay)
return delay
def _unlock_callback(self, delay):
connection = self.connection
- if connection is not None:
+ if connection is None:
+ self.server.stop_waiting(self)
+ else:
connection.call_from_thread(self._try_to_vote, delay)
# The public methods of the ZEO client API do not do the real work.
@@ -764,6 +758,8 @@
for iid in iids:
self._iterators.pop(iid, None)
+ def server_status(self):
+ return self.server.server_status(self)
class StorageServerDB:
@@ -873,7 +869,6 @@
self.addr = addr
self.storages = storages
- set_label()
msg = ", ".join(
["%s:%s:%s" % (name, storage.isReadOnly() and "RO" or "RW",
storage.getName())
@@ -884,7 +879,7 @@
self._lock = threading.Lock()
self._commit_locks = {}
- self._unlock_callbacks = dict((name, []) for name in storages)
+ self._waiting = dict((name, []) for name in storages)
self.read_only = read_only
self.auth_protocol = auth_protocol
@@ -1171,29 +1166,59 @@
if conn.obj in cl:
cl.remove(conn.obj)
- def lock_storage(self, zeostore):
+ def lock_storage(self, zeostore, delay):
storage_id = zeostore.storage_id
+ waiting = self._waiting[storage_id]
with self._lock:
+
if storage_id in self._commit_locks:
- return False
- self._commit_locks[storage_id] = zeostore
- self.timeouts[storage_id].begin(zeostore)
- self.stats[storage_id].lock_time = time.time()
- return True
+ # The lock is held by another zeostore
+ assert self._commit_locks[storage_id] is not zeostore, (
+ storage_id, delay)
+
+ if delay is None:
+ # New request, queue it
+ assert not [i for i in waiting if i[0] is zeostore
+ ], "already waiting"
+ delay = Delay()
+ waiting.append((zeostore, delay))
+ zeostore.log("(%r) queue lock: transactions waiting: %s"
+ % (storage_id, len(waiting)),
+ _level_for_waiting(waiting)
+ )
+
+ return False, delay
+ else:
+ self._commit_locks[storage_id] = zeostore
+ self.timeouts[storage_id].begin(zeostore)
+ self.stats[storage_id].lock_time = time.time()
+ if delay is not None:
+ # we were waiting, stop
+ waiting[:] = [i for i in waiting if i[0] is not zeostore]
+ zeostore.log("(%r) lock: transactions waiting: %s"
+ % (storage_id, len(waiting)),
+ _level_for_waiting(waiting)
+ )
+ return True, delay
+
def unlock_storage(self, zeostore):
storage_id = zeostore.storage_id
+ waiting = self._waiting[storage_id]
with self._lock:
assert self._commit_locks[storage_id] is zeostore
del self._commit_locks[storage_id]
self.timeouts[storage_id].end(zeostore)
self.stats[storage_id].lock_time = None
- callbacks = self._unlock_callbacks[storage_id][:]
- del self._unlock_callbacks[storage_id][:]
+ callbacks = waiting[:]
if callbacks:
+ assert not [i for i in waiting if i[0] is zeostore
+ ], "waiting while unlocking"
zeostore.log("(%r) unlock: transactions waiting: %s"
- % (storage_id, len(callbacks)-1))
+ % (storage_id, len(callbacks)),
+ _level_for_waiting(callbacks)
+ )
for zeostore, delay in callbacks:
try:
@@ -1203,14 +1228,42 @@
except Exception:
logger.exception("Calling unlock callback")
- def unlock_callback(self, zeostore, delay):
+
+ def stop_waiting(self, zeostore):
storage_id = zeostore.storage_id
+ waiting = self._waiting[storage_id]
with self._lock:
- self._unlock_callbacks[storage_id].append((zeostore, delay))
+ new_waiting = [i for i in waiting if i[0] is not zeostore]
+ if len(new_waiting) == len(waiting):
+ return
+ waiting[:] = new_waiting
- def waiting(self, zeostore):
- return len(self._unlock_callbacks[zeostore.storage_id])
+ zeostore.log("(%r) dequeue lock: transactions waiting: %s"
+ % (storage_id, len(waiting)),
+ _level_for_waiting(waiting)
+ )
+ def already_waiting(self, zeostore):
+ storage_id = zeostore.storage_id
+ waiting = self._waiting[storage_id]
+ with self._lock:
+ return bool([i for i in waiting if i[0] is zeostore])
+
+ def server_status(self, zeostore):
+ storage_id = zeostore.storage_id
+ status = self.stats[storage_id].__dict__.copy()
+ status['connections'] = len(status['connections'])
+ status['waiting'] = len(self._waiting[storage_id])
+ return status
+
+def _level_for_waiting(waiting):
+ if len(waiting) > 9:
+ return logging.CRITICAL
+ if len(waiting) > 3:
+ return logging.WARNING
+ else:
+ return logging.DEBUG
+
class StubTimeoutThread:
def begin(self, client):
@@ -1454,4 +1507,10 @@
def __getattr__(self, name):
return getattr(self.storage, name)
+def _addr_label(addr):
+ if isinstance(addr, type("")):
+ return addr
+ else:
+ host, port = addr
+ return str(host) + ":" + str(port)
Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py 2010-05-04 14:41:33 UTC (rev 111923)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py 2010-05-04 14:41:35 UTC (rev 111924)
@@ -1219,7 +1219,7 @@
------
--T INFO ZEO.runzeo () opening storage '1' using FileStorage
------
- --T INFO ZEO.StorageServer () StorageServer created RW with storages 1RWt
+ --T INFO ZEO.StorageServer StorageServer created RW with storages 1RWt
------
--T INFO ZEO.zrpc () listening on ...
------
Modified: ZODB/trunk/src/ZEO/tests/testZEO2.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO2.py 2010-05-04 14:41:33 UTC (rev 111923)
+++ ZODB/trunk/src/ZEO/tests/testZEO2.py 2010-05-04 14:41:35 UTC (rev 111924)
@@ -13,6 +13,7 @@
##############################################################################
from zope.testing import doctest, setupstack, renormalizing
import logging
+import pprint
import re
import sys
import transaction
@@ -93,7 +94,6 @@
handled correctly:
>>> zs1.tpc_abort('0') # doctest: +ELLIPSIS
- (511/test-addr) ('1') unlock: transactions waiting: 0
2 callAsync serialnos ...
reply 1 None
@@ -200,12 +200,210 @@
"""
+def some_basic_locking_tests():
+ r"""
+
+ >>> itid = 0
+ >>> def start_trans(zs):
+ ... global itid
+ ... itid += 1
+ ... tid = str(itid)
+ ... zs.tpc_begin(tid, '', '', {})
+ ... zs.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', tid)
+ ... return tid
+
+ >>> server = ZEO.tests.servertesting.StorageServer()
+
+ >>> handler = logging.StreamHandler(sys.stdout)
+ >>> handler.setFormatter(logging.Formatter(
+ ... '%(name)s %(levelname)s\n%(message)s'))
+ >>> logging.getLogger('ZEO').addHandler(handler)
+ >>> logging.getLogger('ZEO').setLevel(logging.DEBUG)
+
+We start a transaction and vote, this leads to getting the lock.
+
+ >>> zs1 = ZEO.tests.servertesting.client(server, '1')
+ >>> tid1 = start_trans(zs1)
+ >>> zs1.vote(tid1) # doctest: +ELLIPSIS
+ ZEO.StorageServer DEBUG
+ (test-addr-1) ('1') lock: transactions waiting: 0
+ ZEO.StorageServer BLATHER
+ (test-addr-1) Preparing to commit transaction: 1 objects, 36 bytes
+ 1 callAsync serialnos ...
+
+If another client tried to vote, it's lock request will be queued and
+a delay will be returned:
+
+ >>> zs2 = ZEO.tests.servertesting.client(server, '2')
+ >>> tid2 = start_trans(zs2)
+ >>> delay = zs2.vote(tid2)
+ ZEO.StorageServer DEBUG
+ (test-addr-2) ('1') queue lock: transactions waiting: 1
+
+ >>> delay.set_sender(0, zs2.connection)
+
+When we end the first transaction, the queued vote gets the lock.
+
+ >>> zs1.tpc_abort(tid1) # doctest: +ELLIPSIS
+ ZEO.StorageServer DEBUG
+ (test-addr-1) ('1') unlock: transactions waiting: 1
+ ZEO.StorageServer DEBUG
+ (test-addr-2) ('1') lock: transactions waiting: 0
+ ZEO.StorageServer BLATHER
+ (test-addr-2) Preparing to commit transaction: 1 objects, 36 bytes
+ 2 callAsync serialnos ...
+
+Let's try again with the first client. The vote will be queued:
+
+ >>> tid1 = start_trans(zs1)
+ >>> delay = zs1.vote(tid1)
+ ZEO.StorageServer DEBUG
+ (test-addr-1) ('1') queue lock: transactions waiting: 1
+
+If the queued transaction is aborted, it will be dequeued:
+
+ >>> zs1.tpc_abort(tid1) # doctest: +ELLIPSIS
+ ZEO.StorageServer DEBUG
+ (test-addr-1) ('1') dequeue lock: transactions waiting: 0
+
+BTW, voting multiple times will error:
+
+ >>> zs2.vote(tid2)
+ Traceback (most recent call last):
+ ...
+ StorageTransactionError: Already voting (locked)
+
+ >>> tid1 = start_trans(zs1)
+ >>> delay = zs1.vote(tid1)
+ ZEO.StorageServer DEBUG
+ (test-addr-1) ('1') queue lock: transactions waiting: 1
+
+ >>> delay.set_sender(0, zs1.connection)
+
+ >>> zs1.vote(tid1)
+ Traceback (most recent call last):
+ ...
+ StorageTransactionError: Already voting (waiting)
+
+Note that the locking activity is logged at debug level to avoid
+cluttering log files, however, as the number of waiting votes
+increased, so does the logging level:
+
+ >>> clients = []
+ >>> for i in range(9):
+ ... client = ZEO.tests.servertesting.client(server, str(i+10))
+ ... tid = start_trans(client)
+ ... delay = client.vote(tid)
+ ... clients.append(client)
+ ZEO.StorageServer DEBUG
+ (test-addr-10) ('1') queue lock: transactions waiting: 2
+ ZEO.StorageServer DEBUG
+ (test-addr-11) ('1') queue lock: transactions waiting: 3
+ ZEO.StorageServer WARNING
+ (test-addr-12) ('1') queue lock: transactions waiting: 4
+ ZEO.StorageServer WARNING
+ (test-addr-13) ('1') queue lock: transactions waiting: 5
+ ZEO.StorageServer WARNING
+ (test-addr-14) ('1') queue lock: transactions waiting: 6
+ ZEO.StorageServer WARNING
+ (test-addr-15) ('1') queue lock: transactions waiting: 7
+ ZEO.StorageServer WARNING
+ (test-addr-16) ('1') queue lock: transactions waiting: 8
+ ZEO.StorageServer WARNING
+ (test-addr-17) ('1') queue lock: transactions waiting: 9
+ ZEO.StorageServer CRITICAL
+ (test-addr-18) ('1') queue lock: transactions waiting: 10
+
+If a client with the transaction lock disconnects, it will abort and
+release the lock and one of the waiting clients will get the lock.
+
+ >>> zs2.notifyDisconnected() # doctest: +ELLIPSIS
+ ZEO.StorageServer INFO
+ (test-addr-2) disconnected during locked transaction
+ ZEO.StorageServer CRITICAL
+ (test-addr-2) ('1') unlock: transactions waiting: 10
+ ZEO.StorageServer WARNING
+ (test-addr-1) ('1') lock: transactions waiting: 9
+ ZEO.StorageServer BLATHER
+ (test-addr-1) Preparing to commit transaction: 1 objects, 36 bytes
+ 1 callAsync serialnos ...
+
+(In practice, waiting clients won't necessarily get the lock in order.)
+
+We can find out about the current lock state, and get other server
+statistics using the server_status method:
+
+ >>> pprint.pprint(zs1.server_status(), width=1)
+ {'aborts': 3,
+ 'active_txns': 10,
+ 'commits': 0,
+ 'conflicts': 0,
+ 'conflicts_resolved': 0,
+ 'connections': 11,
+ 'loads': 0,
+ 'lock_time': 1272653598.693882,
+ 'start': 'Fri Apr 30 14:53:18 2010',
+ 'stores': 13,
+ 'verifying_clients': 0,
+ 'waiting': 9}
+
+(Note that the connections count above is off by 1 due to the way the
+test infrastructure works.)
+
+If clients disconnect while waiting, they will be dequeued:
+
+ >>> for client in clients:
+ ... client.notifyDisconnected()
+ ZEO.StorageServer INFO
+ (test-addr-10) disconnected during unlocked transaction
+ ZEO.StorageServer WARNING
+ (test-addr-10) ('1') dequeue lock: transactions waiting: 8
+ ZEO.StorageServer INFO
+ (test-addr-11) disconnected during unlocked transaction
+ ZEO.StorageServer WARNING
+ (test-addr-11) ('1') dequeue lock: transactions waiting: 7
+ ZEO.StorageServer INFO
+ (test-addr-12) disconnected during unlocked transaction
+ ZEO.StorageServer WARNING
+ (test-addr-12) ('1') dequeue lock: transactions waiting: 6
+ ZEO.StorageServer INFO
+ (test-addr-13) disconnected during unlocked transaction
+ ZEO.StorageServer WARNING
+ (test-addr-13) ('1') dequeue lock: transactions waiting: 5
+ ZEO.StorageServer INFO
+ (test-addr-14) disconnected during unlocked transaction
+ ZEO.StorageServer WARNING
+ (test-addr-14) ('1') dequeue lock: transactions waiting: 4
+ ZEO.StorageServer INFO
+ (test-addr-15) disconnected during unlocked transaction
+ ZEO.StorageServer DEBUG
+ (test-addr-15) ('1') dequeue lock: transactions waiting: 3
+ ZEO.StorageServer INFO
+ (test-addr-16) disconnected during unlocked transaction
+ ZEO.StorageServer DEBUG
+ (test-addr-16) ('1') dequeue lock: transactions waiting: 2
+ ZEO.StorageServer INFO
+ (test-addr-17) disconnected during unlocked transaction
+ ZEO.StorageServer DEBUG
+ (test-addr-17) ('1') dequeue lock: transactions waiting: 1
+ ZEO.StorageServer INFO
+ (test-addr-18) disconnected during unlocked transaction
+ ZEO.StorageServer DEBUG
+ (test-addr-18) ('1') dequeue lock: transactions waiting: 0
+
+ >>> logging.getLogger('ZEO').setLevel(logging.NOTSET)
+ >>> logging.getLogger('ZEO').removeHandler(handler)
+ """
+
+
def test_suite():
return unittest.TestSuite((
doctest.DocTestSuite(
setUp=ZODB.tests.util.setUp, tearDown=setupstack.tearDown,
checker=renormalizing.RENormalizing([
(re.compile('\d+/test-addr'), ''),
+ (re.compile("'lock_time': \d+.\d+"), 'lock_time'),
+ (re.compile(r"'start': '[^\n]+'"), 'start'),
]),
),
))
More information about the Zodb-checkins
mailing list