[Zodb-checkins] SVN: ZODB/trunk/src/ZODB/DB.py Changed the way delegation of storage methods was done to work with

Jim Fulton jim at zope.com
Thu Apr 26 19:20:25 EDT 2007


Log message for revision 74842:
  Changed the way delegation of storage methods was done to work with
  storages that don't support versions or that don't support undo.
  
  Changed some comments to doc strings.
  
  Deprecated storages without tpc_vote.
  
  Removed the modifiedInVersion cache.
  
  Added an XXX about the broken ResourceManager implementations.
  

Changed:
  U   ZODB/trunk/src/ZODB/DB.py

-=-
Modified: ZODB/trunk/src/ZODB/DB.py
===================================================================
--- ZODB/trunk/src/ZODB/DB.py	2007-04-26 23:20:21 UTC (rev 74841)
+++ ZODB/trunk/src/ZODB/DB.py	2007-04-26 23:20:25 UTC (rev 74842)
@@ -15,6 +15,8 @@
 
 $Id$"""
 
+import warnings
+
 import cPickle, cStringIO, sys
 import threading
 from time import time, ctime
@@ -31,6 +33,7 @@
 
 import transaction
 
+
 logger = logging.getLogger('ZODB.DB')
 
 class _ConnectionPool(object):
@@ -77,23 +80,28 @@
         # a list (we push only "on the right", but may pop from both ends).
         self.available = []
 
-    # Change our belief about the expected maximum # of live connections.
-    # If the pool_size is smaller than the current value, this may discard
-    # the oldest available connections.
     def set_pool_size(self, pool_size):
+        """Change our belief about the expected maximum # of live connections.
+
+        If the pool_size is smaller than the current value, this may discard
+        the oldest available connections.
+        """
         self.pool_size = pool_size
         self._reduce_size()
 
-    # Register a new available connection.  We must not know about c already.
-    # c will be pushed onto the available stack even if we're over the
-    # pool size limit.
     def push(self, c):
+        """Register a new available connection.
+
+        We must not know about c already. c will be pushed onto the available
+        stack even if we're over the pool size limit.
+        """
         assert c not in self.all
         assert c not in self.available
         self._reduce_size(strictly_less=True)
         self.all.add(c)
         self.available.append(c)
-        n, limit = len(self.all), self.pool_size
+        n = len(self.all)
+        limit = self.pool_size
         if n > limit:
             reporter = logger.warn
             if n > 2 * limit:
@@ -101,20 +109,25 @@
             reporter("DB.open() has %s open connections with a pool_size "
                      "of %s", n, limit)
 
-    # Reregister an available connection formerly obtained via pop().  This
-    # pushes it on the stack of available connections, and may discard
-    # older available connections.
     def repush(self, c):
+        """Reregister an available connection formerly obtained via pop().
+
+        This pushes it on the stack of available connections, and may discard
+        older available connections.
+        """
         assert c in self.all
         assert c not in self.available
         self._reduce_size(strictly_less=True)
         self.available.append(c)
 
-    # Throw away the oldest available connections until we're under our
-    # target size (strictly_less=False) or no more than that (strictly_less=
-    # True, the default).
     def _reduce_size(self, strictly_less=False):
-        target = self.pool_size - bool(strictly_less)
+        """Throw away the oldest available connections until we're under our
+        target size (strictly_less=False, the default) or no more than that
+        (strictly_less=True).
+        """
+        target = self.pool_size
+        if strictly_less:
+            target -= 1
         while len(self.available) > target:
             c = self.available.pop(0)
             self.all.remove(c)
@@ -132,11 +145,13 @@
             # now, and `c` would be left in a user-visible crazy state.
             c._resetCache()
 
-    # Pop an available connection and return it, or return None if none are
-    # available.  In the latter case, the caller should create a new
-    # connection, register it via push(), and call pop() again.  The
-    # caller is responsible for serializing this sequence.
     def pop(self):
+        """Pop an available connection and return it.
+
+        Return None if none are available - in this case, the caller should
+        create a new connection, register it via push(), and call pop() again.
+        The caller is responsible for serializing this sequence.
+        """
         result = None
         if self.available:
             result = self.available.pop()
@@ -145,8 +160,8 @@
             assert result in self.all
         return result
 
-    # For every live connection c, invoke f(c).
     def map(self, f):
+        """For every live connection c, invoke f(c)."""
         self.all.map(f)
 
 class DB(object):
@@ -227,8 +242,6 @@
         self._version_pool_size = version_pool_size
         self._version_cache_size = version_cache_size
 
-        self._miv_cache = {}
-
         # Setup storage
         self._storage=storage
         self.references = ZODB.serialize.referencesf
@@ -238,7 +251,13 @@
             storage.registerDB(self, None) # Backward compat
             
         if not hasattr(storage, 'tpc_vote'):
+            warnings.warn(
+                "Storage doesn't have a tpc_vote and this violates "
+                "the stirage API. Violently monkeypatching in a do-nothing "
+                "tpc_vote.",
+                DeprecationWarning, 2)
             storage.tpc_vote = lambda *args: None
+
         try:
             storage.load(z64, '')
         except KeyError:
@@ -268,14 +287,46 @@
                              database_name)
         databases[database_name] = self
 
-        # Pass through methods:
-        for m in ['history', 'supportsUndo', 'supportsVersions', 'undoLog',
-                  'versionEmpty', 'versions']:
-            setattr(self, m, getattr(storage, m))
+        self._setupUndoMethods()
+        self._setupVersionMethods()
+        self.history = storage.history
 
-        if hasattr(storage, 'undoInfo'):
-            self.undoInfo = storage.undoInfo
+    def _setupUndoMethods(self):
+        storage = self._storage
+        try:
+            self.supportsUndo = storage.supportsUndo
+        except AttributeError:
+            self.supportsUndo = lambda : False
 
+        if self.supportsUndo():
+            self.undoLog = storage.undoLog
+            if hasattr(storage, 'undoInfo'):
+                self.undoInfo = storage.undoInfo
+        else:
+            self.undoLog = self.undoInfo = lambda *a,**k: ()
+            def undo(*a, **k):
+                raise NotImplementedError
+            self.undo = undo
+
+    def _setupVersionMethods(self):
+        storage = self._storage
+        try:
+            self.supportsVersions = storage.supportsVersions
+        except AttributeError:
+            self.supportsVersions = lambda : False
+
+        if self.supportsVersions():
+            self.versionEmpty = storage.versionEmpty
+            self.versions = storage.versions
+            self.modifiedInVersion = storage.modifiedInVersion
+        else:
+            self.versionEmpty = lambda version: True
+            self.versions = lambda max=None: ()
+            self.modifiedInVersion = lambda oid: ''
+            def commitVersion(*a, **k):
+                raise NotImplementedError
+            self.commitVersion = self.abortVersion = commitVersion
+
     # This is called by Connection.close().
     def _returnToPool(self, connection):
         """Return a connection to the pool.
@@ -471,12 +522,6 @@
         """
         if connection is not None:
             version = connection._version
-        # Update modified in version cache
-        for oid in oids:
-            h = hash(oid) % 131
-            o = self._miv_cache.get(h, None)
-            if o is not None and o[0]==oid:
-                del self._miv_cache[h]
 
         # Notify connections.
         def inval(c):
@@ -487,20 +532,9 @@
 
     def invalidateCache(self):
         """Invalidate each of the connection caches
-        """        
-        self._miv_cache.clear()
+        """
         self._connectionMap(lambda c: c.invalidateCache())
 
-    def modifiedInVersion(self, oid):
-        h = hash(oid) % 131
-        cache = self._miv_cache
-        o = cache.get(h, None)
-        if o and o[0] == oid:
-            return o[1]
-        v = self._storage.modifiedInVersion(oid)
-        cache[h] = oid, v
-        return v
-
     def objectCount(self):
         return len(self._storage)
 
@@ -687,17 +721,18 @@
             txn = transaction.get()
         txn.register(TransactionalUndo(self, id))
 
-    def versionEmpty(self, version):
-        return self._storage.versionEmpty(version)
 
-
-
 resource_counter_lock = threading.Lock()
 resource_counter = 0
 
 class ResourceManager(object):
     """Transaction participation for a version or undo resource."""
 
+    # XXX This implementation is broken.  Subclasses invalidate oids
+    # in their commit calls. Invalidations should not be sent until
+    # tpc_finish is called.  In fact, invalidations should be sent to
+    # the db *while* tpc_finish is being called on the storage.
+
     def __init__(self, db):
         self._db = db
         # Delegate the actual 2PC methods to the storage
@@ -729,10 +764,10 @@
     # argument to the methods below is self.
 
     def abort(self, obj, txn):
-        pass
+        raise NotImplementedError
 
     def commit(self, obj, txn):
-        pass
+        raise NotImplementedError
 
 class CommitVersion(ResourceManager):
 
@@ -742,6 +777,7 @@
         self._dest = dest
 
     def commit(self, ob, t):
+        # XXX see XXX in ResourceManager
         dest = self._dest
         tid, oids = self._db._storage.commitVersion(self._version,
                                                     self._dest,
@@ -760,6 +796,7 @@
         self._version = version
 
     def commit(self, ob, t):
+        # XXX see XXX in ResourceManager
         tid, oids = self._db._storage.abortVersion(self._version, t)
         self._db.invalidate(tid,
                             dict.fromkeys(oids, 1),
@@ -772,5 +809,6 @@
         self._tid = tid
 
     def commit(self, ob, t):
+        # XXX see XXX in ResourceManager
         tid, oids = self._db._storage.undo(self._tid, t)
         self._db.invalidate(tid, dict.fromkeys(oids, 1))



More information about the Zodb-checkins mailing list