[Zodb-checkins] SVN: ZODB/trunk/src/ZODB/DB.py Changed the way
delegation of storage methods was done to work with
Jim Fulton
jim at zope.com
Thu Apr 26 19:20:25 EDT 2007
Log message for revision 74842:
Changed the way delegation of storage methods was done to work with
storages that don't support versions or that don't support undo.
Changed some comments to doc strings.
Deprecated storages without tpc_vote.
Removed the modifiedInVersion cache.
Added an XXX about the broken ResourceManager implementations.
Changed:
U ZODB/trunk/src/ZODB/DB.py
-=-
Modified: ZODB/trunk/src/ZODB/DB.py
===================================================================
--- ZODB/trunk/src/ZODB/DB.py 2007-04-26 23:20:21 UTC (rev 74841)
+++ ZODB/trunk/src/ZODB/DB.py 2007-04-26 23:20:25 UTC (rev 74842)
@@ -15,6 +15,8 @@
$Id$"""
+import warnings
+
import cPickle, cStringIO, sys
import threading
from time import time, ctime
@@ -31,6 +33,7 @@
import transaction
+
logger = logging.getLogger('ZODB.DB')
class _ConnectionPool(object):
@@ -77,23 +80,28 @@
# a list (we push only "on the right", but may pop from both ends).
self.available = []
- # Change our belief about the expected maximum # of live connections.
- # If the pool_size is smaller than the current value, this may discard
- # the oldest available connections.
def set_pool_size(self, pool_size):
+ """Change our belief about the expected maximum # of live connections.
+
+ If the pool_size is smaller than the current value, this may discard
+ the oldest available connections.
+ """
self.pool_size = pool_size
self._reduce_size()
- # Register a new available connection. We must not know about c already.
- # c will be pushed onto the available stack even if we're over the
- # pool size limit.
def push(self, c):
+ """Register a new available connection.
+
+ We must not know about c already. c will be pushed onto the available
+ stack even if we're over the pool size limit.
+ """
assert c not in self.all
assert c not in self.available
self._reduce_size(strictly_less=True)
self.all.add(c)
self.available.append(c)
- n, limit = len(self.all), self.pool_size
+ n = len(self.all)
+ limit = self.pool_size
if n > limit:
reporter = logger.warn
if n > 2 * limit:
@@ -101,20 +109,25 @@
reporter("DB.open() has %s open connections with a pool_size "
"of %s", n, limit)
- # Reregister an available connection formerly obtained via pop(). This
- # pushes it on the stack of available connections, and may discard
- # older available connections.
def repush(self, c):
+ """Reregister an available connection formerly obtained via pop().
+
+ This pushes it on the stack of available connections, and may discard
+ older available connections.
+ """
assert c in self.all
assert c not in self.available
self._reduce_size(strictly_less=True)
self.available.append(c)
- # Throw away the oldest available connections until we're under our
- # target size (strictly_less=False) or no more than that (strictly_less=
- # True, the default).
def _reduce_size(self, strictly_less=False):
- target = self.pool_size - bool(strictly_less)
+ """Throw away the oldest available connections until we're under our
+ target size (strictly_less=False, the default) or no more than that
+ (strictly_less=True).
+ """
+ target = self.pool_size
+ if strictly_less:
+ target -= 1
while len(self.available) > target:
c = self.available.pop(0)
self.all.remove(c)
@@ -132,11 +145,13 @@
# now, and `c` would be left in a user-visible crazy state.
c._resetCache()
- # Pop an available connection and return it, or return None if none are
- # available. In the latter case, the caller should create a new
- # connection, register it via push(), and call pop() again. The
- # caller is responsible for serializing this sequence.
def pop(self):
+ """Pop an available connection and return it.
+
+ Return None if none are available - in this case, the caller should
+ create a new connection, register it via push(), and call pop() again.
+ The caller is responsible for serializing this sequence.
+ """
result = None
if self.available:
result = self.available.pop()
@@ -145,8 +160,8 @@
assert result in self.all
return result
- # For every live connection c, invoke f(c).
def map(self, f):
+ """For every live connection c, invoke f(c)."""
self.all.map(f)
class DB(object):
@@ -227,8 +242,6 @@
self._version_pool_size = version_pool_size
self._version_cache_size = version_cache_size
- self._miv_cache = {}
-
# Setup storage
self._storage=storage
self.references = ZODB.serialize.referencesf
@@ -238,7 +251,13 @@
storage.registerDB(self, None) # Backward compat
if not hasattr(storage, 'tpc_vote'):
+ warnings.warn(
+ "Storage doesn't have a tpc_vote and this violates "
+ "the stirage API. Violently monkeypatching in a do-nothing "
+ "tpc_vote.",
+ DeprecationWarning, 2)
storage.tpc_vote = lambda *args: None
+
try:
storage.load(z64, '')
except KeyError:
@@ -268,14 +287,46 @@
database_name)
databases[database_name] = self
- # Pass through methods:
- for m in ['history', 'supportsUndo', 'supportsVersions', 'undoLog',
- 'versionEmpty', 'versions']:
- setattr(self, m, getattr(storage, m))
+ self._setupUndoMethods()
+ self._setupVersionMethods()
+ self.history = storage.history
- if hasattr(storage, 'undoInfo'):
- self.undoInfo = storage.undoInfo
+ def _setupUndoMethods(self):
+ storage = self._storage
+ try:
+ self.supportsUndo = storage.supportsUndo
+ except AttributeError:
+ self.supportsUndo = lambda : False
+ if self.supportsUndo():
+ self.undoLog = storage.undoLog
+ if hasattr(storage, 'undoInfo'):
+ self.undoInfo = storage.undoInfo
+ else:
+ self.undoLog = self.undoInfo = lambda *a,**k: ()
+ def undo(*a, **k):
+ raise NotImplementedError
+ self.undo = undo
+
+ def _setupVersionMethods(self):
+ storage = self._storage
+ try:
+ self.supportsVersions = storage.supportsVersions
+ except AttributeError:
+ self.supportsVersions = lambda : False
+
+ if self.supportsVersions():
+ self.versionEmpty = storage.versionEmpty
+ self.versions = storage.versions
+ self.modifiedInVersion = storage.modifiedInVersion
+ else:
+ self.versionEmpty = lambda version: True
+ self.versions = lambda max=None: ()
+ self.modifiedInVersion = lambda oid: ''
+ def commitVersion(*a, **k):
+ raise NotImplementedError
+ self.commitVersion = self.abortVersion = commitVersion
+
# This is called by Connection.close().
def _returnToPool(self, connection):
"""Return a connection to the pool.
@@ -471,12 +522,6 @@
"""
if connection is not None:
version = connection._version
- # Update modified in version cache
- for oid in oids:
- h = hash(oid) % 131
- o = self._miv_cache.get(h, None)
- if o is not None and o[0]==oid:
- del self._miv_cache[h]
# Notify connections.
def inval(c):
@@ -487,20 +532,9 @@
def invalidateCache(self):
"""Invalidate each of the connection caches
- """
- self._miv_cache.clear()
+ """
self._connectionMap(lambda c: c.invalidateCache())
- def modifiedInVersion(self, oid):
- h = hash(oid) % 131
- cache = self._miv_cache
- o = cache.get(h, None)
- if o and o[0] == oid:
- return o[1]
- v = self._storage.modifiedInVersion(oid)
- cache[h] = oid, v
- return v
-
def objectCount(self):
return len(self._storage)
@@ -687,17 +721,18 @@
txn = transaction.get()
txn.register(TransactionalUndo(self, id))
- def versionEmpty(self, version):
- return self._storage.versionEmpty(version)
-
-
resource_counter_lock = threading.Lock()
resource_counter = 0
class ResourceManager(object):
"""Transaction participation for a version or undo resource."""
+ # XXX This implementation is broken. Subclasses invalidate oids
+ # in their commit calls. Invalidations should not be sent until
+ # tpc_finish is called. In fact, invalidations should be sent to
+ # the db *while* tpc_finish is being called on the storage.
+
def __init__(self, db):
self._db = db
# Delegate the actual 2PC methods to the storage
@@ -729,10 +764,10 @@
# argument to the methods below is self.
def abort(self, obj, txn):
- pass
+ raise NotImplementedError
def commit(self, obj, txn):
- pass
+ raise NotImplementedError
class CommitVersion(ResourceManager):
@@ -742,6 +777,7 @@
self._dest = dest
def commit(self, ob, t):
+ # XXX see XXX in ResourceManager
dest = self._dest
tid, oids = self._db._storage.commitVersion(self._version,
self._dest,
@@ -760,6 +796,7 @@
self._version = version
def commit(self, ob, t):
+ # XXX see XXX in ResourceManager
tid, oids = self._db._storage.abortVersion(self._version, t)
self._db.invalidate(tid,
dict.fromkeys(oids, 1),
@@ -772,5 +809,6 @@
self._tid = tid
def commit(self, ob, t):
+ # XXX see XXX in ResourceManager
tid, oids = self._db._storage.undo(self._tid, t)
self._db.invalidate(tid, dict.fromkeys(oids, 1))
More information about the Zodb-checkins
mailing list