[Zodb-checkins] SVN: ZODB/trunk/ Merge tim-simpler_connection branch.

Tim Peters tim.one at comcast.net
Tue Nov 2 14:38:48 EST 2004


Log message for revision 28323:
  Merge tim-simpler_connection branch.
  
  There's no longer a hard limit on # of open connections per DB.
  
  Introduced a sane scheme for raising deprecation warnings.
  Sane ==
  
  1. The machinery ensures that a "this will be removed in ZODB 3.6"
     blurb gets attached to all deprecation warnings.
  
  and
  
  2. It will dead easy to find these when it's time for 3.6.
  

Changed:
  U   ZODB/trunk/NEWS.txt
  U   ZODB/trunk/setup.py
  U   ZODB/trunk/src/ZODB/Connection.py
  U   ZODB/trunk/src/ZODB/DB.py
  A   ZODB/trunk/src/ZODB/tests/dbopen.txt
  U   ZODB/trunk/src/ZODB/tests/testConnection.py
  U   ZODB/trunk/src/ZODB/tests/testDB.py
  U   ZODB/trunk/src/ZODB/tests/testZODB.py
  A   ZODB/trunk/src/ZODB/tests/test_doctest_files.py
  U   ZODB/trunk/src/ZODB/utils.py
  U   ZODB/trunk/src/persistent/tests/test_persistent.py
  U   ZODB/trunk/src/transaction/_manager.py
  U   ZODB/trunk/src/transaction/_transaction.py

-=-
Modified: ZODB/trunk/NEWS.txt
===================================================================
--- ZODB/trunk/NEWS.txt	2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/NEWS.txt	2004-11-02 19:38:48 UTC (rev 28323)
@@ -2,7 +2,34 @@
 ========================
 Release date: DD-MMM-2004
 
+DB
+--
 
+- There is no longer a hard limit on the number of connections that
+  ``DB.open()`` will create.  In other words, ``DB.open()`` never blocks
+  anymore waiting for an earlier connection to close, and ``DB.open()``
+  always returns a connection now (while it wasn't documented, it was
+  possible for ``DB.open()`` to return ``None`` before).
+
+  ``pool_size`` continues to default to 7, but its meaning has changed:
+  if more than ``pool_size`` connections are obtained from ``DB.open()``
+  and not closed, a warning is logged; if more than twice ``pool_size``, a
+  critical problem is logged.  ``pool_size`` should be set to the maximum
+  number of connections from the ``DB`` instance you expect to have open
+  simultaneously.
+
+  In addition, if a connection obtained from ``DB.open()`` becomes
+  unreachable without having been explicitly closed, when Python's garbage
+  collection reclaims that connection it no longer counts against the
+  ``pool_size`` thresholds for logging messages.
+
+  The following optional arguments to ``DB.open()`` are deprecated:
+  ``transaction``, ``waitflag``, ``force`` and ``temporary``.  If one
+  is specified, its value is ignored, and ``DeprecationWarning`` is
+  raised.  In ZODB 3.6, these optional arguments will be removed.
+
+
+
 Tools
 -----
 

Modified: ZODB/trunk/setup.py
===================================================================
--- ZODB/trunk/setup.py	2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/setup.py	2004-11-02 19:38:48 UTC (rev 28323)
@@ -182,6 +182,7 @@
         "ZConfig/tests/library/widget",
         "ZEO",
         "ZODB",
+        "ZODB/tests",
         "zdaemon",
         "zdaemon/tests",
         ]:

Modified: ZODB/trunk/src/ZODB/Connection.py
===================================================================
--- ZODB/trunk/src/ZODB/Connection.py	2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/src/ZODB/Connection.py	2004-11-02 19:38:48 UTC (rev 28323)
@@ -34,6 +34,8 @@
 from ZODB.utils import u64, oid_repr, z64, positive_id
 from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr
 from ZODB.interfaces import IConnection
+from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
+
 from zope.interface import implements
 
 global_reset_counter = 0
@@ -262,9 +264,8 @@
         method.  You can pass a transaction manager (TM) to DB.open()
         to control which TM the Connection uses.
         """
-        warnings.warn("getTransaction() is deprecated. "
-                      "Use the txn_mgr argument to DB.open() instead.",
-                      DeprecationWarning)
+        deprecated36("getTransaction() is deprecated. "
+                     "Use the txn_mgr argument to DB.open() instead.")
         return self._txn_mgr.get()
 
     def setLocalTransaction(self):
@@ -276,9 +277,8 @@
         can pass a transaction manager (TM) to DB.open() to control
         which TM the Connection uses.
         """
-        warnings.warn("setLocalTransaction() is deprecated. "
-                      "Use the txn_mgr argument to DB.open() instead.",
-                      DeprecationWarning)
+        deprecated36("setLocalTransaction() is deprecated. "
+                     "Use the txn_mgr argument to DB.open() instead.")
         if self._txn_mgr is transaction.manager:
             if self._synch:
                 self._txn_mgr.unregisterSynch(self)
@@ -486,14 +486,14 @@
 
     def cacheFullSweep(self, dt=None):
         # XXX needs doc string
-        warnings.warn("cacheFullSweep is deprecated. "
-                      "Use cacheMinimize instead.", DeprecationWarning)
+        deprecated36("cacheFullSweep is deprecated. "
+                     "Use cacheMinimize instead.")
         if dt is None:
             self._cache.full_sweep()
         else:
             self._cache.full_sweep(dt)
 
-    def cacheMinimize(self, dt=None):
+    def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
         """Deactivate all unmodified objects in the cache.
 
         Call _p_deactivate() on each cached object, attempting to turn
@@ -503,9 +503,8 @@
         :Parameters:
           - `dt`: ignored.  It is provided only for backwards compatibility.
         """
-        if dt is not None:
-            warnings.warn("The dt argument to cacheMinimize is ignored.",
-                          DeprecationWarning)
+        if dt is not DEPRECATED_ARGUMENT:
+            deprecated36("cacheMinimize() dt= is ignored.")
         self._cache.minimize()
 
     def cacheGC(self):
@@ -781,8 +780,8 @@
             # an oid is being registered.  I can't think of any way to
             # achieve that without assignment to _p_jar.  If there is
             # a way, this will be a very confusing warning.
-            warnings.warn("Assigning to _p_jar is deprecated",
-                          DeprecationWarning)
+            deprecated36("Assigning to _p_jar is deprecated, and will be "
+                         "changed to raise an exception.")
         elif obj._p_oid in self._added:
             # It was registered before it was added to _added.
             return

Modified: ZODB/trunk/src/ZODB/DB.py
===================================================================
--- ZODB/trunk/src/ZODB/DB.py	2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/src/ZODB/DB.py	2004-11-02 19:38:48 UTC (rev 28323)
@@ -16,7 +16,7 @@
 $Id$"""
 
 import cPickle, cStringIO, sys
-from thread import allocate_lock
+import threading
 from time import time, ctime
 import warnings
 import logging
@@ -25,11 +25,117 @@
 from ZODB.utils import z64
 from ZODB.Connection import Connection
 from ZODB.serialize import referencesf
+from ZODB.utils import WeakSet
+from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
 
 import transaction
 
 logger = logging.getLogger('ZODB.DB')
 
+class _ConnectionPool(object):
+    """Manage a pool of connections.
+
+    CAUTION:  Methods should be called under the protection of a lock.
+    This class does no locking of its own.
+
+    There's no limit on the number of connections this can keep track of,
+    but a warning is logged if there are more than pool_size active
+    connections, and a critical problem if more than twice pool_size.
+
+    New connections are registered via push().  This will log a message if
+    "too many" connections are active.
+
+    When a connection is explicitly closed, tell the pool via repush().
+    That adds the connection to a stack of connections available for
+    reuse, and throws away the oldest stack entries if the stack is too large.
+    pop() pops this stack.
+
+    When a connection is obtained via pop(), the pool holds only a weak
+    reference to it thereafter.  It's not necessary to inform the pool
+    if the connection goes away.  A connection handed out by pop() counts
+    against pool_size only so long as it exists, and provided it isn't
+    repush()'ed.  A weak reference is retained so that DB methods like
+    connectionDebugInfo() can still gather statistics.
+    """
+
+    def __init__(self, pool_size):
+        # The largest # of connections we expect to see alive simultaneously.
+        self.pool_size = pool_size
+
+        # A weak set of all connections we've seen.  A connection vanishes
+        # from this set if pop() hands it out, it's not reregistered via
+        # repush(), and it becomes unreachable.
+        self.all = WeakSet()
+
+        # A stack of connections available to hand out.  This is a subset
+        # of self.all.  push() and repush() add to this, and may remove
+        # the oldest available connections if the pool is too large.
+        # pop() pops this stack.  There are never more than pool_size entries
+        # in this stack.
+        # In Python 2.4, a collections.deque would make more sense than
+        # a list (we push only "on the right", but may pop from both ends).
+        self.available = []
+
+    # Change our belief about the expected maximum # of live connections.
+    # If the pool_size is smaller than the current value, this may discard
+    # the oldest available connections.
+    def set_pool_size(self, pool_size):
+        self.pool_size = pool_size
+        self._reduce_size()
+
+    # Register a new available connection.  We must not know about c already.
+    # c will be pushed onto the available stack even if we're over the
+    # pool size limit.
+    def push(self, c):
+        assert c not in self.all
+        assert c not in self.available
+        self._reduce_size(strictly_less=True)
+        self.all.add(c)
+        self.available.append(c)
+        n, limit = len(self.all), self.pool_size
+        if n > limit:
+            reporter = logger.warn
+            if n > 2 * limit:
+                reporter = logger.critical
+            reporter("DB.open() has %s open connections with a pool_size "
+                     "of %s", n, limit)
+
+    # Reregister an available connection formerly obtained via pop().  This
+    # pushes it on the stack of available connections, and may discard
+    # older available connections.
+    def repush(self, c):
+        assert c in self.all
+        assert c not in self.available
+        self._reduce_size(strictly_less=True)
+        self.available.append(c)
+
+    # Throw away the oldest available connections until we're under our
+    # target size (strictly_less=False) or no more than that (strictly_less=
+    # True, the default).
+    def _reduce_size(self, strictly_less=False):
+        target = self.pool_size - bool(strictly_less)
+        while len(self.available) > target:
+            c = self.available.pop(0)
+            self.all.remove(c)
+
+    # Pop an available connection and return it, or return None if none are
+    # available.  In the latter case, the caller should create a new
+    # connection, register it via push(), and call pop() again.  The
+    # caller is responsible for serializing this sequence.
+    def pop(self):
+        result = None
+        if self.available:
+            result = self.available.pop()
+            # Leave it in self.all, so we can still get at it for statistics
+            # while it's alive.
+            assert result in self.all
+        return result
+
+    # Return a list of all connections we currently know about.
+    def all_as_list(self):
+        return self.all.as_list()
+
+
 class DB(object):
     """The Object Database
     -------------------
@@ -41,9 +147,9 @@
     The DB instance manages a pool of connections.  If a connection is
     closed, it is returned to the pool and its object cache is
     preserved.  A subsequent call to open() will reuse the connection.
-    There is a limit to the pool size; if all its connections are in
-    use, calls to open() will block until one of the open connections
-    is closed.
+    There is no hard limit on the pool size.  If more than `pool_size`
+    connections are opened, a warning is logged, and if more than twice
+    that many, a critical problem is logged.
 
     The class variable 'klass' is used by open() to create database
     connections.  It is set to Connection, but a subclass could override
@@ -81,41 +187,42 @@
     def __init__(self, storage,
                  pool_size=7,
                  cache_size=400,
-                 cache_deactivate_after=None,
+                 cache_deactivate_after=DEPRECATED_ARGUMENT,
                  version_pool_size=3,
                  version_cache_size=100,
-                 version_cache_deactivate_after=None,
+                 version_cache_deactivate_after=DEPRECATED_ARGUMENT,
                  ):
         """Create an object database.
 
         :Parameters:
           - `storage`: the storage used by the database, e.g. FileStorage
-          - `pool_size`: maximum number of open connections
+          - `pool_size`: expected maximum number of open connections
           - `cache_size`: target size of Connection object cache
-          - `cache_deactivate_after`: ignored
-          - `version_pool_size`: maximum number of connections (per version)
+          - `version_pool_size`: expected maximum number of connections (per
+            version)
           - `version_cache_size`: target size of Connection object cache for
             version connections
+          - `cache_deactivate_after`: ignored
           - `version_cache_deactivate_after`: ignored
         """
-        # Allocate locks:
-        l = allocate_lock()
-        self._a = l.acquire
-        self._r = l.release
+        # Allocate lock.
+        x = threading.RLock()
+        self._a = x.acquire
+        self._r = x.release
 
         # Setup connection pools and cache info
-        self._pools = {},[]
-        self._temps = []
+        # _pools maps a version string to a _ConnectionPool object.
+        self._pools = {}
         self._pool_size = pool_size
         self._cache_size = cache_size
         self._version_pool_size = version_pool_size
         self._version_cache_size = version_cache_size
 
         # warn about use of deprecated arguments
-        if (cache_deactivate_after is not None or
-            version_cache_deactivate_after is not None):
-            warnings.warn("cache_deactivate_after has no effect",
-                          DeprecationWarning)
+        if cache_deactivate_after is not DEPRECATED_ARGUMENT:
+            deprecated36("cache_deactivate_after has no effect")
+        if version_cache_deactivate_after is not DEPRECATED_ARGUMENT:
+            deprecated36("version_cache_deactivate_after has no effect")
 
         self._miv_cache = {}
 
@@ -151,6 +258,7 @@
         if hasattr(storage, 'undoInfo'):
             self.undoInfo = storage.undoInfo
 
+    # This is called by Connection.close().
     def _closeConnection(self, connection):
         """Return a connection to the pool.
 
@@ -165,10 +273,10 @@
             am = self._activity_monitor
             if am is not None:
                 am.closedConnection(connection)
+
             version = connection._version
-            pools, pooll = self._pools
             try:
-                pool, allocated, pool_lock = pools[version]
+                pool = self._pools[version]
             except KeyError:
                 # No such version. We must have deleted the pool.
                 # Just let the connection go.
@@ -177,30 +285,18 @@
                 # XXX What objects are involved in the cycle?
                 connection.__dict__.clear()
                 return
+            pool.repush(connection)
 
-            pool.append(connection)
-            if len(pool) == 1:
-                # Pool now usable again, unlock it.
-                pool_lock.release()
         finally:
             self._r()
 
+    # Call f(c) for all connections c in all pools in all versions.
     def _connectionMap(self, f):
         self._a()
         try:
-            pools, pooll = self._pools
-            for pool, allocated in pooll:
-                for cc in allocated:
-                    f(cc)
-
-            temps = self._temps
-            if temps:
-                t = []
-                rc = sys.getrefcount
-                for cc in temps:
-                    if rc(cc) > 3:
-                        f(cc)
-                self._temps = t
+            for pool in self._pools.values():
+                for c in pool.all_as_list():
+                    f(c)
         finally:
             self._r()
 
@@ -216,12 +312,12 @@
         """
 
         detail = {}
-        def f(con, detail=detail, have_detail=detail.has_key):
+        def f(con, detail=detail):
             for oid, ob in con._cache.items():
                 module = getattr(ob.__class__, '__module__', '')
                 module = module and '%s.' % module or ''
                 c = "%s%s" % (module, ob.__class__.__name__)
-                if have_detail(c):
+                if c in detail:
                     detail[c] += 1
                 else:
                     detail[c] = 1
@@ -276,7 +372,7 @@
         self._connectionMap(lambda c: c._cache.full_sweep())
 
     def cacheLastGCTime(self):
-        m=[0]
+        m = [0]
         def f(con, m=m):
             t = con._cache.cache_last_gc_time
             if t > m[0]:
@@ -289,7 +385,7 @@
         self._connectionMap(lambda c: c._cache.minimize())
 
     def cacheSize(self):
-        m=[0]
+        m = [0]
         def f(con, m=m):
             m[0] += con._cache.cache_non_ghost_count
 
@@ -299,9 +395,9 @@
     def cacheDetailSize(self):
         m = []
         def f(con, m=m):
-            m.append({'connection':repr(con),
-                      'ngsize':con._cache.cache_non_ghost_count,
-                      'size':len(con._cache)})
+            m.append({'connection': repr(con),
+                      'ngsize': con._cache.cache_non_ghost_count,
+                      'size': len(con._cache)})
         self._connectionMap(f)
         m.sort()
         return m
@@ -358,39 +454,24 @@
         if connection is not None:
             version = connection._version
         # Update modified in version cache
-        # XXX must make this work with list or dict to backport to 2.6
         for oid in oids.keys():
             h = hash(oid) % 131
             o = self._miv_cache.get(h, None)
             if o is not None and o[0]==oid:
                 del self._miv_cache[h]
 
-        # Notify connections
-        for pool, allocated in self._pools[1]:
-            for cc in allocated:
-                if (cc is not connection and
-                    (not version or cc._version==version)):
-                    if sys.getrefcount(cc) <= 3:
-                        cc.close()
-                    cc.invalidate(tid, oids)
+        # Notify connections.
+        def inval(c):
+            if (c is not connection and
+                  (not version or c._version == version)):
+                c.invalidate(tid, oids)
+        self._connectionMap(inval)
 
-        if self._temps:
-            t = []
-            for cc in self._temps:
-                if sys.getrefcount(cc) > 3:
-                    if (cc is not connection and
-                        (not version or cc._version == version)):
-                        cc.invalidate(tid, oids)
-                    t.append(cc)
-                else:
-                    cc.close()
-            self._temps = t
-
     def modifiedInVersion(self, oid):
         h = hash(oid) % 131
         cache = self._miv_cache
-        o=cache.get(h, None)
-        if o and o[0]==oid:
+        o = cache.get(h, None)
+        if o and o[0] == oid:
             return o[1]
         v = self._storage.modifiedInVersion(oid)
         cache[h] = oid, v
@@ -399,203 +480,108 @@
     def objectCount(self):
         return len(self._storage)
 
-    def open(self, version='', transaction=None, temporary=0, force=None,
-             waitflag=1, mvcc=True, txn_mgr=None, synch=True):
+    def open(self, version='',
+             transaction=DEPRECATED_ARGUMENT, temporary=DEPRECATED_ARGUMENT,
+             force=DEPRECATED_ARGUMENT, waitflag=DEPRECATED_ARGUMENT,
+             mvcc=True, txn_mgr=None, synch=True):
         """Return a database Connection for use by application code.
 
-        The optional version argument can be used to specify that a
+        The optional `version` argument can be used to specify that a
         version connection is desired.
 
-        The optional transaction argument can be provided to cause the
-        connection to be automatically closed when a transaction is
-        terminated.  In addition, connections per transaction are
-        reused, if possible.
-
         Note that the connection pool is managed as a stack, to
-        increate the likelihood that the connection's stack will
+        increase the likelihood that the connection's stack will
         include useful objects.
 
         :Parameters:
           - `version`: the "version" that all changes will be made
              in, defaults to no version.
-          - `transaction`: XXX
-          - `temporary`: XXX
-          - `force`: XXX
-          - `waitflag`: XXX
           - `mvcc`: boolean indicating whether MVCC is enabled
           - `txn_mgr`: transaction manager to use.  None means
              used the default transaction manager.
           - `synch`: boolean indicating whether Connection should
              register for afterCompletion() calls.
-
         """
-        self._a()
-        try:
 
-            if transaction is not None:
-                connections = transaction._connections
-                if connections:
-                    if connections.has_key(version) and not temporary:
-                        return connections[version]
-                else:
-                    transaction._connections = connections = {}
-                transaction = transaction._connections
+        if temporary is not DEPRECATED_ARGUMENT:
+            deprecated36("DB.open() temporary= ignored. "
+                         "open() no longer blocks.")
 
-            if temporary:
-                # This is a temporary connection.
-                # We won't bother with the pools.  This will be
-                # a one-use connection.
-                c = self.klass(version=version,
-                               cache_size=self._version_cache_size,
-                               mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
-                c._setDB(self)
-                self._temps.append(c)
-                if transaction is not None:
-                    transaction[id(c)] = c
-                return c
+        if force is not DEPRECATED_ARGUMENT:
+            deprecated36("DB.open() force= ignored. "
+                         "open() no longer blocks.")
 
-            pools, pooll = self._pools
+        if waitflag is not DEPRECATED_ARGUMENT:
+            deprecated36("DB.open() waitflag= ignored. "
+                         "open() no longer blocks.")
 
-            # pools is a mapping object:
-            #
-            #   {version -> (pool, allocated, lock)
-            #
-            # where:
-            #
-            #   pool is the connection pool for the version,
-            #   allocated is a list of all of the allocated
-            #     connections, and
-            #   lock is a lock that is used to block when a pool is
-            #     empty and no more connections can be allocated.
-            #
-            # pooll is a list of all of the pools and allocated for
-            # use in cases where we need to iterate over all
-            # connections or all inactive connections.
+        if transaction is not DEPRECATED_ARGUMENT:
+            deprecated36("DB.open() transaction= ignored.")
 
-            # Pool locks are tricky.  Basically, the lock needs to be
-            # set whenever the pool becomes empty so that threads are
-            # forced to wait until the pool gets a connection in it.
-            # The lock is acquired when the (empty) pool is
-            # created.  The lock is acquired just prior to removing
-            # the last connection from the pool and released just after
-            # adding a connection to an empty pool.
+        self._a()
+        try:
+            # pool <- the _ConnectionPool for this version
+            pool = self._pools.get(version)
+            if pool is None:
+                if version:
+                    size = self._version_pool_size
+                else:
+                    size = self._pool_size
+                self._pools[version] = pool = _ConnectionPool(size)
+            assert pool is not None
 
-
-            if pools.has_key(version):
-                pool, allocated, pool_lock = pools[version]
-            else:
-                pool, allocated, pool_lock = pools[version] = (
-                    [], [], allocate_lock())
-                pooll.append((pool, allocated))
-                pool_lock.acquire()
-
-
-            if not pool:
-                c = None
+            # result <- a connection
+            result = pool.pop()
+            if result is None:
                 if version:
-                    if self._version_pool_size > len(allocated) or force:
-                        c = self.klass(version=version,
-                                       cache_size=self._version_cache_size,
-                                       mvcc=mvcc, txn_mgr=txn_mgr)
-                        allocated.append(c)
-                        pool.append(c)
-                elif self._pool_size > len(allocated) or force:
-                    c = self.klass(version=version,
-                                   cache_size=self._cache_size,
-                                   mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
-                    allocated.append(c)
-                    pool.append(c)
+                    size = self._version_cache_size
+                else:
+                    size = self._cache_size
+                c = self.klass(version=version, cache_size=size,
+                               mvcc=mvcc, txn_mgr=txn_mgr)
+                pool.push(c)
+                result = pool.pop()
+            assert result is not None
 
-                if c is None:
-                    if waitflag:
-                        self._r()
-                        pool_lock.acquire()
-                        self._a()
-                        if len(pool) > 1:
-                            # Note that the pool size will normally be 1 here,
-                            # but it could be higher due to a race condition.
-                            pool_lock.release()
-                    else:
-                        return
+            # Tell the connection it belongs to self.
+            result._setDB(self, mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
 
-            elif len(pool)==1:
-                # Taking last one, lock the pool.
-                # Note that another thread might grab the lock
-                # before us, so we might actually block, however,
-                # when we get the lock back, there *will* be a
-                # connection in the pool.  OTOH, there's no limit on
-                # how long we may need to wait:  if the other thread
-                # grabbed the lock in this section too, we'll wait
-                # here until another connection is closed.
-                # checkConcurrentUpdates1Storage provoked this frequently
-                # on a hyperthreaded machine, with its second thread
-                # timing out after waiting 5 minutes for DB.open() to
-                # return.  So, if we can't get the pool lock immediately,
-                # now we make a recursive call.  This allows the current
-                # thread to allocate a new connection instead of waiting
-                # arbitrarily long for the single connection in the pool
-                # right now.
-                self._r()
-                if not pool_lock.acquire(0):
-                    result = DB.open(self, version, transaction, temporary,
-                                     force, waitflag)
-                    self._a()
-                    return result
-                self._a()
-                if len(pool) > 1:
-                    # Note that the pool size will normally be 1 here,
-                    # but it could be higher due to a race condition.
-                    pool_lock.release()
+            # A good time to do some cache cleanup.
+            self._connectionMap(lambda c: c.cacheGC())
 
-            c = pool.pop()
-            c._setDB(self, mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
-            for pool, allocated in pooll:
-                for cc in pool:
-                    cc.cacheGC()
+            return result
 
-            if transaction is not None:
-                transaction[version] = c
-            return c
-
         finally:
             self._r()
 
     def removeVersionPool(self, version):
-        pools, pooll = self._pools
-        info = pools.get(version)
-        if info:
-            del pools[version]
-            pool, allocated, pool_lock = info
-            pooll.remove((pool, allocated))
-            try:
-                pool_lock.release()
-            except: # XXX Do we actually expect this to fail?
-                pass
-            del pool[:]
-            del allocated[:]
+        try:
+            del self._pools[version]
+        except KeyError:
+            pass
 
     def connectionDebugInfo(self):
-        r = []
-        pools, pooll = self._pools
+        result = []
         t = time()
-        for version, (pool, allocated, lock) in pools.items():
-            for c in allocated:
-                o = c._opened
-                d = c._debug_info
-                if d:
-                    if len(d)==1:
-                        d = d[0]
-                else:
-                    d=''
-                d = "%s (%s)" % (d, len(c._cache))
+        def f(c):
+            o = c._opened
+            d = c._debug_info
+            if d:
+                if len(d) == 1:
+                    d = d[0]
+            else:
+                d = ''
+            d = "%s (%s)" % (d, len(c._cache))
 
-                r.append({
-                    'opened': o and ("%s (%.2fs)" % (ctime(o), t-o)),
-                    'info': d,
-                    'version': version,
-                    })
-        return r
+            result.append({
+                'opened': o and ("%s (%.2fs)" % (ctime(o), t-o)),
+                'info': d,
+                'version': version,
+                })
 
+        self._connectionMap(f)
+        return result
+
     def getActivityMonitor(self):
         return self._activity_monitor
 
@@ -623,34 +609,52 @@
             logger.error("packing", exc_info=True)
             raise
 
-    def setCacheSize(self, v):
-        self._cache_size = v
-        d = self._pools[0]
-        pool_info = d.get('')
-        if pool_info is not None:
-            for c in pool_info[1]:
-                c._cache.cache_size = v
+    def setActivityMonitor(self, am):
+        self._activity_monitor = am
 
     def classFactory(self, connection, modulename, globalname):
         # Zope will rebind this method to arbitrary user code at runtime.
         return find_global(modulename, globalname)
 
-    def setPoolSize(self, v):
-        self._pool_size = v
+    def setCacheSize(self, v):
+        self._a()
+        try:
+            self._cache_size = v
+            pool = self._pools.get('')
+            if pool is not None:
+                for c in pool.all_as_list():
+                    c._cache.cache_size = v
+        finally:
+            self._r()
 
-    def setActivityMonitor(self, am):
-        self._activity_monitor = am
-
     def setVersionCacheSize(self, v):
-        self._version_cache_size = v
-        for ver in self._pools[0].keys():
-            if ver:
-                for c in self._pools[0][ver][1]:
-                    c._cache.cache_size = v
+        self._a()
+        try:
+            self._version_cache_size = v
+            for version, pool in self._pools.items():
+                if version:
+                    for c in pool.all_as_list():
+                        c._cache.cache_size = v
+        finally:
+            self._r()
 
-    def setVersionPoolSize(self, v):
-        self._version_pool_size=v
+    def setPoolSize(self, size):
+        self._pool_size = size
+        self._reset_pool_sizes(size, for_versions=False)
 
+    def setVersionPoolSize(self, size):
+        self._version_pool_size = size
+        self._reset_pool_sizes(size, for_versions=True)
+
+    def _reset_pool_sizes(self, size, for_versions=False):
+        self._a()
+        try:
+            for version, pool in self._pools.items():
+                if (version != '') == for_versions:
+                    pool.set_pool_size(size)
+        finally:
+            self._r()
+
     def undo(self, id, txn=None):
         """Undo a transaction identified by id.
 
@@ -679,23 +683,19 @@
 
     def getCacheDeactivateAfter(self):
         """Deprecated"""
-        warnings.warn("cache_deactivate_after has no effect",
-                      DeprecationWarning)
+        deprecated36("getCacheDeactivateAfter has no effect")
 
     def getVersionCacheDeactivateAfter(self):
         """Deprecated"""
-        warnings.warn("cache_deactivate_after has no effect",
-                      DeprecationWarning)
+        deprecated36("getVersionCacheDeactivateAfter has no effect")
 
     def setCacheDeactivateAfter(self, v):
         """Deprecated"""
-        warnings.warn("cache_deactivate_after has no effect",
-                      DeprecationWarning)
+        deprecated36("setCacheDeactivateAfter has no effect")
 
     def setVersionCacheDeactivateAfter(self, v):
         """Deprecated"""
-        warnings.warn("cache_deactivate_after has no effect",
-                      DeprecationWarning)
+        deprecated36("setVersionCacheDeactivateAfter has no effect")
 
 class ResourceManager(object):
     """Transaction participation for a version or undo resource."""

Copied: ZODB/trunk/src/ZODB/tests/dbopen.txt (from rev 28322, ZODB/branches/tim-simpler_connection/src/ZODB/tests/dbopen.txt)


Property changes on: ZODB/trunk/src/ZODB/tests/dbopen.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: ZODB/trunk/src/ZODB/tests/testConnection.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testConnection.py	2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/src/ZODB/tests/testConnection.py	2004-11-02 19:38:48 UTC (rev 28323)
@@ -414,8 +414,9 @@
         >>> len(hook.warnings)
         1
         >>> message, category, filename, lineno = hook.warnings[0]
-        >>> message
-        'The dt argument to cacheMinimize is ignored.'
+        >>> print message
+        This will be removed in ZODB 3.6:
+        cacheMinimize() dt= is ignored.
         >>> category.__name__
         'DeprecationWarning'
         >>> hook.clear()
@@ -434,8 +435,9 @@
         >>> len(hook.warnings)
         2
         >>> message, category, filename, lineno = hook.warnings[0]
-        >>> message
-        'cacheFullSweep is deprecated. Use cacheMinimize instead.'
+        >>> print message
+        This will be removed in ZODB 3.6:
+        cacheFullSweep is deprecated. Use cacheMinimize instead.
         >>> category.__name__
         'DeprecationWarning'
         >>> message, category, filename, lineno = hook.warnings[1]

Modified: ZODB/trunk/src/ZODB/tests/testDB.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testDB.py	2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/src/ZODB/tests/testDB.py	2004-11-02 19:38:48 UTC (rev 28323)
@@ -23,6 +23,10 @@
 
 from ZODB.tests.MinPO import MinPO
 
+# Return total number of connections across all pools in a db._pools.
+def nconn(pools):
+    return sum([len(pool.all) for pool in pools.values()])
+
 class DBTests(unittest.TestCase):
 
     def setUp(self):
@@ -75,22 +79,22 @@
         c12.close() # return to pool
         self.assert_(c1 is c12) # should be same
 
-        pools, pooll = self.db._pools
+        pools = self.db._pools
 
         self.assertEqual(len(pools), 3)
-        self.assertEqual(len(pooll), 3)
+        self.assertEqual(nconn(pools), 3)
 
         self.db.removeVersionPool('v1')
 
         self.assertEqual(len(pools), 2)
-        self.assertEqual(len(pooll), 2)
+        self.assertEqual(nconn(pools), 2)
 
         c12 = self.db.open('v1')
         c12.close() # return to pool
         self.assert_(c1 is not c12) # should be different
 
         self.assertEqual(len(pools), 3)
-        self.assertEqual(len(pooll), 3)
+        self.assertEqual(nconn(pools), 3)
 
     def _test_for_leak(self):
         self.dowork()
@@ -112,27 +116,27 @@
         c12 = self.db.open('v1')
         self.assert_(c1 is c12) # should be same
 
-        pools, pooll = self.db._pools
+        pools = self.db._pools
 
         self.assertEqual(len(pools), 3)
-        self.assertEqual(len(pooll), 3)
+        self.assertEqual(nconn(pools), 3)
 
         self.db.removeVersionPool('v1')
 
         self.assertEqual(len(pools), 2)
-        self.assertEqual(len(pooll), 2)
+        self.assertEqual(nconn(pools), 2)
 
         c12.close() # should leave pools alone
 
         self.assertEqual(len(pools), 2)
-        self.assertEqual(len(pooll), 2)
+        self.assertEqual(nconn(pools), 2)
 
         c12 = self.db.open('v1')
         c12.close() # return to pool
         self.assert_(c1 is not c12) # should be different
 
         self.assertEqual(len(pools), 3)
-        self.assertEqual(len(pooll), 3)
+        self.assertEqual(nconn(pools), 3)
 
 
 def test_suite():

Modified: ZODB/trunk/src/ZODB/tests/testZODB.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testZODB.py	2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/src/ZODB/tests/testZODB.py	2004-11-02 19:38:48 UTC (rev 28323)
@@ -243,9 +243,13 @@
             self.assertEqual(r1['item'], 2)
             self.assertEqual(r2['item'], 2)
             for msg, obj, filename, lineno in hook.warnings:
-                self.assert_(
-                    msg.startswith("setLocalTransaction() is deprecated.") or
-                    msg.startswith("getTransaction() is deprecated."))
+                self.assert_(msg in [
+                    "This will be removed in ZODB 3.6:\n"
+                        "setLocalTransaction() is deprecated. "
+                        "Use the txn_mgr argument to DB.open() instead.",
+                    "This will be removed in ZODB 3.6:\n"
+                        "getTransaction() is deprecated. "
+                        "Use the txn_mgr argument to DB.open() instead."])
         finally:
             conn1.close()
             conn2.close()

Copied: ZODB/trunk/src/ZODB/tests/test_doctest_files.py (from rev 28322, ZODB/branches/tim-simpler_connection/src/ZODB/tests/test_doctest_files.py)


Property changes on: ZODB/trunk/src/ZODB/tests/test_doctest_files.py
___________________________________________________________________
Name: svn:keywords
   + Id
Name: svn:eol-style
   + native

Modified: ZODB/trunk/src/ZODB/utils.py
===================================================================
--- ZODB/trunk/src/ZODB/utils.py	2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/src/ZODB/utils.py	2004-11-02 19:38:48 UTC (rev 28323)
@@ -18,6 +18,8 @@
 from binascii import hexlify
 import cPickle
 import cStringIO
+import weakref
+import warnings
 
 from persistent.TimeStamp import TimeStamp
 
@@ -34,8 +36,27 @@
            'positive_id',
            'get_refs',
            'readable_tid_repr',
+           'WeakSet',
+           'DEPRECATED_ARGUMENT',
+           'deprecated36',
           ]
 
+# A unique marker to give as the default value for a deprecated argument.
+# The method should then do a
+#
+#     if that_arg is not DEPRECATED_ARGUMENT:
+#         complain
+#
+# dance.
+DEPRECATED_ARGUMENT = object()
+
+# Raise DeprecationWarning, noting that the deprecated thing will go
+# away in ZODB 3.6.  Point to the caller of our caller (i.e., at the
+# code using the deprecated thing).
+def deprecated36(msg):
+    warnings.warn("This will be removed in ZODB 3.6:\n%s" % msg,
+                  DeprecationWarning, stacklevel=3)
+
 z64 = '\0'*8
 
 # TODO The purpose of t32 is unclear.  Code that uses it is usually
@@ -164,3 +185,46 @@
     u.noload() # class info
     u.noload() # instance state info
     return refs
+
+# A simple implementation of weak sets, supplying just enough of Python's
+# sets.Set interface for our needs.
+
+class WeakSet(object):
+    """A set of objects that doesn't keep its elements alive.
+
+    The objects in the set must be weakly referencable.
+    The objects need not be hashable, and need not support comparison.
+    Two objects are considered to be the same iff their id()s are equal.
+
+    When the only references to an object are weak references (including
+    those from WeakSets), the object can be garbage-collected, and
+    will vanish from any WeakSets it may be a member of at that time.
+    """
+
+    def __init__(self):
+        # Map id(obj) to obj.  By using ids as keys, we avoid requiring
+        # that the elements be hashable or comparable.
+        self.data = weakref.WeakValueDictionary()
+
+    def __len__(self):
+        return len(self.data)
+
+    def __contains__(self, obj):
+        return id(obj) in self.data
+
+    # Same as a Set, add obj to the collection.
+    def add(self, obj):
+        self.data[id(obj)] = obj
+
+    # Same as a Set, remove obj from the collection, and raise
+    # KeyError if obj not in the collection.
+    def remove(self, obj):
+        del self.data[id(obj)]
+
+    # Return a list of all the objects in the collection.
+    # Because a weak dict is used internally, iteration
+    # is dicey (the underlying dict may change size during
+    # iteration, due to gc or activity from other threads).
+    # as_list() attempts to be safe.
+    def as_list(self):
+        return self.data.values()

Modified: ZODB/trunk/src/persistent/tests/test_persistent.py
===================================================================
--- ZODB/trunk/src/persistent/tests/test_persistent.py	2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/src/persistent/tests/test_persistent.py	2004-11-02 19:38:48 UTC (rev 28323)
@@ -11,44 +11,16 @@
 # FOR A PARTICULAR PURPOSE.
 #
 ##############################################################################
-import doctest
-import os
-import sys
-import unittest
 
-import persistent.tests
 from persistent import Persistent
 
+from zope.testing.doctestunit import DocFileSuite
+
 class P(Persistent):
     def __init__(self):
         self.x = 0
     def inc(self):
         self.x += 1
 
-try:
-    DocFileSuite = doctest.DocFileSuite # >= Python 2.4.0a2
-except AttributeError:
-    # <= Python 2.4.0a1
-    def DocFileSuite(path, globs=None):
-        # It's not entirely obvious how to connection this single string
-        # with unittest.  For now, re-use the _utest() function that comes
-        # standard with doctest in Python 2.3.  One problem is that the
-        # error indicator doesn't point to the line of the doctest file
-        # that failed.
-
-        path = os.path.join(persistent.tests.__path__[0], path)
-        
-        source = open(path).read()
-        if globs is None:
-            globs = sys._getframe(1).f_globals
-        t = doctest.Tester(globs=globs)
-        def runit():
-            doctest._utest(t, path, source, path, 0)
-        f = unittest.FunctionTestCase(runit,
-                                      description="doctest from %s" % path)
-        suite = unittest.TestSuite()
-        suite.addTest(f)
-        return suite
-
 def test_suite():
     return DocFileSuite("persistent.txt", globs={"P": P})

Modified: ZODB/trunk/src/transaction/_manager.py
===================================================================
--- ZODB/trunk/src/transaction/_manager.py	2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/src/transaction/_manager.py	2004-11-02 19:38:48 UTC (rev 28323)
@@ -18,7 +18,6 @@
 """
 
 import thread
-import weakref
 
 from transaction._transaction import Transaction
 
@@ -28,48 +27,16 @@
 # practice not to explicitly close Connection objects, and keeping
 # a Connection alive keeps a potentially huge number of other objects
 # alive (e.g., the cache, and everything reachable from it too).
+# Therefore we use "weak sets" internally.
 #
-# Therefore we use "weak sets" internally.  The implementation here
-# implements just enough of Python's sets.Set interface for our needs.
+# Obscure:  because of the __init__.py maze, we can't import WeakSet
+# at top level here.
 
-class WeakSet(object):
-    """A set of objects that doesn't keep its elements alive.
-
-    The objects in the set must be weakly referencable.
-    The objects need not be hashable, and need not support comparison.
-    Two objects are considered to be the same iff their id()s are equal.
-
-    When the only references to an object are weak references (including
-    those from WeakSets), the object can be garbage-collected, and
-    will vanish from any WeakSets it may be a member of at that time.
-    """
-
-    def __init__(self):
-        # Map id(obj) to obj.  By using ids as keys, we avoid requiring
-        # that the elements be hashable or comparable.
-        self.data = weakref.WeakValueDictionary()
-
-    # Same as a Set, add obj to the collection.
-    def add(self, obj):
-        self.data[id(obj)] = obj
-
-    # Same as a Set, remove obj from the collection, and raise
-    # KeyError if obj not in the collection.
-    def remove(self, obj):
-        del self.data[id(obj)]
-
-    # Return a list of all the objects in the collection.
-    # Because a weak dict is used internally, iteration
-    # is dicey (the underlying dict may change size during
-    # iteration, due to gc or activity from other threads).
-    # as_list() attempts to be safe.
-    def as_list(self):
-        return self.data.values()
-
-
 class TransactionManager(object):
 
     def __init__(self):
+        from ZODB.utils import WeakSet
+
         self._txn = None
         self._synchs = WeakSet()
 
@@ -135,6 +102,8 @@
         del self._txns[tid]
 
     def registerSynch(self, synch):
+        from ZODB.utils import WeakSet
+
         tid = thread.get_ident()
         ws = self._synchs.get(tid)
         if ws is None:

Modified: ZODB/trunk/src/transaction/_transaction.py
===================================================================
--- ZODB/trunk/src/transaction/_transaction.py	2004-11-02 19:13:05 UTC (rev 28322)
+++ ZODB/trunk/src/transaction/_transaction.py	2004-11-02 19:38:48 UTC (rev 28323)
@@ -261,9 +261,10 @@
                 self._resources.append(adapter)
 
     def begin(self):
-        warnings.warn("Transaction.begin() should no longer be used; use "
-                      "the begin() method of a transaction manager.",
-                      DeprecationWarning, stacklevel=2)
+        from ZODB.utils import deprecated36
+
+        deprecated36("Transaction.begin() should no longer be used; use "
+                      "the begin() method of a transaction manager.")
         if (self._resources or
               self._sub or
               self._nonsub or



More information about the Zodb-checkins mailing list