[Zope-Checkins] CVS: Zope3/src/ZODB - Connection.py:1.139

Jeremy Hylton jeremy at zope.com
Sat Mar 13 02:48:42 EST 2004


Update of /cvs-repository/Zope3/src/ZODB
In directory cvs.zope.org:/tmp/cvs-serv29469/src/ZODB

Modified Files:
	Connection.py 
Log Message:
Revise Connection.

Make _added_during_commit a regular instance variable.  Don't use
try/finally to reset it; just clear it at the start of a transaction.
XXX There was a test that needed to be removed, but it seemed to be
just a shallow test that try/finally was used.  Can't see any feature
that depends on specific of error handling: The txn is going to abort.

Remove unused _opened instance variable.
Split commit() into two smaller parts.
Get rid of extra manipulation of _creating.
Don't look for _p_serial of None; z64 is now required.
Undo local variable aliases in subtransaction methods.

Also, trivial change to pickle cache API -- get() works like dict get().


=== Zope3/src/ZODB/Connection.py 1.138 => 1.139 ===
--- Zope3/src/ZODB/Connection.py:1.138	Fri Mar 12 01:37:23 2004
+++ Zope3/src/ZODB/Connection.py	Sat Mar 13 02:48:11 2004
@@ -131,10 +131,9 @@
     _tmp = None
     _code_timestamp = 0
     _transaction = None
-    _added_during_commit = None
 
     def __init__(self, version='', cache_size=400,
-                 cache_deactivate_after=60, mvcc=True):
+                 cache_deactivate_after=None, mvcc=True):
         """Create a new Connection.
 
         A Connection instance should by instantiated by the DB
@@ -143,7 +142,6 @@
 
         self._log = logging.getLogger("zodb.conn")
         self._storage = None
-        self._opened = None
         self._debug_info = ()
 
         self._version = version
@@ -158,6 +156,7 @@
             self._cache.cache_drain_resistance = 100
         self._committed = []
         self._added = {}
+        self._added_during_commit = None
         self._reset_counter = global_reset_counter
         self._load_count = 0   # Number of objects unghosted
         self._store_count = 0  # Number of objects stored
@@ -315,12 +314,10 @@
             raise InvalidObjectReference(obj, obj._p_jar)
 
     def sortKey(self):
-        # XXX will raise an exception if the DB hasn't been set
-        storage_key = self._sortKey()
         # If two connections use the same storage, give them a
         # consistent order using id().  This is unique for the
         # lifetime of a connection, which is good enough.
-        return "%s:%s" % (storage_key, id(self))
+        return "%s:%s" % (self._sortKey(), id(self))
 
     def _setDB(self, odb):
         """Register odb, the DB that this Connection uses.
@@ -348,7 +345,6 @@
             self._flush_invalidations()
         self._reader = ConnectionObjectReader(self, self._cache,
                                               self._db.classFactory)
-        self._opened = time()
 
     def _resetCache(self):
         """Creates a new cache, discarding the old.
@@ -452,148 +448,123 @@
                     self._log.error("Close callback failed for %s", f,
                                     sys.exc_info())
             self.__onCloseCallbacks = None
-        self._storage = self._tmp = self.new_oid = self._opened = None
+        self._storage = self._tmp = self.new_oid = None
         self._debug_info = ()
         # Return the connection to the pool.
         if self._db is not None:
             self._db._closeConnection(self)
             self._db = None
 
-    def commit(self, object, transaction):
-        if object is self:
+    def commit(self, obj, transaction):
+        if obj is self:
             # We registered ourself.  Execute a commit action, if any.
             if self._import:
                 self._importDuringCommit(transaction, *self._import)
                 self._import = None
             return
 
-        oid = object._p_oid
-        if self._conflicts.has_key(oid):
-            self.getTransaction().register(object)
-            raise ReadConflictError(object=object)
-
-        invalid = self._invalid
-
-        # XXX In the case of a new object or an object added using add(),
-        #     the oid is appended to _creating.
-        #     However, this ought to be unnecessary because the _p_serial
-        #     of the object will be z64 or None, so it will be appended
-        #     to _creating about 30 lines down. The removal from _added
-        #     ought likewise to be unnecessary.
-        if oid is None or object._p_jar is not self:
+        oid = obj._p_oid
+        if oid in self._conflicts:
+            self.getTransaction().register(obj)
+            raise ReadConflictError(object=obj)
+
+        if oid is None or obj._p_jar is not self:
             # new object
             oid = self.new_oid()
-            object._p_jar = self
-            object._p_oid = oid
-            self._creating.append(oid) # maybe don't need this
+            obj._p_jar = self
+            obj._p_oid = oid
+            assert obj._p_serial == z64
         elif oid in self._added:
-            # maybe don't need these
-            self._creating.append(oid)
-            del self._added[oid]
-        elif object._p_changed:
-            if invalid(oid):
-                resolve = getattr(object, "_p_resolveConflict", None)
+            assert obj._p_serial == z64
+        elif obj._p_changed:
+            if oid in self._invalidated:
+                resolve = getattr(obj, "_p_resolveConflict", None)
                 if resolve is None:
-                    raise ConflictError(object=object)
+                    raise ConflictError(object=obj)
             self._modified.append(oid)
         else:
             # Nothing to do
             return
 
-        w = ObjectWriter(object)
+        self._store_objects(ObjectWriter(obj), transaction)
+
+    def _store_objects(self, writer, transaction):
         self._added_during_commit = []
-        try:
-            for obj in itertools.chain(w, self._added_during_commit):
-                oid = obj._p_oid
-                serial = getattr(obj, '_p_serial', z64)
-
-                # XXX which one? z64 or None? Why do I have to check both?
-                if serial == z64 or serial is None:
-                    # new object
-                    self._creating.append(oid)
-                    # If this object was added, it is now in _creating, so can
-                    # be removed from _added.
-                    self._added.pop(oid, None)
+        for obj in itertools.chain(writer, self._added_during_commit):
+            oid = obj._p_oid
+            serial = getattr(obj, "_p_serial", z64)
+
+            if serial == z64:
+                # new object
+                self._creating.append(oid)
+                # If this object was added, it is now in _creating, so can
+                # be removed from _added.
+                self._added.pop(oid, None)
+            else:
+                if (oid in self._invalidated
+                    and not hasattr(obj, '_p_resolveConflict')):
+                    raise ConflictError(object=obj)
+                self._modified.append(oid)
+            p = writer.serialize(obj)  # This calls __getstate__ of obj
+            s = self._storage.store(oid, serial, p, self._version, transaction)
+            self._store_count += 1
+            # Put the object in the cache before handling the
+            # response, just in case the response contains the
+            # serial number for a newly created object
+            try:
+                self._cache[oid] = obj
+            except:
+                # Dang, I bet its wrapped:
+                if hasattr(obj, 'aq_base'):
+                    self._cache[oid] = obj.aq_base
                 else:
-                    if (invalid(oid)
-                        and not hasattr(object, '_p_resolveConflict')):
-                        raise ConflictError(object=obj)
-                    self._modified.append(oid)
-                p = w.serialize(obj)  # This calls __getstate__ of obj
-
-                s = self._storage.store(oid, serial, p, self._version,
-                                        transaction)
-                self._store_count = self._store_count + 1
-                # Put the object in the cache before handling the
-                # response, just in case the response contains the
-                # serial number for a newly created object
-                try:
-                    self._cache[oid] = obj
-                except:
-                    # Dang, I bet its wrapped:
-                    if hasattr(obj, 'aq_base'):
-                        self._cache[oid] = obj.aq_base
-                    else:
-                        raise
+                    raise
 
-                self._handle_serial(s, oid)
-        finally:
-            del self._added_during_commit
+            self._handle_serial(s, oid)
+        self._added_during_commit = None
 
     def commit_sub(self, t):
         """Commit all work done in all subtransactions for this transaction"""
-        tmp=self._tmp
-        if tmp is None: return
-        src=self._storage
-
-        self._log.debug("Commiting subtransaction of size %s",
-                        src.getSize())
-
-        self._storage=tmp
-        self._tmp=None
-
-        tmp.tpc_begin(t)
-
-        load=src.load
-        store=tmp.store
-        dest=self._version
-        oids=src._index.keys()
+        if self._tmp is None:
+            return
+        src = self._storage
+        self._storage = self._tmp
+        self._tmp = None
+        
+        self._log.debug("Commiting subtransaction of size %s", src.getSize())
+        oids = src._index.keys()
+        self._storage.tpc_begin(t)
 
         # Copy invalidating and creating info from temporary storage:
-        modified = self._modified
-        modified[len(modified):] = oids
-        creating = self._creating
-        creating[len(creating):]=src._creating
+        self._modified[len(self._modified):] = oids
+        self._creating[len(self._creating):] = src._creating
 
         for oid in oids:
-            data, serial = load(oid, src)
-            s=store(oid, serial, data, dest, t)
-            self._handle_serial(s, oid, change=0)
+            data, serial = src.load(oid, src)
+            s = self._storage.store(oid, serial, data, self._version, t)
+            self._handle_serial(s, oid, change=False)
 
     def abort_sub(self, t):
         """Abort work done in all subtransactions for this transaction"""
-        tmp=self._tmp
-        if tmp is None: return
-        src=self._storage
-        self._tmp=None
-        self._storage=tmp
+        if self._tmp is None:
+            return
+        src = self._storage
+        self._storage = self._tmp
+        self._tmp = None
 
         self._cache.invalidate(src._index.keys())
         self._invalidate_creating(src._creating)
 
     def _invalidate_creating(self, creating=None):
-        """Dissown any objects newly saved in an uncommitted transaction.
-        """
+        """Dissown any objects newly saved in an uncommitted transaction."""
         if creating is None:
-            creating=self._creating
-            self._creating=[]
+            creating = self._creating
+            self._creating = []
 
-        cache=self._cache
-        cache_get=cache.get
         for oid in creating:
-            o=cache_get(oid, None)
+            o = self._cache.get(oid)
             if o is not None:
-                del cache[oid]
+                del self._cache[oid]
                 del o._p_jar
                 del o._p_oid
 
@@ -844,6 +815,9 @@
 
     def tpc_begin(self, transaction, sub=None):
         self._modified = []
+        
+        # _creating is a list of oids of new objects, which is used to
+        # remove them from the cache if a transaction aborts.
         self._creating = []
         if sub:
             # Sub-transaction!




More information about the Zope-Checkins mailing list