[Zodb-checkins] SVN: ZODB/trunk/src/ZODB/ Refactored the connection pool implementation. (I have a feeling that
Jim Fulton
jim at zope.com
Thu Oct 30 17:03:45 EDT 2008
Log message for revision 92720:
Refactored the connection pool implementation. (I have a feeling that
it could be made simpler still.)
Changed:
U ZODB/trunk/src/ZODB/DB.py
U ZODB/trunk/src/ZODB/historical_connections.txt
U ZODB/trunk/src/ZODB/tests/dbopen.txt
U ZODB/trunk/src/ZODB/tests/testhistoricalconnections.py
-=-
Modified: ZODB/trunk/src/ZODB/DB.py
===================================================================
--- ZODB/trunk/src/ZODB/DB.py 2008-10-30 18:08:21 UTC (rev 92719)
+++ ZODB/trunk/src/ZODB/DB.py 2008-10-30 21:03:41 UTC (rev 92720)
@@ -103,22 +103,21 @@
def getTimeout(self):
return self._timeout
- timeout = property(getTimeout, setTimeout)
+ timeout = property(getTimeout, lambda self, v: self.setTimeout(v))
- size = property(getSize, setSize)
+ size = property(getSize, lambda self, v: self.setSize(v))
class ConnectionPool(AbstractConnectionPool):
- def __init__(self, size, timeout=None):
+ def __init__(self, size, timeout=time()):
super(ConnectionPool, self).__init__(size, timeout)
# A stack of connections available to hand out. This is a subset
# of self.all. push() and repush() add to this, and may remove
# the oldest available connections if the pool is too large.
# pop() pops this stack. There are never more than size entries
- # in this stack. The keys are time.time() values of the push or
- # repush calls.
- self.available = BTrees.OOBTree.Bucket()
+ # in this stack.
+ self.available = []
def push(self, c):
"""Register a new available connection.
@@ -127,10 +126,10 @@
stack even if we're over the pool size limit.
"""
assert c not in self.all
- assert c not in self.available.values()
+ assert c not in self.available
self._reduce_size(strictly_less=True)
self.all.add(c)
- self.available[time()] = c
+ self.available.append((time(), c))
n = len(self.all)
limit = self.size
if n > limit:
@@ -147,44 +146,43 @@
older available connections.
"""
assert c in self.all
- assert c not in self.available.values()
+ assert c not in self.available
self._reduce_size(strictly_less=True)
- self.available[time()] = c
+ self.available.append((time(), c))
def _reduce_size(self, strictly_less=False):
"""Throw away the oldest available connections until we're under our
target size (strictly_less=False, the default) or no more than that
(strictly_less=True).
"""
- if self.timeout is None:
- threshhold = None
- else:
- threshhold = time() - self.timeout
+ threshhold = time() - self.timeout
target = self.size
if strictly_less:
target -= 1
- for t, c in list(self.available.items()):
- if (len(self.available) > target or
- threshhold is not None and t < threshhold):
- del self.available[t]
- self.all.remove(c)
- # While application code may still hold a reference to `c`,
- # there's little useful that can be done with this Connection
- # anymore. Its cache may be holding on to limited resources,
- # and we replace the cache with an empty one now so that we
- # don't have to wait for gc to reclaim it. Note that it's not
- # possible for DB.open() to return `c` again: `c` can never be
- # in an open state again.
- # TODO: Perhaps it would be better to break the reference
- # cycles between `c` and `c._cache`, so that refcounting
- # reclaims both right now. But if user code _does_ have a
- # strong reference to `c` now, breaking the cycle would not
- # reclaim `c` now, and `c` would be left in a user-visible
- # crazy state.
- c._resetCache()
- else:
- break
+ available = self.available
+ while (
+ (len(available) > target)
+ or
+ (available and available[0][0] < threshhold)
+ ):
+ t, c = available.pop(0)
+ self.all.remove(c)
+ # While application code may still hold a reference to `c`,
+ # there's little useful that can be done with this Connection
+ # anymore. Its cache may be holding on to limited resources,
+ # and we replace the cache with an empty one now so that we
+ # don't have to wait for gc to reclaim it. Note that it's not
+ # possible for DB.open() to return `c` again: `c` can never be
+ # in an open state again.
+ # TODO: Perhaps it would be better to break the reference
+ # cycles between `c` and `c._cache`, so that refcounting
+ # reclaims both right now. But if user code _does_ have a
+ # strong reference to `c` now, breaking the cycle would not
+ # reclaim `c` now, and `c` would be left in a user-visible
+ # crazy state.
+ c._resetCache()
+
def reduce_size(self):
self._reduce_size()
@@ -197,7 +195,7 @@
"""
result = None
if self.available:
- result = self.available.pop(self.available.maxKey())
+ _, result = self.available.pop()
# Leave it in self.all, so we can still get at it for statistics
# while it's alive.
assert result in self.all
@@ -212,19 +210,15 @@
If a connection is no longer viable because it has timed out, it is
garbage collected."""
- if self.timeout is None:
- threshhold = None
- else:
- threshhold = time() - self.timeout
- for t, c in tuple(self.available.items()):
- if threshhold is not None and t < threshhold:
+ threshhold = time() - self.timeout
+ for t, c in list(self.available):
+ if t < threshhold:
del self.available[t]
self.all.remove(c)
c._resetCache()
else:
c.cacheGC()
-
class KeyedConnectionPool(AbstractConnectionPool):
# this pool keeps track of keyed connections all together. It makes
# it possible to make assertions about total numbers of keyed connections.
@@ -233,100 +227,69 @@
# see the comments in ConnectionPool for method descriptions.
- def __init__(self, size, timeout=None):
+ def __init__(self, size, timeout=time()):
super(KeyedConnectionPool, self).__init__(size, timeout)
- # key: {time.time: connection}
- self.available = BTrees.family32.OO.Bucket()
- # time.time: key
- self.closed = BTrees.family32.OO.Bucket()
+ self.pools = {}
+ def setSize(self, v):
+ self._size = v
+ for pool in self.pools.values():
+ pool.setSize(v)
+
+ def setTimeout(self, v):
+ self._timeout = v
+ for pool in self.pools.values():
+ pool.setTimeout(v)
+
def push(self, c, key):
- assert c not in self.all
- available = self.available.get(key)
- if available is None:
- available = self.available[key] = BTrees.family32.OO.Bucket()
- else:
- assert c not in available.values()
- self._reduce_size(strictly_less=True)
- self.all.add(c)
- t = time()
- available[t] = c
- self.closed[t] = key
- n = len(self.all)
- limit = self.size
- if n > limit:
- reporter = logger.warn
- if n > 2 * limit:
- reporter = logger.critical
- reporter("DB.open() has %s open connections with a size "
- "of %s", n, limit)
+ pool = self.pools.get(key)
+ if pool is None:
+ pool = self.pools[key] = ConnectionPool(self.size, self.timeout)
+ pool.push(c)
def repush(self, c, key):
- assert c in self.all
- self._reduce_size(strictly_less=True)
- available = self.available.get(key)
- if available is None:
- available = self.available[key] = BTrees.family32.OO.Bucket()
- else:
- assert c not in available.values()
- t = time()
- available[t] = c
- self.closed[t] = key
+ self.pools[key].repush(c)
def _reduce_size(self, strictly_less=False):
- if self.timeout is None:
- threshhold = None
- else:
- threshhold = time() - self.timeout
- target = self.size
- if strictly_less:
- target -= 1
- for t, key in tuple(self.closed.items()):
- if (len(self.available) > target or
- threshhold is not None and t < threshhold):
- del self.closed[t]
- c = self.available[key].pop(t)
- if not self.available[key]:
- del self.available[key]
- self.all.remove(c)
- c._resetCache()
- else:
- break
+ for key, pool in list(self.pools.items()):
+ pool._reduce_size(strictly_less)
+ if not pool.all:
+ del self.pools[key]
def reduce_size(self):
self._reduce_size()
def pop(self, key):
- result = None
- available = self.available.get(key)
- if available:
- t = available.maxKey()
- result = available.pop(t)
- del self.closed[t]
- if not available:
- del self.available[key]
- assert result in self.all
- return result
+ pool = self.pools.get(key)
+ if pool is not None:
+ return pool.pop()
def map(self, f):
- self.all.map(f)
+ for pool in self.pools.itervalues():
+ pool.map(f)
def availableGC(self):
- if self.timeout is None:
- threshhold = None
- else:
- threshhold = time() - self.timeout
- for t, key in tuple(self.closed.items()):
- if threshhold is not None and t < threshhold:
- del self.closed[t]
- c = self.available[key].pop(t)
- if not self.available[key]:
- del self.available[key]
- self.all.remove(c)
- c._resetCache()
- else:
- self.available[key][t].cacheGC()
+ for key, pool in self.pools.items():
+ pool.availableGC()
+ if not pool.all:
+ del self.pools[key]
+ @property
+ def test_all(self):
+ result = set()
+ for pool in self.pools.itervalues():
+ result.update(pool.all)
+ return frozenset(result)
+
+ @property
+ def test_available(self):
+ result = []
+ for pool in self.pools.itervalues():
+ result.extend(pool.available)
+ return tuple(result)
+
+
+
def toTimeStamp(dt):
utc_struct = dt.utctimetuple()
# if this is a leapsecond, this will probably fail. That may be a good
Modified: ZODB/trunk/src/ZODB/historical_connections.txt
===================================================================
--- ZODB/trunk/src/ZODB/historical_connections.txt 2008-10-30 18:08:21 UTC (rev 92719)
+++ ZODB/trunk/src/ZODB/historical_connections.txt 2008-10-30 21:03:41 UTC (rev 92720)
@@ -13,11 +13,7 @@
A database can be opened historically ``at`` or ``before`` a given transaction
serial or datetime. Here's a simple example. It should work with any storage
-that supports ``loadBefore``. Unfortunately that does not include
-MappingStorage, so we use a FileStorage instance. Also unfortunately, as of
-this writing there is no reliable way to determine if a storage truly
-implements loadBefore, or if it simply returns None (as in BaseStorage), other
-than reading code.
+that supports ``loadBefore``.
We'll begin our example with a fairly standard set up. We
@@ -28,11 +24,8 @@
- modify the database again; and
- commit a transaction.
- >>> import ZODB.FileStorage
- >>> storage = ZODB.FileStorage.FileStorage(
- ... 'HistoricalConnectionTests.fs', create=True)
- >>> import ZODB
- >>> db = ZODB.DB(storage)
+ >>> import ZODB.MappingStorage
+ >>> db = ZODB.MappingStorage.DB()
>>> conn = db.open()
>>> import persistent.mapping
@@ -42,14 +35,13 @@
>>> import transaction
>>> transaction.commit()
-We wait for some ttime to pass, and then make some other changes.
+We wait for some time to pass, record he time, and then make some other changes.
>>> import time
>>> t = time.time()
>>> while time.time() <= t:
... time.sleep(.001)
-
>>> import datetime
>>> now = datetime.datetime.utcnow()
@@ -164,187 +156,81 @@
>>> db.getHistoricalTimeout()
400
-All three of these values can be specified in a ZConfig file. We're using
-mapping storage for simplicity, but remember, as we said at the start of this
-document, mapping storage will not work for historical connections (and in fact
-may seem to work but then fail confusingly) because it does not implement
-loadBefore.
+All three of these values can be specified in a ZConfig file.
>>> import ZODB.config
>>> db2 = ZODB.config.databaseFromString('''
... <zodb>
... <mappingstorage/>
- ... historical-pool-size 5
+ ... historical-pool-size 3
... historical-cache-size 1500
... historical-timeout 6m
... </zodb>
... ''')
>>> db2.getHistoricalPoolSize()
- 5
+ 3
>>> db2.getHistoricalCacheSize()
1500
>>> db2.getHistoricalTimeout()
360
-Let's actually look at these values at work by shining some light into what
-has been a black box up to now. We'll actually do some white box examination
-of what is going on in the database, pools and connections.
-Historical connections are held in a single connection pool with mappings
-from the ``before`` TID to available connections. First we'll put a new
-pool on the database so we have a clean slate.
+The pool lets us reuse connections. To see this, we'll open some
+connections, close them, and then open them again:
- >>> historical_conn.close()
- >>> from ZODB.DB import KeyedConnectionPool
- >>> db.historical_pool = KeyedConnectionPool(
- ... db.historical_pool.size, db.historical_pool.timeout)
+ >>> conns1 = [db2.open(before=serial) for i in range(4)]
+ >>> _ = [c.close() for c in conns1]
+ >>> conns2 = [db2.open(before=serial) for i in range(4)]
-Now lets look what happens to the pool when we create and close an historical
-connection.
+Now let's look at what we got. The first connection in conns 2 is the
+last connection in conns1, because it was the last connection closed.
- >>> pool = db.historical_pool
- >>> len(pool.all)
- 0
- >>> len(pool.available)
- 0
- >>> historical_conn = db.open(
- ... transaction_manager=transaction1, before=serial)
- >>> len(pool.all)
- 1
- >>> len(pool.available)
- 0
- >>> historical_conn in pool.all
+ >>> conns2[0] is conns1[-1]
True
- >>> historical_conn.close()
- >>> len(pool.all)
- 1
- >>> len(pool.available)
- 1
- >>> pool.available.keys()[0] == serial
- True
- >>> len(pool.available.values()[0])
- 1
-Now we'll open and close two for the same serial to see what happens to the
-data structures.
+Also for the next two:
- >>> historical_conn is db.open(
- ... transaction_manager=transaction1, before=serial)
- True
- >>> len(pool.all)
- 1
- >>> len(pool.available)
- 0
- >>> transaction2 = transaction.TransactionManager()
- >>> historical_conn2 = db.open(
- ... transaction_manager=transaction2, before=serial)
- >>> len(pool.all)
- 2
- >>> len(pool.available)
- 0
- >>> historical_conn2.close()
- >>> len(pool.all)
- 2
- >>> len(pool.available)
- 1
- >>> len(pool.available.values()[0])
- 1
- >>> historical_conn.close()
- >>> len(pool.all)
- 2
- >>> len(pool.available)
- 1
- >>> len(pool.available.values()[0])
- 2
+ >>> (conns2[1] is conns1[-2]), (conns2[2] is conns1[-3])
+ (True, True)
-If you change the historical cache size, that changes the size of the
-persistent cache on our connection.
+But not for the last:
- >>> historical_conn._cache.cache_size
- 2000
- >>> db.setHistoricalCacheSize(1500)
- >>> historical_conn._cache.cache_size
- 1500
+ >>> conns2[3] is conns1[-4]
+ False
-Now let's look at pool sizes. We'll set it to two, then open and close three
-connections. We should end up with only two available connections.
+Because the pool size was set to 3.
- >>> db.setHistoricalPoolSize(2)
+Connections are also discarded if they haven't been used in a while.
+To see this, let's close two of the connections:
- >>> historical_conn = db.open(
- ... transaction_manager=transaction1, before=serial)
- >>> historical_conn2 = db.open(
- ... transaction_manager=transaction2, before=serial)
- >>> transaction3 = transaction.TransactionManager()
- >>> historical_conn3 = db.open(
- ... transaction_manager=transaction3, at=historical_serial)
- >>> len(pool.all)
- 3
- >>> len(pool.available)
- 0
+ >>> conns2[0].close(); conns2[1].close()
- >>> historical_conn3.close()
- >>> len(pool.all)
- 3
- >>> len(pool.available)
- 1
- >>> len(pool.available.values()[0])
- 1
+We'l also set the historical timeout to be very low:
- >>> historical_conn2.close()
- >>> len(pool.all)
- 3
- >>> len(pool.available)
- 2
- >>> len(pool.available.values()[0])
- 1
- >>> len(pool.available.values()[1])
- 1
+ >>> db2.setHistoricalTimeout(.01)
+ >>> time.sleep(.1)
+ >>> conns2[2].close(); conns2[3].close()
- >>> historical_conn.close()
- >>> len(pool.all)
- 2
- >>> len(pool.available)
- 1
- >>> len(pool.available.values()[0])
- 2
+Now, when we open 4 connections:
-Notice it dumped the one that was closed at the earliest time.
+ >>> conns1 = [db2.open(before=serial) for i in range(4)]
-Finally, we'll look at the timeout. We'll need to monkeypatch ``time`` for
-this. (The funky __import__ of DB is because some ZODB __init__ shenanigans
-make the DB class mask the DB module.)
+We'll see that only the last 2 connections from conn2 are in the
+result:
- >>> db.getHistoricalTimeout()
- 400
- >>> import time
- >>> delta = 200
- >>> def stub_time():
- ... return time.time() + delta
- ...
- >>> DB_module = __import__('ZODB.DB', globals(), locals(), ['chicken'])
- >>> original_time = DB_module.time
- >>> DB_module.time = stub_time
+ >>> [c in conns1 for c in conns2]
+ [False, False, True, True]
- >>> historical_conn = db.open(before=serial)
- >>> len(pool.all)
- 2
- >>> len(pool.available)
- 1
+If you change the historical cache size, that changes the size of the
+persistent cache on our connection.
-A close or an open will do garbage collection on the timed out connections.
+ >>> historical_conn._cache.cache_size
+ 2000
+ >>> db.setHistoricalCacheSize(1500)
+ >>> historical_conn._cache.cache_size
+ 1500
- >>> delta += 200
- >>> historical_conn.close()
-
- >>> len(pool.all)
- 1
- >>> len(pool.available)
- 1
- >>> len(pool.available.values()[0])
- 1
-
Invalidations
=============
Modified: ZODB/trunk/src/ZODB/tests/dbopen.txt
===================================================================
--- ZODB/trunk/src/ZODB/tests/dbopen.txt 2008-10-30 18:08:21 UTC (rev 92719)
+++ ZODB/trunk/src/ZODB/tests/dbopen.txt 2008-10-30 21:03:41 UTC (rev 92720)
@@ -239,12 +239,12 @@
Closing another one will purge the one with MARKER 0 from the stack
(since it was the first added to the stack):
- >>> [c.MARKER for c in pool.available.values()]
+ >>> [c.MARKER for (t, c) in pool.available]
[0, 1, 2]
>>> conns[0].close() # MARKER 3
>>> len(pool.available), len(pool.all)
(3, 5)
- >>> [c.MARKER for c in pool.available.values()]
+ >>> [c.MARKER for (t, c) in pool.available]
[1, 2, 3]
Similarly for the other two:
@@ -252,7 +252,7 @@
>>> conns[1].close(); conns[2].close()
>>> len(pool.available), len(pool.all)
(3, 3)
- >>> [c.MARKER for c in pool.available.values()]
+ >>> [c.MARKER for (t, c) in pool.available]
[3, 4, 5]
Reducing the pool size may also purge the oldest closed connections:
@@ -260,7 +260,7 @@
>>> db.setPoolSize(2) # gets rid of MARKER 3
>>> len(pool.available), len(pool.all)
(2, 2)
- >>> [c.MARKER for c in pool.available.values()]
+ >>> [c.MARKER for (t, c) in pool.available]
[4, 5]
Since MARKER 5 is still the last one added to the stack, it will be the
Modified: ZODB/trunk/src/ZODB/tests/testhistoricalconnections.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testhistoricalconnections.py 2008-10-30 18:08:21 UTC (rev 92719)
+++ ZODB/trunk/src/ZODB/tests/testhistoricalconnections.py 2008-10-30 21:03:41 UTC (rev 92720)
@@ -25,10 +25,7 @@
def tearDown(test):
test.globs['db'].close()
test.globs['db2'].close()
- test.globs['storage'].close()
# the DB class masks the module because of __init__ shenanigans
- DB_module = __import__('ZODB.DB', globals(), locals(), ['chicken'])
- DB_module.time = test.globs['original_time']
module.tearDown(test)
ZODB.tests.util.tearDown(test)
More information about the Zodb-checkins
mailing list