[Zodb-checkins] SVN: ZODB/branches/ctheune-bushy-directory-3.8/
Merge updates from 3.8 branch
Christian Theune
ct at gocept.com
Mon Aug 4 13:47:46 EDT 2008
Log message for revision 89350:
Merge updates from 3.8 branch
Changed:
U ZODB/branches/ctheune-bushy-directory-3.8/NEWS.txt
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/ClientStorage.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/cache.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/ConnectionTests.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/InvalidationTests.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/forker.py
A ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/invalidations_while_connecting.test
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testConnection.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testZEO.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/test_cache.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/client.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/connection.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/Connection.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/blob.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_layout.txt
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_packing.txt
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/testcrossdatabasereferences.py
-=-
Modified: ZODB/branches/ctheune-bushy-directory-3.8/NEWS.txt
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/NEWS.txt 2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/NEWS.txt 2008-08-04 17:47:44 UTC (rev 89350)
@@ -4,6 +4,15 @@
Bugs Fixed:
+- (???) Fixed bug #251037: Made packing of blob storages non-blocking.
+
+- (beta 6) Fixed a bug that could cause InvalidObjectReference errors
+ for objects that were explicitly added to a database if the object
+ was modified after a savepoint that added the object.
+
+- (beta 5) Fixed several bugs that caused ZEO cache corruption when connecting
+ to servers. These bugs affected both persistent and non-persistent caches.
+
- (beta 5) Improved the the ZEO client shutdown support to try to
avoid spurious errors on exit, especially for scripts, such as zeopack.
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/ClientStorage.py 2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/ClientStorage.py 2008-08-04 17:47:44 UTC (rev 89350)
@@ -251,8 +251,6 @@
# _is_read_only stores the constructor argument
self._is_read_only = read_only
- # _conn_is_read_only stores the status of the current connection
- self._conn_is_read_only = 0
self._storage = storage
self._read_only_fallback = read_only_fallback
self._username = username
@@ -340,8 +338,6 @@
else:
cache_path = None
self._cache = self.ClientCacheClass(cache_path, size=cache_size)
- # TODO: maybe there's a better time to open the cache? Unclear.
- self._cache.open()
self._rpc_mgr = self.ConnectionManagerClass(addr, self,
tmin=min_disconnect_poll,
@@ -382,13 +378,18 @@
def close(self):
"""Storage API: finalize the storage, releasing external resources."""
+ if self._rpc_mgr is not None:
+ self._rpc_mgr.close()
+ self._rpc_mgr = None
+ if self._connection is not None:
+ self._connection.register_object(None) # Don't call me!
+ self._connection.close()
+ self._connection = None
+
self._tbuf.close()
if self._cache is not None:
self._cache.close()
self._cache = None
- if self._rpc_mgr is not None:
- self._rpc_mgr.close()
- self._rpc_mgr = None
def registerDB(self, db):
"""Storage API: register a database for invalidation messages.
@@ -454,7 +455,7 @@
"""
log2("Testing connection %r" % conn)
# TODO: Should we check the protocol version here?
- self._conn_is_read_only = 0
+ conn._is_read_only = self._is_read_only
stub = self.StorageServerStubClass(conn)
auth = stub.getAuthProtocol()
@@ -476,7 +477,7 @@
raise
log2("Got ReadOnlyError; trying again with read_only=1")
stub.register(str(self._storage), read_only=1)
- self._conn_is_read_only = 1
+ conn._is_read_only = True
return 0
def notifyConnected(self, conn):
@@ -490,24 +491,26 @@
# this method before it was stopped.
return
- # invalidate our db cache
- if self._db is not None:
- self._db.invalidateCache()
- # TODO: report whether we get a read-only connection.
if self._connection is not None:
+ # If we are upgrading from a read-only fallback connection,
+ # we must close the old connection to prevent it from being
+ # used while the cache is verified against the new connection.
+ self._connection.register_object(None) # Don't call me!
+ self._connection.close()
+ self._connection = None
+ self._ready.clear()
reconnect = 1
else:
reconnect = 0
+
self.set_server_addr(conn.get_addr())
-
- # If we are upgrading from a read-only fallback connection,
- # we must close the old connection to prevent it from being
- # used while the cache is verified against the new connection.
- if self._connection is not None:
- self._connection.close()
self._connection = conn
+ # invalidate our db cache
+ if self._db is not None:
+ self._db.invalidateCache()
+
if reconnect:
log2("Reconnected to storage: %s" % self._server_addr)
else:
@@ -561,54 +564,6 @@
else:
return '%s:%s' % (self._storage, self._server_addr)
- def verify_cache(self, server):
- """Internal routine called to verify the cache.
-
- The return value (indicating which path we took) is used by
- the test suite.
- """
-
- # If verify_cache() finishes the cache verification process,
- # it should set self._server. If it goes through full cache
- # verification, then endVerify() should self._server.
-
- last_inval_tid = self._cache.getLastTid()
- if last_inval_tid is not None:
- ltid = server.lastTransaction()
- if ltid == last_inval_tid:
- log2("No verification necessary (last_inval_tid up-to-date)")
- self._server = server
- self._ready.set()
- return "no verification"
-
- # log some hints about last transaction
- log2("last inval tid: %r %s\n"
- % (last_inval_tid, tid2time(last_inval_tid)))
- log2("last transaction: %r %s" %
- (ltid, ltid and tid2time(ltid)))
-
- pair = server.getInvalidations(last_inval_tid)
- if pair is not None:
- log2("Recovering %d invalidations" % len(pair[1]))
- self.invalidateTransaction(*pair)
- self._server = server
- self._ready.set()
- return "quick verification"
-
- log2("Verifying cache")
- # setup tempfile to hold zeoVerify results
- self._tfile = tempfile.TemporaryFile(suffix=".inv")
- self._pickler = cPickle.Pickler(self._tfile, 1)
- self._pickler.fast = 1 # Don't use the memo
-
- # TODO: should batch these operations for efficiency; would need
- # to acquire lock ...
- for oid, tid, version in self._cache.contents():
- server.verify(oid, version, tid)
- self._pending_server = server
- server.endZeoVerify()
- return "full verification"
-
### Is there a race condition between notifyConnected and
### notifyDisconnected? In Particular, what if we get
### notifyDisconnected in the middle of notifyConnected?
@@ -674,12 +629,16 @@
def isReadOnly(self):
"""Storage API: return whether we are in read-only mode."""
if self._is_read_only:
- return 1
+ return True
else:
# If the client is configured for a read-write connection
- # but has a read-only fallback connection, _conn_is_read_only
- # will be True.
- return self._conn_is_read_only
+ # but has a read-only fallback connection, conn._is_read_only
+ # will be True. If self._connection is None, we'll behave as
+ # read_only
+ try:
+ return self._connection._is_read_only
+ except AttributeError:
+ return True
def _check_trans(self, trans):
"""Internal helper to check a transaction argument for sanity."""
@@ -1152,7 +1111,7 @@
return
for oid, version, data in self._tbuf:
- self._cache.invalidate(oid, version, tid)
+ self._cache.invalidate(oid, version, tid, False)
# If data is None, we just invalidate.
if data is not None:
s = self._seriald[oid]
@@ -1210,8 +1169,6 @@
"""Storage API: return a sequence of versions in the storage."""
return self._server.versions(max)
- # Below are methods invoked by the StorageServer
-
def serialnos(self, args):
"""Server callback to pass a list of changed (oid, serial) pairs."""
self._serials.extend(args)
@@ -1220,6 +1177,57 @@
"""Server callback to update the info dictionary."""
self._info.update(dict)
+ def verify_cache(self, server):
+ """Internal routine called to verify the cache.
+
+ The return value (indicating which path we took) is used by
+ the test suite.
+ """
+
+ self._pending_server = server
+
+ # setup tempfile to hold zeoVerify results and interim
+ # invalidation results
+ self._tfile = tempfile.TemporaryFile(suffix=".inv")
+ self._pickler = cPickle.Pickler(self._tfile, 1)
+ self._pickler.fast = 1 # Don't use the memo
+
+ # allow incoming invalidations:
+ self._connection.register_object(self)
+
+ # If verify_cache() finishes the cache verification process,
+ # it should set self._server. If it goes through full cache
+ # verification, then endVerify() should self._server.
+
+ last_inval_tid = self._cache.getLastTid()
+ if last_inval_tid is not None:
+ ltid = server.lastTransaction()
+ if ltid == last_inval_tid:
+ log2("No verification necessary (last_inval_tid up-to-date)")
+ self.finish_verification()
+ return "no verification"
+
+ # log some hints about last transaction
+ log2("last inval tid: %r %s\n"
+ % (last_inval_tid, tid2time(last_inval_tid)))
+ log2("last transaction: %r %s" %
+ (ltid, ltid and tid2time(ltid)))
+
+ pair = server.getInvalidations(last_inval_tid)
+ if pair is not None:
+ log2("Recovering %d invalidations" % len(pair[1]))
+ self.finish_verification(pair)
+ return "quick verification"
+
+ log2("Verifying cache")
+
+ # TODO: should batch these operations for efficiency; would need
+ # to acquire lock ...
+ for oid, tid, version in self._cache.contents():
+ server.verify(oid, version, tid)
+ server.endZeoVerify()
+ return "full verification"
+
def invalidateVerify(self, args):
"""Server callback to invalidate an (oid, version) pair.
@@ -1231,68 +1239,93 @@
# This should never happen. TODO: assert it doesn't, or log
# if it does.
return
- self._pickler.dump(args)
+ oid, version = args
+ self._pickler.dump((oid, version, None))
- def _process_invalidations(self, invs):
- # Invalidations are sent by the ZEO server as a sequence of
- # oid, version pairs. The DB's invalidate() method expects a
- # dictionary of oids.
+ def endVerify(self):
+ """Server callback to signal end of cache validation."""
+ log2("endVerify finishing")
+ self.finish_verification()
+ log2("endVerify finished")
+
+ def finish_verification(self, catch_up=None):
self._lock.acquire()
try:
- # versions maps version names to dictionary of invalidations
- versions = {}
- for oid, version, tid in invs:
- if oid == self._load_oid:
- self._load_status = 0
- self._cache.invalidate(oid, version, tid)
- oids = versions.get((version, tid))
- if not oids:
- versions[(version, tid)] = [oid]
- else:
- oids.append(oid)
+ if catch_up:
+ # process catch-up invalidations
+ tid, invalidations = catch_up
+ self._process_invalidations(
+ (oid, version, tid)
+ for oid, version in invalidations
+ )
+
+ if self._pickler is None:
+ return
+ # write end-of-data marker
+ self._pickler.dump((None, None, None))
+ self._pickler = None
+ self._tfile.seek(0)
+ unpickler = cPickle.Unpickler(self._tfile)
+ min_tid = self._cache.getLastTid()
+ def InvalidationLogIterator():
+ while 1:
+ oid, version, tid = unpickler.load()
+ if oid is None:
+ break
+ if ((tid is None)
+ or (min_tid is None)
+ or (tid > min_tid)
+ ):
+ yield oid, version, tid
- if self._db is not None:
- for (version, tid), d in versions.items():
- self._db.invalidate(tid, d, version=version)
+ self._process_invalidations(InvalidationLogIterator())
+ self._tfile.close()
+ self._tfile = None
finally:
self._lock.release()
- def endVerify(self):
- """Server callback to signal end of cache validation."""
- if self._pickler is None:
- return
- # write end-of-data marker
- self._pickler.dump((None, None))
- self._pickler = None
- self._tfile.seek(0)
- f = self._tfile
- self._tfile = None
- self._process_invalidations(InvalidationLogIterator(f))
- f.close()
-
- log2("endVerify finishing")
self._server = self._pending_server
self._ready.set()
- self._pending_conn = None
- log2("endVerify finished")
+ self._pending_server = None
+
def invalidateTransaction(self, tid, args):
- """Invalidate objects modified by tid."""
+ """Server callback: Invalidate objects modified by tid."""
self._lock.acquire()
try:
- self._cache.setLastTid(tid)
+ if self._pickler is not None:
+ log2("Transactional invalidation during cache verification",
+ level=BLATHER)
+ for oid, version in args:
+ self._pickler.dump((oid, version, tid))
+ return
+ self._process_invalidations([(oid, version, tid)
+ for oid, version in args])
finally:
self._lock.release()
- if self._pickler is not None:
- log2("Transactional invalidation during cache verification",
- level=BLATHER)
- for t in args:
- self._pickler.dump(t)
- return
- self._process_invalidations([(oid, version, tid)
- for oid, version in args])
+ def _process_invalidations(self, invs):
+ # Invalidations are sent by the ZEO server as a sequence of
+ # oid, version, tid triples. The DB's invalidate() method expects a
+ # dictionary of oids.
+
+ # versions maps version names to dictionary of invalidations
+ versions = {}
+ for oid, version, tid in invs:
+ if oid == self._load_oid:
+ self._load_status = 0
+ self._cache.invalidate(oid, version, tid)
+ oids = versions.get((version, tid))
+ if not oids:
+ versions[(version, tid)] = [oid]
+ else:
+ oids.append(oid)
+
+ if self._db is not None:
+ for (version, tid), d in versions.items():
+ self._db.invalidate(tid, d, version=version)
+
# The following are for compatibility with protocol version 2.0.0
def invalidateTrans(self, args):
@@ -1301,11 +1334,3 @@
invalidate = invalidateVerify
end = endVerify
Invalidate = invalidateTrans
-
-def InvalidationLogIterator(fileobj):
- unpickler = cPickle.Unpickler(fileobj)
- while 1:
- oid, version = unpickler.load()
- if oid is None:
- break
- yield oid, version, None
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/cache.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/cache.py 2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/cache.py 2008-08-04 17:47:44 UTC (rev 89350)
@@ -30,6 +30,7 @@
import logging
import os
import tempfile
+import threading
import time
import ZODB.fsIndex
@@ -121,7 +122,22 @@
# to the end of the file that the new object can't fit in one
# contiguous chunk, currentofs is reset to ZEC3_HEADER_SIZE first.
+class locked(object):
+ def __init__(self, func):
+ self.func = func
+
+ def __get__(self, inst, class_):
+ if inst is None:
+ return self
+ def call(*args, **kw):
+ inst._lock.acquire()
+ try:
+ return self.func(inst, *args, **kw)
+ finally:
+ inst._lock.release()
+ return call
+
class ClientCache(object):
"""A simple in-memory cache."""
@@ -200,6 +216,10 @@
self._setup_trace(path)
+ self.open()
+
+ self._lock = threading.RLock()
+
# Backward compatibility. Client code used to have to use the fc
# attr to get to the file cache to get cache stats.
@property
@@ -353,6 +373,7 @@
# instance, and also written out near the start of the cache file. The
# new tid must be strictly greater than our current idea of the most
# recent tid.
+ @locked
def setLastTid(self, tid):
if self.tid is not None and tid <= self.tid:
raise ValueError("new last tid (%s) must be greater than "
@@ -369,10 +390,11 @@
# @return a transaction id
# @defreturn string, or None if no transaction is yet known
def getLastTid(self):
- if self.tid == z64:
+ tid = self.tid
+ if tid == z64:
return None
else:
- return self.tid
+ return tid
##
# Return the current data record for oid and version.
@@ -382,6 +404,7 @@
# in the cache
# @defreturn 3-tuple: (string, string, string)
+ @locked
def load(self, oid, version=""):
ofs = self.current.get(oid)
if ofs is None:
@@ -414,6 +437,7 @@
# @return data record, serial number, start tid, and end tid
# @defreturn 4-tuple: (string, string, string, string)
+ @locked
def loadBefore(self, oid, before_tid):
noncurrent_for_oid = self.noncurrent.get(u64(oid))
if noncurrent_for_oid is None:
@@ -455,6 +479,7 @@
# @defreturn string or None
# XXX This approac is wrong, but who cares
+ @locked
def modifiedInVersion(self, oid):
ofs = self.current.get(oid)
if ofs is None:
@@ -482,6 +507,7 @@
# @param data the actual data
# @exception ValueError tried to store non-current version data
+ @locked
def store(self, oid, version, start_tid, end_tid, data):
# It's hard for the client to avoid storing the same object
# more than once. One case is when the client requests
@@ -581,14 +607,30 @@
# data for `oid`, stop believing we have current data, and mark the
# data we had as being valid only up to `tid`. In all other cases, do
# nothing.
- # @param oid object id
- # @param version name of version to invalidate.
- # @param tid the id of the transaction that wrote a new revision of oid,
+ #
+ # Paramters:
+ #
+ # - oid object id
+ # - version name of version to invalidate.
+ # - tid the id of the transaction that wrote a new revision of oid,
# or None to forget all cached info about oid (version, current
# revision, and non-current revisions)
- def invalidate(self, oid, version, tid):
- if tid > self.tid and tid is not None:
- self.setLastTid(tid)
+ # - server_invalidation, a flag indicating whether the
+ # invalidation has come from the server. It's possible, due
+ # to threading issues, that when applying a local
+ # invalidation after a store, that later invalidations from
+ # the server may already have arrived.
+
+ @locked
+ def invalidate(self, oid, version, tid, server_invalidation=True):
+ if tid is not None:
+ if tid > self.tid:
+ self.setLastTid(tid)
+ elif tid < self.tid:
+ if server_invalidation:
+ raise ValueError("invalidation tid (%s) must not be less"
+ " than previous one (%s)" %
+ (u64(tid), u64(self.tid)))
ofs = self.current.get(oid)
if ofs is None:
@@ -630,17 +672,25 @@
seek = self.f.seek
read = self.f.read
for oid, ofs in self.current.iteritems():
- seek(ofs)
- assert read(1) == 'a', (ofs, self.f.tell(), oid)
- size, saved_oid, tid, end_tid, lver = unpack(">I8s8s8sh", read(30))
- assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid)
- assert end_tid == z64, (ofs, self.f.tell(), oid)
- if lver:
- version = read(lver)
- else:
- version = ''
- yield oid, tid, version
+ self._lock.acquire()
+ try:
+ seek(ofs)
+ assert read(1) == 'a', (ofs, self.f.tell(), oid)
+ size, saved_oid, tid, end_tid, lver = unpack(
+ ">I8s8s8sh", read(30))
+ assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid)
+ assert end_tid == z64, (ofs, self.f.tell(), oid)
+ if lver:
+ version = read(lver)
+ else:
+ version = ''
+ result = oid, tid, version
+ finally:
+ self._lock.release()
+
+ yield result
+
def dump(self):
from ZODB.utils import oid_repr
print "cache size", len(self)
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/ConnectionTests.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/ConnectionTests.py 2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/ConnectionTests.py 2008-08-04 17:47:44 UTC (rev 89350)
@@ -158,8 +158,7 @@
self.addr.append(self._getAddr())
def _getAddr(self):
- # port+1 is also used, so only draw even port numbers
- return 'localhost', random.randrange(25000, 30000, 2)
+ return 'localhost', forker.get_port()
def getConfig(self, path, create, read_only):
raise NotImplementedError
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/InvalidationTests.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/InvalidationTests.py 2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/InvalidationTests.py 2008-08-04 17:47:44 UTC (rev 89350)
@@ -144,24 +144,25 @@
self.commitdict = commitdict
def _testrun(self):
- cn = self.db.open()
+ tm = transaction.TransactionManager()
+ cn = self.db.open(transaction_manager=tm)
while not self.stop.isSet():
try:
tree = cn.root()["tree"]
break
except (ConflictError, KeyError):
- transaction.abort()
+ tm.abort()
key = self.startnum
while not self.stop.isSet():
try:
tree[key] = self.threadnum
- transaction.get().note("add key %s" % key)
- transaction.commit()
+ tm.get().note("add key %s" % key)
+ tm.commit()
self.commitdict[self] = 1
if self.sleep:
time.sleep(self.sleep)
except (ReadConflictError, ConflictError), msg:
- transaction.abort()
+ tm.abort()
else:
self.added_keys.append(key)
key += self.step
@@ -338,16 +339,23 @@
def _check_threads(self, tree, *threads):
# Make sure the thread's view of the world is consistent with
# the actual database state.
+
expected_keys = []
- errormsgs = []
- err = errormsgs.append
for t in threads:
if not t.added_keys:
err("thread %d didn't add any keys" % t.threadnum)
expected_keys.extend(t.added_keys)
expected_keys.sort()
- actual_keys = list(tree.keys())
- if expected_keys != actual_keys:
+
+ for i in range(100):
+ tree._p_jar.sync()
+ actual_keys = list(tree.keys())
+ if expected_keys == actual_keys:
+ break
+ time.sleep(.1)
+ else:
+ errormsgs = []
+ err = errormsgs.append
err("expected keys != actual keys")
for k in expected_keys:
if k not in actual_keys:
@@ -355,8 +363,7 @@
for k in actual_keys:
if k not in expected_keys:
err("key %s in tree but not expected" % k)
- if errormsgs:
- display(tree)
+
self.fail('\n'.join(errormsgs))
def go(self, stop, commitdict, *threads):
@@ -488,10 +495,9 @@
self.go(stop, cd, t1, t2, t3)
while db1.lastTransaction() != db2.lastTransaction():
- db1._storage.sync()
- db2._storage.sync()
+ time.sleep(.1)
-
+ time.sleep(.1)
cn = db1.open()
tree = cn.root()["tree"]
self._check_tree(cn, tree)
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/forker.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/forker.py 2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/forker.py 2008-08-04 17:47:44 UTC (rev 89350)
@@ -14,6 +14,7 @@
"""Library for forking storage server and connecting client storage"""
import os
+import random
import sys
import time
import errno
@@ -201,3 +202,29 @@
ack = 'no ack received'
logger.debug('shutdown_zeo_server(): acked: %s' % ack)
s.close()
+
+def get_port():
+ """Return a port that is not in use.
+
+ Checks if a port is in use by trying to connect to it. Assumes it
+ is not in use if connect raises an exception. We actually look for
+ 2 consective free ports because most of the clients of this
+ function will use the returned port and the next one.
+
+ Raises RuntimeError after 10 tries.
+ """
+ for i in range(10):
+ port = random.randrange(20000, 30000)
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ try:
+ s.connect(('localhost', port))
+ s1.connect(('localhost', port+1))
+ except socket.error:
+ # Perhaps we should check value of error too.
+ return port
+ finally:
+ s.close()
+ s1.close()
+ raise RuntimeError("Can't find port")
Copied: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/invalidations_while_connecting.test (from rev 89348, ZODB/branches/3.8/src/ZEO/tests/invalidations_while_connecting.test)
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/invalidations_while_connecting.test (rev 0)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/invalidations_while_connecting.test 2008-08-04 17:47:44 UTC (rev 89350)
@@ -0,0 +1,104 @@
+Invalidations while connecting
+==============================
+
+As soon as a client registers with a server, it will recieve
+invalidations from the server. The client must be careful to queue
+these invalidations until it is ready to deal with them. At the time
+of the writing of this test, clients weren't careful enogh about
+queing invalidations. This led to cache corruption in the form of
+both low-level file corruption as well as out-of-date records marked
+as current.
+
+This tests tries to provoke this bug by:
+
+- starting a server
+
+ >>> import ZEO.tests.testZEO, ZEO.tests.forker
+ >>> addr = 'localhost', ZEO.tests.testZEO.get_port()
+ >>> zconf = ZEO.tests.forker.ZEOConfig(addr)
+ >>> sconf = '<filestorage 1>\npath Data.fs\n</filestorage>\n'
+ >>> _, adminaddr, pid, conf_path = ZEO.tests.forker.start_zeo_server(
+ ... sconf, zconf, addr[1])
+
+- opening a client to the server that writes some objects, filling
+ it's cache at the same time,
+
+ >>> import ZEO.ClientStorage, ZODB.tests.MinPO, transaction
+ >>> db = ZODB.DB(ZEO.ClientStorage.ClientStorage(addr, client='x'))
+ >>> conn = db.open()
+ >>> nobs = 1000
+ >>> for i in range(nobs):
+ ... conn.root()[i] = ZODB.tests.MinPO.MinPO(0)
+ >>> transaction.commit()
+
+- disconnecting the first client (closing it with a persistent cache),
+
+ >>> db.close()
+
+- starting a second client that writes objects more or less
+ constantly,
+
+ >>> import random, threading
+ >>> stop = False
+ >>> db2 = ZODB.DB(ZEO.ClientStorage.ClientStorage(addr))
+ >>> tm = transaction.TransactionManager()
+ >>> conn2 = db2.open(transaction_manager=tm)
+ >>> random = random.Random(0)
+ >>> lock = threading.Lock()
+ >>> def run():
+ ... while 1:
+ ... i = random.randint(0, nobs-1)
+ ... if stop:
+ ... return
+ ... lock.acquire()
+ ... try:
+ ... conn2.root()[i].value += 1
+ ... tm.commit()
+ ... finally:
+ ... lock.release()
+ ... time.sleep(0)
+ >>> thread = threading.Thread(target=run)
+ >>> thread.start()
+
+- restarting the first client, and
+- testing for cache validity.
+
+ >>> import zope.testing.loggingsupport, logging
+ >>> handler = zope.testing.loggingsupport.InstalledHandler(
+ ... 'ZEO', level=logging.ERROR)
+
+ >>> import time
+ >>> for c in range(10):
+ ... time.sleep(.1)
+ ... db = ZODB.DB(ZEO.ClientStorage.ClientStorage(addr, client='x'))
+ ... _ = lock.acquire()
+ ... try:
+ ... time.sleep(.1)
+ ... assert (db._storage.lastTransaction()
+ ... == db._storage._server.lastTransaction()), (
+ ... db._storage.lastTransaction(),
+ ... db._storage._server.lastTransactiion())
+ ... conn = db.open()
+ ... for i in range(1000):
+ ... if conn.root()[i].value != conn2.root()[i].value:
+ ... print 'bad', c, i, conn.root()[i].value,
+ ... print conn2.root()[i].value
+ ... finally:
+ ... _ = lock.release()
+ ... db.close()
+
+ >>> stop = True
+ >>> thread.join(10)
+ >>> thread.isAlive()
+ False
+
+ >>> for record in handler.records:
+ ... print record.name, record.levelname
+ ... print handler.format(record)
+
+ >>> handler.uninstall()
+
+ >>> db.close()
+ >>> db2.close()
+ >>> ZEO.tests.forker.shutdown_zeo_server(adminaddr)
+
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testConnection.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testConnection.py 2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testConnection.py 2008-08-04 17:47:44 UTC (rev 89350)
@@ -21,8 +21,8 @@
import unittest
# Import the actual test class
from ZEO.tests import ConnectionTests, InvalidationTests
+from zope.testing import doctest, setupstack
-
class FileStorageConfig:
def getConfig(self, path, create, read_only):
return """\
@@ -135,6 +135,10 @@
for klass in test_classes:
sub = unittest.makeSuite(klass, 'check')
suite.addTest(sub)
+ suite.addTest(doctest.DocFileSuite(
+ 'invalidations_while_connecting.test',
+ setUp=setupstack.setUpDirectory, tearDown=setupstack.tearDown,
+ ))
return suite
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testZEO.py 2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testZEO.py 2008-08-04 17:47:44 UTC (rev 89350)
@@ -18,9 +18,7 @@
import doctest
import logging
import os
-import random
import signal
-import socket
import stat
import tempfile
import threading
@@ -50,6 +48,7 @@
import ZEO.zrpc.connection
from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests
+from ZEO.tests.forker import get_port
import ZEO.tests.ConnectionTests
@@ -128,27 +127,6 @@
finally:
storage2.close()
-def get_port():
- """Return a port that is not in use.
-
- Checks if a port is in use by trying to connect to it. Assumes it
- is not in use if connect raises an exception.
-
- Raises RuntimeError after 10 tries.
- """
- for i in range(10):
- port = random.randrange(20000, 30000)
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- try:
- s.connect(('localhost', port))
- except socket.error:
- # Perhaps we should check value of error too.
- return port
- finally:
- s.close()
- raise RuntimeError("Can't find port")
-
class GenericTests(
# Base class for all ZODB tests
StorageTestBase.StorageTestBase,
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/test_cache.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/test_cache.py 2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/test_cache.py 2008-08-04 17:47:44 UTC (rev 89350)
@@ -35,7 +35,6 @@
# testSerialization reads the entire file into a string, it's not
# good to leave it that big.
self.cache = ZEO.cache.ClientCache(size=1024**2)
- self.cache.open()
def tearDown(self):
if self.cache.path:
@@ -45,7 +44,6 @@
self.assertEqual(self.cache.getLastTid(), None)
self.cache.setLastTid(n2)
self.assertEqual(self.cache.getLastTid(), n2)
- self.cache.invalidate(n1, "", n1)
self.assertEqual(self.cache.getLastTid(), n2)
self.cache.invalidate(n1, "", n3)
self.assertEqual(self.cache.getLastTid(), n3)
@@ -65,8 +63,8 @@
def testInvalidate(self):
data1 = "data for n1"
self.cache.store(n1, "", n3, None, data1)
+ self.cache.invalidate(n2, "", n2)
self.cache.invalidate(n1, "", n4)
- self.cache.invalidate(n2, "", n2)
self.assertEqual(self.cache.load(n1, ""), None)
self.assertEqual(self.cache.loadBefore(n1, n4),
(data1, n3, n4))
@@ -142,7 +140,6 @@
dst.write(src.read(self.cache.maxsize))
dst.close()
copy = ZEO.cache.ClientCache(path)
- copy.open()
# Verify that internals of both objects are the same.
# Could also test that external API produces the same results.
@@ -158,7 +155,6 @@
if self.cache.path:
os.remove(self.cache.path)
cache = ZEO.cache.ClientCache(size=50)
- cache.open()
# We store an object that is a bit larger than the cache can handle.
cache.store(n1, '', n2, None, "x"*64)
@@ -174,7 +170,6 @@
if self.cache.path:
os.remove(self.cache.path)
cache = ZEO.cache.ClientCache(size=50)
- cache.open()
# We store an object that is a bit larger than the cache can handle.
cache.store(n1, '', n2, n3, "x"*64)
@@ -218,7 +213,6 @@
... _ = os.spawnl(os.P_WAIT, sys.executable, sys.executable, 't')
... if os.path.exists('cache'):
... cache = ZEO.cache.ClientCache('cache')
- ... cache.open()
... cache.close()
... os.remove('cache')
... os.remove('cache.lock')
@@ -238,7 +232,6 @@
>>> cache.store(ZODB.utils.p64(1), '', ZODB.utils.p64(1), None, data)
>>> cache.close()
>>> cache = ZEO.cache.ClientCache('cache', 1000)
- >>> cache.open()
>>> cache.store(ZODB.utils.p64(2), '', ZODB.utils.p64(2), None, 'XXX')
>>> cache.close()
@@ -255,6 +248,57 @@
>>> cache.close()
""",
+
+ thread_safe =
+ r"""
+
+ >>> import ZEO.cache, ZODB.utils
+ >>> cache = ZEO.cache.ClientCache('cache', 1000000)
+
+ >>> for i in range(100):
+ ... cache.store(ZODB.utils.p64(i), '', ZODB.utils.p64(1), None, '0')
+
+ >>> import random, sys, threading
+ >>> random = random.Random(0)
+ >>> stop = False
+ >>> read_failure = None
+
+ >>> def read_thread():
+ ... def pick_oid():
+ ... return ZODB.utils.p64(random.randint(0,99))
+ ...
+ ... try:
+ ... while not stop:
+ ... cache.load(pick_oid())
+ ... cache.loadBefore(pick_oid(), ZODB.utils.p64(2))
+ ... cache.modifiedInVersion(pick_oid())
+ ... except:
+ ... global read_failure
+ ... read_failure = sys.exc_info()
+
+ >>> thread = threading.Thread(target=read_thread)
+ >>> thread.start()
+
+ >>> for tid in range(2,10):
+ ... for oid in range(100):
+ ... oid = ZODB.utils.p64(oid)
+ ... cache.invalidate(oid, '', ZODB.utils.p64(tid))
+ ... cache.store(oid, '', ZODB.utils.p64(tid), None, str(tid))
+
+ >>> stop = True
+ >>> thread.join()
+ >>> if read_failure:
+ ... print 'Read failure:'
+ ... import traceback
+ ... traceback.print_exception(*read_failure)
+
+ >>> expected = '9', ZODB.utils.p64(9), ''
+ >>> for oid in range(100):
+ ... loaded = cache.load(ZODB.utils.p64(oid))
+ ... if loaded != expected:
+ ... print oid, loaded
+
+ """,
)
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/client.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/client.py 2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/client.py 2008-08-04 17:47:44 UTC (rev 89350)
@@ -447,8 +447,7 @@
Call the client's testConnection(), giving the client a chance
to do app-level check of the connection.
"""
- self.conn = ManagedClientConnection(self.sock, self.addr,
- self.client, self.mgr)
+ self.conn = ManagedClientConnection(self.sock, self.addr, self.mgr)
self.sock = None # The socket is now owned by the connection
try:
self.preferred = self.client.testConnection(self.conn)
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/connection.py 2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/connection.py 2008-08-04 17:47:44 UTC (rev 89350)
@@ -555,14 +555,23 @@
self.replies_cond.release()
def handle_request(self, msgid, flags, name, args):
- if not self.check_method(name):
- msg = "Invalid method name: %s on %s" % (name, repr(self.obj))
+ obj = self.obj
+
+ if name.startswith('_') or not hasattr(obj, name):
+ if obj is None:
+ if __debug__:
+ self.log("no object calling %s%s"
+ % (name, short_repr(args)),
+ level=logging.DEBUG)
+ return
+
+ msg = "Invalid method name: %s on %s" % (name, repr(obj))
raise ZRPCError(msg)
if __debug__:
self.log("calling %s%s" % (name, short_repr(args)),
level=logging.DEBUG)
- meth = getattr(self.obj, name)
+ meth = getattr(obj, name)
try:
self.waiting_for_reply = True
try:
@@ -601,12 +610,6 @@
level=logging.ERROR, exc_info=True)
self.close()
- def check_method(self, name):
- # TODO: This is hardly "secure".
- if name.startswith('_'):
- return None
- return hasattr(self.obj, name)
-
def send_reply(self, msgid, ret):
# encode() can pass on a wide variety of exceptions from cPickle.
# While a bare `except` is generally poor practice, in this case
@@ -897,7 +900,7 @@
__super_close = Connection.close
base_message_output = Connection.message_output
- def __init__(self, sock, addr, obj, mgr):
+ def __init__(self, sock, addr, mgr):
self.mgr = mgr
# We can't use the base smac's message_output directly because the
@@ -914,7 +917,7 @@
self.queue_output = True
self.queued_messages = []
- self.__super_init(sock, addr, obj, tag='C', map=client_map)
+ self.__super_init(sock, addr, None, tag='C', map=client_map)
self.thr_async = True
self.trigger = client_trigger
client_trigger.pull_trigger()
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/Connection.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/Connection.py 2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/Connection.py 2008-08-04 17:47:44 UTC (rev 89350)
@@ -594,7 +594,14 @@
oid = obj._p_oid
serial = getattr(obj, "_p_serial", z64)
- if serial == z64:
+ if ((serial == z64)
+ and
+ ((self._savepoint_storage is None)
+ or (oid not in self._savepoint_storage.creating)
+ or self._savepoint_storage.creating[oid]
+ )
+ ):
+
# obj is a new object
# Because obj was added, it is now in _creating, so it
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/blob.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/blob.py 2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/blob.py 2008-08-04 17:47:44 UTC (rev 89350)
@@ -530,6 +530,10 @@
LAYOUTS['lawn'] = LawnLayout()
+class BlobStorageError(Exception):
+ """The blob storage encountered an invalid state."""
+
+
class BlobStorage(SpecificationDecoratorBase):
"""A storage to support blobs."""
@@ -537,7 +541,8 @@
# Proxies can't have a __dict__ so specifying __slots__ here allows
# us to have instance attributes explicitly on the proxy.
- __slots__ = ('fshelper', 'dirty_oids', '_BlobStorage__supportsUndo')
+ __slots__ = ('fshelper', 'dirty_oids', '_BlobStorage__supportsUndo',
+ '_blobs_pack_is_in_progress', )
def __new__(self, base_directory, storage, layout='automatic'):
return SpecificationDecoratorBase.__new__(self, storage)
@@ -556,6 +561,7 @@
else:
supportsUndo = supportsUndo()
self.__supportsUndo = supportsUndo
+ self._blobs_pack_is_in_progress = False
@non_overridable
def temporaryDirectory(self):
@@ -661,21 +667,29 @@
@non_overridable
def pack(self, packtime, referencesf):
- """Remove all unused oid/tid combinations."""
- unproxied = getProxiedObject(self)
+ """Remove all unused OID/TID combinations."""
+ self._lock_acquire()
+ try:
+ if self._blobs_pack_is_in_progress:
+ raise BlobStorageError('Already packing')
+ self._blobs_pack_is_in_progress = True
+ finally:
+ self._lock_release()
- # pack the underlying storage, which will allow us to determine
- # which serials are current.
- result = unproxied.pack(packtime, referencesf)
+ try:
+ # Pack the underlying storage, which will allow us to determine
+ # which serials are current.
+ unproxied = getProxiedObject(self)
+ result = unproxied.pack(packtime, referencesf)
- # perform a pack on blob data
- self._lock_acquire()
- try:
+ # Perform a pack on the blob data.
if self.__supportsUndo:
self._packUndoing(packtime, referencesf)
else:
self._packNonUndoing(packtime, referencesf)
finally:
+ self._lock_acquire()
+ self._blobs_pack_is_in_progress = False
self._lock_release()
return result
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_layout.txt
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_layout.txt 2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_layout.txt 2008-08-04 17:47:44 UTC (rev 89350)
@@ -207,9 +207,9 @@
>>> bushy = os.path.join(d, 'bushy')
>>> migrate(old, bushy, 'bushy') # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
Migrating blob data from `/.../old` (lawn) to `/.../bushy` (bushy)
+ OID: 0x1b7a - 2 files
+ OID: 0x0a - 2 files
OID: 0x1b7f - 2 files
- OID: 0x0a - 2 files
- OID: 0x1b7a - 2 files
The new directory now contains the same files in different directories, but
with the same sizes and permissions:
@@ -226,6 +226,26 @@
>>> ls(bushy)
040700 4096 /.../bushy
0100644 5 /.../bushy/.layout
+ 040700 4096 /.../bushy/0x7f
+ 040700 4096 /.../bushy/0x7f/0x1b
+ 040700 4096 /.../bushy/0x7f/0x1b/0x00
+ 040700 4096 /.../bushy/0x7f/0x1b/0x00/0x00
+ 040700 4096 /.../bushy/0x7f/0x1b/0x00/0x00/0x00
+ 040700 4096 /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00
+ 040700 4096 /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00/0x00
+ 040700 4096 /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00/0x00/0x00
+ 0100644 3 /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00/0x00/0x00/foo2
+ 0100644 3 /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00/0x00/0x00/foo
+ 040700 4096 /.../bushy/0x0a
+ 040700 4096 /.../bushy/0x0a/0x00
+ 040700 4096 /.../bushy/0x0a/0x00/0x00
+ 040700 4096 /.../bushy/0x0a/0x00/0x00/0x00
+ 040700 4096 /.../bushy/0x0a/0x00/0x00/0x00/0x00
+ 040700 4096 /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00
+ 040700 4096 /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00/0x00
+ 040700 4096 /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00/0x00/0x00
+ 0100644 3 /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00/0x00/0x00/foo4
+ 0100644 3 /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00/0x00/0x00/foo3
040700 4096 /.../bushy/0x7a
040700 4096 /.../bushy/0x7a/0x1b
040700 4096 /.../bushy/0x7a/0x1b/0x00
@@ -234,50 +254,30 @@
040700 4096 /.../bushy/0x7a/0x1b/0x00/0x00/0x00/0x00
040700 4096 /.../bushy/0x7a/0x1b/0x00/0x00/0x00/0x00/0x00
040700 4096 /.../bushy/0x7a/0x1b/0x00/0x00/0x00/0x00/0x00/0x00
+ 0100644 4 /.../bushy/0x7a/0x1b/0x00/0x00/0x00/0x00/0x00/0x00/foo5
0100644 5 /.../bushy/0x7a/0x1b/0x00/0x00/0x00/0x00/0x00/0x00/foo6
- 0100644 4 /.../bushy/0x7a/0x1b/0x00/0x00/0x00/0x00/0x00/0x00/foo5
040700 4096 /.../bushy/tmp
- 040700 4096 /.../bushy/0x0a
- 040700 4096 /.../bushy/0x0a/0x00
- 040700 4096 /.../bushy/0x0a/0x00/0x00
- 040700 4096 /.../bushy/0x0a/0x00/0x00/0x00
- 040700 4096 /.../bushy/0x0a/0x00/0x00/0x00/0x00
- 040700 4096 /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00
- 040700 4096 /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00/0x00
- 040700 4096 /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00/0x00/0x00
- 0100644 3 /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00/0x00/0x00/foo4
- 0100644 3 /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00/0x00/0x00/foo3
- 040700 4096 /.../bushy/0x7f
- 040700 4096 /.../bushy/0x7f/0x1b
- 040700 4096 /.../bushy/0x7f/0x1b/0x00
- 040700 4096 /.../bushy/0x7f/0x1b/0x00/0x00
- 040700 4096 /.../bushy/0x7f/0x1b/0x00/0x00/0x00
- 040700 4096 /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00
- 040700 4096 /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00/0x00
- 040700 4096 /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00/0x00/0x00
- 0100644 3 /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00/0x00/0x00/foo
- 0100644 3 /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00/0x00/0x00/foo2
We can also migrate the bushy layout back to the lawn layout:
>>> lawn = os.path.join(d, 'lawn')
>>> migrate(bushy, lawn, 'lawn')
Migrating blob data from `/.../bushy` (bushy) to `/.../lawn` (lawn)
+ OID: 0x1b7f - 2 files
+ OID: 0x0a - 2 files
OID: 0x1b7a - 2 files
- OID: 0x0a - 2 files
- OID: 0x1b7f - 2 files
>>> ls(lawn)
- 040700 4096 /.../lawn
- 0100644 4 /.../lawn/.layout
- 040700 4096 /.../lawn/0x1b7f
- 0100644 3 /.../lawn/0x1b7f/foo
- 0100644 3 /.../lawn/0x1b7f/foo2
- 040700 4096 /.../lawn/tmp
- 040700 4096 /.../lawn/0x0a
- 0100644 3 /.../lawn/0x0a/foo4
- 0100644 3 /.../lawn/0x0a/foo3
- 040700 4096 /.../lawn/0x1b7a
- 0100644 5 /.../lawn/0x1b7a/foo6
- 0100644 4 /.../lawn/0x1b7a/foo5
+ 040700 4096 /.../lawn
+ 0100644 4 /.../lawn/.layout
+ 040700 4096 /.../lawn/0x1b7a
+ 0100644 4 /.../lawn/0x1b7a/foo5
+ 0100644 5 /.../lawn/0x1b7a/foo6
+ 040700 4096 /.../lawn/0x0a
+ 0100644 3 /.../lawn/0x0a/foo4
+ 0100644 3 /.../lawn/0x0a/foo3
+ 040700 4096 /.../lawn/0x1b7f
+ 0100644 3 /.../lawn/0x1b7f/foo2
+ 0100644 3 /.../lawn/0x1b7f/foo
+ 040700 4096 /.../lawn/tmp
>>> shutil.rmtree(d)
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_packing.txt
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_packing.txt 2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_packing.txt 2008-08-04 17:47:44 UTC (rev 89350)
@@ -240,6 +240,37 @@
>>> os.path.exists(os.path.split(fns[0])[0])
False
+Avoiding parallel packs
+=======================
+
+Blob packing (similar to FileStorage) can only be run once at a time. For
+this, a flag (_blobs_pack_is_in_progress) is set. If the pack method is called
+while this flag is set, it will refuse to perform another pack, until the flag
+is reset:
+
+ >>> blob_storage._blobs_pack_is_in_progress
+ False
+ >>> blob_storage._blobs_pack_is_in_progress = True
+ >>> blob_storage.pack(packtime, referencesf)
+ Traceback (most recent call last):
+ BlobStorageError: Already packing
+ >>> blob_storage._blobs_pack_is_in_progress = False
+ >>> blob_storage.pack(packtime, referencesf)
+
+We can also see, that the flag is set during the pack, by leveraging the
+knowledge that the underlying storage's pack method is also called:
+
+ >>> def dummy_pack(time, ref):
+ ... print "_blobs_pack_is_in_progress =", blob_storage._blobs_pack_is_in_progress
+ ... return base_pack(time, ref)
+ >>> base_pack = base_storage.pack
+ >>> base_storage.pack = dummy_pack
+ >>> blob_storage.pack(packtime, referencesf)
+ _blobs_pack_is_in_progress = True
+ >>> blob_storage._blobs_pack_is_in_progress
+ False
+ >>> base_storage.pack = base_pack
+
Clean up our blob directory:
>>> shutil.rmtree(blob_dir)
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/testcrossdatabasereferences.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/testcrossdatabasereferences.py 2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/testcrossdatabasereferences.py 2008-08-04 17:47:44 UTC (rev 89350)
@@ -146,6 +146,32 @@
"""
+def test_explicit_adding_with_savepoint2():
+ """
+
+ >>> import ZODB.tests.util, transaction, persistent
+ >>> databases = {}
+ >>> db1 = ZODB.tests.util.DB(databases=databases, database_name='1')
+ >>> db2 = ZODB.tests.util.DB(databases=databases, database_name='2')
+ >>> tm = transaction.TransactionManager()
+ >>> conn1 = db1.open(transaction_manager=tm)
+ >>> conn2 = conn1.get_connection('2')
+ >>> z = MyClass()
+
+ >>> conn1.root()['z'] = z
+ >>> conn1.add(z)
+ >>> s = tm.savepoint()
+ >>> conn2.root()['z'] = z
+ >>> z.x = 1
+ >>> tm.commit()
+ >>> z._p_jar.db().database_name
+ '1'
+
+ >>> db1.close()
+ >>> db2.close()
+
+"""
+
def tearDownDbs(test):
test.globs['db1'].close()
test.globs['db2'].close()
More information about the Zodb-checkins
mailing list