[Zodb-checkins] CVS: ZODB3/ZODB - FileStorage.py:1.135.6.3.2.1 DB.py:1.53.2.2.2.1 Connection.py:1.98.4.2.2.1 BaseStorage.py:1.34.4.1.2.1

Jeremy Hylton jeremy at zope.com
Mon Sep 29 14:23:40 EDT 2003


Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv17037/ZODB

Modified Files:
      Tag: ZODB3-mvcc-branch
	FileStorage.py DB.py Connection.py BaseStorage.py 
Log Message:
First cut at MVCC implementation for FileStorage.

Two tests fail sporadically: checkNZODBThreads() and checkPackWhileWriting().
The latter probably fails because it occasionally returns data from a
packed transaction, which shouldn't be allowed.

loadNonCurrent() is implemented as a hack in BaseStorage.  It can't be
implemented correctly using history(), because history doesn't return
transaction ids and because history doesn't give any indication of
whether the transaction containing the object was packed.

The DB API had to change so that tids can be passed with all
invalidations.  (Up until now, only ZEO passed a tid with an
invalidation.)  Currently, the storage doesn't expose the current
transaction id to the database.  As a result, abortVersion(),
commitVersion(), and transactionalUndo() need to return a tid as well
as oids.  The callback passed to tpc_finish() needs to take a tid as
well as oids as arguments.

Many tests needed to be updated to track the changes to the storage
API.

I expect ZEO and BDBStorage will fail in a variety of ways.



=== ZODB3/ZODB/FileStorage.py 1.135.6.3 => 1.135.6.3.2.1 ===
--- ZODB3/ZODB/FileStorage.py:1.135.6.3	Mon Sep 15 17:26:56 2003
+++ ZODB3/ZODB/FileStorage.py	Mon Sep 29 14:23:38 2003
@@ -643,7 +643,7 @@
             spos = h[-8:]
             srcpos = u64(spos)
         self._toid2serial_delete.update(current_oids)
-        return oids
+        return self._serial, oids
 
     def getSize(self):
         return self._pos
@@ -1315,7 +1315,7 @@
         # It's too painful to try to update them to correct current
         # values instead.
         self._toid2serial_delete.update(tindex)
-        return tindex.keys()
+        return self._serial, tindex.keys()
 
     def _txn_find(self, tid, stop_at_pack):
         pos = self._pos
@@ -1460,9 +1460,6 @@
         self._lock_acquire()
         try:
             r=[]
-            file=self._file
-            seek=file.seek
-            read=file.read
             try:
                 pos=self._index[oid]
             except KeyError:
@@ -1473,14 +1470,14 @@
 
             while 1:
                 if len(r) >= size: return r
-                seek(pos)
-                h=read(DATA_HDR_LEN)
+                self._file.seek(pos)
+                h=self._file.read(DATA_HDR_LEN)
                 doid,serial,prev,tloc,vlen,plen = unpack(DATA_HDR, h)
                 prev=u64(prev)
 
                 if vlen:
-                    read(16)
-                    version=read(vlen)
+                    self._file.read(16)
+                    version = self._file.read(vlen)
                     if wantver is not None and version != wantver:
                         if prev:
                             pos=prev
@@ -1491,13 +1488,15 @@
                     version=''
                     wantver=None
 
-                seek(u64(tloc))
-                h=read(TRANS_HDR_LEN)
+                self._file.seek(u64(tloc))
+                h = self._file.read(TRANS_HDR_LEN)
                 tid, stl, status, ul, dl, el = unpack(TRANS_HDR,h)
-                user_name=read(ul)
-                description=read(dl)
-                if el: d=loads(read(el))
-                else: d={}
+                user_name = self._file.read(ul)
+                description = self._file.read(dl)
+                if el:
+                    d=loads(self._file.read(el))
+                else:
+                    d={}
 
                 d['time']=TimeStamp(serial).timeTime()
                 d['user_name']=user_name


=== ZODB3/ZODB/DB.py 1.53.2.2 => 1.53.2.2.2.1 ===
--- ZODB3/ZODB/DB.py:1.53.2.2	Mon Sep 15 17:26:56 2003
+++ ZODB3/ZODB/DB.py	Mon Sep 29 14:23:38 2003
@@ -74,7 +74,7 @@
         self._version_cache_size=version_cache_size
         self._version_cache_deactivate_after = version_cache_deactivate_after
 
-        self._miv_cache={}
+        self._miv_cache = {}
 
         # Setup storage
         self._storage=storage
@@ -303,8 +303,7 @@
     def importFile(self, file):
         raise 'Not yet implemented'
 
-    def invalidate(self, oids, connection=None, version='',
-                   rc=sys.getrefcount):
+    def invalidate(self, tid, oids, connection=None, version=''):
         """Invalidate references to a given oid.
 
         This is used to indicate that one of the connections has committed a
@@ -326,21 +325,21 @@
             for cc in allocated:
                 if (cc is not connection and
                     (not version or cc._version==version)):
-                    if rc(cc) <= 3:
+                    if sys.getrefcount(cc) <= 3:
                         cc.close()
-                    cc.invalidate(oids)
+                    cc.invalidate(tid, oids)
 
-        temps=self._temps
-        if temps:
+        if self._temps:
             t=[]
-            for cc in temps:
-                if rc(cc) > 3:
+            for cc in self._temps:
+                if sys.getrefcount(cc) > 3:
                     if (cc is not connection and
-                        (not version or cc._version==version)):
-                        cc.invalidate(oids)
+                        (not version or cc._version == version)):
+                        cc.invalidate(tid, oids)
                     t.append(cc)
-                else: cc.close()
-            self._temps=t
+                else:
+                    cc.close()
+            self._temps = t
 
     def modifiedInVersion(self, oid):
         h=hash(oid)%131
@@ -356,7 +355,7 @@
         return len(self._storage)
 
     def open(self, version='', transaction=None, temporary=0, force=None,
-             waitflag=1):
+             waitflag=1, mvcc=True):
         """Return a object space (AKA connection) to work in
 
         The optional version argument can be used to specify that a
@@ -374,25 +373,25 @@
         try:
 
             if transaction is not None:
-                connections=transaction._connections
+                connections = transaction._connections
                 if connections:
                     if connections.has_key(version) and not temporary:
                         return connections[version]
                 else:
-                    transaction._connections=connections={}
-                transaction=transaction._connections
-
+                    transaction._connections = connections = {}
+                transaction = transaction._connections
 
             if temporary:
                 # This is a temporary connection.
                 # We won't bother with the pools.  This will be
                 # a one-use connection.
-                c=self.klass(
-                    version=version,
-                    cache_size=self._version_cache_size)
+                c = self.klass(version=version,
+                               cache_size=self._version_cache_size,
+                               mvcc=mvcc)
                 c._setDB(self)
                 self._temps.append(c)
-                if transaction is not None: transaction[id(c)]=c
+                if transaction is not None:
+                    transaction[id(c)] = c
                 return c
 
 
@@ -433,18 +432,18 @@
 
 
             if not pool:
-                c=None
+                c = None
                 if version:
                     if self._version_pool_size > len(allocated) or force:
-                        c=self.klass(
-                            version=version,
-                            cache_size=self._version_cache_size)
+                        c = self.klass(version=version,
+                                       cache_size=self._version_cache_size,
+                                       mvcc=mvcc)
                         allocated.append(c)
                         pool.append(c)
                 elif self._pool_size > len(allocated) or force:
-                    c=self.klass(
-                        version=version,
-                        cache_size=self._cache_size)
+                    c = self.klass(version=version,
+                                   cache_size=self._cache_size,
+                                   mvcc=mvcc)
                     allocated.append(c)
                     pool.append(c)
 
@@ -459,7 +458,7 @@
                             pool_lock.release()
                     else: return
 
-            elif len(pool)==1:
+            elif len(pool) == 1:
                 # Taking last one, lock the pool
                 # Note that another thread might grab the lock
                 # before us, so we might actually block, however,
@@ -473,14 +472,15 @@
                     # but it could be higher due to a race condition.
                     pool_lock.release()
 
-            c=pool[-1]
+            c = pool[-1]
             del pool[-1]
             c._setDB(self)
             for pool, allocated in pooll:
                 for cc in pool:
                     cc._incrgc()
 
-            if transaction is not None: transaction[version]=c
+            if transaction is not None:
+                transaction[version] = c
             return c
 
         finally: self._r()
@@ -591,7 +591,8 @@
             d = {}
             for oid in storage.undo(id):
                 d[oid] = 1
-            self.invalidate(d)
+            # XXX I think we need to remove old undo to use mvcc
+            self.invalidate(None, d)
 
     def versionEmpty(self, version):
         return self._storage.versionEmpty(version)
@@ -619,13 +620,13 @@
 
     def commit(self, reallyme, t):
         dest=self._dest
-        oids = self._db._storage.commitVersion(self._version, dest, t)
+        tid, oids = self._db._storage.commitVersion(self._version, dest, t)
         oids = list2dict(oids)
-        self._db.invalidate(oids, version=dest)
+        self._db.invalidate(tid, oids, version=dest)
         if dest:
             # the code above just invalidated the dest version.
             # now we need to invalidate the source!
-            self._db.invalidate(oids, version=self._version)
+            self._db.invalidate(tid, oids, version=self._version)
 
 class AbortVersion(CommitVersion):
     """An object that will see to version abortion
@@ -634,9 +635,9 @@
     """
 
     def commit(self, reallyme, t):
-        version=self._version
-        oids = self._db._storage.abortVersion(version, t)
-        self._db.invalidate(list2dict(oids), version=version)
+        version = self._version
+        tid, oids = self._db._storage.abortVersion(version, t)
+        self._db.invalidate(tid, list2dict(oids), version=version)
 
 
 class TransactionalUndo(CommitVersion):
@@ -650,5 +651,5 @@
     # similarity of rhythm that I think it's justified.
 
     def commit(self, reallyme, t):
-        oids = self._db._storage.transactionalUndo(self._version, t)
-        self._db.invalidate(list2dict(oids))
+        tid, oids = self._db._storage.transactionalUndo(self._version, t)
+        self._db.invalidate(tid, list2dict(oids))


=== ZODB3/ZODB/Connection.py 1.98.4.2 => 1.98.4.2.2.1 ===
--- ZODB3/ZODB/Connection.py:1.98.4.2	Mon Sep 15 17:26:56 2003
+++ ZODB3/ZODB/Connection.py	Mon Sep 29 14:23:38 2003
@@ -21,7 +21,6 @@
 from POSException import ConflictError, ReadConflictError, TransactionError
 from ExtensionClass import Base
 import ExportImport, TmpStore
-from zLOG import LOG, ERROR, BLATHER, WARNING
 from coptimizations import new_persistent_id
 from ConflictResolution import ResolvedSerial
 from Transaction import Transaction, get_transaction
@@ -29,10 +28,17 @@
 
 from cPickle import Unpickler, Pickler
 from cStringIO import StringIO
+import logging
 import sys
 import threading
 from time import time
-from types import StringType, ClassType
+from types import ClassType
+
+_marker = object()
+
+def myhasattr(obj, attr):
+    # builtin hasattr() swallows exceptions
+    return getattr(obj, attr, _marker) is not _marker
 
 global_code_timestamp = 0
 
@@ -55,9 +61,9 @@
 
     The Connection manages movement of objects in and out of object storage.
     """
-    _tmp=None
-    _debug_info=()
-    _opened=None
+    _tmp = None
+    _debug_info = ()
+    _opened = None
     _code_timestamp = 0
     _transaction = None
 
@@ -65,9 +71,12 @@
     # when we close by putting something here.
 
     def __init__(self, version='', cache_size=400,
-                 cache_deactivate_after=60):
+                 cache_deactivate_after=60, mvcc=True):
         """Create a new Connection"""
-        self._version=version
+
+        self._log = logging.getLogger("zodb.conn")
+        
+        self._version = version
         self._cache = cache = PickleCache(self, cache_size)
         if version:
             # Caches for versions end up empty if the version
@@ -99,6 +108,14 @@
         self._invalidated = d = {}
         self._invalid = d.has_key
         self._conflicts = {}
+        self._noncurrent = {}
+
+        # If MVCC is enabled, then _mvcc is True and _txn_time stores
+        # the upper bound on transactions visible to this connection.
+        # That is, all object revisions must be written before _txn_time.
+        # If it is None, then the current revisions are acceptable.
+        self._mvcc = mvcc
+        self._txn_time = None
 
     def getTransaction(self):
         t = self._transaction
@@ -141,7 +158,7 @@
         self._incrgc = None
         self.cacheGC = None
 
-    def __getitem__(self, oid, tt=type(())):
+    def __getitem__(self, oid):
         obj = self._cache.get(oid, None)
         if obj is not None:
             return obj
@@ -157,9 +174,9 @@
 
         klass, args = object
 
-        if type(klass) is tt:
+        if isinstance(klass, tuple):
             module, name = klass
-            klass=self._db._classFactory(self, module, name)
+            klass = self._db._classFactory(self, module, name)
 
         if (args is None or
             not args and not hasattr(klass,'__getinitargs__')):
@@ -177,12 +194,10 @@
         self._cache[oid] = object
         return object
 
-    def _persistent_load(self,oid,
-                        tt=type(())):
-
+    def _persistent_load(self, oid):
         __traceback_info__=oid
 
-        if type(oid) is tt:
+        if isinstance(oid, tuple):
             # Quick instance reference.  We know all we need to know
             # to create the instance wo hitting the db, so go for it!
             oid, klass = oid
@@ -190,9 +205,10 @@
             if obj is not None:
                 return obj
 
-            if type(klass) is tt:
+            if isinstance(klass, tuple):
                 module, name = klass
-                try: klass=self._db._classFactory(self, module, name)
+                try:
+                    klass=self._db._classFactory(self, module, name)
                 except:
                     # Eek, we couldn't get the class. Hm.
                     # Maybe their's more current data in the
@@ -282,11 +298,12 @@
         # Call the close callbacks.
         if self.__onCloseCallbacks is not None:
             for f in self.__onCloseCallbacks:
-                try: f()
-                except:
-                    f=getattr(f, 'im_self', f)
-                    LOG('ZODB',ERROR, 'Close callback failed for %s' % f,
-                        error=sys.exc_info())
+                try:
+                    f()
+                except: # except what?
+                    f = getattr(f, 'im_self', f)
+                    self._log.error("Close callback failed for %s", f,
+                                    sys.exc_info())
             self.__onCloseCallbacks = None
         self._storage = self._tmp = self.new_oid = self._opened = None
         self._debug_info = ()
@@ -438,8 +455,8 @@
         if tmp is None: return
         src=self._storage
 
-        LOG('ZODB', BLATHER,
-            'Commiting subtransaction of size %s' % src.getSize())
+        self._log.debug("Commiting subtransaction of size %s",
+                        src.getSize())
 
         self._storage=tmp
         self._tmp=None
@@ -490,8 +507,6 @@
                 del o._p_jar
                 del o._p_oid
 
-    #XXX
-
     def db(self): return self._db
 
     def getVersion(self): return self._version
@@ -499,7 +514,7 @@
     def isReadOnly(self):
         return self._storage.isReadOnly()
 
-    def invalidate(self, oids):
+    def invalidate(self, tid, oids):
         """Invalidate a set of oids.
 
         This marks the oid as invalid, but doesn't actually invalidate
@@ -508,6 +523,8 @@
         """
         self._inv_lock.acquire()
         try:
+            if self._txn_time is None:
+                self._txn_time = tid
             self._invalidated.update(oids)
         finally:
             self._inv_lock.release()
@@ -517,13 +534,15 @@
         try:
             self._cache.invalidate(self._invalidated)
             self._invalidated.clear()
+            self._txn_time = None
         finally:
             self._inv_lock.release()
         # Now is a good time to collect some garbage
         self._cache.incrgc()
 
     def modifiedInVersion(self, oid):
-        try: return self._db.modifiedInVersion(oid)
+        try:
+            return self._db.modifiedInVersion(oid)
         except KeyError:
             return self._version
 
@@ -547,55 +566,85 @@
         if self._storage is None:
             msg = ("Shouldn't load state for %s "
                    "when the connection is closed" % oid_repr(oid))
-            LOG('ZODB', ERROR, msg)
+            self._log.error(msg)
             raise RuntimeError(msg)
 
         try:
-            # Avoid reading data from a transaction that committed
-            # after the current transaction started, as that might
-            # lead to mixing of cached data from earlier transactions
-            # and new inconsistent data.
-            #
-            # Wait for check until after data is loaded from storage
-            # to avoid time-of-check to time-of-use race.
-            p, serial = self._storage.load(oid, self._version)
-            self._load_count = self._load_count + 1
-            invalid = self._is_invalidated(obj)
-            self._set_ghost_state(obj, p)
-            obj._p_serial = serial
-            if invalid:
-                self._handle_independent(obj)
+            self._setstate(obj)
         except ConflictError:
             raise
         except:
-            LOG('ZODB', ERROR,
-                "Couldn't load state for %s" % oid_repr(oid),
-                error=sys.exc_info())
+            self._log.error("Couldn't load state for %s", oid_repr(oid),
+                            exc_info=sys.exc_info())
             raise
 
-    def _is_invalidated(self, obj):
-        # Helper method for setstate() covers three cases:
-        # returns false if obj is valid
-        # returns true if obj was invalidation, but is independent
-        # otherwise, raises ConflictError for invalidated objects
+    def _setstate(self, obj):
+        # Helper for setstate(), which provides logging of failures.
+
+        # The control flow is complicated here to avoid loading an
+        # object revision that we are sure we aren't going to use.  As
+        # a result, invalidation tests occur before and after the
+        # load.  We can only be sure about invalidations after the
+        # load.
+
+        # If an object has been invalidated, there are several cases
+        # to consider:
+        # 1. Check _p_independent()
+        # 2. Try MVCC
+        # 3. Raise ConflictError.
+
+        # Does anything actually use _p_independent()?  It would simplify
+        # the code if we could drop support for it.
+
+        # There is a harmless data race with self._invalidated.  A
+        # dict update could go on in another thread, but we don't care
+        # because we have to check again after the load anyway.
+        if (obj._p_oid in self._invalidated
+            and not myhasattr(obj, "_p_independent")):
+            # If the object has _p_independent(), we will handle it below.
+            if not (self._mvcc and self._setstate_noncurrent(obj)):
+                self.getTransaction().register(obj)
+                self._conflicts[obj._p_oid] = 1
+                raise ReadConflictError(object=obj)
+
+        p, serial = self._storage.load(obj._p_oid, self._version)
+        self._load_count += 1
+
         self._inv_lock.acquire()
         try:
-            if self._invalidated.has_key(obj._p_oid):
-                # Defer _p_independent() call until state is loaded.
-                ind = getattr(obj, "_p_independent", None)
-                if ind is not None:
-                    # Defer _p_independent() call until state is loaded.
-                    return 1
-                else:
-                    self.getTransaction().register(obj)
-                    self._conflicts[obj._p_oid] = 1
-                    raise ReadConflictError(object=obj)
-            else:
-                return 0
+            invalid = obj._p_oid in self._invalidated
         finally:
             self._inv_lock.release()
+            
+        if invalid:
+            if myhasattr(obj, "_p_independent"):
+                # This call will raise a ReadConflictError if something
+                # goes wrong
+                self._handle_independent(obj)
+            elif not (self._mvcc and self._setstate_noncurrent(obj)):
+                self.getTransaction().register(obj)
+                self._conflicts[obj._p_oid] = 1
+                raise ReadConflictError(object=obj)
+                
+        self._set_ghost_state(obj, p, serial)
+
+    def _setstate_noncurrent(self, obj):
+        """Set state using non-current data.
+
+        Return True if state was available, False if not.
+        """
+        try:
+            t = self._storage.loadNonCurrent(obj._p_oid, self._txn_time)
+        except KeyError:
+            return False
+        if t is None:
+            return False
+        data, serial, start, end = t
+        assert start < end == self._txn_time, (start, end, self._txn_time)
+        self._noncurrent[obj._p_oid] = True
+        self._set_ghost_state(obj, data, serial)
 
-    def _set_ghost_state(self, obj, p):
+    def _set_ghost_state(self, obj, p, serial):
         file = StringIO(p)
         unpickler = Unpickler(file)
         unpickler.persistent_load = self._persistent_load
@@ -607,6 +656,7 @@
             obj.update(state)
         else:
             setstate(state)
+        obj._p_serial = serial
 
     def _handle_independent(self, obj):
         # Helper method for setstate() handles possibly independent objects
@@ -649,9 +699,9 @@
             klass, args = copy
 
             if klass is not ExtensionKlass:
-                LOG('ZODB',ERROR,
-                    "Unexpected klass when setting class state on %s"
-                    % getattr(object,'__name__','(?)'))
+                self._log.error(
+                    "Unexpected klass when setting class state on %s",
+                    getattr(object, "__name__", "(?)"))
                 return
 
             copy = klass(*args)
@@ -663,7 +713,7 @@
             object._p_changed=0
             object._p_serial=serial
         except:
-            LOG('ZODB',ERROR, 'setklassstate failed', error=sys.exc_info())
+            self._log.error("setklassstate failed", exc_info=sys.exc_info())
             raise
 
     def tpc_abort(self, transaction):
@@ -720,7 +770,7 @@
 
         if not store_return:
             return
-        if isinstance(store_return, StringType):
+        if isinstance(store_return, str):
             assert oid is not None
             self._handle_one_serial(oid, store_return, change)
         else:
@@ -728,7 +778,7 @@
                 self._handle_one_serial(oid, serial, change)
 
     def _handle_one_serial(self, oid, serial, change):
-        if not isinstance(serial, StringType):
+        if not isinstance(serial, str):
             raise serial
         obj = self._cache.get(oid, None)
         if obj is None:
@@ -754,11 +804,11 @@
             self._storage._creating[:0]=self._creating
             del self._creating[:]
         else:
-            def callback():
+            def callback(tid):
                 d = {}
                 for oid in self._modified:
                     d[oid] = 1
-                self._db.invalidate(d, self)
+                self._db.invalidate(tid, d, self)
             self._storage.tpc_finish(transaction, callback)
 
         self._conflicts.clear()


=== ZODB3/ZODB/BaseStorage.py 1.34.4.1 => 1.34.4.1.2.1 ===
--- ZODB3/ZODB/BaseStorage.py:1.34.4.1	Mon Sep 15 14:02:58 2003
+++ ZODB3/ZODB/BaseStorage.py	Mon Sep 29 14:23:38 2003
@@ -56,12 +56,12 @@
     def abortVersion(self, src, transaction):
         if transaction is not self._transaction:
             raise POSException.StorageTransactionError(self, transaction)
-        return []
+        return self._serial, []
 
     def commitVersion(self, src, dest, transaction):
         if transaction is not self._transaction:
             raise POSException.StorageTransactionError(self, transaction)
-        return []
+        return self._serial, []
 
     def close(self):
         pass
@@ -199,7 +199,7 @@
                 return
             try:
                 if f is not None:
-                    f()
+                    f(self._serial)
                 u, d, e = self._ude
                 self._finish(self._serial, u, d, e)
                 self._clear_temp()
@@ -245,6 +245,33 @@
     def loadSerial(self, oid, serial):
         raise POSException.Unsupported, (
             "Retrieval of historical revisions is not supported")
+
+    def loadNonCurrent(self, oid, tid):
+        """Return most recent revision of oid before tid committed."""
+
+        n = 2
+        start_time = None
+        while start_time is None:
+            # The history() approach is a hack, because the dict
+            # returned by history() doesn't contain a tid.  It
+            # contains a serialno, which is often the same, but isn't
+            # required to be.  We'll pretend it is for now.
+
+            # A second problem is that history() doesn't say anything
+            # about whether the transaction status.  If it falls before
+            # the pack time, we can't honor the MVCC request.
+
+            # Note: history() returns the most recent record first.
+            L = self.history(oid, "", lambda d: not d["version"])
+            for d in L:
+                if d["serial"] < tid:
+                    start_time = d["serial"]
+                    break
+                else:
+                    end_time = d["serial"]
+            n *= 2
+        data = self.loadSerial(oid, start_time)
+        return data, start_time, start_time, end_time
 
     def getExtensionMethods(self):
         """getExtensionMethods




More information about the Zodb-checkins mailing list