[Zodb-checkins] SVN: ZODB/trunk/ Merge tim-simpler_connection
branch.
Tim Peters
tim.one at comcast.net
Tue Nov 2 14:38:48 EST 2004
Log message for revision 28323:
Merge tim-simpler_connection branch.
There's no longer a hard limit on # of open connections per DB.
Introduced a sane scheme for raising deprecation warnings.
Sane ==
1. The machinery ensures that a "this will be removed in ZODB 3.6"
blurb gets attached to all deprecation warnings.
and
2. It will dead easy to find these when it's time for 3.6.
Changed:
U ZODB/trunk/NEWS.txt
U ZODB/trunk/setup.py
U ZODB/trunk/src/ZODB/Connection.py
U ZODB/trunk/src/ZODB/DB.py
A ZODB/trunk/src/ZODB/tests/dbopen.txt
U ZODB/trunk/src/ZODB/tests/testConnection.py
U ZODB/trunk/src/ZODB/tests/testDB.py
U ZODB/trunk/src/ZODB/tests/testZODB.py
A ZODB/trunk/src/ZODB/tests/test_doctest_files.py
U ZODB/trunk/src/ZODB/utils.py
U ZODB/trunk/src/persistent/tests/test_persistent.py
U ZODB/trunk/src/transaction/_manager.py
U ZODB/trunk/src/transaction/_transaction.py
-=-
Modified: ZODB/trunk/NEWS.txt
===================================================================
--- ZODB/trunk/NEWS.txt 2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/NEWS.txt 2004-11-02 19:38:48 UTC (rev 28323)
@@ -2,7 +2,34 @@
========================
Release date: DD-MMM-2004
+DB
+--
+- There is no longer a hard limit on the number of connections that
+ ``DB.open()`` will create. In other words, ``DB.open()`` never blocks
+ anymore waiting for an earlier connection to close, and ``DB.open()``
+ always returns a connection now (while it wasn't documented, it was
+ possible for ``DB.open()`` to return ``None`` before).
+
+ ``pool_size`` continues to default to 7, but its meaning has changed:
+ if more than ``pool_size`` connections are obtained from ``DB.open()``
+ and not closed, a warning is logged; if more than twice ``pool_size``, a
+ critical problem is logged. ``pool_size`` should be set to the maximum
+ number of connections from the ``DB`` instance you expect to have open
+ simultaneously.
+
+ In addition, if a connection obtained from ``DB.open()`` becomes
+ unreachable without having been explicitly closed, when Python's garbage
+ collection reclaims that connection it no longer counts against the
+ ``pool_size`` thresholds for logging messages.
+
+ The following optional arguments to ``DB.open()`` are deprecated:
+ ``transaction``, ``waitflag``, ``force`` and ``temporary``. If one
+ is specified, its value is ignored, and ``DeprecationWarning`` is
+ raised. In ZODB 3.6, these optional arguments will be removed.
+
+
+
Tools
-----
Modified: ZODB/trunk/setup.py
===================================================================
--- ZODB/trunk/setup.py 2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/setup.py 2004-11-02 19:38:48 UTC (rev 28323)
@@ -182,6 +182,7 @@
"ZConfig/tests/library/widget",
"ZEO",
"ZODB",
+ "ZODB/tests",
"zdaemon",
"zdaemon/tests",
]:
Modified: ZODB/trunk/src/ZODB/Connection.py
===================================================================
--- ZODB/trunk/src/ZODB/Connection.py 2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/src/ZODB/Connection.py 2004-11-02 19:38:48 UTC (rev 28323)
@@ -34,6 +34,8 @@
from ZODB.utils import u64, oid_repr, z64, positive_id
from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr
from ZODB.interfaces import IConnection
+from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
+
from zope.interface import implements
global_reset_counter = 0
@@ -262,9 +264,8 @@
method. You can pass a transaction manager (TM) to DB.open()
to control which TM the Connection uses.
"""
- warnings.warn("getTransaction() is deprecated. "
- "Use the txn_mgr argument to DB.open() instead.",
- DeprecationWarning)
+ deprecated36("getTransaction() is deprecated. "
+ "Use the txn_mgr argument to DB.open() instead.")
return self._txn_mgr.get()
def setLocalTransaction(self):
@@ -276,9 +277,8 @@
can pass a transaction manager (TM) to DB.open() to control
which TM the Connection uses.
"""
- warnings.warn("setLocalTransaction() is deprecated. "
- "Use the txn_mgr argument to DB.open() instead.",
- DeprecationWarning)
+ deprecated36("setLocalTransaction() is deprecated. "
+ "Use the txn_mgr argument to DB.open() instead.")
if self._txn_mgr is transaction.manager:
if self._synch:
self._txn_mgr.unregisterSynch(self)
@@ -486,14 +486,14 @@
def cacheFullSweep(self, dt=None):
# XXX needs doc string
- warnings.warn("cacheFullSweep is deprecated. "
- "Use cacheMinimize instead.", DeprecationWarning)
+ deprecated36("cacheFullSweep is deprecated. "
+ "Use cacheMinimize instead.")
if dt is None:
self._cache.full_sweep()
else:
self._cache.full_sweep(dt)
- def cacheMinimize(self, dt=None):
+ def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
"""Deactivate all unmodified objects in the cache.
Call _p_deactivate() on each cached object, attempting to turn
@@ -503,9 +503,8 @@
:Parameters:
- `dt`: ignored. It is provided only for backwards compatibility.
"""
- if dt is not None:
- warnings.warn("The dt argument to cacheMinimize is ignored.",
- DeprecationWarning)
+ if dt is not DEPRECATED_ARGUMENT:
+ deprecated36("cacheMinimize() dt= is ignored.")
self._cache.minimize()
def cacheGC(self):
@@ -781,8 +780,8 @@
# an oid is being registered. I can't think of any way to
# achieve that without assignment to _p_jar. If there is
# a way, this will be a very confusing warning.
- warnings.warn("Assigning to _p_jar is deprecated",
- DeprecationWarning)
+ deprecated36("Assigning to _p_jar is deprecated, and will be "
+ "changed to raise an exception.")
elif obj._p_oid in self._added:
# It was registered before it was added to _added.
return
Modified: ZODB/trunk/src/ZODB/DB.py
===================================================================
--- ZODB/trunk/src/ZODB/DB.py 2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/src/ZODB/DB.py 2004-11-02 19:38:48 UTC (rev 28323)
@@ -16,7 +16,7 @@
$Id$"""
import cPickle, cStringIO, sys
-from thread import allocate_lock
+import threading
from time import time, ctime
import warnings
import logging
@@ -25,11 +25,117 @@
from ZODB.utils import z64
from ZODB.Connection import Connection
from ZODB.serialize import referencesf
+from ZODB.utils import WeakSet
+from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
import transaction
logger = logging.getLogger('ZODB.DB')
+class _ConnectionPool(object):
+ """Manage a pool of connections.
+
+ CAUTION: Methods should be called under the protection of a lock.
+ This class does no locking of its own.
+
+ There's no limit on the number of connections this can keep track of,
+ but a warning is logged if there are more than pool_size active
+ connections, and a critical problem if more than twice pool_size.
+
+ New connections are registered via push(). This will log a message if
+ "too many" connections are active.
+
+ When a connection is explicitly closed, tell the pool via repush().
+ That adds the connection to a stack of connections available for
+ reuse, and throws away the oldest stack entries if the stack is too large.
+ pop() pops this stack.
+
+ When a connection is obtained via pop(), the pool holds only a weak
+ reference to it thereafter. It's not necessary to inform the pool
+ if the connection goes away. A connection handed out by pop() counts
+ against pool_size only so long as it exists, and provided it isn't
+ repush()'ed. A weak reference is retained so that DB methods like
+ connectionDebugInfo() can still gather statistics.
+ """
+
+ def __init__(self, pool_size):
+ # The largest # of connections we expect to see alive simultaneously.
+ self.pool_size = pool_size
+
+ # A weak set of all connections we've seen. A connection vanishes
+ # from this set if pop() hands it out, it's not reregistered via
+ # repush(), and it becomes unreachable.
+ self.all = WeakSet()
+
+ # A stack of connections available to hand out. This is a subset
+ # of self.all. push() and repush() add to this, and may remove
+ # the oldest available connections if the pool is too large.
+ # pop() pops this stack. There are never more than pool_size entries
+ # in this stack.
+ # In Python 2.4, a collections.deque would make more sense than
+ # a list (we push only "on the right", but may pop from both ends).
+ self.available = []
+
+ # Change our belief about the expected maximum # of live connections.
+ # If the pool_size is smaller than the current value, this may discard
+ # the oldest available connections.
+ def set_pool_size(self, pool_size):
+ self.pool_size = pool_size
+ self._reduce_size()
+
+ # Register a new available connection. We must not know about c already.
+ # c will be pushed onto the available stack even if we're over the
+ # pool size limit.
+ def push(self, c):
+ assert c not in self.all
+ assert c not in self.available
+ self._reduce_size(strictly_less=True)
+ self.all.add(c)
+ self.available.append(c)
+ n, limit = len(self.all), self.pool_size
+ if n > limit:
+ reporter = logger.warn
+ if n > 2 * limit:
+ reporter = logger.critical
+ reporter("DB.open() has %s open connections with a pool_size "
+ "of %s", n, limit)
+
+ # Reregister an available connection formerly obtained via pop(). This
+ # pushes it on the stack of available connections, and may discard
+ # older available connections.
+ def repush(self, c):
+ assert c in self.all
+ assert c not in self.available
+ self._reduce_size(strictly_less=True)
+ self.available.append(c)
+
+ # Throw away the oldest available connections until we're under our
+ # target size (strictly_less=False) or no more than that (strictly_less=
+ # True, the default).
+ def _reduce_size(self, strictly_less=False):
+ target = self.pool_size - bool(strictly_less)
+ while len(self.available) > target:
+ c = self.available.pop(0)
+ self.all.remove(c)
+
+ # Pop an available connection and return it, or return None if none are
+ # available. In the latter case, the caller should create a new
+ # connection, register it via push(), and call pop() again. The
+ # caller is responsible for serializing this sequence.
+ def pop(self):
+ result = None
+ if self.available:
+ result = self.available.pop()
+ # Leave it in self.all, so we can still get at it for statistics
+ # while it's alive.
+ assert result in self.all
+ return result
+
+ # Return a list of all connections we currently know about.
+ def all_as_list(self):
+ return self.all.as_list()
+
+
class DB(object):
"""The Object Database
-------------------
@@ -41,9 +147,9 @@
The DB instance manages a pool of connections. If a connection is
closed, it is returned to the pool and its object cache is
preserved. A subsequent call to open() will reuse the connection.
- There is a limit to the pool size; if all its connections are in
- use, calls to open() will block until one of the open connections
- is closed.
+ There is no hard limit on the pool size. If more than `pool_size`
+ connections are opened, a warning is logged, and if more than twice
+ that many, a critical problem is logged.
The class variable 'klass' is used by open() to create database
connections. It is set to Connection, but a subclass could override
@@ -81,41 +187,42 @@
def __init__(self, storage,
pool_size=7,
cache_size=400,
- cache_deactivate_after=None,
+ cache_deactivate_after=DEPRECATED_ARGUMENT,
version_pool_size=3,
version_cache_size=100,
- version_cache_deactivate_after=None,
+ version_cache_deactivate_after=DEPRECATED_ARGUMENT,
):
"""Create an object database.
:Parameters:
- `storage`: the storage used by the database, e.g. FileStorage
- - `pool_size`: maximum number of open connections
+ - `pool_size`: expected maximum number of open connections
- `cache_size`: target size of Connection object cache
- - `cache_deactivate_after`: ignored
- - `version_pool_size`: maximum number of connections (per version)
+ - `version_pool_size`: expected maximum number of connections (per
+ version)
- `version_cache_size`: target size of Connection object cache for
version connections
+ - `cache_deactivate_after`: ignored
- `version_cache_deactivate_after`: ignored
"""
- # Allocate locks:
- l = allocate_lock()
- self._a = l.acquire
- self._r = l.release
+ # Allocate lock.
+ x = threading.RLock()
+ self._a = x.acquire
+ self._r = x.release
# Setup connection pools and cache info
- self._pools = {},[]
- self._temps = []
+ # _pools maps a version string to a _ConnectionPool object.
+ self._pools = {}
self._pool_size = pool_size
self._cache_size = cache_size
self._version_pool_size = version_pool_size
self._version_cache_size = version_cache_size
# warn about use of deprecated arguments
- if (cache_deactivate_after is not None or
- version_cache_deactivate_after is not None):
- warnings.warn("cache_deactivate_after has no effect",
- DeprecationWarning)
+ if cache_deactivate_after is not DEPRECATED_ARGUMENT:
+ deprecated36("cache_deactivate_after has no effect")
+ if version_cache_deactivate_after is not DEPRECATED_ARGUMENT:
+ deprecated36("version_cache_deactivate_after has no effect")
self._miv_cache = {}
@@ -151,6 +258,7 @@
if hasattr(storage, 'undoInfo'):
self.undoInfo = storage.undoInfo
+ # This is called by Connection.close().
def _closeConnection(self, connection):
"""Return a connection to the pool.
@@ -165,10 +273,10 @@
am = self._activity_monitor
if am is not None:
am.closedConnection(connection)
+
version = connection._version
- pools, pooll = self._pools
try:
- pool, allocated, pool_lock = pools[version]
+ pool = self._pools[version]
except KeyError:
# No such version. We must have deleted the pool.
# Just let the connection go.
@@ -177,30 +285,18 @@
# XXX What objects are involved in the cycle?
connection.__dict__.clear()
return
+ pool.repush(connection)
- pool.append(connection)
- if len(pool) == 1:
- # Pool now usable again, unlock it.
- pool_lock.release()
finally:
self._r()
+ # Call f(c) for all connections c in all pools in all versions.
def _connectionMap(self, f):
self._a()
try:
- pools, pooll = self._pools
- for pool, allocated in pooll:
- for cc in allocated:
- f(cc)
-
- temps = self._temps
- if temps:
- t = []
- rc = sys.getrefcount
- for cc in temps:
- if rc(cc) > 3:
- f(cc)
- self._temps = t
+ for pool in self._pools.values():
+ for c in pool.all_as_list():
+ f(c)
finally:
self._r()
@@ -216,12 +312,12 @@
"""
detail = {}
- def f(con, detail=detail, have_detail=detail.has_key):
+ def f(con, detail=detail):
for oid, ob in con._cache.items():
module = getattr(ob.__class__, '__module__', '')
module = module and '%s.' % module or ''
c = "%s%s" % (module, ob.__class__.__name__)
- if have_detail(c):
+ if c in detail:
detail[c] += 1
else:
detail[c] = 1
@@ -276,7 +372,7 @@
self._connectionMap(lambda c: c._cache.full_sweep())
def cacheLastGCTime(self):
- m=[0]
+ m = [0]
def f(con, m=m):
t = con._cache.cache_last_gc_time
if t > m[0]:
@@ -289,7 +385,7 @@
self._connectionMap(lambda c: c._cache.minimize())
def cacheSize(self):
- m=[0]
+ m = [0]
def f(con, m=m):
m[0] += con._cache.cache_non_ghost_count
@@ -299,9 +395,9 @@
def cacheDetailSize(self):
m = []
def f(con, m=m):
- m.append({'connection':repr(con),
- 'ngsize':con._cache.cache_non_ghost_count,
- 'size':len(con._cache)})
+ m.append({'connection': repr(con),
+ 'ngsize': con._cache.cache_non_ghost_count,
+ 'size': len(con._cache)})
self._connectionMap(f)
m.sort()
return m
@@ -358,39 +454,24 @@
if connection is not None:
version = connection._version
# Update modified in version cache
- # XXX must make this work with list or dict to backport to 2.6
for oid in oids.keys():
h = hash(oid) % 131
o = self._miv_cache.get(h, None)
if o is not None and o[0]==oid:
del self._miv_cache[h]
- # Notify connections
- for pool, allocated in self._pools[1]:
- for cc in allocated:
- if (cc is not connection and
- (not version or cc._version==version)):
- if sys.getrefcount(cc) <= 3:
- cc.close()
- cc.invalidate(tid, oids)
+ # Notify connections.
+ def inval(c):
+ if (c is not connection and
+ (not version or c._version == version)):
+ c.invalidate(tid, oids)
+ self._connectionMap(inval)
- if self._temps:
- t = []
- for cc in self._temps:
- if sys.getrefcount(cc) > 3:
- if (cc is not connection and
- (not version or cc._version == version)):
- cc.invalidate(tid, oids)
- t.append(cc)
- else:
- cc.close()
- self._temps = t
-
def modifiedInVersion(self, oid):
h = hash(oid) % 131
cache = self._miv_cache
- o=cache.get(h, None)
- if o and o[0]==oid:
+ o = cache.get(h, None)
+ if o and o[0] == oid:
return o[1]
v = self._storage.modifiedInVersion(oid)
cache[h] = oid, v
@@ -399,203 +480,108 @@
def objectCount(self):
return len(self._storage)
- def open(self, version='', transaction=None, temporary=0, force=None,
- waitflag=1, mvcc=True, txn_mgr=None, synch=True):
+ def open(self, version='',
+ transaction=DEPRECATED_ARGUMENT, temporary=DEPRECATED_ARGUMENT,
+ force=DEPRECATED_ARGUMENT, waitflag=DEPRECATED_ARGUMENT,
+ mvcc=True, txn_mgr=None, synch=True):
"""Return a database Connection for use by application code.
- The optional version argument can be used to specify that a
+ The optional `version` argument can be used to specify that a
version connection is desired.
- The optional transaction argument can be provided to cause the
- connection to be automatically closed when a transaction is
- terminated. In addition, connections per transaction are
- reused, if possible.
-
Note that the connection pool is managed as a stack, to
- increate the likelihood that the connection's stack will
+ increase the likelihood that the connection's stack will
include useful objects.
:Parameters:
- `version`: the "version" that all changes will be made
in, defaults to no version.
- - `transaction`: XXX
- - `temporary`: XXX
- - `force`: XXX
- - `waitflag`: XXX
- `mvcc`: boolean indicating whether MVCC is enabled
- `txn_mgr`: transaction manager to use. None means
used the default transaction manager.
- `synch`: boolean indicating whether Connection should
register for afterCompletion() calls.
-
"""
- self._a()
- try:
- if transaction is not None:
- connections = transaction._connections
- if connections:
- if connections.has_key(version) and not temporary:
- return connections[version]
- else:
- transaction._connections = connections = {}
- transaction = transaction._connections
+ if temporary is not DEPRECATED_ARGUMENT:
+ deprecated36("DB.open() temporary= ignored. "
+ "open() no longer blocks.")
- if temporary:
- # This is a temporary connection.
- # We won't bother with the pools. This will be
- # a one-use connection.
- c = self.klass(version=version,
- cache_size=self._version_cache_size,
- mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
- c._setDB(self)
- self._temps.append(c)
- if transaction is not None:
- transaction[id(c)] = c
- return c
+ if force is not DEPRECATED_ARGUMENT:
+ deprecated36("DB.open() force= ignored. "
+ "open() no longer blocks.")
- pools, pooll = self._pools
+ if waitflag is not DEPRECATED_ARGUMENT:
+ deprecated36("DB.open() waitflag= ignored. "
+ "open() no longer blocks.")
- # pools is a mapping object:
- #
- # {version -> (pool, allocated, lock)
- #
- # where:
- #
- # pool is the connection pool for the version,
- # allocated is a list of all of the allocated
- # connections, and
- # lock is a lock that is used to block when a pool is
- # empty and no more connections can be allocated.
- #
- # pooll is a list of all of the pools and allocated for
- # use in cases where we need to iterate over all
- # connections or all inactive connections.
+ if transaction is not DEPRECATED_ARGUMENT:
+ deprecated36("DB.open() transaction= ignored.")
- # Pool locks are tricky. Basically, the lock needs to be
- # set whenever the pool becomes empty so that threads are
- # forced to wait until the pool gets a connection in it.
- # The lock is acquired when the (empty) pool is
- # created. The lock is acquired just prior to removing
- # the last connection from the pool and released just after
- # adding a connection to an empty pool.
+ self._a()
+ try:
+ # pool <- the _ConnectionPool for this version
+ pool = self._pools.get(version)
+ if pool is None:
+ if version:
+ size = self._version_pool_size
+ else:
+ size = self._pool_size
+ self._pools[version] = pool = _ConnectionPool(size)
+ assert pool is not None
-
- if pools.has_key(version):
- pool, allocated, pool_lock = pools[version]
- else:
- pool, allocated, pool_lock = pools[version] = (
- [], [], allocate_lock())
- pooll.append((pool, allocated))
- pool_lock.acquire()
-
-
- if not pool:
- c = None
+ # result <- a connection
+ result = pool.pop()
+ if result is None:
if version:
- if self._version_pool_size > len(allocated) or force:
- c = self.klass(version=version,
- cache_size=self._version_cache_size,
- mvcc=mvcc, txn_mgr=txn_mgr)
- allocated.append(c)
- pool.append(c)
- elif self._pool_size > len(allocated) or force:
- c = self.klass(version=version,
- cache_size=self._cache_size,
- mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
- allocated.append(c)
- pool.append(c)
+ size = self._version_cache_size
+ else:
+ size = self._cache_size
+ c = self.klass(version=version, cache_size=size,
+ mvcc=mvcc, txn_mgr=txn_mgr)
+ pool.push(c)
+ result = pool.pop()
+ assert result is not None
- if c is None:
- if waitflag:
- self._r()
- pool_lock.acquire()
- self._a()
- if len(pool) > 1:
- # Note that the pool size will normally be 1 here,
- # but it could be higher due to a race condition.
- pool_lock.release()
- else:
- return
+ # Tell the connection it belongs to self.
+ result._setDB(self, mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
- elif len(pool)==1:
- # Taking last one, lock the pool.
- # Note that another thread might grab the lock
- # before us, so we might actually block, however,
- # when we get the lock back, there *will* be a
- # connection in the pool. OTOH, there's no limit on
- # how long we may need to wait: if the other thread
- # grabbed the lock in this section too, we'll wait
- # here until another connection is closed.
- # checkConcurrentUpdates1Storage provoked this frequently
- # on a hyperthreaded machine, with its second thread
- # timing out after waiting 5 minutes for DB.open() to
- # return. So, if we can't get the pool lock immediately,
- # now we make a recursive call. This allows the current
- # thread to allocate a new connection instead of waiting
- # arbitrarily long for the single connection in the pool
- # right now.
- self._r()
- if not pool_lock.acquire(0):
- result = DB.open(self, version, transaction, temporary,
- force, waitflag)
- self._a()
- return result
- self._a()
- if len(pool) > 1:
- # Note that the pool size will normally be 1 here,
- # but it could be higher due to a race condition.
- pool_lock.release()
+ # A good time to do some cache cleanup.
+ self._connectionMap(lambda c: c.cacheGC())
- c = pool.pop()
- c._setDB(self, mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
- for pool, allocated in pooll:
- for cc in pool:
- cc.cacheGC()
+ return result
- if transaction is not None:
- transaction[version] = c
- return c
-
finally:
self._r()
def removeVersionPool(self, version):
- pools, pooll = self._pools
- info = pools.get(version)
- if info:
- del pools[version]
- pool, allocated, pool_lock = info
- pooll.remove((pool, allocated))
- try:
- pool_lock.release()
- except: # XXX Do we actually expect this to fail?
- pass
- del pool[:]
- del allocated[:]
+ try:
+ del self._pools[version]
+ except KeyError:
+ pass
def connectionDebugInfo(self):
- r = []
- pools, pooll = self._pools
+ result = []
t = time()
- for version, (pool, allocated, lock) in pools.items():
- for c in allocated:
- o = c._opened
- d = c._debug_info
- if d:
- if len(d)==1:
- d = d[0]
- else:
- d=''
- d = "%s (%s)" % (d, len(c._cache))
+ def f(c):
+ o = c._opened
+ d = c._debug_info
+ if d:
+ if len(d) == 1:
+ d = d[0]
+ else:
+ d = ''
+ d = "%s (%s)" % (d, len(c._cache))
- r.append({
- 'opened': o and ("%s (%.2fs)" % (ctime(o), t-o)),
- 'info': d,
- 'version': version,
- })
- return r
+ result.append({
+ 'opened': o and ("%s (%.2fs)" % (ctime(o), t-o)),
+ 'info': d,
+ 'version': version,
+ })
+ self._connectionMap(f)
+ return result
+
def getActivityMonitor(self):
return self._activity_monitor
@@ -623,34 +609,52 @@
logger.error("packing", exc_info=True)
raise
- def setCacheSize(self, v):
- self._cache_size = v
- d = self._pools[0]
- pool_info = d.get('')
- if pool_info is not None:
- for c in pool_info[1]:
- c._cache.cache_size = v
+ def setActivityMonitor(self, am):
+ self._activity_monitor = am
def classFactory(self, connection, modulename, globalname):
# Zope will rebind this method to arbitrary user code at runtime.
return find_global(modulename, globalname)
- def setPoolSize(self, v):
- self._pool_size = v
+ def setCacheSize(self, v):
+ self._a()
+ try:
+ self._cache_size = v
+ pool = self._pools.get('')
+ if pool is not None:
+ for c in pool.all_as_list():
+ c._cache.cache_size = v
+ finally:
+ self._r()
- def setActivityMonitor(self, am):
- self._activity_monitor = am
-
def setVersionCacheSize(self, v):
- self._version_cache_size = v
- for ver in self._pools[0].keys():
- if ver:
- for c in self._pools[0][ver][1]:
- c._cache.cache_size = v
+ self._a()
+ try:
+ self._version_cache_size = v
+ for version, pool in self._pools.items():
+ if version:
+ for c in pool.all_as_list():
+ c._cache.cache_size = v
+ finally:
+ self._r()
- def setVersionPoolSize(self, v):
- self._version_pool_size=v
+ def setPoolSize(self, size):
+ self._pool_size = size
+ self._reset_pool_sizes(size, for_versions=False)
+ def setVersionPoolSize(self, size):
+ self._version_pool_size = size
+ self._reset_pool_sizes(size, for_versions=True)
+
+ def _reset_pool_sizes(self, size, for_versions=False):
+ self._a()
+ try:
+ for version, pool in self._pools.items():
+ if (version != '') == for_versions:
+ pool.set_pool_size(size)
+ finally:
+ self._r()
+
def undo(self, id, txn=None):
"""Undo a transaction identified by id.
@@ -679,23 +683,19 @@
def getCacheDeactivateAfter(self):
"""Deprecated"""
- warnings.warn("cache_deactivate_after has no effect",
- DeprecationWarning)
+ deprecated36("getCacheDeactivateAfter has no effect")
def getVersionCacheDeactivateAfter(self):
"""Deprecated"""
- warnings.warn("cache_deactivate_after has no effect",
- DeprecationWarning)
+ deprecated36("getVersionCacheDeactivateAfter has no effect")
def setCacheDeactivateAfter(self, v):
"""Deprecated"""
- warnings.warn("cache_deactivate_after has no effect",
- DeprecationWarning)
+ deprecated36("setCacheDeactivateAfter has no effect")
def setVersionCacheDeactivateAfter(self, v):
"""Deprecated"""
- warnings.warn("cache_deactivate_after has no effect",
- DeprecationWarning)
+ deprecated36("setVersionCacheDeactivateAfter has no effect")
class ResourceManager(object):
"""Transaction participation for a version or undo resource."""
Copied: ZODB/trunk/src/ZODB/tests/dbopen.txt (from rev 28322, ZODB/branches/tim-simpler_connection/src/ZODB/tests/dbopen.txt)
Property changes on: ZODB/trunk/src/ZODB/tests/dbopen.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: ZODB/trunk/src/ZODB/tests/testConnection.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testConnection.py 2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/src/ZODB/tests/testConnection.py 2004-11-02 19:38:48 UTC (rev 28323)
@@ -414,8 +414,9 @@
>>> len(hook.warnings)
1
>>> message, category, filename, lineno = hook.warnings[0]
- >>> message
- 'The dt argument to cacheMinimize is ignored.'
+ >>> print message
+ This will be removed in ZODB 3.6:
+ cacheMinimize() dt= is ignored.
>>> category.__name__
'DeprecationWarning'
>>> hook.clear()
@@ -434,8 +435,9 @@
>>> len(hook.warnings)
2
>>> message, category, filename, lineno = hook.warnings[0]
- >>> message
- 'cacheFullSweep is deprecated. Use cacheMinimize instead.'
+ >>> print message
+ This will be removed in ZODB 3.6:
+ cacheFullSweep is deprecated. Use cacheMinimize instead.
>>> category.__name__
'DeprecationWarning'
>>> message, category, filename, lineno = hook.warnings[1]
Modified: ZODB/trunk/src/ZODB/tests/testDB.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testDB.py 2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/src/ZODB/tests/testDB.py 2004-11-02 19:38:48 UTC (rev 28323)
@@ -23,6 +23,10 @@
from ZODB.tests.MinPO import MinPO
+# Return total number of connections across all pools in a db._pools.
+def nconn(pools):
+ return sum([len(pool.all) for pool in pools.values()])
+
class DBTests(unittest.TestCase):
def setUp(self):
@@ -75,22 +79,22 @@
c12.close() # return to pool
self.assert_(c1 is c12) # should be same
- pools, pooll = self.db._pools
+ pools = self.db._pools
self.assertEqual(len(pools), 3)
- self.assertEqual(len(pooll), 3)
+ self.assertEqual(nconn(pools), 3)
self.db.removeVersionPool('v1')
self.assertEqual(len(pools), 2)
- self.assertEqual(len(pooll), 2)
+ self.assertEqual(nconn(pools), 2)
c12 = self.db.open('v1')
c12.close() # return to pool
self.assert_(c1 is not c12) # should be different
self.assertEqual(len(pools), 3)
- self.assertEqual(len(pooll), 3)
+ self.assertEqual(nconn(pools), 3)
def _test_for_leak(self):
self.dowork()
@@ -112,27 +116,27 @@
c12 = self.db.open('v1')
self.assert_(c1 is c12) # should be same
- pools, pooll = self.db._pools
+ pools = self.db._pools
self.assertEqual(len(pools), 3)
- self.assertEqual(len(pooll), 3)
+ self.assertEqual(nconn(pools), 3)
self.db.removeVersionPool('v1')
self.assertEqual(len(pools), 2)
- self.assertEqual(len(pooll), 2)
+ self.assertEqual(nconn(pools), 2)
c12.close() # should leave pools alone
self.assertEqual(len(pools), 2)
- self.assertEqual(len(pooll), 2)
+ self.assertEqual(nconn(pools), 2)
c12 = self.db.open('v1')
c12.close() # return to pool
self.assert_(c1 is not c12) # should be different
self.assertEqual(len(pools), 3)
- self.assertEqual(len(pooll), 3)
+ self.assertEqual(nconn(pools), 3)
def test_suite():
Modified: ZODB/trunk/src/ZODB/tests/testZODB.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testZODB.py 2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/src/ZODB/tests/testZODB.py 2004-11-02 19:38:48 UTC (rev 28323)
@@ -243,9 +243,13 @@
self.assertEqual(r1['item'], 2)
self.assertEqual(r2['item'], 2)
for msg, obj, filename, lineno in hook.warnings:
- self.assert_(
- msg.startswith("setLocalTransaction() is deprecated.") or
- msg.startswith("getTransaction() is deprecated."))
+ self.assert_(msg in [
+ "This will be removed in ZODB 3.6:\n"
+ "setLocalTransaction() is deprecated. "
+ "Use the txn_mgr argument to DB.open() instead.",
+ "This will be removed in ZODB 3.6:\n"
+ "getTransaction() is deprecated. "
+ "Use the txn_mgr argument to DB.open() instead."])
finally:
conn1.close()
conn2.close()
Copied: ZODB/trunk/src/ZODB/tests/test_doctest_files.py (from rev 28322, ZODB/branches/tim-simpler_connection/src/ZODB/tests/test_doctest_files.py)
Property changes on: ZODB/trunk/src/ZODB/tests/test_doctest_files.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Modified: ZODB/trunk/src/ZODB/utils.py
===================================================================
--- ZODB/trunk/src/ZODB/utils.py 2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/src/ZODB/utils.py 2004-11-02 19:38:48 UTC (rev 28323)
@@ -18,6 +18,8 @@
from binascii import hexlify
import cPickle
import cStringIO
+import weakref
+import warnings
from persistent.TimeStamp import TimeStamp
@@ -34,8 +36,27 @@
'positive_id',
'get_refs',
'readable_tid_repr',
+ 'WeakSet',
+ 'DEPRECATED_ARGUMENT',
+ 'deprecated36',
]
+# A unique marker to give as the default value for a deprecated argument.
+# The method should then do a
+#
+# if that_arg is not DEPRECATED_ARGUMENT:
+# complain
+#
+# dance.
+DEPRECATED_ARGUMENT = object()
+
+# Raise DeprecationWarning, noting that the deprecated thing will go
+# away in ZODB 3.6. Point to the caller of our caller (i.e., at the
+# code using the deprecated thing).
+def deprecated36(msg):
+ warnings.warn("This will be removed in ZODB 3.6:\n%s" % msg,
+ DeprecationWarning, stacklevel=3)
+
z64 = '\0'*8
# TODO The purpose of t32 is unclear. Code that uses it is usually
@@ -164,3 +185,46 @@
u.noload() # class info
u.noload() # instance state info
return refs
+
+# A simple implementation of weak sets, supplying just enough of Python's
+# sets.Set interface for our needs.
+
+class WeakSet(object):
+ """A set of objects that doesn't keep its elements alive.
+
+ The objects in the set must be weakly referencable.
+ The objects need not be hashable, and need not support comparison.
+ Two objects are considered to be the same iff their id()s are equal.
+
+ When the only references to an object are weak references (including
+ those from WeakSets), the object can be garbage-collected, and
+ will vanish from any WeakSets it may be a member of at that time.
+ """
+
+ def __init__(self):
+ # Map id(obj) to obj. By using ids as keys, we avoid requiring
+ # that the elements be hashable or comparable.
+ self.data = weakref.WeakValueDictionary()
+
+ def __len__(self):
+ return len(self.data)
+
+ def __contains__(self, obj):
+ return id(obj) in self.data
+
+ # Same as a Set, add obj to the collection.
+ def add(self, obj):
+ self.data[id(obj)] = obj
+
+ # Same as a Set, remove obj from the collection, and raise
+ # KeyError if obj not in the collection.
+ def remove(self, obj):
+ del self.data[id(obj)]
+
+ # Return a list of all the objects in the collection.
+ # Because a weak dict is used internally, iteration
+ # is dicey (the underlying dict may change size during
+ # iteration, due to gc or activity from other threads).
+ # as_list() attempts to be safe.
+ def as_list(self):
+ return self.data.values()
Modified: ZODB/trunk/src/persistent/tests/test_persistent.py
===================================================================
--- ZODB/trunk/src/persistent/tests/test_persistent.py 2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/src/persistent/tests/test_persistent.py 2004-11-02 19:38:48 UTC (rev 28323)
@@ -11,44 +11,16 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-import doctest
-import os
-import sys
-import unittest
-import persistent.tests
from persistent import Persistent
+from zope.testing.doctestunit import DocFileSuite
+
class P(Persistent):
def __init__(self):
self.x = 0
def inc(self):
self.x += 1
-try:
- DocFileSuite = doctest.DocFileSuite # >= Python 2.4.0a2
-except AttributeError:
- # <= Python 2.4.0a1
- def DocFileSuite(path, globs=None):
- # It's not entirely obvious how to connection this single string
- # with unittest. For now, re-use the _utest() function that comes
- # standard with doctest in Python 2.3. One problem is that the
- # error indicator doesn't point to the line of the doctest file
- # that failed.
-
- path = os.path.join(persistent.tests.__path__[0], path)
-
- source = open(path).read()
- if globs is None:
- globs = sys._getframe(1).f_globals
- t = doctest.Tester(globs=globs)
- def runit():
- doctest._utest(t, path, source, path, 0)
- f = unittest.FunctionTestCase(runit,
- description="doctest from %s" % path)
- suite = unittest.TestSuite()
- suite.addTest(f)
- return suite
-
def test_suite():
return DocFileSuite("persistent.txt", globs={"P": P})
Modified: ZODB/trunk/src/transaction/_manager.py
===================================================================
--- ZODB/trunk/src/transaction/_manager.py 2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/src/transaction/_manager.py 2004-11-02 19:38:48 UTC (rev 28323)
@@ -18,7 +18,6 @@
"""
import thread
-import weakref
from transaction._transaction import Transaction
@@ -28,48 +27,16 @@
# practice not to explicitly close Connection objects, and keeping
# a Connection alive keeps a potentially huge number of other objects
# alive (e.g., the cache, and everything reachable from it too).
+# Therefore we use "weak sets" internally.
#
-# Therefore we use "weak sets" internally. The implementation here
-# implements just enough of Python's sets.Set interface for our needs.
+# Obscure: because of the __init__.py maze, we can't import WeakSet
+# at top level here.
-class WeakSet(object):
- """A set of objects that doesn't keep its elements alive.
-
- The objects in the set must be weakly referencable.
- The objects need not be hashable, and need not support comparison.
- Two objects are considered to be the same iff their id()s are equal.
-
- When the only references to an object are weak references (including
- those from WeakSets), the object can be garbage-collected, and
- will vanish from any WeakSets it may be a member of at that time.
- """
-
- def __init__(self):
- # Map id(obj) to obj. By using ids as keys, we avoid requiring
- # that the elements be hashable or comparable.
- self.data = weakref.WeakValueDictionary()
-
- # Same as a Set, add obj to the collection.
- def add(self, obj):
- self.data[id(obj)] = obj
-
- # Same as a Set, remove obj from the collection, and raise
- # KeyError if obj not in the collection.
- def remove(self, obj):
- del self.data[id(obj)]
-
- # Return a list of all the objects in the collection.
- # Because a weak dict is used internally, iteration
- # is dicey (the underlying dict may change size during
- # iteration, due to gc or activity from other threads).
- # as_list() attempts to be safe.
- def as_list(self):
- return self.data.values()
-
-
class TransactionManager(object):
def __init__(self):
+ from ZODB.utils import WeakSet
+
self._txn = None
self._synchs = WeakSet()
@@ -135,6 +102,8 @@
del self._txns[tid]
def registerSynch(self, synch):
+ from ZODB.utils import WeakSet
+
tid = thread.get_ident()
ws = self._synchs.get(tid)
if ws is None:
Modified: ZODB/trunk/src/transaction/_transaction.py
===================================================================
--- ZODB/trunk/src/transaction/_transaction.py 2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/src/transaction/_transaction.py 2004-11-02 19:38:48 UTC (rev 28323)
@@ -261,9 +261,10 @@
self._resources.append(adapter)
def begin(self):
- warnings.warn("Transaction.begin() should no longer be used; use "
- "the begin() method of a transaction manager.",
- DeprecationWarning, stacklevel=2)
+ from ZODB.utils import deprecated36
+
+ deprecated36("Transaction.begin() should no longer be used; use "
+ "the begin() method of a transaction manager.")
if (self._resources or
self._sub or
self._nonsub or
More information about the Zodb-checkins
mailing list