[Zope3-checkins] CVS: Zope3/src/zodb - connection.py:1.10

Jeremy Hylton jeremy@zope.com
Wed, 5 Mar 2003 15:24:39 -0500


Update of /cvs-repository/Zope3/src/zodb
In directory cvs.zope.org:/tmp/cvs-serv20045

Modified Files:
	connection.py 
Log Message:
A bunch of almost related improvements.

Replace _txns dict with _txn.  A connection can only be involved with
one transaction, so just keep track of it.  If a transaction is active
when close is called, abort the transaction.  XXX It might be better
to raise an error than abort the transaction.

Be consistent about resetting registered, modified, and created at
transaction boundaries.  XXX registered and modified may actually be
the same thing.

Add a few log messages and make the attribute _log instead of _logger.

Replace local variable 'object' with 'obj'.


=== Zope3/src/zodb/connection.py 1.9 => 1.10 ===
--- Zope3/src/zodb/connection.py:1.9	Thu Feb 27 15:16:45 2003
+++ Zope3/src/zodb/connection.py	Wed Mar  5 15:24:38 2003
@@ -26,19 +26,13 @@
 associated with the Connection that loaded them.  When the objects are
 modified, the Connection is registered with the current transaction.
 
-XXX multi-threaded issues
+Synchronization
 
-One or more application threads could call methods on a Connection or
-interact with it indirectly via objects that the Connection loaded.
-The intent is for a single thread to use a Connection, but this is not
-enforced.
-
-Not sure if its possible for other threads to interact with Connection
-in other ways.
-
-XXX undocumented issues
-
-The Connection supports callbacks on close and commit.
+A Connection instance is not thread-safe.  It is designed to support a
+thread model where each thread has its own transaction.  If an
+application has more than one thread that uses the connection or the
+transaction the connection is registered with, the application should
+provide locking.
 
 $Id$
 """
@@ -84,11 +78,13 @@
         self._version = version
         self._cache = cache = Cache(cache_size)
         self._reader = ConnectionObjectReader(self, self._cache)
-        self._logger = logging.getLogger("zodb")
+        self._log = logging.getLogger("zodb")
         # a TmpStore object used by sub-transactions
         self._tmp = None
-        # None, or time.time() when opened
-        self._opened = None
+        # whether the connection is open or closed
+        self._open = True
+        # the connection's current txn
+        self._txn = None
 
         # _invalidated queues invalidate messages delivered from the DB
         # _inv_lock prevents one thread from modifying the set while
@@ -97,35 +93,39 @@
         self._invalidated = Set()
         self._committed = []
 
-        # track which objects are involved with a transaction
-        self._txns = {}
+        # Bookkeeping for objects affected by the current transaction.
+        # These sets are clear()ed at transaction boundaries.
 
-        self._modified = Set()
+        # XXX Is a Set safe?  What if the objects are not hashable?
+        self._registered = Set()
+        self._modified = Set() # XXX is this the same as registered?
         self._created = Set()
 
         # new_oid is used by serialize
         self.newObjectId = self._storage.newObjectId
 
+    def _get_transaction(self):
+        # Return the transaction currently active.
+        # If no transaction is active, call get_transaction().
+        if self._txn is None:
+            self._txn = get_transaction()
+        return self._txn     
+            
     ######################################################################
-    # IAppConnection defines the next two methods
-    # root() and sync()
+    # IAppConnection defines the next three methods
+    # root(), sync(), get()
     
     def root(self):
         return self.get(z64)
 
     def sync(self):
-        get_transaction().abort()
+        if self._txn:
+            self._txn.abort()
         sync = getattr(self._storage, 'sync', None)
         if sync is not None:
             sync()
         self._flush_invalidations()
 
-    def modifiedInVersion(self, oid):
-        try:
-            return self._storage.modifiedInVersion(oid)
-        except KeyError:
-            return self._version
-
     def get(self, oid):
         # assume that a cache cannot store None as a valid object
         object = self._cache.get(oid)
@@ -133,30 +133,30 @@
             return object
 
         p, serial = self._storage.load(oid, self._version)
-        object = self._reader.getGhost(p)
+        obj = self._reader.getGhost(p)
 
-        object._p_oid = oid
-        object._p_jar = self
-        object._p_changed = None # make sure it is a ghost
-        object._p_serial = serial
+        obj._p_oid = oid
+        obj._p_jar = self
+        obj._p_changed = None # make sure it is a ghost
+        obj._p_serial = serial
 
-        self._cache[oid] = object
+        self._cache[oid] = obj
         if oid == z64:
             # Keep a reference to the root so that the pickle cache
             # won't evict it.  XXX Not sure if this is necessary.  If
             # the cache is LRU, it should know best if the root is needed.
-            self._root = object
-        return object
+            self._root = obj
+        return obj
 
     ######################################################################
     # IPersistentDataManager requires the next three methods:
     # setstate(), register(), mtime()
 
-    def setstate(self, object):
+    def setstate(self, obj):
         oid = None
         # XXX Is it possible to reorganize the method-level try/except?
         try:
-            oid = object._p_oid
+            oid = obj._p_oid
 
             # XXX this is quite conservative!
 
@@ -170,19 +170,19 @@
             p, serial = self._storage.load(oid, self._version)
 
             if oid in self._invalidated:
-                if not (hasattr(object, '_p_independent')
-                        and object._p_independent()):
-                    get_transaction().join(self)
-                    raise ConflictError(object=object)
+                if not (hasattr(obj, '_p_independent')
+                        and obj._p_independent()):
+                    self._get_transaction().join(self)
+                    raise ConflictError(object=obj)
                 invalid = 1
             else:
                 invalid = 0
 
-            self._reader.setGhostState(object, p)
-            object._p_serial = serial
+            self._reader.setGhostState(obj, p)
+            obj._p_serial = serial
 
             if invalid:
-                if object._p_independent():
+                if obj._p_independent():
                     self._inv_lock.acquire()
                     try:
                         try:
@@ -192,27 +192,24 @@
                     finally:
                         self._inv_lock.release()
                 else:
-                    get_transaction().join(self)
-                    raise ConflictError(object=object)
+                    self._get_transaction().join(self)
+                    raise ConflictError(object=obj)
 
         except ConflictError:
             raise
         except:
-            self._logger.exception("Couldn't load state for %r", oid)
+            self._log.exception("Couldn't load state for %r", oid)
             raise
         else:
             # Add the object to the cache active list
-            self._cache.setstate(oid, object)
+            self._cache.setstate(oid, obj)
 
-    def register(self, object):
-        txn = get_transaction()
-        L = self._txns.get(txn)
-        if L is None:
-            L = self._txns[txn] = []
-            txn.join(self)
-        L.append(object)
+    def register(self, obj):
+        if not self._registered:
+            self._get_transaction().join(self)
+        self._registered.add(obj)    
 
-    def mtime(self, object):
+    def mtime(self, obj):
         # required by the IPersistentDataManager interface, but unimplemented
         return None
 
@@ -224,6 +221,7 @@
         return self._version
 
     def reset(self, version=""):
+        self._log.info("connection reset")
         if version != self._version:
             # XXX I think it's necessary to clear the cache here, because
             # the objects in the cache don't know that they were in a
@@ -234,7 +232,7 @@
         # when the DB is delivering invalidations.
         self._cache.invalidateMany(self._invalidated)
         self._invalidated.clear()
-        self._opened = time.time()
+        self._open = True
 
     def cacheGC(self):
         self._cache.incrgc()
@@ -254,8 +252,17 @@
             self._inv_lock.release()
 
     def close(self):
+        if self._txn is not None:
+            # What should happen at this point?  It's arguable that
+            # close() should raise an error and force the application
+            # to decide what should become of the transaction.  It's
+            # possible that multiple resources are involved in the
+            # transaction.
+            self._log.warn("connection closed while transaction active")
+            self._txn.abort()
+        self._log.info("connection closed")    
+        self._open = False
         self._cache.incrgc()
-        self._opened = None
         # Return the connection to the pool.
         self._db._closeConnection(self)
 
@@ -272,7 +279,7 @@
         else:
             self._storage.tpcBegin(txn)
 
-        for obj in self._txns.get(txn, ()):
+        for obj in self._registered:
             self._objcommit(obj, txn)
 
         s = self._storage.tpcVote(txn)
@@ -286,14 +293,15 @@
             self._abort_sub()
         self._storage.tpcAbort(txn)
 
-        objs = self._txns.get(txn)
-        if objs is not None:
-            self._cache.invalidateMany([obj._p_oid for obj in objs])
-            del self._txns[txn]
-        self._flush_invalidations()
-        self._cache.invalidateMany(self._modified)
+        if self._registered:
+            self._cache.invalidateMany([obj._p_oid
+                                        for obj in self._registered])
+            self._registered.clear()
         self._invalidate_created(self._created)
-        self._created = Set()
+        self._cache.invalidateMany(self._modified)
+        self._txn = None
+        self._flush_invalidations()
+        self._created.clear()
         self._modified.clear()
 
     def commit(self, txn):
@@ -308,12 +316,11 @@
         # call acquired a lock that will only be released in
         # _invalidate_modified().
         self._storage.tpcFinish(txn, self._invalidate_modified)
-        try:
-            del self._txns[txn]
-        except KeyError:
-            pass
-
+        self._txn = None
         self._flush_invalidations()
+        self._registered.clear()
+        self._created.clear()
+        self._modified.clear()
 
     def savepoint(self, txn):
         if self._tmp is None:
@@ -325,7 +332,7 @@
         self._created = Set()
         self._storage.tpcBegin(txn)
 
-        for obj in self._txns.get(txn, ()):
+        for obj in self._registered:
             self._objcommit(obj, txn)
         self.importHook(txn) # hook for ExportImport
 
@@ -389,7 +396,7 @@
 
     def _objcommit(self, object, transaction):
         oid = object._p_oid
-        self._logger.debug("commit object %s", u64(oid))
+        self._log.debug("commit object %s", u64(oid))
 
         if oid is None or object._p_jar is not self:
             oid = self._storage.newObjectId()
@@ -520,7 +527,6 @@
         self._db = None
 
     def close(self):
-        # XXX Is this necessary?
         self._file.close()
 
     def getName(self):