[Zodb-checkins] SVN: ZODB/branches/ctheune-bushy-directory-3.8/ Merge updates from 3.8 branch

Christian Theune ct at gocept.com
Mon Aug 4 13:47:46 EDT 2008


Log message for revision 89350:
  Merge updates from 3.8 branch 
  

Changed:
  U   ZODB/branches/ctheune-bushy-directory-3.8/NEWS.txt
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/ClientStorage.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/cache.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/ConnectionTests.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/InvalidationTests.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/forker.py
  A   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/invalidations_while_connecting.test
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testConnection.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testZEO.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/test_cache.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/client.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/connection.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/Connection.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/blob.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_layout.txt
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_packing.txt
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/testcrossdatabasereferences.py

-=-
Modified: ZODB/branches/ctheune-bushy-directory-3.8/NEWS.txt
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/NEWS.txt	2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/NEWS.txt	2008-08-04 17:47:44 UTC (rev 89350)
@@ -4,6 +4,15 @@
 
 Bugs Fixed:
 
+- (???) Fixed bug #251037: Made packing of blob storages non-blocking.
+
+- (beta 6) Fixed a bug that could cause InvalidObjectReference errors
+  for objects that were explicitly added to a database if the object
+  was modified after a savepoint that added the object.
+
+- (beta 5) Fixed several bugs that caused ZEO cache corruption when connecting
+  to servers. These bugs affected both persistent and non-persistent caches. 
+
 - (beta 5) Improved the the ZEO client shutdown support to try to
   avoid spurious errors on exit, especially for scripts, such as zeopack.
 

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/ClientStorage.py	2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/ClientStorage.py	2008-08-04 17:47:44 UTC (rev 89350)
@@ -251,8 +251,6 @@
 
         # _is_read_only stores the constructor argument
         self._is_read_only = read_only
-        # _conn_is_read_only stores the status of the current connection
-        self._conn_is_read_only = 0
         self._storage = storage
         self._read_only_fallback = read_only_fallback
         self._username = username
@@ -340,8 +338,6 @@
         else:
             cache_path = None
         self._cache = self.ClientCacheClass(cache_path, size=cache_size)
-        # TODO:  maybe there's a better time to open the cache?  Unclear.
-        self._cache.open()
 
         self._rpc_mgr = self.ConnectionManagerClass(addr, self,
                                                     tmin=min_disconnect_poll,
@@ -382,13 +378,18 @@
 
     def close(self):
         """Storage API: finalize the storage, releasing external resources."""
+        if self._rpc_mgr is not None:
+            self._rpc_mgr.close()
+            self._rpc_mgr = None
+        if self._connection is not None:
+            self._connection.register_object(None) # Don't call me!
+            self._connection.close()
+            self._connection = None
+
         self._tbuf.close()
         if self._cache is not None:
             self._cache.close()
             self._cache = None
-        if self._rpc_mgr is not None:
-            self._rpc_mgr.close()
-            self._rpc_mgr = None
 
     def registerDB(self, db):
         """Storage API: register a database for invalidation messages.
@@ -454,7 +455,7 @@
         """
         log2("Testing connection %r" % conn)
         # TODO:  Should we check the protocol version here?
-        self._conn_is_read_only = 0
+        conn._is_read_only = self._is_read_only
         stub = self.StorageServerStubClass(conn)
 
         auth = stub.getAuthProtocol()
@@ -476,7 +477,7 @@
                 raise
             log2("Got ReadOnlyError; trying again with read_only=1")
             stub.register(str(self._storage), read_only=1)
-            self._conn_is_read_only = 1
+            conn._is_read_only = True
             return 0
 
     def notifyConnected(self, conn):
@@ -490,24 +491,26 @@
             # this method before it was stopped.
             return
 
-        # invalidate our db cache
-        if self._db is not None:
-            self._db.invalidateCache()
 
-        # TODO:  report whether we get a read-only connection.
         if self._connection is not None:
+            # If we are upgrading from a read-only fallback connection,
+            # we must close the old connection to prevent it from being
+            # used while the cache is verified against the new connection.
+            self._connection.register_object(None) # Don't call me!
+            self._connection.close()
+            self._connection = None
+            self._ready.clear()
             reconnect = 1
         else:
             reconnect = 0
+
         self.set_server_addr(conn.get_addr())
-
-        # If we are upgrading from a read-only fallback connection,
-        # we must close the old connection to prevent it from being
-        # used while the cache is verified against the new connection.
-        if self._connection is not None:
-            self._connection.close()
         self._connection = conn
 
+        # invalidate our db cache
+        if self._db is not None:
+            self._db.invalidateCache()
+
         if reconnect:
             log2("Reconnected to storage: %s" % self._server_addr)
         else:
@@ -561,54 +564,6 @@
         else:
             return '%s:%s' % (self._storage, self._server_addr)
 
-    def verify_cache(self, server):
-        """Internal routine called to verify the cache.
-
-        The return value (indicating which path we took) is used by
-        the test suite.
-        """
-
-        # If verify_cache() finishes the cache verification process,
-        # it should set self._server.  If it goes through full cache
-        # verification, then endVerify() should self._server.
-
-        last_inval_tid = self._cache.getLastTid()
-        if last_inval_tid is not None:
-            ltid = server.lastTransaction()
-            if ltid == last_inval_tid:
-                log2("No verification necessary (last_inval_tid up-to-date)")
-                self._server = server
-                self._ready.set()
-                return "no verification"
-
-            # log some hints about last transaction
-            log2("last inval tid: %r %s\n"
-                 % (last_inval_tid, tid2time(last_inval_tid)))
-            log2("last transaction: %r %s" %
-                 (ltid, ltid and tid2time(ltid)))
-
-            pair = server.getInvalidations(last_inval_tid)
-            if pair is not None:
-                log2("Recovering %d invalidations" % len(pair[1]))
-                self.invalidateTransaction(*pair)
-                self._server = server
-                self._ready.set()
-                return "quick verification"
-
-        log2("Verifying cache")
-        # setup tempfile to hold zeoVerify results
-        self._tfile = tempfile.TemporaryFile(suffix=".inv")
-        self._pickler = cPickle.Pickler(self._tfile, 1)
-        self._pickler.fast = 1 # Don't use the memo
-
-        # TODO:  should batch these operations for efficiency; would need
-        # to acquire lock ...
-        for oid, tid, version in self._cache.contents():
-            server.verify(oid, version, tid)
-        self._pending_server = server
-        server.endZeoVerify()
-        return "full verification"
-
     ### Is there a race condition between notifyConnected and
     ### notifyDisconnected? In Particular, what if we get
     ### notifyDisconnected in the middle of notifyConnected?
@@ -674,12 +629,16 @@
     def isReadOnly(self):
         """Storage API: return whether we are in read-only mode."""
         if self._is_read_only:
-            return 1
+            return True
         else:
             # If the client is configured for a read-write connection
-            # but has a read-only fallback connection, _conn_is_read_only
-            # will be True.
-            return self._conn_is_read_only
+            # but has a read-only fallback connection, conn._is_read_only
+            # will be True.  If self._connection is None, we'll behave as
+            # read_only
+            try:
+                return self._connection._is_read_only
+            except AttributeError:
+                return True
 
     def _check_trans(self, trans):
         """Internal helper to check a transaction argument for sanity."""
@@ -1152,7 +1111,7 @@
             return
 
         for oid, version, data in self._tbuf:
-            self._cache.invalidate(oid, version, tid)
+            self._cache.invalidate(oid, version, tid, False)
             # If data is None, we just invalidate.
             if data is not None:
                 s = self._seriald[oid]
@@ -1210,8 +1169,6 @@
         """Storage API: return a sequence of versions in the storage."""
         return self._server.versions(max)
 
-    # Below are methods invoked by the StorageServer
-
     def serialnos(self, args):
         """Server callback to pass a list of changed (oid, serial) pairs."""
         self._serials.extend(args)
@@ -1220,6 +1177,57 @@
         """Server callback to update the info dictionary."""
         self._info.update(dict)
 
+    def verify_cache(self, server):
+        """Internal routine called to verify the cache.
+
+        The return value (indicating which path we took) is used by
+        the test suite.
+        """
+
+        self._pending_server = server
+
+        # setup tempfile to hold zeoVerify results and interim
+        # invalidation results
+        self._tfile = tempfile.TemporaryFile(suffix=".inv")
+        self._pickler = cPickle.Pickler(self._tfile, 1)
+        self._pickler.fast = 1 # Don't use the memo
+
+        # allow incoming invalidations:
+        self._connection.register_object(self)
+
+        # If verify_cache() finishes the cache verification process,
+        # it should set self._server.  If it goes through full cache
+        # verification, then endVerify() should self._server.
+
+        last_inval_tid = self._cache.getLastTid()
+        if last_inval_tid is not None:
+            ltid = server.lastTransaction()
+            if ltid == last_inval_tid:
+                log2("No verification necessary (last_inval_tid up-to-date)")
+                self.finish_verification()
+                return "no verification"
+
+            # log some hints about last transaction
+            log2("last inval tid: %r %s\n"
+                 % (last_inval_tid, tid2time(last_inval_tid)))
+            log2("last transaction: %r %s" %
+                 (ltid, ltid and tid2time(ltid)))
+
+            pair = server.getInvalidations(last_inval_tid)
+            if pair is not None:
+                log2("Recovering %d invalidations" % len(pair[1]))
+                self.finish_verification(pair)
+                return "quick verification"
+
+        log2("Verifying cache")
+
+        # TODO:  should batch these operations for efficiency; would need
+        # to acquire lock ...
+        for oid, tid, version in self._cache.contents():
+            server.verify(oid, version, tid)
+        server.endZeoVerify()
+        return "full verification"
+
     def invalidateVerify(self, args):
         """Server callback to invalidate an (oid, version) pair.
 
@@ -1231,68 +1239,93 @@
             # This should never happen.  TODO:  assert it doesn't, or log
             # if it does.
             return
-        self._pickler.dump(args)
+        oid, version = args
+        self._pickler.dump((oid, version, None))
 
-    def _process_invalidations(self, invs):
-        # Invalidations are sent by the ZEO server as a sequence of
-        # oid, version pairs.  The DB's invalidate() method expects a
-        # dictionary of oids.
+    def endVerify(self):
+        """Server callback to signal end of cache validation."""
 
+        log2("endVerify finishing")
+        self.finish_verification()
+        log2("endVerify finished")
+
+    def finish_verification(self, catch_up=None):
         self._lock.acquire()
         try:
-            # versions maps version names to dictionary of invalidations
-            versions = {}
-            for oid, version, tid in invs:
-                if oid == self._load_oid:
-                    self._load_status = 0
-                self._cache.invalidate(oid, version, tid)
-                oids = versions.get((version, tid))
-                if not oids:
-                    versions[(version, tid)] = [oid]
-                else:
-                    oids.append(oid)
+            if catch_up:
+                # process catch-up invalidations
+                tid, invalidations = catch_up
+                self._process_invalidations(
+                    (oid, version, tid)
+                    for oid, version in invalidations
+                    )
+            
+            if self._pickler is None:
+                return
+            # write end-of-data marker
+            self._pickler.dump((None, None, None))
+            self._pickler = None
+            self._tfile.seek(0)
+            unpickler = cPickle.Unpickler(self._tfile)
+            min_tid = self._cache.getLastTid()
+            def InvalidationLogIterator():
+                while 1:
+                    oid, version, tid = unpickler.load()
+                    if oid is None:
+                        break
+                    if ((tid is None)
+                        or (min_tid is None)
+                        or (tid > min_tid)
+                        ):
+                        yield oid, version, tid
 
-            if self._db is not None:
-                for (version, tid), d in versions.items():
-                    self._db.invalidate(tid, d, version=version)
+            self._process_invalidations(InvalidationLogIterator())
+            self._tfile.close()
+            self._tfile = None
         finally:
             self._lock.release()
 
-    def endVerify(self):
-        """Server callback to signal end of cache validation."""
-        if self._pickler is None:
-            return
-        # write end-of-data marker
-        self._pickler.dump((None, None))
-        self._pickler = None
-        self._tfile.seek(0)
-        f = self._tfile
-        self._tfile = None
-        self._process_invalidations(InvalidationLogIterator(f))
-        f.close()
-
-        log2("endVerify finishing")
         self._server = self._pending_server
         self._ready.set()
-        self._pending_conn = None
-        log2("endVerify finished")
+        self._pending_server = None
 
+
     def invalidateTransaction(self, tid, args):
-        """Invalidate objects modified by tid."""
+        """Server callback: Invalidate objects modified by tid."""
         self._lock.acquire()
         try:
-            self._cache.setLastTid(tid)
+            if self._pickler is not None:
+                log2("Transactional invalidation during cache verification",
+                     level=BLATHER)
+                for oid, version in args:
+                    self._pickler.dump((oid, version, tid))
+                return
+            self._process_invalidations([(oid, version, tid)
+                                         for oid, version in args])
         finally:
             self._lock.release()
-        if self._pickler is not None:
-            log2("Transactional invalidation during cache verification",
-                 level=BLATHER)
-            for t in args:
-                self._pickler.dump(t)
-            return
-        self._process_invalidations([(oid, version, tid)
-                                     for oid, version in args])
 
+    def _process_invalidations(self, invs):
+        # Invalidations are sent by the ZEO server as a sequence of
+        # oid, version, tid triples.  The DB's invalidate() method expects a
+        # dictionary of oids.
+
+        # versions maps version names to dictionary of invalidations
+        versions = {}
+        for oid, version, tid in invs:
+            if oid == self._load_oid:
+                self._load_status = 0
+            self._cache.invalidate(oid, version, tid)
+            oids = versions.get((version, tid))
+            if not oids:
+                versions[(version, tid)] = [oid]
+            else:
+                oids.append(oid)
+
+        if self._db is not None:
+            for (version, tid), d in versions.items():
+                self._db.invalidate(tid, d, version=version)
+
     # The following are for compatibility with protocol version 2.0.0
 
     def invalidateTrans(self, args):
@@ -1301,11 +1334,3 @@
     invalidate = invalidateVerify
     end = endVerify
     Invalidate = invalidateTrans
-
-def InvalidationLogIterator(fileobj):
-    unpickler = cPickle.Unpickler(fileobj)
-    while 1:
-        oid, version = unpickler.load()
-        if oid is None:
-            break
-        yield oid, version, None

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/cache.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/cache.py	2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/cache.py	2008-08-04 17:47:44 UTC (rev 89350)
@@ -30,6 +30,7 @@
 import logging
 import os
 import tempfile
+import threading
 import time
 
 import ZODB.fsIndex
@@ -121,7 +122,22 @@
 # to the end of the file that the new object can't fit in one
 # contiguous chunk, currentofs is reset to ZEC3_HEADER_SIZE first.
 
+class locked(object):
 
+    def __init__(self, func):
+        self.func = func
+
+    def __get__(self, inst, class_):
+        if inst is None:
+            return self
+        def call(*args, **kw):
+            inst._lock.acquire()
+            try:
+                return self.func(inst, *args, **kw)
+            finally:
+                inst._lock.release()
+        return call
+
 class ClientCache(object):
     """A simple in-memory cache."""
 
@@ -200,6 +216,10 @@
 
         self._setup_trace(path)
 
+        self.open()
+
+        self._lock = threading.RLock()
+
     # Backward compatibility. Client code used to have to use the fc
     # attr to get to the file cache to get cache stats.
     @property
@@ -353,6 +373,7 @@
     # instance, and also written out near the start of the cache file.  The
     # new tid must be strictly greater than our current idea of the most
     # recent tid.
+    @locked
     def setLastTid(self, tid):
         if self.tid is not None and tid <= self.tid:
             raise ValueError("new last tid (%s) must be greater than "
@@ -369,10 +390,11 @@
     # @return a transaction id
     # @defreturn string, or None if no transaction is yet known
     def getLastTid(self):
-        if self.tid == z64:
+        tid = self.tid
+        if tid == z64:
             return None
         else:
-            return self.tid
+            return tid
 
     ##
     # Return the current data record for oid and version.
@@ -382,6 +404,7 @@
     #         in the cache
     # @defreturn 3-tuple: (string, string, string)
 
+    @locked
     def load(self, oid, version=""):
         ofs = self.current.get(oid)
         if ofs is None:
@@ -414,6 +437,7 @@
     # @return data record, serial number, start tid, and end tid
     # @defreturn 4-tuple: (string, string, string, string)
 
+    @locked
     def loadBefore(self, oid, before_tid):
         noncurrent_for_oid = self.noncurrent.get(u64(oid))
         if noncurrent_for_oid is None:
@@ -455,6 +479,7 @@
     # @defreturn string or None
 
     # XXX This approac is wrong, but who cares
+    @locked
     def modifiedInVersion(self, oid):
         ofs = self.current.get(oid)
         if ofs is None:
@@ -482,6 +507,7 @@
     # @param data the actual data
     # @exception ValueError tried to store non-current version data
 
+    @locked
     def store(self, oid, version, start_tid, end_tid, data):
         # It's hard for the client to avoid storing the same object
         # more than once.  One case is when the client requests
@@ -581,14 +607,30 @@
     # data for `oid`, stop believing we have current data, and mark the
     # data we had as being valid only up to `tid`.  In all other cases, do
     # nothing.
-    # @param oid object id
-    # @param version name of version to invalidate.
-    # @param tid the id of the transaction that wrote a new revision of oid,
+    #
+    # Paramters:
+    #
+    # - oid object id
+    # - version name of version to invalidate.
+    # - tid the id of the transaction that wrote a new revision of oid,
     #        or None to forget all cached info about oid (version, current
     #        revision, and non-current revisions)
-    def invalidate(self, oid, version, tid):
-        if tid > self.tid and tid is not None:
-            self.setLastTid(tid)
+    # - server_invalidation, a flag indicating whether the
+    #       invalidation has come from the server. It's possible, due
+    #       to threading issues, that when applying a local
+    #       invalidation after a store, that later invalidations from
+    #       the server may already have arrived.
+    
+    @locked
+    def invalidate(self, oid, version, tid, server_invalidation=True):
+        if tid is not None:
+            if tid > self.tid:
+                self.setLastTid(tid)
+            elif tid < self.tid:
+                if server_invalidation:
+                    raise ValueError("invalidation tid (%s) must not be less"
+                                     " than previous one (%s)" %
+                                     (u64(tid), u64(self.tid)))
 
         ofs = self.current.get(oid)
         if ofs is None:
@@ -630,17 +672,25 @@
         seek = self.f.seek
         read = self.f.read
         for oid, ofs in self.current.iteritems():
-            seek(ofs)
-            assert read(1) == 'a', (ofs, self.f.tell(), oid)
-            size, saved_oid, tid, end_tid, lver = unpack(">I8s8s8sh", read(30))
-            assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid)
-            assert end_tid == z64, (ofs, self.f.tell(), oid)
-            if lver:
-                version = read(lver)
-            else:
-                version = ''
-            yield oid, tid, version
 
+            self._lock.acquire()
+            try:
+                seek(ofs)
+                assert read(1) == 'a', (ofs, self.f.tell(), oid)
+                size, saved_oid, tid, end_tid, lver = unpack(
+                    ">I8s8s8sh", read(30))
+                assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid)
+                assert end_tid == z64, (ofs, self.f.tell(), oid)
+                if lver:
+                    version = read(lver)
+                else:
+                    version = ''
+                result = oid, tid, version
+            finally:
+                self._lock.release()
+
+            yield result
+
     def dump(self):
         from ZODB.utils import oid_repr
         print "cache size", len(self)

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/ConnectionTests.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/ConnectionTests.py	2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/ConnectionTests.py	2008-08-04 17:47:44 UTC (rev 89350)
@@ -158,8 +158,7 @@
         self.addr.append(self._getAddr())
 
     def _getAddr(self):
-        # port+1 is also used, so only draw even port numbers
-        return 'localhost', random.randrange(25000, 30000, 2)
+        return 'localhost', forker.get_port()
 
     def getConfig(self, path, create, read_only):
         raise NotImplementedError

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/InvalidationTests.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/InvalidationTests.py	2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/InvalidationTests.py	2008-08-04 17:47:44 UTC (rev 89350)
@@ -144,24 +144,25 @@
         self.commitdict = commitdict
 
     def _testrun(self):
-        cn = self.db.open()
+        tm = transaction.TransactionManager()
+        cn = self.db.open(transaction_manager=tm)
         while not self.stop.isSet():
             try:
                 tree = cn.root()["tree"]
                 break
             except (ConflictError, KeyError):
-                transaction.abort()
+                tm.abort()
         key = self.startnum
         while not self.stop.isSet():
             try:
                 tree[key] = self.threadnum
-                transaction.get().note("add key %s" % key)
-                transaction.commit()
+                tm.get().note("add key %s" % key)
+                tm.commit()
                 self.commitdict[self] = 1
                 if self.sleep:
                     time.sleep(self.sleep)
             except (ReadConflictError, ConflictError), msg:
-                transaction.abort()
+                tm.abort()
             else:
                 self.added_keys.append(key)
             key += self.step
@@ -338,16 +339,23 @@
     def _check_threads(self, tree, *threads):
         # Make sure the thread's view of the world is consistent with
         # the actual database state.
+
         expected_keys = []
-        errormsgs = []
-        err = errormsgs.append
         for t in threads:
             if not t.added_keys:
                 err("thread %d didn't add any keys" % t.threadnum)
             expected_keys.extend(t.added_keys)
         expected_keys.sort()
-        actual_keys = list(tree.keys())
-        if expected_keys != actual_keys:
+
+        for i in range(100):
+            tree._p_jar.sync()
+            actual_keys = list(tree.keys())
+            if expected_keys == actual_keys:
+                break
+            time.sleep(.1)
+        else:
+            errormsgs = []
+            err = errormsgs.append
             err("expected keys != actual keys")
             for k in expected_keys:
                 if k not in actual_keys:
@@ -355,8 +363,7 @@
             for k in actual_keys:
                 if k not in expected_keys:
                     err("key %s in tree but not expected" % k)
-        if errormsgs:
-            display(tree)
+
             self.fail('\n'.join(errormsgs))
 
     def go(self, stop, commitdict, *threads):
@@ -488,10 +495,9 @@
         self.go(stop, cd, t1, t2, t3)
 
         while db1.lastTransaction() != db2.lastTransaction():
-            db1._storage.sync()
-            db2._storage.sync()
+            time.sleep(.1)
 
-
+        time.sleep(.1)
         cn = db1.open()
         tree = cn.root()["tree"]
         self._check_tree(cn, tree)

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/forker.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/forker.py	2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/forker.py	2008-08-04 17:47:44 UTC (rev 89350)
@@ -14,6 +14,7 @@
 """Library for forking storage server and connecting client storage"""
 
 import os
+import random
 import sys
 import time
 import errno
@@ -201,3 +202,29 @@
             ack = 'no ack received'
         logger.debug('shutdown_zeo_server(): acked: %s' % ack)
         s.close()
+
+def get_port():
+    """Return a port that is not in use.
+
+    Checks if a port is in use by trying to connect to it.  Assumes it
+    is not in use if connect raises an exception. We actually look for
+    2 consective free ports because most of the clients of this
+    function will use the returned port and the next one.
+
+    Raises RuntimeError after 10 tries.
+    """
+    for i in range(10):
+        port = random.randrange(20000, 30000)
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        try:
+            try:
+                s.connect(('localhost', port))
+                s1.connect(('localhost', port+1))
+            except socket.error:
+                # Perhaps we should check value of error too.
+                return port
+        finally:
+            s.close()
+            s1.close()
+    raise RuntimeError("Can't find port")

Copied: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/invalidations_while_connecting.test (from rev 89348, ZODB/branches/3.8/src/ZEO/tests/invalidations_while_connecting.test)
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/invalidations_while_connecting.test	                        (rev 0)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/invalidations_while_connecting.test	2008-08-04 17:47:44 UTC (rev 89350)
@@ -0,0 +1,104 @@
+Invalidations while connecting
+==============================
+
+As soon as a client registers with a server, it will recieve
+invalidations from the server.  The client must be careful to queue
+these invalidations until it is ready to deal with them.  At the time
+of the writing of this test, clients weren't careful enogh about
+queing invalidations.  This led to cache corruption in the form of
+both low-level file corruption as well as out-of-date records marked
+as current.
+
+This tests tries to provoke this bug by:
+
+- starting a server
+
+    >>> import ZEO.tests.testZEO, ZEO.tests.forker
+    >>> addr = 'localhost', ZEO.tests.testZEO.get_port()
+    >>> zconf = ZEO.tests.forker.ZEOConfig(addr)
+    >>> sconf = '<filestorage 1>\npath Data.fs\n</filestorage>\n'
+    >>> _, adminaddr, pid, conf_path = ZEO.tests.forker.start_zeo_server(
+    ...     sconf, zconf, addr[1])
+
+- opening a client to the server that writes some objects, filling
+  it's cache at the same time,
+
+    >>> import ZEO.ClientStorage, ZODB.tests.MinPO, transaction
+    >>> db = ZODB.DB(ZEO.ClientStorage.ClientStorage(addr, client='x'))
+    >>> conn = db.open()
+    >>> nobs = 1000
+    >>> for i in range(nobs):
+    ...     conn.root()[i] = ZODB.tests.MinPO.MinPO(0)
+    >>> transaction.commit()
+
+- disconnecting the first client (closing it with a persistent cache),
+
+    >>> db.close()
+
+- starting a second client that writes objects more or less
+  constantly,
+
+    >>> import random, threading
+    >>> stop = False
+    >>> db2 = ZODB.DB(ZEO.ClientStorage.ClientStorage(addr))
+    >>> tm = transaction.TransactionManager()
+    >>> conn2 = db2.open(transaction_manager=tm)
+    >>> random = random.Random(0)
+    >>> lock = threading.Lock()
+    >>> def run():
+    ...     while 1:
+    ...         i = random.randint(0, nobs-1)
+    ...         if stop:
+    ...             return
+    ...         lock.acquire()
+    ...         try:
+    ...             conn2.root()[i].value += 1
+    ...             tm.commit()
+    ...         finally:
+    ...             lock.release()
+    ...             time.sleep(0)
+    >>> thread = threading.Thread(target=run)
+    >>> thread.start()
+
+- restarting the first client, and 
+- testing for cache validity.
+
+    >>> import zope.testing.loggingsupport, logging
+    >>> handler = zope.testing.loggingsupport.InstalledHandler(
+    ...    'ZEO', level=logging.ERROR)
+
+    >>> import time
+    >>> for c in range(10):
+    ...    time.sleep(.1)
+    ...    db = ZODB.DB(ZEO.ClientStorage.ClientStorage(addr, client='x'))
+    ...    _ = lock.acquire()
+    ...    try:
+    ...      time.sleep(.1)
+    ...      assert (db._storage.lastTransaction()
+    ...              == db._storage._server.lastTransaction()), (
+    ...                  db._storage.lastTransaction(),
+    ...                  db._storage._server.lastTransactiion())
+    ...      conn = db.open()
+    ...      for i in range(1000):
+    ...        if conn.root()[i].value != conn2.root()[i].value:
+    ...            print 'bad', c, i, conn.root()[i].value,
+    ...            print  conn2.root()[i].value
+    ...    finally:
+    ...      _ = lock.release()
+    ...    db.close()
+
+    >>> stop = True
+    >>> thread.join(10)
+    >>> thread.isAlive()
+    False
+
+    >>> for record in handler.records:
+    ...     print record.name, record.levelname
+    ...     print handler.format(record)
+
+    >>> handler.uninstall()
+
+    >>> db.close()
+    >>> db2.close()
+    >>> ZEO.tests.forker.shutdown_zeo_server(adminaddr)
+

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testConnection.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testConnection.py	2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testConnection.py	2008-08-04 17:47:44 UTC (rev 89350)
@@ -21,8 +21,8 @@
 import unittest
 # Import the actual test class
 from ZEO.tests import ConnectionTests, InvalidationTests
+from zope.testing import doctest, setupstack
 
-
 class FileStorageConfig:
     def getConfig(self, path, create, read_only):
         return """\
@@ -135,6 +135,10 @@
     for klass in test_classes:
         sub = unittest.makeSuite(klass, 'check')
         suite.addTest(sub)
+    suite.addTest(doctest.DocFileSuite(
+        'invalidations_while_connecting.test',
+        setUp=setupstack.setUpDirectory, tearDown=setupstack.tearDown,
+        ))
     return suite
 
 

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testZEO.py	2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testZEO.py	2008-08-04 17:47:44 UTC (rev 89350)
@@ -18,9 +18,7 @@
 import doctest
 import logging
 import os
-import random
 import signal
-import socket
 import stat
 import tempfile
 import threading
@@ -50,6 +48,7 @@
 import ZEO.zrpc.connection
 
 from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests
+from ZEO.tests.forker import get_port
 
 import ZEO.tests.ConnectionTests
 
@@ -128,27 +127,6 @@
         finally:
             storage2.close()
 
-def get_port():
-    """Return a port that is not in use.
-
-    Checks if a port is in use by trying to connect to it.  Assumes it
-    is not in use if connect raises an exception.
-
-    Raises RuntimeError after 10 tries.
-    """
-    for i in range(10):
-        port = random.randrange(20000, 30000)
-        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        try:
-            try:
-                s.connect(('localhost', port))
-            except socket.error:
-                # Perhaps we should check value of error too.
-                return port
-        finally:
-            s.close()
-    raise RuntimeError("Can't find port")
-
 class GenericTests(
     # Base class for all ZODB tests
     StorageTestBase.StorageTestBase,

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/test_cache.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/test_cache.py	2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/test_cache.py	2008-08-04 17:47:44 UTC (rev 89350)
@@ -35,7 +35,6 @@
         # testSerialization reads the entire file into a string, it's not
         # good to leave it that big.
         self.cache = ZEO.cache.ClientCache(size=1024**2)
-        self.cache.open()
 
     def tearDown(self):
         if self.cache.path:
@@ -45,7 +44,6 @@
         self.assertEqual(self.cache.getLastTid(), None)
         self.cache.setLastTid(n2)
         self.assertEqual(self.cache.getLastTid(), n2)
-        self.cache.invalidate(n1, "", n1)
         self.assertEqual(self.cache.getLastTid(), n2)
         self.cache.invalidate(n1, "", n3)
         self.assertEqual(self.cache.getLastTid(), n3)
@@ -65,8 +63,8 @@
     def testInvalidate(self):
         data1 = "data for n1"
         self.cache.store(n1, "", n3, None, data1)
+        self.cache.invalidate(n2, "", n2)
         self.cache.invalidate(n1, "", n4)
-        self.cache.invalidate(n2, "", n2)
         self.assertEqual(self.cache.load(n1, ""), None)
         self.assertEqual(self.cache.loadBefore(n1, n4),
                          (data1, n3, n4))
@@ -142,7 +140,6 @@
         dst.write(src.read(self.cache.maxsize))
         dst.close()
         copy = ZEO.cache.ClientCache(path)
-        copy.open()
 
         # Verify that internals of both objects are the same.
         # Could also test that external API produces the same results.
@@ -158,7 +155,6 @@
         if self.cache.path:
             os.remove(self.cache.path)
         cache = ZEO.cache.ClientCache(size=50)
-        cache.open()
 
         # We store an object that is a bit larger than the cache can handle.
         cache.store(n1, '', n2, None, "x"*64)
@@ -174,7 +170,6 @@
         if self.cache.path:
             os.remove(self.cache.path)
         cache = ZEO.cache.ClientCache(size=50)
-        cache.open()
 
         # We store an object that is a bit larger than the cache can handle.
         cache.store(n1, '', n2, n3, "x"*64)
@@ -218,7 +213,6 @@
     ...     _ = os.spawnl(os.P_WAIT, sys.executable, sys.executable, 't')
     ...     if os.path.exists('cache'):
     ...         cache = ZEO.cache.ClientCache('cache')
-    ...         cache.open()
     ...         cache.close()
     ...         os.remove('cache')
     ...         os.remove('cache.lock')
@@ -238,7 +232,6 @@
     >>> cache.store(ZODB.utils.p64(1), '', ZODB.utils.p64(1), None, data)
     >>> cache.close()
     >>> cache = ZEO.cache.ClientCache('cache', 1000)
-    >>> cache.open()
     >>> cache.store(ZODB.utils.p64(2), '', ZODB.utils.p64(2), None, 'XXX')
 
     >>> cache.close()
@@ -255,6 +248,57 @@
 
     >>> cache.close()
     """,
+
+    thread_safe =
+    r"""
+
+    >>> import ZEO.cache, ZODB.utils
+    >>> cache = ZEO.cache.ClientCache('cache', 1000000)
+
+    >>> for i in range(100):
+    ...     cache.store(ZODB.utils.p64(i), '', ZODB.utils.p64(1), None, '0')
+
+    >>> import random, sys, threading
+    >>> random = random.Random(0)
+    >>> stop = False
+    >>> read_failure = None
+
+    >>> def read_thread():
+    ...     def pick_oid():
+    ...         return ZODB.utils.p64(random.randint(0,99))
+    ...
+    ...     try:
+    ...         while not stop:
+    ...             cache.load(pick_oid())
+    ...             cache.loadBefore(pick_oid(), ZODB.utils.p64(2))
+    ...             cache.modifiedInVersion(pick_oid())
+    ...     except:
+    ...         global read_failure
+    ...         read_failure = sys.exc_info()
+
+    >>> thread = threading.Thread(target=read_thread)
+    >>> thread.start()
+
+    >>> for tid in range(2,10):
+    ...     for oid in range(100):
+    ...         oid = ZODB.utils.p64(oid)
+    ...         cache.invalidate(oid, '', ZODB.utils.p64(tid))
+    ...         cache.store(oid, '', ZODB.utils.p64(tid), None, str(tid))
+
+    >>> stop = True
+    >>> thread.join()
+    >>> if read_failure:
+    ...    print 'Read failure:'
+    ...    import traceback
+    ...    traceback.print_exception(*read_failure)
+
+    >>> expected = '9', ZODB.utils.p64(9), ''
+    >>> for oid in range(100):
+    ...     loaded = cache.load(ZODB.utils.p64(oid))
+    ...     if loaded != expected:
+    ...         print oid, loaded
+    
+    """,
     )
 
 

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/client.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/client.py	2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/client.py	2008-08-04 17:47:44 UTC (rev 89350)
@@ -447,8 +447,7 @@
         Call the client's testConnection(), giving the client a chance
         to do app-level check of the connection.
         """
-        self.conn = ManagedClientConnection(self.sock, self.addr,
-                                            self.client, self.mgr)
+        self.conn = ManagedClientConnection(self.sock, self.addr, self.mgr)
         self.sock = None # The socket is now owned by the connection
         try:
             self.preferred = self.client.testConnection(self.conn)

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/connection.py	2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/connection.py	2008-08-04 17:47:44 UTC (rev 89350)
@@ -555,14 +555,23 @@
             self.replies_cond.release()
 
     def handle_request(self, msgid, flags, name, args):
-        if not self.check_method(name):
-            msg = "Invalid method name: %s on %s" % (name, repr(self.obj))
+        obj = self.obj
+        
+        if name.startswith('_') or not hasattr(obj, name):
+            if obj is None:
+                if __debug__:
+                    self.log("no object calling %s%s"
+                             % (name, short_repr(args)),
+                             level=logging.DEBUG)
+                return
+                
+            msg = "Invalid method name: %s on %s" % (name, repr(obj))
             raise ZRPCError(msg)
         if __debug__:
             self.log("calling %s%s" % (name, short_repr(args)),
                      level=logging.DEBUG)
 
-        meth = getattr(self.obj, name)
+        meth = getattr(obj, name)
         try:
             self.waiting_for_reply = True
             try:
@@ -601,12 +610,6 @@
                  level=logging.ERROR, exc_info=True)
         self.close()
 
-    def check_method(self, name):
-        # TODO:  This is hardly "secure".
-        if name.startswith('_'):
-            return None
-        return hasattr(self.obj, name)
-
     def send_reply(self, msgid, ret):
         # encode() can pass on a wide variety of exceptions from cPickle.
         # While a bare `except` is generally poor practice, in this case
@@ -897,7 +900,7 @@
     __super_close = Connection.close
     base_message_output = Connection.message_output
 
-    def __init__(self, sock, addr, obj, mgr):
+    def __init__(self, sock, addr, mgr):
         self.mgr = mgr
 
         # We can't use the base smac's message_output directly because the
@@ -914,7 +917,7 @@
         self.queue_output = True
         self.queued_messages = []
 
-        self.__super_init(sock, addr, obj, tag='C', map=client_map)
+        self.__super_init(sock, addr, None, tag='C', map=client_map)
         self.thr_async = True
         self.trigger = client_trigger
         client_trigger.pull_trigger()

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/Connection.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/Connection.py	2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/Connection.py	2008-08-04 17:47:44 UTC (rev 89350)
@@ -594,7 +594,14 @@
             oid = obj._p_oid
             serial = getattr(obj, "_p_serial", z64)
 
-            if serial == z64:
+            if ((serial == z64)
+                and
+                ((self._savepoint_storage is None)
+                 or (oid not in self._savepoint_storage.creating)
+                 or self._savepoint_storage.creating[oid]
+                 )
+                ):
+                
                 # obj is a new object
 
                 # Because obj was added, it is now in _creating, so it

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/blob.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/blob.py	2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/blob.py	2008-08-04 17:47:44 UTC (rev 89350)
@@ -530,6 +530,10 @@
 LAYOUTS['lawn'] = LawnLayout()
 
 
+class BlobStorageError(Exception):
+    """The blob storage encountered an invalid state."""
+
+
 class BlobStorage(SpecificationDecoratorBase):
     """A storage to support blobs."""
 
@@ -537,7 +541,8 @@
 
     # Proxies can't have a __dict__ so specifying __slots__ here allows
     # us to have instance attributes explicitly on the proxy.
-    __slots__ = ('fshelper', 'dirty_oids', '_BlobStorage__supportsUndo')
+    __slots__ = ('fshelper', 'dirty_oids', '_BlobStorage__supportsUndo',
+                 '_blobs_pack_is_in_progress', )
 
     def __new__(self, base_directory, storage, layout='automatic'):
         return SpecificationDecoratorBase.__new__(self, storage)
@@ -556,6 +561,7 @@
         else:
             supportsUndo = supportsUndo()
         self.__supportsUndo = supportsUndo
+        self._blobs_pack_is_in_progress = False
 
     @non_overridable
     def temporaryDirectory(self):
@@ -661,21 +667,29 @@
 
     @non_overridable
     def pack(self, packtime, referencesf):
-        """Remove all unused oid/tid combinations."""
-        unproxied = getProxiedObject(self)
+        """Remove all unused OID/TID combinations."""
+        self._lock_acquire()
+        try:
+            if self._blobs_pack_is_in_progress:
+                raise BlobStorageError('Already packing')
+            self._blobs_pack_is_in_progress = True
+        finally:
+            self._lock_release()
 
-        # pack the underlying storage, which will allow us to determine
-        # which serials are current.
-        result = unproxied.pack(packtime, referencesf)
+        try:
+            # Pack the underlying storage, which will allow us to determine
+            # which serials are current.
+            unproxied = getProxiedObject(self)
+            result = unproxied.pack(packtime, referencesf)
 
-        # perform a pack on blob data
-        self._lock_acquire()
-        try:
+            # Perform a pack on the blob data.
             if self.__supportsUndo:
                 self._packUndoing(packtime, referencesf)
             else:
                 self._packNonUndoing(packtime, referencesf)
         finally:
+            self._lock_acquire()
+            self._blobs_pack_is_in_progress = False
             self._lock_release()
 
         return result

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_layout.txt
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_layout.txt	2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_layout.txt	2008-08-04 17:47:44 UTC (rev 89350)
@@ -207,9 +207,9 @@
 >>> bushy = os.path.join(d, 'bushy')
 >>> migrate(old, bushy, 'bushy')  # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
 Migrating blob data from `/.../old` (lawn) to `/.../bushy` (bushy)
+    OID: 0x1b7a - 2 files 
+    OID: 0x0a - 2 files 
     OID: 0x1b7f - 2 files 
-    OID: 0x0a - 2 files 
-    OID: 0x1b7a - 2 files 
 
 The new directory now contains the same files in different directories, but
 with the same sizes and permissions:
@@ -226,6 +226,26 @@
 >>> ls(bushy)
      040700  4096  /.../bushy
     0100644  5     /.../bushy/.layout
+     040700  4096  /.../bushy/0x7f
+     040700  4096  /.../bushy/0x7f/0x1b
+     040700  4096  /.../bushy/0x7f/0x1b/0x00
+     040700  4096  /.../bushy/0x7f/0x1b/0x00/0x00
+     040700  4096  /.../bushy/0x7f/0x1b/0x00/0x00/0x00
+     040700  4096  /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00
+     040700  4096  /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00/0x00
+     040700  4096  /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00/0x00/0x00
+    0100644  3     /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00/0x00/0x00/foo2
+    0100644  3     /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00/0x00/0x00/foo
+     040700  4096  /.../bushy/0x0a
+     040700  4096  /.../bushy/0x0a/0x00
+     040700  4096  /.../bushy/0x0a/0x00/0x00
+     040700  4096  /.../bushy/0x0a/0x00/0x00/0x00
+     040700  4096  /.../bushy/0x0a/0x00/0x00/0x00/0x00
+     040700  4096  /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00
+     040700  4096  /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00/0x00
+     040700  4096  /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00/0x00/0x00
+    0100644  3     /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00/0x00/0x00/foo4
+    0100644  3     /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00/0x00/0x00/foo3
      040700  4096  /.../bushy/0x7a
      040700  4096  /.../bushy/0x7a/0x1b
      040700  4096  /.../bushy/0x7a/0x1b/0x00
@@ -234,50 +254,30 @@
      040700  4096  /.../bushy/0x7a/0x1b/0x00/0x00/0x00/0x00
      040700  4096  /.../bushy/0x7a/0x1b/0x00/0x00/0x00/0x00/0x00
      040700  4096  /.../bushy/0x7a/0x1b/0x00/0x00/0x00/0x00/0x00/0x00
+    0100644  4     /.../bushy/0x7a/0x1b/0x00/0x00/0x00/0x00/0x00/0x00/foo5
     0100644  5     /.../bushy/0x7a/0x1b/0x00/0x00/0x00/0x00/0x00/0x00/foo6
-    0100644  4     /.../bushy/0x7a/0x1b/0x00/0x00/0x00/0x00/0x00/0x00/foo5
      040700  4096  /.../bushy/tmp
-     040700  4096  /.../bushy/0x0a
-     040700  4096  /.../bushy/0x0a/0x00
-     040700  4096  /.../bushy/0x0a/0x00/0x00
-     040700  4096  /.../bushy/0x0a/0x00/0x00/0x00
-     040700  4096  /.../bushy/0x0a/0x00/0x00/0x00/0x00
-     040700  4096  /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00
-     040700  4096  /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00/0x00
-     040700  4096  /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00/0x00/0x00
-    0100644  3     /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00/0x00/0x00/foo4
-    0100644  3     /.../bushy/0x0a/0x00/0x00/0x00/0x00/0x00/0x00/0x00/foo3
-     040700  4096  /.../bushy/0x7f
-     040700  4096  /.../bushy/0x7f/0x1b
-     040700  4096  /.../bushy/0x7f/0x1b/0x00
-     040700  4096  /.../bushy/0x7f/0x1b/0x00/0x00
-     040700  4096  /.../bushy/0x7f/0x1b/0x00/0x00/0x00
-     040700  4096  /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00
-     040700  4096  /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00/0x00
-     040700  4096  /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00/0x00/0x00
-    0100644  3     /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00/0x00/0x00/foo
-    0100644  3     /.../bushy/0x7f/0x1b/0x00/0x00/0x00/0x00/0x00/0x00/foo2
 
 We can also migrate the bushy layout back to the lawn layout:
 
 >>> lawn = os.path.join(d, 'lawn')
 >>> migrate(bushy, lawn, 'lawn')
 Migrating blob data from `/.../bushy` (bushy) to `/.../lawn` (lawn)
+    OID: 0x1b7f - 2 files 
+    OID: 0x0a - 2 files 
     OID: 0x1b7a - 2 files 
-    OID: 0x0a - 2 files 
-    OID: 0x1b7f - 2 files 
 >>> ls(lawn)
-    040700  4096    /.../lawn
-   0100644  4       /.../lawn/.layout
-    040700  4096    /.../lawn/0x1b7f
-   0100644  3       /.../lawn/0x1b7f/foo
-   0100644  3       /.../lawn/0x1b7f/foo2
-    040700  4096    /.../lawn/tmp
-    040700  4096    /.../lawn/0x0a
-   0100644  3       /.../lawn/0x0a/foo4
-   0100644  3       /.../lawn/0x0a/foo3
-    040700  4096    /.../lawn/0x1b7a
-   0100644  5       /.../lawn/0x1b7a/foo6
-   0100644  4       /.../lawn/0x1b7a/foo5
+     040700  4096  /.../lawn
+    0100644  4     /.../lawn/.layout
+     040700  4096  /.../lawn/0x1b7a
+    0100644  4     /.../lawn/0x1b7a/foo5
+    0100644  5     /.../lawn/0x1b7a/foo6
+     040700  4096  /.../lawn/0x0a
+    0100644  3     /.../lawn/0x0a/foo4
+    0100644  3     /.../lawn/0x0a/foo3
+     040700  4096  /.../lawn/0x1b7f
+    0100644  3     /.../lawn/0x1b7f/foo2
+    0100644  3     /.../lawn/0x1b7f/foo
+     040700  4096  /.../lawn/tmp
 
 >>> shutil.rmtree(d)

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_packing.txt
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_packing.txt	2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_packing.txt	2008-08-04 17:47:44 UTC (rev 89350)
@@ -240,6 +240,37 @@
     >>> os.path.exists(os.path.split(fns[0])[0])
     False
 
+Avoiding parallel packs
+=======================
+
+Blob packing (similar to FileStorage) can only be run once at a time. For
+this, a flag (_blobs_pack_is_in_progress) is set. If the pack method is called
+while this flag is set, it will refuse to perform another pack, until the flag
+is reset:
+
+    >>> blob_storage._blobs_pack_is_in_progress
+    False
+    >>> blob_storage._blobs_pack_is_in_progress = True
+    >>> blob_storage.pack(packtime, referencesf)
+    Traceback (most recent call last):
+    BlobStorageError: Already packing
+    >>> blob_storage._blobs_pack_is_in_progress = False
+    >>> blob_storage.pack(packtime, referencesf)
+
+We can also see, that the flag is set during the pack, by leveraging the
+knowledge that the underlying storage's pack method is also called:
+
+    >>> def dummy_pack(time, ref):
+    ...     print "_blobs_pack_is_in_progress =", blob_storage._blobs_pack_is_in_progress
+    ...     return base_pack(time, ref)
+    >>> base_pack = base_storage.pack
+    >>> base_storage.pack = dummy_pack
+    >>> blob_storage.pack(packtime, referencesf)
+    _blobs_pack_is_in_progress = True
+    >>> blob_storage._blobs_pack_is_in_progress
+    False
+    >>> base_storage.pack = base_pack
+
 Clean up our blob directory:
 
     >>> shutil.rmtree(blob_dir)

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/testcrossdatabasereferences.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/testcrossdatabasereferences.py	2008-08-04 15:54:33 UTC (rev 89349)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/testcrossdatabasereferences.py	2008-08-04 17:47:44 UTC (rev 89350)
@@ -146,6 +146,32 @@
 
 """
 
+def test_explicit_adding_with_savepoint2():
+    """
+
+    >>> import ZODB.tests.util, transaction, persistent
+    >>> databases = {}
+    >>> db1 = ZODB.tests.util.DB(databases=databases, database_name='1')
+    >>> db2 = ZODB.tests.util.DB(databases=databases, database_name='2')
+    >>> tm = transaction.TransactionManager()
+    >>> conn1 = db1.open(transaction_manager=tm)
+    >>> conn2 = conn1.get_connection('2')
+    >>> z = MyClass()
+
+    >>> conn1.root()['z'] = z
+    >>> conn1.add(z)
+    >>> s = tm.savepoint()
+    >>> conn2.root()['z'] = z
+    >>> z.x = 1
+    >>> tm.commit()
+    >>> z._p_jar.db().database_name
+    '1'
+    
+    >>> db1.close()
+    >>> db2.close()
+
+"""
+
 def tearDownDbs(test):
     test.globs['db1'].close()
     test.globs['db2'].close()



More information about the Zodb-checkins mailing list