[Zodb-checkins] SVN: ZODB/branches/jim-thready/src/Z rebranch
Jim Fulton
jim at zope.com
Tue Dec 22 16:31:06 EST 2009
Log message for revision 106933:
rebranch
Changed:
U ZODB/branches/jim-thready/src/ZEO/StorageServer.py
U ZODB/branches/jim-thready/src/ZEO/tests/InvalidationTests.py
U ZODB/branches/jim-thready/src/ZEO/tests/servertesting.py
U ZODB/branches/jim-thready/src/ZEO/tests/testConversionSupport.py
U ZODB/branches/jim-thready/src/ZEO/tests/testZEO.py
U ZODB/branches/jim-thready/src/ZEO/tests/testZEO2.py
U ZODB/branches/jim-thready/src/ZEO/zrpc/client.py
U ZODB/branches/jim-thready/src/ZEO/zrpc/connection.py
U ZODB/branches/jim-thready/src/ZODB/FileStorage/FileStorage.py
U ZODB/branches/jim-thready/src/ZODB/FileStorage/format.py
U ZODB/branches/jim-thready/src/ZODB/tests/testFileStorage.py
-=-
Modified: ZODB/branches/jim-thready/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/StorageServer.py 2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZEO/StorageServer.py 2009-12-22 21:31:06 UTC (rev 106933)
@@ -32,6 +32,7 @@
import transaction
+import ZODB.blob
import ZODB.serialize
import ZODB.TimeStamp
import ZEO.zrpc.error
@@ -48,7 +49,7 @@
from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.serialize import referencesf
-from ZODB.utils import u64, p64, oid_repr, mktemp
+from ZODB.utils import u64, p64, oid_repr
from ZODB.loglevels import BLATHER
@@ -87,7 +88,6 @@
def __init__(self, server, read_only=0, auth_realm=None):
self.server = server
# timeout and stats will be initialized in register()
- self.timeout = None
self.stats = None
self.connection = None
self.client = None
@@ -95,14 +95,14 @@
self.storage_id = "uninitialized"
self.transaction = None
self.read_only = read_only
- self.locked = 0
+ self.locked = False # Don't have storage lock
+ self.locked_lock = threading.Lock() # mediate locked access
self.verifying = 0
self.store_failed = 0
self.log_label = _label
self.authenticated = 0
self.auth_realm = auth_realm
self.blob_tempfile = None
- self.blob_log = []
# The authentication protocol may define extra methods.
self._extensions = {}
for func in self.extensions:
@@ -138,25 +138,37 @@
label = str(host) + ":" + str(port)
self.log_label = _label + "/" + label
+ def notifyLocked(self):
+ # We don't want to give a lock to a disconnected client and, we
+ # need to avoid a race of giving a lock to a client while it's
+ # disconecting. We check self.connection and set self.locked while
+ # the locked_lock is held, preventing self.connection from being
+ # set to None between the check and setting self.lock.
+ self.locked_lock.acquire()
+ try:
+ if self.connection is None:
+ return False # We've been disconnected. Don't take the lock
+ self.locked = True
+ # What happens if, before processing the trigger we, disconnect,
+ # reconnect, and start a new transaction?
+ # This isn't possible because we never reconnect!
+ self.connection.trigger.pull_trigger(self._restart)
+ return True
+ finally:
+ self.locked_lock.release()
+
def notifyDisconnected(self):
+ self.locked_lock.acquire()
+ try:
+ self.connection = None
+ finally:
+ self.locked_lock.release()
+
# When this storage closes, we must ensure that it aborts
# any pending transaction.
if self.transaction is not None:
self.log("disconnected during transaction %s" % self.transaction)
- if not self.locked:
- # Delete (d, zeo_storage) from the _waiting list, if found.
- waiting = self.storage._waiting
- for i in range(len(waiting)):
- d, z = waiting[i]
- if z is self:
- del waiting[i]
- self.log("Closed connection removed from waiting list."
- " Clients waiting: %d." % len(waiting))
- break
-
- if self.transaction:
- self.tpc_abort(self.transaction.id)
-
+ self.tpc_abort(self.transaction.id)
else:
self.log("disconnected")
@@ -265,9 +277,12 @@
if self.auth_realm and not self.authenticated:
raise AuthError("Client was never authenticated with server!")
+ self.connection.auth_done()
+
if self.storage is not None:
self.log("duplicate register() call")
raise ValueError("duplicate register() call")
+
storage = self.server.storages.get(storage_id)
if storage is None:
self.log("unknown storage_id: %s" % storage_id)
@@ -280,8 +295,7 @@
self.storage_id = storage_id
self.storage = storage
self.setup_delegation()
- self.timeout, self.stats = self.server.register_connection(storage_id,
- self)
+ self.stats = self.server.register_connection(storage_id, self)
def get_info(self):
storage = self.storage
@@ -419,6 +433,7 @@
self.serials = []
self.invalidated = []
self.txnlog = CommitLog()
+ self.blob_log = []
self.tid = tid
self.status = status
self.store_failed = 0
@@ -438,109 +453,99 @@
if not self._check_tid(id):
return
assert self.locked
+
self.stats.commits += 1
- self.storage.tpc_finish(self.transaction)
+ self.storage.tpc_finish(self.transaction, self._invalidate)
+ # Note that the tid is still current because we still hold the
+ # commit lock. We'll relinquish it in _clear_transaction.
tid = self.storage.lastTransaction()
- if self.invalidated:
- self.server.invalidate(self, self.storage_id, tid,
- self.invalidated, self.get_size_info())
self._clear_transaction()
# Return the tid, for cache invalidation optimization
return tid
- def tpc_abort(self, id):
- if not self._check_tid(id):
+ def _invalidate(self, tid):
+ if self.invalidated:
+ self.server.invalidate(self, self.storage_id, tid,
+ self.invalidated, self.get_size_info())
+
+ def tpc_abort(self, tid):
+ if not self._check_tid(tid):
return
self.stats.aborts += 1
+
+ # Is there a race here? What if notifyLocked is called after
+ # the check? Well, we still won't have started committing the actual
+ # storage. That wouldn't happen until _restart is called and that
+ # can't happen while this method is executing, as both are only
+ # run by the client thtread. So no race.
if self.locked:
self.storage.tpc_abort(self.transaction)
self._clear_transaction()
def _clear_transaction(self):
# Common code at end of tpc_finish() and tpc_abort()
+ self.server.unlock_storage(self)
+ self.locked = 0
+ self.transaction = None
self.stats.active_txns -= 1
- self.transaction = None
- self.txnlog.close()
- if self.locked:
- self.locked = 0
- self.timeout.end(self)
- self.stats.lock_time = None
- self.log("Transaction released storage lock", BLATHER)
+ if self.txnlog is not None:
+ self.txnlog.close()
+ self.txnlog = None
+ for oid, oldserial, data, blobfilename in self.blob_log:
+ ZODB.blob.remove_committed(blobfilename)
+ del self.blob_log
- # Restart any client waiting for the storage lock.
- while self.storage._waiting:
- delay, zeo_storage = self.storage._waiting.pop(0)
- try:
- zeo_storage._restart(delay)
- except:
- self.log("Unexpected error handling waiting transaction",
- level=logging.WARNING, exc_info=True)
- zeo_storage.connection.close()
- continue
-
- if self.storage._waiting:
- n = len(self.storage._waiting)
- self.log("Blocked transaction restarted. "
- "Clients waiting: %d" % n)
- else:
- self.log("Blocked transaction restarted.")
-
- break
-
# The following two methods return values, so they must acquire
# the storage lock and begin the transaction before returning.
# It's a bit vile that undo can cause us to get the lock before vote.
- def undo(self, trans_id, id):
- self._check_tid(id, exc=StorageTransactionError)
- if self.locked:
+ def undo(self, trans_id, tid):
+ self._check_tid(tid, exc=StorageTransactionError)
+
+ if self.txnlog is not None:
+ return self._wait(lambda: self._undo(trans_id))
+ else:
return self._undo(trans_id)
+
+ def vote(self, tid):
+ self._check_tid(tid, exc=StorageTransactionError)
+
+ if self.txnlog is not None:
+ return self._wait(lambda: self._vote())
else:
- return self._wait(lambda: self._undo(trans_id))
-
- def vote(self, id):
- self._check_tid(id, exc=StorageTransactionError)
- if self.locked:
return self._vote()
- else:
- return self._wait(lambda: self._vote())
- # When a delayed transaction is restarted, the dance is
- # complicated. The restart occurs when one ZEOStorage instance
- # finishes as a transaction and finds another instance is in the
- # _waiting list.
- # It might be better to have a mechanism to explicitly send
- # the finishing transaction's reply before restarting the waiting
- # transaction. If the restart takes a long time, the previous
- # client will be blocked until it finishes.
-
+ _thunk = _delay = None
def _wait(self, thunk):
# Wait for the storage lock to be acquired.
+ assert self._thunk == self._delay == None
self._thunk = thunk
- if self.tpc_transaction():
- d = Delay()
- self.storage._waiting.append((d, self))
- self.log("Transaction blocked waiting for storage. "
- "Clients waiting: %d." % len(self.storage._waiting))
- return d
- else:
+ if self.server.lock_storage(self):
+ assert not self.tpc_transaction()
self.log("Transaction acquired storage lock.", BLATHER)
+ self.locked = True
return self._restart()
- def _restart(self, delay=None):
+ self._delay = d = Delay()
+ return d
+
+ def _restart(self):
+ if not self.locked:
+ # Must have been disconnected after locking
+ assert self.connection is None
+ return
+
# Restart when the storage lock is available.
if self.txnlog.stores == 1:
template = "Preparing to commit transaction: %d object, %d bytes"
else:
template = "Preparing to commit transaction: %d objects, %d bytes"
+
self.log(template % (self.txnlog.stores, self.txnlog.size()),
level=BLATHER)
- self.locked = 1
- self.timeout.begin(self)
- self.stats.lock_time = time.time()
if (self.tid is not None) or (self.status != ' '):
self.storage.tpc_begin(self.transaction, self.tid, self.status)
else:
@@ -575,12 +580,19 @@
self._clear_transaction()
raise
- resp = self._thunk()
+ thunk = self._thunk
+ delay = self._delay
+ self._thunk = self._delay = None
+
+ resp = thunk()
if delay is not None:
delay.reply(resp)
- else:
- return resp
+ self.txnlog.close()
+ self.txnlog = None
+ del self.blob_log
+ return resp
+
# The public methods of the ZEO client API do not do the real work.
# They defer work until after the storage lock has been acquired.
# Most of the real implementations are in methods beginning with
@@ -610,14 +622,18 @@
os.write(self.blob_tempfile[0], chunk)
def storeBlobEnd(self, oid, serial, data, id):
+ self._check_tid(id, exc=StorageTransactionError)
+ assert self.txnlog is not None # effectively not allowed after undo
fd, tempname = self.blob_tempfile
self.blob_tempfile = None
os.close(fd)
self.blob_log.append((oid, serial, data, tempname))
def storeBlobShared(self, oid, serial, data, filename, id):
+ self._check_tid(id, exc=StorageTransactionError)
+ assert self.txnlog is not None # effectively not allowed after undo
+
# Reconstruct the full path from the filename in the OID directory
-
if (os.path.sep in filename
or not (filename.endswith('.tmp')
or filename[:-1].endswith('.tmp')
@@ -735,6 +751,7 @@
return error
def _vote(self):
+ assert self.locked
if not self.store_failed:
# Only call tpc_vote of no store call failed, otherwise
# the serialnos() call will deliver an exception that will be
@@ -929,8 +946,10 @@
for name, storage in storages.items()])
log("%s created %s with storages: %s" %
(self.__class__.__name__, read_only and "RO" or "RW", msg))
- for s in storages.values():
- s._waiting = []
+
+ self.lockers = dict((name, []) for name in storages)
+ self.lockers_lock = threading.Lock()
+
self.read_only = read_only
self.auth_protocol = auth_protocol
self.auth_database = auth_database
@@ -1044,7 +1063,7 @@
Returns the timeout and stats objects for the appropriate storage.
"""
self.connections[storage_id].append(conn)
- return self.timeouts[storage_id], self.stats[storage_id]
+ return self.stats[storage_id]
def _invalidateCache(self, storage_id):
"""We need to invalidate any caches we have.
@@ -1216,7 +1235,51 @@
if conn.obj in cl:
cl.remove(conn.obj)
+ def lock_storage(self, zeostore):
+ self.lockers_lock.acquire()
+ try:
+ storage_id = zeostore.storage_id
+ lockers = self.lockers[storage_id]
+ lockers.append(zeostore)
+ if len(lockers) == 1:
+ self.timeouts[storage_id].begin(zeostore)
+ self.stats[storage_id].lock_time = time.time()
+ return True
+ else:
+ zeostore.log("(%r) queue lock: transactions waiting: %s"
+ % (storage_id, len(lockers)-1))
+ finally:
+ self.lockers_lock.release()
+ def unlock_storage(self, zeostore):
+ self.lockers_lock.acquire()
+ try:
+ storage_id = zeostore.storage_id
+ lockers = self.lockers[storage_id]
+ if zeostore in lockers:
+ if lockers[0] == zeostore:
+ self.timeouts[storage_id].end(zeostore)
+ self.stats[storage_id].lock_time = None
+ lockers.pop(0)
+ while lockers:
+ zeostore.log("(%r) unlock: transactions waiting: %s"
+ % (storage_id, len(lockers)-1))
+ zeostore = lockers[0]
+ if zeostore.notifyLocked():
+ self.timeouts[storage_id].begin(zeostore)
+ self.stats[storage_id].lock_time = time.time()
+ break
+ else:
+ # The queued client was closed, so dequeue it
+ lockers.pop(0)
+ else:
+ lockers.remove(zeostore)
+ if lockers:
+ zeostore.log("(%r) dequeue: transactions waiting: %s"
+ % (storage_id, len(lockers)-1))
+ finally:
+ self.lockers_lock.release()
+
class StubTimeoutThread:
def begin(self, client):
@@ -1238,7 +1301,6 @@
self._client = None
self._deadline = None
self._cond = threading.Condition() # Protects _client and _deadline
- self._trigger = trigger()
def begin(self, client):
# Called from the restart code the "main" thread, whenever the
@@ -1281,7 +1343,8 @@
if howlong <= 0:
client.log("Transaction timeout after %s seconds" %
self._timeout)
- self._trigger.pull_trigger(lambda: client.connection.close())
+ client.connection.trigger.pull_trigger(
+ lambda: client.connection.close())
else:
time.sleep(howlong)
@@ -1337,7 +1400,7 @@
self.rpc.callAsync('endVerify')
def invalidateTransaction(self, tid, args):
- self.rpc.callAsyncNoPoll('invalidateTransaction', tid, args)
+ self.rpc.callAsync('invalidateTransaction', tid, args)
def serialnos(self, arg):
self.rpc.callAsync('serialnos', arg)
@@ -1363,7 +1426,7 @@
class ClientStub308(ClientStub):
def invalidateTransaction(self, tid, args):
- self.rpc.callAsyncNoPoll(
+ self.rpc.callAsync(
'invalidateTransaction', tid, [(arg, '') for arg in args])
def invalidateVerify(self, oid):
Modified: ZODB/branches/jim-thready/src/ZEO/tests/InvalidationTests.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/tests/InvalidationTests.py 2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZEO/tests/InvalidationTests.py 2009-12-22 21:31:06 UTC (rev 106933)
@@ -318,9 +318,9 @@
# tearDown then immediately, but if other threads are still
# running that can lead to a cascade of spurious exceptions.
for t in threads:
- t.join(10)
+ t.join(30)
for t in threads:
- t.cleanup()
+ t.cleanup(10)
def checkConcurrentUpdates2Storages_emulated(self):
self._storage = storage1 = self.openClientStorage()
@@ -378,6 +378,34 @@
db1.close()
db2.close()
+ def checkConcurrentUpdates19Storages(self):
+ n = 19
+ dbs = [DB(self.openClientStorage()) for i in range(n)]
+ self._storage = dbs[0].storage
+ stop = threading.Event()
+
+ cn = dbs[0].open()
+ tree = cn.root()["tree"] = OOBTree()
+ transaction.commit()
+ cn.close()
+
+ # Run threads that update the BTree
+ cd = {}
+ threads = [self.StressThread(dbs[i], stop, i, cd, i, n)
+ for i in range(n)]
+ self.go(stop, cd, *threads)
+
+ while len(set(db.lastTransaction() for db in dbs)) > 1:
+ _ = [db._storage.sync() for db in dbs]
+
+ cn = dbs[0].open()
+ tree = cn.root()["tree"]
+ self._check_tree(cn, tree)
+ self._check_threads(tree, *threads)
+
+ cn.close()
+ _ = [db.close() for db in dbs]
+
def checkConcurrentUpdates1Storage(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
Modified: ZODB/branches/jim-thready/src/ZEO/tests/servertesting.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/tests/servertesting.py 2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZEO/tests/servertesting.py 2009-12-22 21:31:06 UTC (rev 106933)
@@ -56,3 +56,13 @@
def callAsync(self, meth, *args):
print self.name, 'callAsync', meth, repr(args)
+
+ @property
+ def trigger(self):
+ return self
+
+ def pull_trigger(self, func):
+ func()
+
+ def auth_done(self):
+ pass
Modified: ZODB/branches/jim-thready/src/ZEO/tests/testConversionSupport.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/tests/testConversionSupport.py 2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZEO/tests/testConversionSupport.py 2009-12-22 21:31:06 UTC (rev 106933)
@@ -13,6 +13,8 @@
##############################################################################
import unittest
from zope.testing import doctest
+import ZEO.zrpc.connection
+import ZEO.tests.servertesting
class FakeStorageBase:
@@ -29,6 +31,9 @@
def __len__(self):
return 4
+ def registerDB(self, *args):
+ pass
+
class FakeStorage(FakeStorageBase):
def record_iternext(self, next=None):
@@ -50,15 +55,25 @@
def register_connection(*args):
return None, None
+class FauxConn:
+ addr = 'x'
+ thread_ident = unregistered_thread_ident = None
+ peer_protocol_version = (
+ ZEO.zrpc.connection.Connection.current_protocol)
+
+ def auth_done(self):
+ pass
+
def test_server_record_iternext():
"""
-
+
On the server, record_iternext calls are simply delegated to the
underlying storage.
>>> import ZEO.StorageServer
>>> zeo = ZEO.StorageServer.ZEOStorage(FakeServer(), False)
+ >>> zeo.notifyConnected(FauxConn())
>>> zeo.register('1', False)
>>> next = None
@@ -71,13 +86,14 @@
2
3
4
-
+
The storage info also reflects the fact that record_iternext is supported.
>>> zeo.get_info()['supports_record_iternext']
True
>>> zeo = ZEO.StorageServer.ZEOStorage(FakeServer(), False)
+ >>> zeo.notifyConnected(FauxConn())
>>> zeo.register('2', False)
>>> zeo.get_info()['supports_record_iternext']
@@ -152,7 +168,7 @@
4
"""
-
+
def history_to_version_compatible_storage():
"""
Some storages work under ZODB <= 3.8 and ZODB >= 3.9.
@@ -163,15 +179,19 @@
... return oid,version,size
A ZEOStorage such as the following should support this type of storage:
-
+
>>> class OurFakeServer(FakeServer):
... storages = {'1':VersionCompatibleStorage()}
>>> import ZEO.StorageServer
- >>> zeo = ZEO.StorageServer.ZEOStorage(OurFakeServer(), False)
+ >>> zeo = ZEO.StorageServer.ZEOStorage(
+ ... ZEO.tests.servertesting.StorageServer(
+ ... 'test', {'1':VersionCompatibleStorage()}))
+ >>> zeo.notifyConnected(ZEO.tests.servertesting.Connection())
>>> zeo.register('1', False)
- The ZEOStorage should sort out the following call such that the storage gets
- the correct parameters and so should return the parameters it was called with:
+ The ZEOStorage should sort out the following call such that the
+ storage gets the correct parameters and so should return the
+ parameters it was called with:
>>> zeo.history('oid',99)
('oid', '', 99)
@@ -181,7 +201,7 @@
>>> from ZEO.StorageServer import ZEOStorage308Adapter
>>> zeo = ZEOStorage308Adapter(VersionCompatibleStorage())
-
+
The history method should still return the parameters it was called with:
>>> zeo.history('oid','',99)
Modified: ZODB/branches/jim-thready/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/tests/testZEO.py 2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZEO/tests/testZEO.py 2009-12-22 21:31:06 UTC (rev 106933)
@@ -25,7 +25,6 @@
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle
-import asyncore
import doctest
import logging
import os
@@ -464,7 +463,11 @@
pass
time.sleep(.1)
- self.failIf(self._storage.is_connected())
+ try:
+ self.failIf(self._storage.is_connected())
+ except:
+ print log
+ raise
self.assertEqual(len(ZEO.zrpc.connection.client_map), 1)
del ZEO.zrpc.connection.client_logger.critical
self.assertEqual(log[0][0], 'The ZEO client loop failed.')
@@ -738,6 +741,14 @@
blob_cache_dir = 'blobs'
shared_blob_dir = True
+class FauxConn:
+ addr = 'x'
+ peer_protocol_version = (
+ ZEO.zrpc.connection.Connection.current_protocol)
+
+ def auth_done(self):
+ pass
+
class StorageServerClientWrapper:
def __init__(self):
@@ -754,8 +765,8 @@
def __init__(self, server, storage_id):
self.storage_id = storage_id
self.server = ZEO.StorageServer.ZEOStorage(server, server.read_only)
+ self.server.notifyConnected(FauxConn())
self.server.register(storage_id, False)
- self.server._thunk = lambda : None
self.server.client = StorageServerClientWrapper()
def sortKey(self):
@@ -777,7 +788,6 @@
self.server.tpc_begin(id(transaction), '', '', {}, None, ' ')
def tpc_vote(self, transaction):
- self.server._restart()
self.server.vote(id(transaction))
result = self.server.client.serials[:]
del self.server.client.serials[:]
@@ -860,6 +870,8 @@
>>> fs = FileStorage('t.fs')
>>> sv = StorageServer(('', get_port()), dict(fs=fs))
>>> s = ZEOStorage(sv, sv.read_only)
+
+ >>> s.notifyConnected(FauxConn())
>>> s.register('fs', False)
If we ask for the last transaction, we should get the last transaction
Modified: ZODB/branches/jim-thready/src/ZEO/tests/testZEO2.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/tests/testZEO2.py 2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZEO/tests/testZEO2.py 2009-12-22 21:31:06 UTC (rev 106933)
@@ -92,9 +92,9 @@
handled correctly:
>>> zs1.tpc_abort('0') # doctest: +ELLIPSIS
+ (511/test-addr) ('1') unlock: transactions waiting: 0
2 callAsync serialnos ...
reply 1 None
- (511/test-addr) Blocked transaction restarted.
>>> fs.tpc_transaction() is not None
True
Modified: ZODB/branches/jim-thready/src/ZEO/zrpc/client.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/zrpc/client.py 2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZEO/zrpc/client.py 2009-12-22 21:31:06 UTC (rev 106933)
@@ -84,7 +84,6 @@
try:
t = self.thread
self.thread = None
- conn = self.connection
finally:
self.cond.release()
if t is not None:
@@ -94,9 +93,6 @@
if t.isAlive():
log("CM.close(): self.thread.join() timed out",
level=logging.WARNING)
- if conn is not None:
- # This will call close_conn() below which clears self.connection
- conn.close()
def attempt_connect(self):
"""Attempt a connection to the server without blocking too long.
Modified: ZODB/branches/jim-thready/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/zrpc/connection.py 2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZEO/zrpc/connection.py 2009-12-22 21:31:06 UTC (rev 106933)
@@ -16,17 +16,16 @@
import errno
import select
import sys
+import thread
import threading
import logging
-import traceback, time
-
from ZEO.zrpc import smac
from ZEO.zrpc.error import ZRPCError, DisconnectedError
from ZEO.zrpc.marshal import Marshaller, ServerMarshaller
from ZEO.zrpc.trigger import trigger
from ZEO.zrpc.log import short_repr, log
-from ZODB.loglevels import BLATHER, TRACE
+from ZODB.loglevels import BLATHER
import ZODB.POSException
REPLY = ".reply" # message name used for replies
@@ -144,7 +143,7 @@
if obj is client_trigger:
continue
try:
- obj.mgr.client.close()
+ obj.mgr.client.close(True)
except:
map.pop(fd, None)
try:
@@ -759,11 +758,6 @@
self.log("wait(%d)" % msgid, level=TRACE)
self.trigger.pull_trigger()
-
- # Delay used when we call asyncore.poll() directly.
- # Start with a 1 msec delay, double until 1 sec.
- delay = 0.001
-
self.replies_cond.acquire()
try:
while 1:
@@ -794,7 +788,6 @@
self.trigger.pull_trigger()
-
class ManagedServerConnection(Connection):
"""Server-side Connection subclass."""
@@ -802,13 +795,19 @@
unlogged_exception_types = (ZODB.POSException.POSKeyError, )
# Servers use a shared server trigger that uses the asyncore socket map
- trigger = trigger()
+ #trigger = trigger()
def __init__(self, sock, addr, obj, mgr):
self.mgr = mgr
- Connection.__init__(self, sock, addr, obj, 'S')
+ map={}
+ Connection.__init__(self, sock, addr, obj, 'S', map=map)
self.marshal = ServerMarshaller()
+ self.trigger = trigger(map)
+ t = threading.Thread(target=server_loop, args=(map, self))
+ t.setDaemon(True)
+ t.start()
+
def handshake(self):
# Send the server's preferred protocol to the client.
self.message_output(self.current_protocol)
@@ -821,6 +820,27 @@
self.obj.notifyDisconnected()
Connection.close(self)
+ thread_ident = unregistered_thread_ident = None
+ def poll(self):
+ "Invoke asyncore mainloop to get pending message out."
+ ident = self.thread_ident
+ if ident is not None and thread.get_ident() == ident:
+ self.handle_write()
+ else:
+ self.trigger.pull_trigger()
+
+ def auth_done(self):
+ # We're done with the auth dance. We can be fast now.
+ self.thread_ident = self.unregistered_thread_ident
+
+def server_loop(map, conn):
+ conn.unregistered_thread_ident = thread.get_ident()
+
+ while len(map) > 1:
+ asyncore.poll(30.0, map)
+ for o in map.values():
+ o.close()
+
class ManagedClientConnection(Connection):
"""Client-side Connection subclass."""
__super_init = Connection.__init__
Modified: ZODB/branches/jim-thready/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/branches/jim-thready/src/ZODB/FileStorage/FileStorage.py 2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZODB/FileStorage/FileStorage.py 2009-12-22 21:31:06 UTC (rev 106933)
@@ -36,6 +36,7 @@
import logging
import os
import sys
+import threading
import time
import ZODB.blob
import ZODB.interfaces
@@ -128,7 +129,7 @@
else:
self._tfile = None
- self._file_name = file_name
+ self._file_name = os.path.abspath(file_name)
self._pack_gc = pack_gc
self.pack_keep_old = pack_keep_old
@@ -167,6 +168,7 @@
self._file = open(file_name, 'w+b')
self._file.write(packed_version)
+ self._files = FilePool(self._file_name)
r = self._restore_index()
if r is not None:
self._used_index = 1 # Marker for testing
@@ -401,6 +403,7 @@
def close(self):
self._file.close()
+ self._files.close()
if hasattr(self,'_lock_file'):
self._lock_file.close()
if self._tfile:
@@ -426,22 +429,25 @@
"""Return pickle data and serial number."""
assert not version
- self._lock_acquire()
+ _file = self._files.get()
try:
+
pos = self._lookup_pos(oid)
- h = self._read_data_header(pos, oid)
+
+ h = self._read_data_header(pos, oid, _file)
if h.plen:
- data = self._file.read(h.plen)
+ data = _file.read(h.plen)
return data, h.tid
elif h.back:
# Get the data from the backpointer, but tid from
# current txn.
- data = self._loadBack_impl(oid, h.back)[0]
+ data = self._loadBack_impl(oid, h.back, _file=_file)[0]
return data, h.tid
else:
raise POSKeyError(oid)
+
finally:
- self._lock_release()
+ self._files.put(_file)
def loadSerial(self, oid, serial):
self._lock_acquire()
@@ -462,12 +468,13 @@
self._lock_release()
def loadBefore(self, oid, tid):
- self._lock_acquire()
+ _file = self._files.get()
try:
pos = self._lookup_pos(oid)
+
end_tid = None
while True:
- h = self._read_data_header(pos, oid)
+ h = self._read_data_header(pos, oid, _file)
if h.tid < tid:
break
@@ -477,14 +484,14 @@
return None
if h.back:
- data, _, _, _ = self._loadBack_impl(oid, h.back)
+ data, _, _, _ = self._loadBack_impl(oid, h.back, _file=_file)
return data, h.tid, end_tid
else:
- return self._file.read(h.plen), h.tid, end_tid
-
+ return _file.read(h.plen), h.tid, end_tid
finally:
- self._lock_release()
+ self._files.put(_file)
+
def store(self, oid, oldserial, data, version, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
@@ -734,6 +741,31 @@
finally:
self._lock_release()
+ def tpc_finish(self, transaction, f=None):
+
+ # Get write lock
+ self._files.write_lock()
+ try:
+ self._lock_acquire()
+ try:
+ if transaction is not self._transaction:
+ return
+ try:
+ if f is not None:
+ f(self._tid)
+ u, d, e = self._ude
+ self._finish(self._tid, u, d, e)
+ self._clear_temp()
+ finally:
+ self._ude = None
+ self._transaction = None
+ self._commit_lock_release()
+ finally:
+ self._lock_release()
+
+ finally:
+ self._files.write_unlock()
+
def _finish(self, tid, u, d, e):
# If self._nextpos is 0, then the transaction didn't write any
# data, so we don't bother writing anything to the file.
@@ -1130,8 +1162,10 @@
return
have_commit_lock = True
opos, index = pack_result
+ self._files.write_lock()
self._lock_acquire()
try:
+ self._files.empty()
self._file.close()
try:
os.rename(self._file_name, oldpath)
@@ -1145,6 +1179,7 @@
self._initIndex(index, self._tindex)
self._pos = opos
finally:
+ self._files.write_unlock()
self._lock_release()
# We're basically done. Now we need to deal with removed
@@ -2036,3 +2071,72 @@
'description': d}
d.update(e)
return d
+
+class FilePool:
+
+ closed = False
+ writing = False
+
+ def __init__(self, file_name):
+ self.name = file_name
+ self._files = []
+ self._out = []
+ self._cond = threading.Condition()
+
+ def write_lock(self):
+ self._cond.acquire()
+ try:
+ self.writing = True
+ while self._out:
+ self._cond.wait()
+ finally:
+ self._cond.release()
+
+ def write_unlock(self):
+ self._cond.acquire()
+ self.writing = False
+ self._cond.notifyAll()
+ self._cond.release()
+
+ def get(self):
+ self._cond.acquire()
+ try:
+ while self.writing:
+ self._cond.wait()
+ if self.closed:
+ raise ValueError('closed')
+
+ try:
+ f = self._files.pop()
+ except IndexError:
+ f = open(self.name, 'rb')
+ self._out.append(f)
+ return f
+ finally:
+ self._cond.release()
+
+ def put(self, f):
+ self._out.remove(f)
+ self._files.append(f)
+ if not self._out:
+ self._cond.acquire()
+ try:
+ if self.writing and not self._out:
+ self._cond.notifyAll()
+ finally:
+ self._cond.release()
+
+ def empty(self):
+ while self._files:
+ self._files.pop().close()
+
+ def close(self):
+ self._cond.acquire()
+ self.closed = True
+ self._cond.release()
+
+ self.write_lock()
+ try:
+ self.empty()
+ finally:
+ self.write_unlock()
Modified: ZODB/branches/jim-thready/src/ZODB/FileStorage/format.py
===================================================================
--- ZODB/branches/jim-thready/src/ZODB/FileStorage/format.py 2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZODB/FileStorage/format.py 2009-12-22 21:31:06 UTC (rev 106933)
@@ -134,21 +134,24 @@
self._file.seek(pos)
return u64(self._file.read(8))
- def _read_data_header(self, pos, oid=None):
+ def _read_data_header(self, pos, oid=None, _file=None):
"""Return a DataHeader object for data record at pos.
If ois is not None, raise CorruptedDataError if oid passed
does not match oid in file.
"""
- self._file.seek(pos)
- s = self._file.read(DATA_HDR_LEN)
+ if _file is None:
+ _file = self._file
+
+ _file.seek(pos)
+ s = _file.read(DATA_HDR_LEN)
if len(s) != DATA_HDR_LEN:
raise CorruptedDataError(oid, s, pos)
h = DataHeaderFromString(s)
if oid is not None and oid != h.oid:
raise CorruptedDataError(oid, s, pos)
if not h.plen:
- h.back = u64(self._file.read(8))
+ h.back = u64(_file.read(8))
return h
def _read_txn_header(self, pos, tid=None):
@@ -164,20 +167,22 @@
h.ext = self._file.read(h.elen)
return h
- def _loadBack_impl(self, oid, back, fail=True):
+ def _loadBack_impl(self, oid, back, fail=True, _file=None):
# shared implementation used by various _loadBack methods
#
# If the backpointer ultimately resolves to 0:
# If fail is True, raise KeyError for zero backpointer.
# If fail is False, return the empty data from the record
# with no backpointer.
+ if _file is None:
+ _file = self._file
while 1:
if not back:
# If backpointer is 0, object does not currently exist.
raise POSKeyError(oid)
- h = self._read_data_header(back)
+ h = self._read_data_header(back, _file=_file)
if h.plen:
- return self._file.read(h.plen), h.tid, back, h.tloc
+ return _file.read(h.plen), h.tid, back, h.tloc
if h.back == 0 and not fail:
return None, h.tid, back, h.tloc
back = h.back
Modified: ZODB/branches/jim-thready/src/ZODB/tests/testFileStorage.py
===================================================================
--- ZODB/branches/jim-thready/src/ZODB/tests/testFileStorage.py 2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZODB/tests/testFileStorage.py 2009-12-22 21:31:06 UTC (rev 106933)
@@ -587,10 +587,10 @@
>>> handler.uninstall()
- >>> fs.load('\0'*8, '')
+ >>> fs.load('\0'*8, '') # doctest: +ELLIPSIS
Traceback (most recent call last):
...
- ValueError: I/O operation on closed file
+ ValueError: ...
>>> db.close()
>>> fs = ZODB.FileStorage.FileStorage('data.fs')
More information about the Zodb-checkins
mailing list