[Zodb-checkins] SVN: ZODB/branches/tim-simpler_connection/src/ZODB/
Rewrote DB to use new _ConnectionPool class.
Tim Peters
tim.one at comcast.net
Mon Oct 25 23:02:46 EDT 2004
Log message for revision 28247:
Rewrote DB to use new _ConnectionPool class.
Most relevant code is much simpler now; there's no longer
a hard limit on the # of connections per DB. All the
tests pass, but test coverage of the sprawling DB API isn't
very good.
Changed:
U ZODB/branches/tim-simpler_connection/src/ZODB/DB.py
U ZODB/branches/tim-simpler_connection/src/ZODB/tests/testDB.py
-=-
Modified: ZODB/branches/tim-simpler_connection/src/ZODB/DB.py
===================================================================
--- ZODB/branches/tim-simpler_connection/src/ZODB/DB.py 2004-10-25 23:33:07 UTC (rev 28246)
+++ ZODB/branches/tim-simpler_connection/src/ZODB/DB.py 2004-10-26 03:02:45 UTC (rev 28247)
@@ -50,11 +50,11 @@
When a connection is explicitly closed, tell the pool via repush().
That adds the connection to a stack of connections available for
reuse, and throws away the oldest stack entries if the pool is too large.
- get() pops this stack.
+ pop() pops this stack.
- If a connection is obtained via get(), the pool holds only a weak
+ When a connection is obtained via pop(), the pool holds only a weak
reference to it thereafter. It's not necessary to inform the pool
- if the connection goes away. A connection handed out by get() counts
+ if the connection goes away. A connection handed out by pop() counts
against pool_size only so long as it exists, and provided it isn't
repush()'ed.
"""
@@ -94,26 +94,37 @@
reporter("DB.open() has %s open connections with a pool_size "
"of %s", n, limit)
- # Reregister an available connection formerly obtained via get().
+ # Reregister an available connection formerly obtained via pop().
def repush(self, c):
assert c in self.all
assert c not in self.available
self._reduce_size()
self.available.append(c)
- # Prior to pushing a connection onto self.available, throw away the
- # oldest available connections until we're under our target size.
- # It may not be possible be achieve this.
+ # Throw away the oldest available connections until we're under our
+ # target size. It may not be possible be achieve this.
def _reduce_size(self):
while self.available and len(self.all) >= self.pool_size:
c = self.available.pop(0)
self.all.remove(c)
- def get(self):
+ # The number of available connections.
+ def num_available(self):
+ return len(self.available)
+
+ # Pop an available connection and return it. A caller must ensurue
+ # that num_available() > 0 before calling pop(), and if it's not,
+ # create a connection and register it via push() first.
+ def pop(self):
# Leave it in self.all, so we can still get at it for statistics
# while it's alive.
- return self.available.pop()
+ c = self.available.pop()
+ assert c in self.all
+ return c
+ # Return a list of all connections we currently know about.
+ def all_as_list(self):
+ return self.all.as_list()
class DB(object):
@@ -178,11 +189,11 @@
- `storage`: the storage used by the database, e.g. FileStorage
- `pool_size`: expected maximum number of open connections
- `cache_size`: target size of Connection object cache
- - `cache_deactivate_after`: ignored
- `version_pool_size`: expected maximum number of connections (per
version)
- `version_cache_size`: target size of Connection object cache for
version connections
+ - `cache_deactivate_after`: ignored
- `version_cache_deactivate_after`: ignored
"""
# Allocate locks:
@@ -191,8 +202,8 @@
self._r = l.release
# Setup connection pools and cache info
- self._pools = {},[]
- self._temps = []
+ # _pools maps a version string to a _ConnectionPool object.
+ self._pools = {}
self._pool_size = pool_size
self._cache_size = cache_size
self._version_pool_size = version_pool_size
@@ -252,10 +263,10 @@
am = self._activity_monitor
if am is not None:
am.closedConnection(connection)
+
version = connection._version
- pools, pooll = self._pools
try:
- pool, allocated, pool_lock = pools[version]
+ pool = self._pools[version]
except KeyError:
# No such version. We must have deleted the pool.
# Just let the connection go.
@@ -264,30 +275,18 @@
# XXX What objects are involved in the cycle?
connection.__dict__.clear()
return
+ pool.repush(connection)
- pool.append(connection)
- if len(pool) == 1:
- # Pool now usable again, unlock it.
- pool_lock.release()
finally:
self._r()
+ # Call f(c) for all connections in all pools in all versions.
def _connectionMap(self, f):
self._a()
try:
- pools, pooll = self._pools
- for pool, allocated in pooll:
- for cc in allocated:
- f(cc)
-
- temps = self._temps
- if temps:
- t = []
- rc = sys.getrefcount
- for cc in temps:
- if rc(cc) > 3:
- f(cc)
- self._temps = t
+ for pool in self._pools.itervalues():
+ for c in pool.all_as_list():
+ f(c)
finally:
self._r()
@@ -445,7 +444,6 @@
if connection is not None:
version = connection._version
# Update modified in version cache
- # XXX must make this work with list or dict to backport to 2.6
for oid in oids.keys():
h = hash(oid) % 131
o = self._miv_cache.get(h, None)
@@ -453,25 +451,12 @@
del self._miv_cache[h]
# Notify connections
- for pool, allocated in self._pools[1]:
- for cc in allocated:
+ for pool in self._pools.values():
+ for cc in pool.all_as_list():
if (cc is not connection and
- (not version or cc._version==version)):
- if sys.getrefcount(cc) <= 3:
- cc.close()
+ (not version or cc._version == version)):
cc.invalidate(tid, oids)
- if self._temps:
- t = []
- for cc in self._temps:
- if sys.getrefcount(cc) > 3:
- if (cc is not connection and
- (not version or cc._version == version)):
- cc.invalidate(tid, oids)
- t.append(cc)
- else:
- cc.close()
- self._temps = t
def modifiedInVersion(self, oid):
h = hash(oid) % 131
@@ -495,11 +480,6 @@
The optional `version` argument can be used to specify that a
version connection is desired.
- The optional transaction argument can be provided to cause the
- connection to be automatically closed when a transaction is
- terminated. In addition, connections per transaction are
- reused, if possible.
-
Note that the connection pool is managed as a stack, to
increase the likelihood that the connection's stack will
include useful objects.
@@ -532,143 +512,67 @@
self._a()
try:
- pools, pooll = self._pools
+ # pool <- the _ConnectionPool for this version
+ pool = self._pools.get(version)
+ if pool is None:
+ if version:
+ size = self._version_pool_size
+ else:
+ size = self._pool_size
+ self._pools[version] = pool = _ConnectionPool(size)
- # pools is a mapping object:
- #
- # {version -> (pool, allocated, lock)
- #
- # where:
- #
- # pool is the connection pool for the version,
- # allocated is a list of all of the allocated
- # connections, and
- # lock is a lock that is used to block when a pool is
- # empty and no more connections can be allocated.
- #
- # pooll is a list of all of the pools and allocated for
- # use in cases where we need to iterate over all
- # connections or all inactive connections.
-
- # Pool locks are tricky. Basically, the lock needs to be
- # set whenever the pool becomes empty so that threads are
- # forced to wait until the pool gets a connection in it.
- # The lock is acquired when the (empty) pool is
- # created. The lock is acquired just prior to removing
- # the last connection from the pool and released just after
- # adding a connection to an empty pool.
-
-
- if pools.has_key(version):
- pool, allocated, pool_lock = pools[version]
- else:
- pool, allocated, pool_lock = pools[version] = (
- [], [], allocate_lock())
- pooll.append((pool, allocated))
- pool_lock.acquire()
-
-
- if not pool:
- c = None
+ # result <- a connection
+ if pool.num_available() == 0:
if version:
- if self._version_pool_size > len(allocated):
- c = self.klass(version=version,
- cache_size=self._version_cache_size,
- mvcc=mvcc, txn_mgr=txn_mgr)
- allocated.append(c)
- pool.append(c)
- elif self._pool_size > len(allocated):
- c = self.klass(version=version,
- cache_size=self._cache_size,
- mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
- allocated.append(c)
- pool.append(c)
+ cache = self._version_cache_size
+ else:
+ cache = self._cache_size
+ c = self.klass(version=version, cache_size=cache,
+ mvcc=mvcc, txn_mgr=txn_mgr)
+ pool.push(c)
+ result = pool.pop()
- if c is None:
- self._r()
- pool_lock.acquire()
- self._a()
- if len(pool) > 1:
- # Note that the pool size will normally be 1 here,
- # but it could be higher due to a race condition.
- pool_lock.release()
+ # Tell the connection it belongs to self.
+ result._setDB(self, mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
- elif len(pool)==1:
- # Taking last one, lock the pool.
- # Note that another thread might grab the lock
- # before us, so we might actually block, however,
- # when we get the lock back, there *will* be a
- # connection in the pool. OTOH, there's no limit on
- # how long we may need to wait: if the other thread
- # grabbed the lock in this section too, we'll wait
- # here until another connection is closed.
- # checkConcurrentUpdates1Storage provoked this frequently
- # on a hyperthreaded machine, with its second thread
- # timing out after waiting 5 minutes for DB.open() to
- # return. So, if we can't get the pool lock immediately,
- # now we make a recursive call. This allows the current
- # thread to allocate a new connection instead of waiting
- # arbitrarily long for the single connection in the pool
- # right now.
- self._r()
- if not pool_lock.acquire(0):
- result = DB.open(self, version, transaction, temporary,
- force, waitflag)
- self._a()
- return result
- self._a()
- if len(pool) > 1:
- # Note that the pool size will normally be 1 here,
- # but it could be higher due to a race condition.
- pool_lock.release()
+ # A good time to do some cache cleanup.
+ for pool in self._pools.itervalues():
+ for c in pool.all_as_list():
+ c.cacheGC()
- c = pool.pop()
- c._setDB(self, mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
- for pool, allocated in pooll:
- for cc in pool:
- cc.cacheGC()
+ return result
- return c
-
finally:
self._r()
def removeVersionPool(self, version):
- pools, pooll = self._pools
- info = pools.get(version)
- if info:
- del pools[version]
- pool, allocated, pool_lock = info
- pooll.remove((pool, allocated))
- try:
- pool_lock.release()
- except: # XXX Do we actually expect this to fail?
- pass
- del pool[:]
- del allocated[:]
+ try:
+ del self._pools[version]
+ except KeyError:
+ pass
def connectionDebugInfo(self):
- r = []
- pools, pooll = self._pools
+ result = []
t = time()
- for version, (pool, allocated, lock) in pools.items():
- for c in allocated:
+ for version, pool in self._pools.items():
+ for c in pool.all_as_list():
o = c._opened
d = c._debug_info
if d:
- if len(d)==1:
+ if len(d) == 1:
d = d[0]
else:
- d=''
+ d = ''
d = "%s (%s)" % (d, len(c._cache))
- r.append({
+ result.append({
'opened': o and ("%s (%.2fs)" % (ctime(o), t-o)),
'info': d,
'version': version,
})
- return r
+ return result
+
def getActivityMonitor(self):
return self._activity_monitor
@@ -696,34 +600,45 @@
logger.error("packing", exc_info=True)
raise
- def setCacheSize(self, v):
- self._cache_size = v
- d = self._pools[0]
- pool_info = d.get('')
- if pool_info is not None:
- for c in pool_info[1]:
- c._cache.cache_size = v
+ def setActivityMonitor(self, am):
+ self._activity_monitor = am
def classFactory(self, connection, modulename, globalname):
# Zope will rebind this method to arbitrary user code at runtime.
return find_global(modulename, globalname)
- def setPoolSize(self, v):
- self._pool_size = v
+ def setCacheSize(self, v):
+ self._cache_size = v
+ pool = self._pools.get('')
+ if pool is not None:
+ for c in pool.all_as_list():
+ c._cache.cache_size = v
- def setActivityMonitor(self, am):
- self._activity_monitor = am
-
def setVersionCacheSize(self, v):
self._version_cache_size = v
- for ver in self._pools[0].keys():
- if ver:
- for c in self._pools[0][ver][1]:
+ for version, pool in self._pools.items():
+ if version:
+ for c in pool.all_as_list():
c._cache.cache_size = v
- def setVersionPoolSize(self, v):
- self._version_pool_size=v
+ def setPoolSize(self, size):
+ self._pool_size = size
+ self._reset_pool_sizes(size, for_versions=False)
+ def setVersionPoolSize(self, size):
+ self._version_pool_size = size
+ self._reset_pool_sizes(size, for_versions=True)
+
+ def _reset_pool_sizes(self, size, for_versions=False):
+ self._a()
+ try:
+ for version, pool in self._pools.items():
+ if (version != '') == for_versions:
+ pool.set_pool_size(size)
+
+ finally:
+ self._r()
+
def undo(self, id, txn=None):
"""Undo a transaction identified by id.
Modified: ZODB/branches/tim-simpler_connection/src/ZODB/tests/testDB.py
===================================================================
--- ZODB/branches/tim-simpler_connection/src/ZODB/tests/testDB.py 2004-10-25 23:33:07 UTC (rev 28246)
+++ ZODB/branches/tim-simpler_connection/src/ZODB/tests/testDB.py 2004-10-26 03:02:45 UTC (rev 28247)
@@ -23,6 +23,10 @@
from ZODB.tests.MinPO import MinPO
+# Return total number of connections across all pools in a db._pools.
+def nconn(pools):
+ return sum([len(pool.all) for pool in pools.values()])
+
class DBTests(unittest.TestCase):
def setUp(self):
@@ -75,22 +79,22 @@
c12.close() # return to pool
self.assert_(c1 is c12) # should be same
- pools, pooll = self.db._pools
+ pools = self.db._pools
self.assertEqual(len(pools), 3)
- self.assertEqual(len(pooll), 3)
+ self.assertEqual(nconn(pools), 3)
self.db.removeVersionPool('v1')
self.assertEqual(len(pools), 2)
- self.assertEqual(len(pooll), 2)
+ self.assertEqual(nconn(pools), 2)
c12 = self.db.open('v1')
c12.close() # return to pool
self.assert_(c1 is not c12) # should be different
self.assertEqual(len(pools), 3)
- self.assertEqual(len(pooll), 3)
+ self.assertEqual(nconn(pools), 3)
def _test_for_leak(self):
self.dowork()
@@ -112,27 +116,27 @@
c12 = self.db.open('v1')
self.assert_(c1 is c12) # should be same
- pools, pooll = self.db._pools
+ pools = self.db._pools
self.assertEqual(len(pools), 3)
- self.assertEqual(len(pooll), 3)
+ self.assertEqual(nconn(pools), 3)
self.db.removeVersionPool('v1')
self.assertEqual(len(pools), 2)
- self.assertEqual(len(pooll), 2)
+ self.assertEqual(nconn(pools), 2)
c12.close() # should leave pools alone
self.assertEqual(len(pools), 2)
- self.assertEqual(len(pooll), 2)
+ self.assertEqual(nconn(pools), 2)
c12 = self.db.open('v1')
c12.close() # return to pool
self.assert_(c1 is not c12) # should be different
self.assertEqual(len(pools), 3)
- self.assertEqual(len(pooll), 3)
+ self.assertEqual(nconn(pools), 3)
def test_suite():
More information about the Zodb-checkins
mailing list