[Zodb-checkins] SVN: ZODB/branches/jim-dev/src/ZEO/ checkpoint
Jim Fulton
jim at zope.com
Sat May 1 11:49:27 EDT 2010
Log message for revision 111839:
checkpoint
Changed:
U ZODB/branches/jim-dev/src/ZEO/StorageServer.py
U ZODB/branches/jim-dev/src/ZEO/tests/forker.py
U ZODB/branches/jim-dev/src/ZEO/tests/servertesting.py
U ZODB/branches/jim-dev/src/ZEO/tests/testZEO2.py
-=-
Modified: ZODB/branches/jim-dev/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-dev/src/ZEO/StorageServer.py 2010-05-01 15:45:55 UTC (rev 111838)
+++ ZODB/branches/jim-dev/src/ZEO/StorageServer.py 2010-05-01 15:49:27 UTC (rev 111839)
@@ -57,20 +57,8 @@
logger = logging.getLogger('ZEO.StorageServer')
-# TODO: This used to say "ZSS", which is now implied in the logger name.
-# Can this be either set to str(os.getpid()) (if that makes sense) or removed?
-_label = "" # default label used for logging.
-
-
-def set_label():
- """Internal helper to reset the logging label (e.g. after fork())."""
- global _label
- _label = "%s" % os.getpid()
-
-
-def log(message, level=logging.INFO, label=None, exc_info=False):
+def log(message, level=logging.INFO, label='', exc_info=False):
"""Internal helper to log a message."""
- label = label or _label
if label:
message = "(%s) %s" % (label, message)
logger.log(level, message, exc_info=exc_info)
@@ -97,10 +85,10 @@
self.storage_id = "uninitialized"
self.transaction = None
self.read_only = read_only
+ self.log_label = 'unconnected'
self.locked = False # Don't have storage lock
self.verifying = 0
self.store_failed = 0
- self.log_label = _label
self.authenticated = 0
self.auth_realm = auth_realm
self.blob_tempfile = None
@@ -131,13 +119,7 @@
conn.register_object(ZEOStorage308Adapter(self))
else:
self.client = ClientStub(conn)
- addr = conn.addr
- if isinstance(addr, type("")):
- label = addr
- else:
- host, port = addr
- label = str(host) + ":" + str(port)
- self.log_label = _label + "/" + label
+ self.log_label = _addr_label(conn.addr)
def notifyDisconnected(self):
self.connection = None
@@ -146,7 +128,7 @@
# any pending transaction.
if self.transaction is not None:
self.log("disconnected during %s transaction"
- % self.locked and 'locked' or 'unlocked')
+ % (self.locked and 'locked' or 'unlocked'))
self.tpc_abort(self.transaction.id)
else:
self.log("disconnected")
@@ -451,7 +433,9 @@
if self.locked:
self.server.unlock_storage(self)
self.locked = 0
- self.transaction = None
+ if self.transaction is not None:
+ self.server.stop_waiting(self)
+ self.transaction = None
self.stats.active_txns -= 1
if self.txnlog is not None:
self.txnlog.close()
@@ -462,13 +446,20 @@
def vote(self, tid):
self._check_tid(tid, exc=StorageTransactionError)
+ if self.locked or self.server.already_waiting(self):
+ raise StorageTransactionError('Already voting %s' % self.locked)
return self._try_to_vote()
def _try_to_vote(self, delay=None):
if self.connection is None:
return # We're disconnected
- self.locked = self.server.lock_storage(self)
if self.locked:
+ # as a consequence of the unlocking strategy,
+ # _try_to_vote may be called multiple times.
+ # Once we're locked, we should stop trying. :)
+ return
+ self.locked, delay = self.server.lock_storage(self, delay)
+ if self.locked:
try:
self.log(
"Preparing to commit transaction: %d objects, %d bytes"
@@ -511,17 +502,16 @@
else:
if delay is not None:
delay.reply(None)
+ else:
+ return None
else:
- if delay == None:
- self.log("(%r) queue lock: transactions waiting: %s"
- % (self.storage_id, self.server.waiting(self)+1))
- delay = Delay()
- self.server.unlock_callback(self, delay)
return delay
def _unlock_callback(self, delay):
connection = self.connection
- if connection is not None:
+ if connection is None:
+ self.server.stop_waiting(self)
+ else:
connection.call_from_thread(self._try_to_vote, delay)
# The public methods of the ZEO client API do not do the real work.
@@ -764,6 +754,8 @@
for iid in iids:
self._iterators.pop(iid, None)
+ def server_status(self):
+ return self.server.server_status(self)
class StorageServerDB:
@@ -873,7 +865,6 @@
self.addr = addr
self.storages = storages
- set_label()
msg = ", ".join(
["%s:%s:%s" % (name, storage.isReadOnly() and "RO" or "RW",
storage.getName())
@@ -884,7 +875,7 @@
self._lock = threading.Lock()
self._commit_locks = {}
- self._unlock_callbacks = dict((name, []) for name in storages)
+ self._waiting = dict((name, []) for name in storages)
self.read_only = read_only
self.auth_protocol = auth_protocol
@@ -1171,29 +1162,54 @@
if conn.obj in cl:
cl.remove(conn.obj)
- def lock_storage(self, zeostore):
+ def lock_storage(self, zeostore, delay):
storage_id = zeostore.storage_id
+ waiting = self._waiting[storage_id]
with self._lock:
+
if storage_id in self._commit_locks:
- return False
- self._commit_locks[storage_id] = zeostore
- self.timeouts[storage_id].begin(zeostore)
- self.stats[storage_id].lock_time = time.time()
- return True
+ # The lock is held by another zeostore
+ assert self._commit_locks[storage_id] is not zeostore
+
+ if delay is None:
+ # New request, queue it
+ delay = Delay()
+ waiting.append((zeostore, delay))
+ zeostore.log("(%r) queue lock: transactions waiting: %s"
+ % (storage_id, len(waiting)),
+ _level_for_waiting(waiting)
+ )
+
+ return False, delay
+ else:
+ self._commit_locks[storage_id] = zeostore
+ self.timeouts[storage_id].begin(zeostore)
+ self.stats[storage_id].lock_time = time.time()
+ if delay:
+ # we were waiting, stop
+ waiting[:] = [i for i in waiting if i[0] is not zeostore]
+ zeostore.log("(%r) lock: transactions waiting: %s"
+ % (storage_id, len(waiting)),
+ _level_for_waiting(waiting)
+ )
+ return True, delay
+
def unlock_storage(self, zeostore):
storage_id = zeostore.storage_id
+ waiting = self._waiting[storage_id]
with self._lock:
assert self._commit_locks[storage_id] is zeostore
del self._commit_locks[storage_id]
self.timeouts[storage_id].end(zeostore)
self.stats[storage_id].lock_time = None
- callbacks = self._unlock_callbacks[storage_id][:]
- del self._unlock_callbacks[storage_id][:]
+ callbacks = waiting[:]
if callbacks:
zeostore.log("(%r) unlock: transactions waiting: %s"
- % (storage_id, len(callbacks)-1))
+ % (storage_id, len(callbacks)),
+ _level_for_waiting(callbacks)
+ )
for zeostore, delay in callbacks:
try:
@@ -1203,14 +1219,41 @@
except Exception:
logger.exception("Calling unlock callback")
- def unlock_callback(self, zeostore, delay):
+ def stop_waiting(self, zeostore):
storage_id = zeostore.storage_id
+ waiting = self._waiting[storage_id]
with self._lock:
- self._unlock_callbacks[storage_id].append((zeostore, delay))
+ new_waiting = [i for i in waiting if i[0] is not zeostore]
+ if len(new_waiting) == len(waiting):
+ return
+ waiting[:] = new_waiting
- def waiting(self, zeostore):
- return len(self._unlock_callbacks[zeostore.storage_id])
+ zeostore.log("(%r) dequeue lock: transactions waiting: %s"
+ % (storage_id, len(waiting)),
+ _level_for_waiting(waiting)
+ )
+ def already_waiting(self, zeostore):
+ storage_id = zeostore.storage_id
+ waiting = self._waiting[storage_id]
+ with self._lock:
+ return bool([i for i in waiting if i[0] is zeostore])
+
+ def server_status(self, zeostore):
+ storage_id = zeostore.storage_id
+ status = self.stats[storage_id].__dict__.copy()
+ status['connections'] = len(status['connections'])
+ status['waiting'] = len(self._waiting[storage_id])
+ return status
+
+def _level_for_waiting(waiting):
+ if len(waiting) > 9:
+ return logging.CRITICAL
+ if len(waiting) > 3:
+ return logging.WARNING
+ else:
+ return logging.DEBUG
+
class StubTimeoutThread:
def begin(self, client):
@@ -1454,4 +1497,10 @@
def __getattr__(self, name):
return getattr(self.storage, name)
+def _addr_label(addr):
+ if isinstance(addr, type("")):
+ return addr
+ else:
+ host, port = addr
+ return str(host) + ":" + str(port)
Modified: ZODB/branches/jim-dev/src/ZEO/tests/forker.py
===================================================================
--- ZODB/branches/jim-dev/src/ZEO/tests/forker.py 2010-05-01 15:45:55 UTC (rev 111838)
+++ ZODB/branches/jim-dev/src/ZEO/tests/forker.py 2010-05-01 15:49:27 UTC (rev 111839)
@@ -42,7 +42,7 @@
self.authentication_protocol = None
self.authentication_database = None
self.authentication_realm = None
- self.loglevel = 'INFO'
+ self.loglevel = 'DEBUG'
def dump(self, f):
print >> f, "<zeo>"
Modified: ZODB/branches/jim-dev/src/ZEO/tests/servertesting.py
===================================================================
--- ZODB/branches/jim-dev/src/ZEO/tests/servertesting.py 2010-05-01 15:45:55 UTC (rev 111838)
+++ ZODB/branches/jim-dev/src/ZEO/tests/servertesting.py 2010-05-01 15:49:27 UTC (rev 111839)
@@ -31,9 +31,15 @@
import ZEO.StorageServer
import ZEO.zrpc.connection
import ZEO.zrpc.error
+import ZODB.MappingStorage
class StorageServer(ZEO.StorageServer.StorageServer):
+ def __init__(self, addr='test_addr', storages=None, **kw):
+ if storages is None:
+ storages = {'1': ZODB.MappingStorage.MappingStorage()}
+ ZEO.StorageServer.StorageServer.__init__(self, addr, storages, **kw)
+
def DispatcherClass(*args, **kw):
pass
@@ -42,9 +48,10 @@
peer_protocol_version = ZEO.zrpc.connection.Connection.current_protocol
connected = True
- def __init__(self, name='connection', addr='test-addr'):
+ def __init__(self, name='connection', addr=''):
+ name = str(name)
self.name = name
- self.addr = addr
+ self.addr = addr or 'test-addr-'+name
def close(self):
print self.name, 'closed'
@@ -65,3 +72,9 @@
def send_reply(self, *args):
pass
+
+def client(server, name='client', addr=''):
+ zs = ZEO.StorageServer.ZEOStorage(server)
+ zs.notifyConnected(Connection(name, addr))
+ zs.register('1', 0)
+ return zs
Modified: ZODB/branches/jim-dev/src/ZEO/tests/testZEO2.py
===================================================================
--- ZODB/branches/jim-dev/src/ZEO/tests/testZEO2.py 2010-05-01 15:45:55 UTC (rev 111838)
+++ ZODB/branches/jim-dev/src/ZEO/tests/testZEO2.py 2010-05-01 15:49:27 UTC (rev 111839)
@@ -13,6 +13,7 @@
##############################################################################
from zope.testing import doctest, setupstack, renormalizing
import logging
+import pprint
import re
import sys
import transaction
@@ -93,7 +94,6 @@
handled correctly:
>>> zs1.tpc_abort('0') # doctest: +ELLIPSIS
- (511/test-addr) ('1') unlock: transactions waiting: 0
2 callAsync serialnos ...
reply 1 None
@@ -200,12 +200,210 @@
"""
+def some_basic_locking_tests():
+ r"""
+
+ >>> itid = 0
+ >>> def start_trans(zs):
+ ... global itid
+ ... itid += 1
+ ... tid = str(itid)
+ ... zs.tpc_begin(tid, '', '', {})
+ ... zs.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', tid)
+ ... return tid
+
+ >>> server = ZEO.tests.servertesting.StorageServer()
+
+ >>> handler = logging.StreamHandler(sys.stdout)
+ >>> handler.setFormatter(logging.Formatter(
+ ... '%(name)s %(levelname)s\n%(message)s'))
+ >>> logging.getLogger('ZEO').addHandler(handler)
+ >>> logging.getLogger('ZEO').setLevel(logging.DEBUG)
+
+We start a transaction and vote, this leads to getting the lock.
+
+ >>> zs1 = ZEO.tests.servertesting.client(server, '1')
+ >>> tid1 = start_trans(zs1)
+ >>> zs1.vote(tid1) # doctest: +ELLIPSIS
+ ZEO.StorageServer DEBUG
+ (test-addr-1) ('1') lock: transactions waiting: 0
+ ZEO.StorageServer BLATHER
+ (test-addr-1) Preparing to commit transaction: 1 objects, 36 bytes
+ 1 callAsync serialnos ...
+
+If another client tried to vote, it's lock request will be queued and
+a delay will be returned:
+
+ >>> zs2 = ZEO.tests.servertesting.client(server, '2')
+ >>> tid2 = start_trans(zs2)
+ >>> delay = zs2.vote(tid2)
+ ZEO.StorageServer DEBUG
+ (test-addr-2) ('1') queue lock: transactions waiting: 1
+
+ >>> delay.set_sender(0, zs2.connection)
+
+When we end the first transaction, the queued vote gets the lock.
+
+ >>> zs1.tpc_abort(tid1) # doctest: +ELLIPSIS
+ ZEO.StorageServer DEBUG
+ (test-addr-1) ('1') unlock: transactions waiting: 1
+ ZEO.StorageServer DEBUG
+ (test-addr-2) ('1') lock: transactions waiting: 0
+ ZEO.StorageServer BLATHER
+ (test-addr-2) Preparing to commit transaction: 1 objects, 36 bytes
+ 2 callAsync serialnos ...
+
+Let's try again with the first client. The vote will be queued:
+
+ >>> tid1 = start_trans(zs1)
+ >>> delay = zs1.vote(tid1)
+ ZEO.StorageServer DEBUG
+ (test-addr-1) ('1') queue lock: transactions waiting: 1
+
+If the queued transaction is aborted, it will be dequeued:
+
+ >>> zs1.tpc_abort(tid1) # doctest: +ELLIPSIS
+ ZEO.StorageServer DEBUG
+ (test-addr-1) ('1') dequeue lock: transactions waiting: 0
+
+BTW, voting multiple times will error:
+
+ >>> zs2.vote(tid2)
+ Traceback (most recent call last):
+ ...
+ StorageTransactionError: Already voting
+
+ >>> tid1 = start_trans(zs1)
+ >>> delay = zs1.vote(tid1)
+ ZEO.StorageServer DEBUG
+ (test-addr-1) ('1') queue lock: transactions waiting: 1
+
+ >>> delay.set_sender(0, zs1.connection)
+
+ >>> zs1.vote(tid1)
+ Traceback (most recent call last):
+ ...
+ StorageTransactionError: Already voting
+
+Note that the locking activity is logged at debug level to avoid
+cluttering log files, however, as the number of waiting votes
+increased, so does the logging level:
+
+ >>> clients = []
+ >>> for i in range(9):
+ ... client = ZEO.tests.servertesting.client(server, str(i+10))
+ ... tid = start_trans(client)
+ ... delay = client.vote(tid)
+ ... clients.append(client)
+ ZEO.StorageServer DEBUG
+ (test-addr-10) ('1') queue lock: transactions waiting: 2
+ ZEO.StorageServer DEBUG
+ (test-addr-11) ('1') queue lock: transactions waiting: 3
+ ZEO.StorageServer WARNING
+ (test-addr-12) ('1') queue lock: transactions waiting: 4
+ ZEO.StorageServer WARNING
+ (test-addr-13) ('1') queue lock: transactions waiting: 5
+ ZEO.StorageServer WARNING
+ (test-addr-14) ('1') queue lock: transactions waiting: 6
+ ZEO.StorageServer WARNING
+ (test-addr-15) ('1') queue lock: transactions waiting: 7
+ ZEO.StorageServer WARNING
+ (test-addr-16) ('1') queue lock: transactions waiting: 8
+ ZEO.StorageServer WARNING
+ (test-addr-17) ('1') queue lock: transactions waiting: 9
+ ZEO.StorageServer CRITICAL
+ (test-addr-18) ('1') queue lock: transactions waiting: 10
+
+If a client with the transaction lock disconnects, it will abort and
+release the lock and one of the waiting clients will get the lock.
+
+ >>> zs2.notifyDisconnected() # doctest: +ELLIPSIS
+ ZEO.StorageServer INFO
+ (test-addr-2) disconnected during locked transaction
+ ZEO.StorageServer CRITICAL
+ (test-addr-2) ('1') unlock: transactions waiting: 10
+ ZEO.StorageServer WARNING
+ (test-addr-1) ('1') lock: transactions waiting: 9
+ ZEO.StorageServer BLATHER
+ (test-addr-1) Preparing to commit transaction: 1 objects, 36 bytes
+ 1 callAsync serialnos ...
+
+(In practice, waiting clients won't necessarily get the lock in order.)
+
+We can find out about the current lock state, and get other server
+statistics using the server_status method:
+
+ >>> pprint.pprint(zs1.server_status(), width=1)
+ {'aborts': 3,
+ 'active_txns': 10,
+ 'commits': 0,
+ 'conflicts': 0,
+ 'conflicts_resolved': 0,
+ 'connections': 11,
+ 'loads': 0,
+ 'lock_time': 1272653598.693882,
+ 'start': 'Fri Apr 30 14:53:18 2010',
+ 'stores': 13,
+ 'verifying_clients': 0,
+ 'waiting': 9}
+
+(Note that the connections count above is off by 1 due to the way the
+test infrastructure works.)
+
+If clients disconnect while waiting, they will be dequeued:
+
+ >>> for client in clients:
+ ... client.notifyDisconnected()
+ ZEO.StorageServer INFO
+ (test-addr-10) disconnected during unlocked transaction
+ ZEO.StorageServer WARNING
+ (test-addr-10) ('1') dequeue lock: transactions waiting: 8
+ ZEO.StorageServer INFO
+ (test-addr-11) disconnected during unlocked transaction
+ ZEO.StorageServer WARNING
+ (test-addr-11) ('1') dequeue lock: transactions waiting: 7
+ ZEO.StorageServer INFO
+ (test-addr-12) disconnected during unlocked transaction
+ ZEO.StorageServer WARNING
+ (test-addr-12) ('1') dequeue lock: transactions waiting: 6
+ ZEO.StorageServer INFO
+ (test-addr-13) disconnected during unlocked transaction
+ ZEO.StorageServer WARNING
+ (test-addr-13) ('1') dequeue lock: transactions waiting: 5
+ ZEO.StorageServer INFO
+ (test-addr-14) disconnected during unlocked transaction
+ ZEO.StorageServer WARNING
+ (test-addr-14) ('1') dequeue lock: transactions waiting: 4
+ ZEO.StorageServer INFO
+ (test-addr-15) disconnected during unlocked transaction
+ ZEO.StorageServer DEBUG
+ (test-addr-15) ('1') dequeue lock: transactions waiting: 3
+ ZEO.StorageServer INFO
+ (test-addr-16) disconnected during unlocked transaction
+ ZEO.StorageServer DEBUG
+ (test-addr-16) ('1') dequeue lock: transactions waiting: 2
+ ZEO.StorageServer INFO
+ (test-addr-17) disconnected during unlocked transaction
+ ZEO.StorageServer DEBUG
+ (test-addr-17) ('1') dequeue lock: transactions waiting: 1
+ ZEO.StorageServer INFO
+ (test-addr-18) disconnected during unlocked transaction
+ ZEO.StorageServer DEBUG
+ (test-addr-18) ('1') dequeue lock: transactions waiting: 0
+
+ >>> logging.getLogger('ZEO').setLevel(logging.NOTSET)
+ >>> logging.getLogger('ZEO').removeHandler(handler)
+ """
+
+
def test_suite():
return unittest.TestSuite((
doctest.DocTestSuite(
setUp=ZODB.tests.util.setUp, tearDown=setupstack.tearDown,
checker=renormalizing.RENormalizing([
(re.compile('\d+/test-addr'), ''),
+ (re.compile("'lock_time': \d+.\d+"), 'lock_time'),
+ (re.compile(r"'start': '[^\n]+'"), 'start'),
]),
),
))
More information about the Zodb-checkins
mailing list