[Zope3-checkins] CVS: Zope3/src/zodb - db.py:1.3

Jeremy Hylton jeremy@zope.com
Tue, 21 Jan 2003 13:20:27 -0500


Update of /cvs-repository/Zope3/src/zodb
In directory cvs.zope.org:/tmp/cvs-serv9061/zodb

Modified Files:
	db.py 
Log Message:
Repair pool and invalidation logic in database.

Make sure invalidations are sent to all connections, regardless of
whether they are opened.  Fix logic of connection pools so that
_allocated contains all connections and _pool contains the ones
currently available.

Add tests cases to verify fixes.


=== Zope3/src/zodb/db.py 1.2 => 1.3 ===
--- Zope3/src/zodb/db.py:1.2	Wed Dec 25 09:12:16 2002
+++ Zope3/src/zodb/db.py	Tue Jan 21 13:19:55 2003
@@ -40,31 +40,33 @@
     or more connections, which manage object spaces.  Most of the actual work
     of managing objects is done by the connections.
     """
-    def __init__(self, storage,
-                 pool_size=7,
-                 cache_size=400,
-                 ):
+    def __init__(self, storage, pool_size=7, cache_size=400):
         """Create an object database.
 
         The storage for the object database must be passed in.
         Optional arguments are:
 
         pool_size -- The size of the pool of object spaces.
-
         """
 
+        self.log = logging.getLogger("zodb")
+
         # Allocate locks:
         l=Lock()
         self._a=l.acquire
         self._r=l.release
 
         # Setup connection pools and cache info
+        # _pool is currently available (closed) connections
+        # _allocated is all connections, open and closed
+        # _temps is temporary connections
         self._pool = []
         self._allocated = []
+        self._temps = []
         self._pool_lock = Lock()
         self._pool_lock.acquire()
-        self._temps = []
         self._pool_size = pool_size
+        
         self._cache_size = cache_size
 
         # Setup storage
@@ -89,13 +91,12 @@
         """Return a connection to the pool"""
         self._a()
         try:
-            version = connection._version
-            self._allocated.remove(connection)
             self._pool.append(connection)
             if len(self._pool) == 1:
                 # Pool now usable again, unlock it.
                 self._pool_lock.release()
-        finally: self._r()
+        finally:
+            self._r()
 
     def _connectionMap(self, f):
         self._a()
@@ -108,7 +109,8 @@
                     if sys.getrefcount(cc) > 3:
                         f(cc)
                 self._temps = []
-        finally: self._r()
+        finally:
+            self._r()
 
     def abortVersion(self, version):
         AbortVersion(self, version)
@@ -243,28 +245,39 @@
             # a connection to an empty pool.
 
             if not self._pool:
-                c = None
-                if self._pool_size > len(self._pool) or force:
+                if self._pool_size > len(self._allocated) or force:
+                    # If the number allocated is less than the pool
+                    # size, then we've never reached the limit.
+                    # Allocate a connection and return without
+                    # touching the lock.
                     c = Connection(self, version, cache_size=self._cache_size)
-                    self._pool.append(c)
-
-                if c is None:
+                    self._allocated.append(c)
+                    return c
+                else:
+                    # If the number allocated is larger than the pool
+                    # size, then we have to wait for another thread to
+                    # close its connection.
                     if waitflag:
+                        self.log.debug("waiting for pool lock")
                         self._r()
                         self._pool_lock.acquire()
                         self._a()
+                        self.log.debug("acquired pool lock")
                         if len(self._pool) > 1:
                             # Note that the pool size will normally be 1 here,
                             # but it could be higher due to a race condition.
                             self._pool_lock.release()
                     else:
+                        self.log.debug("open failed because pool is empty")
                         return
             elif len(self._pool) == 1:
                 # Taking last one, lock the pool
-                # Note that another thread might grab the lock
-                # before us, so we might actually block, however,
-                # when we get the lock back, there *will* be a
-                # connection in the pool.
+                
+                # Note that another thread might grab the lock before
+                # us, so we might actually block, however, when we get
+                # the lock back, there *will* be a connection in the
+                # pool.
+                
                 self._r()
                 self._pool_lock.acquire()
                 self._a()
@@ -276,7 +289,6 @@
             # XXX Could look for a connection with the right version
             c = self._pool.pop()
             c.reset(version)
-            self._allocated.append(c)
             for other_conn in self._pool:
                 other_conn.cacheGC()
 
@@ -293,7 +305,7 @@
         try:
             self._storage.pack(t)
         except:
-            logging.getLogger("zodb").exception("packing")
+            self.log.exception("packing")
             raise
 
     def setCacheSize(self, v):
@@ -322,7 +334,7 @@
             self._prepare(txn)
             self._storage.tpc_vote(txn)
         except StorageError, err:
-            logging.getLogger("DB").info("Error during prepare: %s", err)
+            logging.getLogger("zodb").info("Error during prepare: %s", err)
             return False
         else:
             return True