[Zodb-checkins] SVN: ZODB/branches/tim-simpler_connection/src/ZODB/ Rewrote DB to use new _ConnectionPool class.

Tim Peters tim.one at comcast.net
Mon Oct 25 23:02:46 EDT 2004


Log message for revision 28247:
  Rewrote DB to use new _ConnectionPool class.
  
  Most relevant code is much simpler now; there's no longer
  a hard limit on the # of connections per DB.  All the
  tests pass, but test coverage of the sprawling DB API isn't
  very good.
  

Changed:
  U   ZODB/branches/tim-simpler_connection/src/ZODB/DB.py
  U   ZODB/branches/tim-simpler_connection/src/ZODB/tests/testDB.py

-=-
Modified: ZODB/branches/tim-simpler_connection/src/ZODB/DB.py
===================================================================
--- ZODB/branches/tim-simpler_connection/src/ZODB/DB.py	2004-10-25 23:33:07 UTC (rev 28246)
+++ ZODB/branches/tim-simpler_connection/src/ZODB/DB.py	2004-10-26 03:02:45 UTC (rev 28247)
@@ -50,11 +50,11 @@
     When a connection is explicitly closed, tell the pool via repush().
     That adds the connection to a stack of connections available for
     reuse, and throws away the oldest stack entries if the pool is too large.
-    get() pops this stack.
+    pop() pops this stack.
 
-    If a connection is obtained via get(), the pool holds only a weak
+    When a connection is obtained via pop(), the pool holds only a weak
     reference to it thereafter.  It's not necessary to inform the pool
-    if the connection goes away.  A connection handed out by get() counts
+    if the connection goes away.  A connection handed out by pop() counts
     against pool_size only so long as it exists, and provided it isn't
     repush()'ed.
     """
@@ -94,26 +94,37 @@
             reporter("DB.open() has %s open connections with a pool_size "
                      "of %s", n, limit)
 
-    # Reregister an available connection formerly obtained via get().
+    # Reregister an available connection formerly obtained via pop().
     def repush(self, c):
         assert c in self.all
         assert c not in self.available
         self._reduce_size()
         self.available.append(c)
 
-    # Prior to pushing a connection onto self.available, throw away the
-    # oldest available connections until we're under our target size.
-    # It may not be possible be achieve this.
+    # Throw away the oldest available connections until we're under our
+    # target size.  It may not be possible be achieve this.
     def _reduce_size(self):
         while self.available and len(self.all) >= self.pool_size:
             c = self.available.pop(0)
             self.all.remove(c)
 
-    def get(self):
+    # The number of available connections.
+    def num_available(self):
+        return len(self.available)
+
+    # Pop an available connection and return it.  A caller must ensurue
+    # that num_available() > 0 before calling pop(), and if it's not,
+    # create a connection and register it via push() first.
+    def pop(self):
         # Leave it in self.all, so we can still get at it for statistics
         # while it's alive.
-        return self.available.pop()
+        c = self.available.pop()
+        assert c in self.all
+        return c
 
+    # Return a list of all connections we currently know about.
+    def all_as_list(self):
+        return self.all.as_list()
 
 
 class DB(object):
@@ -178,11 +189,11 @@
           - `storage`: the storage used by the database, e.g. FileStorage
           - `pool_size`: expected maximum number of open connections
           - `cache_size`: target size of Connection object cache
-          - `cache_deactivate_after`: ignored
           - `version_pool_size`: expected maximum number of connections (per
             version)
           - `version_cache_size`: target size of Connection object cache for
             version connections
+          - `cache_deactivate_after`: ignored
           - `version_cache_deactivate_after`: ignored
         """
         # Allocate locks:
@@ -191,8 +202,8 @@
         self._r = l.release
 
         # Setup connection pools and cache info
-        self._pools = {},[]
-        self._temps = []
+        # _pools maps a version string to a _ConnectionPool object.
+        self._pools = {}
         self._pool_size = pool_size
         self._cache_size = cache_size
         self._version_pool_size = version_pool_size
@@ -252,10 +263,10 @@
             am = self._activity_monitor
             if am is not None:
                 am.closedConnection(connection)
+
             version = connection._version
-            pools, pooll = self._pools
             try:
-                pool, allocated, pool_lock = pools[version]
+                pool = self._pools[version]
             except KeyError:
                 # No such version. We must have deleted the pool.
                 # Just let the connection go.
@@ -264,30 +275,18 @@
                 # XXX What objects are involved in the cycle?
                 connection.__dict__.clear()
                 return
+            pool.repush(connection)
 
-            pool.append(connection)
-            if len(pool) == 1:
-                # Pool now usable again, unlock it.
-                pool_lock.release()
         finally:
             self._r()
 
+    # Call f(c) for all connections in all pools in all versions.
     def _connectionMap(self, f):
         self._a()
         try:
-            pools, pooll = self._pools
-            for pool, allocated in pooll:
-                for cc in allocated:
-                    f(cc)
-
-            temps = self._temps
-            if temps:
-                t = []
-                rc = sys.getrefcount
-                for cc in temps:
-                    if rc(cc) > 3:
-                        f(cc)
-                self._temps = t
+            for pool in self._pools.itervalues():
+                for c in pool.all_as_list():
+                    f(c)
         finally:
             self._r()
 
@@ -445,7 +444,6 @@
         if connection is not None:
             version = connection._version
         # Update modified in version cache
-        # XXX must make this work with list or dict to backport to 2.6
         for oid in oids.keys():
             h = hash(oid) % 131
             o = self._miv_cache.get(h, None)
@@ -453,25 +451,12 @@
                 del self._miv_cache[h]
 
         # Notify connections
-        for pool, allocated in self._pools[1]:
-            for cc in allocated:
+        for pool in self._pools.values():
+            for cc in pool.all_as_list():
                 if (cc is not connection and
-                    (not version or cc._version==version)):
-                    if sys.getrefcount(cc) <= 3:
-                        cc.close()
+                      (not version or cc._version == version)):
                     cc.invalidate(tid, oids)
 
-        if self._temps:
-            t = []
-            for cc in self._temps:
-                if sys.getrefcount(cc) > 3:
-                    if (cc is not connection and
-                        (not version or cc._version == version)):
-                        cc.invalidate(tid, oids)
-                    t.append(cc)
-                else:
-                    cc.close()
-            self._temps = t
 
     def modifiedInVersion(self, oid):
         h = hash(oid) % 131
@@ -495,11 +480,6 @@
         The optional `version` argument can be used to specify that a
         version connection is desired.
 
-        The optional transaction argument can be provided to cause the
-        connection to be automatically closed when a transaction is
-        terminated.  In addition, connections per transaction are
-        reused, if possible.
-
         Note that the connection pool is managed as a stack, to
         increase the likelihood that the connection's stack will
         include useful objects.
@@ -532,143 +512,67 @@
 
         self._a()
         try:
-            pools, pooll = self._pools
+            # pool <- the _ConnectionPool for this version
+            pool = self._pools.get(version)
+            if pool is None:
+                if version:
+                    size = self._version_pool_size
+                else:
+                    size = self._pool_size
+                self._pools[version] = pool = _ConnectionPool(size)
 
-            # pools is a mapping object:
-            #
-            #   {version -> (pool, allocated, lock)
-            #
-            # where:
-            #
-            #   pool is the connection pool for the version,
-            #   allocated is a list of all of the allocated
-            #     connections, and
-            #   lock is a lock that is used to block when a pool is
-            #     empty and no more connections can be allocated.
-            #
-            # pooll is a list of all of the pools and allocated for
-            # use in cases where we need to iterate over all
-            # connections or all inactive connections.
-
-            # Pool locks are tricky.  Basically, the lock needs to be
-            # set whenever the pool becomes empty so that threads are
-            # forced to wait until the pool gets a connection in it.
-            # The lock is acquired when the (empty) pool is
-            # created.  The lock is acquired just prior to removing
-            # the last connection from the pool and released just after
-            # adding a connection to an empty pool.
-
-
-            if pools.has_key(version):
-                pool, allocated, pool_lock = pools[version]
-            else:
-                pool, allocated, pool_lock = pools[version] = (
-                    [], [], allocate_lock())
-                pooll.append((pool, allocated))
-                pool_lock.acquire()
-
-
-            if not pool:
-                c = None
+            # result <- a connection
+            if pool.num_available() == 0:
                 if version:
-                    if self._version_pool_size > len(allocated):
-                        c = self.klass(version=version,
-                                       cache_size=self._version_cache_size,
-                                       mvcc=mvcc, txn_mgr=txn_mgr)
-                        allocated.append(c)
-                        pool.append(c)
-                elif self._pool_size > len(allocated):
-                    c = self.klass(version=version,
-                                   cache_size=self._cache_size,
-                                   mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
-                    allocated.append(c)
-                    pool.append(c)
+                    cache = self._version_cache_size
+                else:
+                    cache = self._cache_size
+                c = self.klass(version=version, cache_size=cache,
+                               mvcc=mvcc, txn_mgr=txn_mgr)
+                pool.push(c)
+            result = pool.pop()
 
-                if c is None:
-                    self._r()
-                    pool_lock.acquire()
-                    self._a()
-                    if len(pool) > 1:
-                        # Note that the pool size will normally be 1 here,
-                        # but it could be higher due to a race condition.
-                        pool_lock.release()
+            # Tell the connection it belongs to self.
+            result._setDB(self, mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
 
-            elif len(pool)==1:
-                # Taking last one, lock the pool.
-                # Note that another thread might grab the lock
-                # before us, so we might actually block, however,
-                # when we get the lock back, there *will* be a
-                # connection in the pool.  OTOH, there's no limit on
-                # how long we may need to wait:  if the other thread
-                # grabbed the lock in this section too, we'll wait
-                # here until another connection is closed.
-                # checkConcurrentUpdates1Storage provoked this frequently
-                # on a hyperthreaded machine, with its second thread
-                # timing out after waiting 5 minutes for DB.open() to
-                # return.  So, if we can't get the pool lock immediately,
-                # now we make a recursive call.  This allows the current
-                # thread to allocate a new connection instead of waiting
-                # arbitrarily long for the single connection in the pool
-                # right now.
-                self._r()
-                if not pool_lock.acquire(0):
-                    result = DB.open(self, version, transaction, temporary,
-                                     force, waitflag)
-                    self._a()
-                    return result
-                self._a()
-                if len(pool) > 1:
-                    # Note that the pool size will normally be 1 here,
-                    # but it could be higher due to a race condition.
-                    pool_lock.release()
+            # A good time to do some cache cleanup.
+            for pool in self._pools.itervalues():
+                for c in pool.all_as_list():
+                    c.cacheGC()
 
-            c = pool.pop()
-            c._setDB(self, mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
-            for pool, allocated in pooll:
-                for cc in pool:
-                    cc.cacheGC()
+            return result
 
-            return c
-
         finally:
             self._r()
 
     def removeVersionPool(self, version):
-        pools, pooll = self._pools
-        info = pools.get(version)
-        if info:
-            del pools[version]
-            pool, allocated, pool_lock = info
-            pooll.remove((pool, allocated))
-            try:
-                pool_lock.release()
-            except: # XXX Do we actually expect this to fail?
-                pass
-            del pool[:]
-            del allocated[:]
+        try:
+            del self._pools[version]
+        except KeyError:
+            pass
 
     def connectionDebugInfo(self):
-        r = []
-        pools, pooll = self._pools
+        result = []
         t = time()
-        for version, (pool, allocated, lock) in pools.items():
-            for c in allocated:
+        for version, pool in self._pools.items():
+            for c in pool.all_as_list():
                 o = c._opened
                 d = c._debug_info
                 if d:
-                    if len(d)==1:
+                    if len(d) == 1:
                         d = d[0]
                 else:
-                    d=''
+                    d = ''
                 d = "%s (%s)" % (d, len(c._cache))
 
-                r.append({
+                result.append({
                     'opened': o and ("%s (%.2fs)" % (ctime(o), t-o)),
                     'info': d,
                     'version': version,
                     })
-        return r
 
+        return result
+
     def getActivityMonitor(self):
         return self._activity_monitor
 
@@ -696,34 +600,45 @@
             logger.error("packing", exc_info=True)
             raise
 
-    def setCacheSize(self, v):
-        self._cache_size = v
-        d = self._pools[0]
-        pool_info = d.get('')
-        if pool_info is not None:
-            for c in pool_info[1]:
-                c._cache.cache_size = v
+    def setActivityMonitor(self, am):
+        self._activity_monitor = am
 
     def classFactory(self, connection, modulename, globalname):
         # Zope will rebind this method to arbitrary user code at runtime.
         return find_global(modulename, globalname)
 
-    def setPoolSize(self, v):
-        self._pool_size = v
+    def setCacheSize(self, v):
+        self._cache_size = v
+        pool = self._pools.get('')
+        if pool is not None:
+            for c in pool.all_as_list():
+                c._cache.cache_size = v
 
-    def setActivityMonitor(self, am):
-        self._activity_monitor = am
-
     def setVersionCacheSize(self, v):
         self._version_cache_size = v
-        for ver in self._pools[0].keys():
-            if ver:
-                for c in self._pools[0][ver][1]:
+        for version, pool in self._pools.items():
+            if version:
+                for c in pool.all_as_list():
                     c._cache.cache_size = v
 
-    def setVersionPoolSize(self, v):
-        self._version_pool_size=v
+    def setPoolSize(self, size):
+        self._pool_size = size
+        self._reset_pool_sizes(size, for_versions=False)
 
+    def setVersionPoolSize(self, size):
+        self._version_pool_size = size
+        self._reset_pool_sizes(size, for_versions=True)
+
+    def _reset_pool_sizes(self, size, for_versions=False):
+        self._a()
+        try:
+            for version, pool in self._pools.items():
+                if (version != '') == for_versions:
+                    pool.set_pool_size(size)
+
+        finally:
+            self._r()
+
     def undo(self, id, txn=None):
         """Undo a transaction identified by id.
 

Modified: ZODB/branches/tim-simpler_connection/src/ZODB/tests/testDB.py
===================================================================
--- ZODB/branches/tim-simpler_connection/src/ZODB/tests/testDB.py	2004-10-25 23:33:07 UTC (rev 28246)
+++ ZODB/branches/tim-simpler_connection/src/ZODB/tests/testDB.py	2004-10-26 03:02:45 UTC (rev 28247)
@@ -23,6 +23,10 @@
 
 from ZODB.tests.MinPO import MinPO
 
+# Return total number of connections across all pools in a db._pools.
+def nconn(pools):
+    return sum([len(pool.all) for pool in pools.values()])
+
 class DBTests(unittest.TestCase):
 
     def setUp(self):
@@ -75,22 +79,22 @@
         c12.close() # return to pool
         self.assert_(c1 is c12) # should be same
 
-        pools, pooll = self.db._pools
+        pools = self.db._pools
 
         self.assertEqual(len(pools), 3)
-        self.assertEqual(len(pooll), 3)
+        self.assertEqual(nconn(pools), 3)
 
         self.db.removeVersionPool('v1')
 
         self.assertEqual(len(pools), 2)
-        self.assertEqual(len(pooll), 2)
+        self.assertEqual(nconn(pools), 2)
 
         c12 = self.db.open('v1')
         c12.close() # return to pool
         self.assert_(c1 is not c12) # should be different
 
         self.assertEqual(len(pools), 3)
-        self.assertEqual(len(pooll), 3)
+        self.assertEqual(nconn(pools), 3)
 
     def _test_for_leak(self):
         self.dowork()
@@ -112,27 +116,27 @@
         c12 = self.db.open('v1')
         self.assert_(c1 is c12) # should be same
 
-        pools, pooll = self.db._pools
+        pools = self.db._pools
 
         self.assertEqual(len(pools), 3)
-        self.assertEqual(len(pooll), 3)
+        self.assertEqual(nconn(pools), 3)
 
         self.db.removeVersionPool('v1')
 
         self.assertEqual(len(pools), 2)
-        self.assertEqual(len(pooll), 2)
+        self.assertEqual(nconn(pools), 2)
 
         c12.close() # should leave pools alone
 
         self.assertEqual(len(pools), 2)
-        self.assertEqual(len(pooll), 2)
+        self.assertEqual(nconn(pools), 2)
 
         c12 = self.db.open('v1')
         c12.close() # return to pool
         self.assert_(c1 is not c12) # should be different
 
         self.assertEqual(len(pools), 3)
-        self.assertEqual(len(pooll), 3)
+        self.assertEqual(nconn(pools), 3)
 
 
 def test_suite():



More information about the Zodb-checkins mailing list