[Zodb-checkins] SVN: ZODB/trunk/ incrementally remove some version
support (this time from connection and DB);
add support for historical read-only connections.
Gary Poster
gary at zope.com
Tue Nov 13 20:10:27 EST 2007
Log message for revision 81822:
incrementally remove some version support (this time from connection and DB); add support for historical read-only connections.
Changed:
U ZODB/trunk/NEWS.txt
U ZODB/trunk/src/ZEO/tests/testConnection.py
U ZODB/trunk/src/ZEO/tests/zeo-fan-out.test
U ZODB/trunk/src/ZODB/Connection.py
U ZODB/trunk/src/ZODB/DB.py
U ZODB/trunk/src/ZODB/ExportImport.py
U ZODB/trunk/src/ZODB/POSException.py
U ZODB/trunk/src/ZODB/component.xml
U ZODB/trunk/src/ZODB/config.py
A ZODB/trunk/src/ZODB/historical_connections.txt
U ZODB/trunk/src/ZODB/interfaces.py
U ZODB/trunk/src/ZODB/serialize.py
U ZODB/trunk/src/ZODB/tests/VersionStorage.py
U ZODB/trunk/src/ZODB/tests/dbopen.txt
U ZODB/trunk/src/ZODB/tests/testDB.py
U ZODB/trunk/src/ZODB/tests/testZODB.py
A ZODB/trunk/src/ZODB/tests/testhistoricalconnections.py
-=-
Modified: ZODB/trunk/NEWS.txt
===================================================================
--- ZODB/trunk/NEWS.txt 2007-11-13 18:43:03 UTC (rev 81821)
+++ ZODB/trunk/NEWS.txt 2007-11-14 01:10:26 UTC (rev 81822)
@@ -28,6 +28,14 @@
using `int` for memory sizes which caused errors on x86_64 Intel Xeon
machines (using 64-bit Linux).
+- (unreleased, after 3.9.0a1) Removed version support from connections and
+ DB. Versions are still in the storages; this is an incremental step.
+
+- (unreleased, after 3.9.0a1) Added support for read-only, historical
+ connections based on datetimes or serials (TIDs). See
+ src/ZODB/historical_connections.txt.
+
+
ZEO
---
Modified: ZODB/trunk/src/ZEO/tests/testConnection.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testConnection.py 2007-11-13 18:43:03 UTC (rev 81821)
+++ ZODB/trunk/src/ZEO/tests/testConnection.py 2007-11-14 01:10:26 UTC (rev 81822)
@@ -46,7 +46,7 @@
def getConfig(self, path, create, read_only):
return """<mappingstorage 1/>"""
-
+
class FileStorageConnectionTests(
FileStorageConfig,
ConnectionTests.ConnectionTests,
Modified: ZODB/trunk/src/ZEO/tests/zeo-fan-out.test
===================================================================
--- ZODB/trunk/src/ZEO/tests/zeo-fan-out.test 2007-11-13 18:43:03 UTC (rev 81821)
+++ ZODB/trunk/src/ZEO/tests/zeo-fan-out.test 2007-11-14 01:10:26 UTC (rev 81822)
@@ -4,7 +4,7 @@
We should be able to set up ZEO servers with ZEO clients. Let's see
if we can make it work.
-We'll use some helper functions. The first is a helpter that starts
+We'll use some helper functions. The first is a helper that starts
ZEO servers for us and another one that picks ports.
We'll start the first server:
@@ -16,7 +16,7 @@
... '<filestorage 1>\n path fs\n</filestorage>\n', zconf0, port0)
-Then we''ll start 2 others that use this one:
+Then we'll start 2 others that use this one:
>>> port1 = ZEO.tests.testZEO.get_port()
>>> zconf1 = ZEO.tests.forker.ZEOConfig(('', port1))
Modified: ZODB/trunk/src/ZODB/Connection.py
===================================================================
--- ZODB/trunk/src/ZODB/Connection.py 2007-11-13 18:43:03 UTC (rev 81821)
+++ ZODB/trunk/src/ZODB/Connection.py 2007-11-14 01:10:26 UTC (rev 81822)
@@ -44,7 +44,7 @@
from ZODB import POSException
from ZODB.POSException import InvalidObjectReference, ConnectionStateError
from ZODB.POSException import ConflictError, ReadConflictError
-from ZODB.POSException import Unsupported
+from ZODB.POSException import Unsupported, ReadOnlyHistoryError
from ZODB.POSException import POSKeyError
from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr
from ZODB.utils import p64, u64, z64, oid_repr, positive_id
@@ -79,17 +79,20 @@
##########################################################################
# Connection methods, ZODB.IConnection
- def __init__(self, db, version='', cache_size=400):
+ def __init__(self, db, cache_size=400, before=None):
"""Create a new Connection."""
self._log = logging.getLogger('ZODB.Connection')
self._debug_info = ()
self._db = db
+
+ # historical connection
+ self.before = before
+
# Multi-database support
self.connections = {self._db.database_name: self}
- self._version = version
self._normal_storage = self._storage = db._storage
self.new_oid = db._storage.new_oid
self._savepoint_storage = None
@@ -112,13 +115,6 @@
# objects immediately load their state whern they get their
# persistent data set.
self._pre_cache = {}
-
- if version:
- # Caches for versions end up empty if the version
- # is not used for a while. Non-version caches
- # keep their content indefinitely.
- # Unclear: Why do we want version caches to behave this way?
- self._cache.cache_drain_resistance = 100
# List of all objects (not oids) registered as modified by the
# persistence machinery, or by add(), or whose access caused a
@@ -186,8 +182,6 @@
# the upper bound on transactions visible to this connection.
# That is, all object revisions must be written before _txn_time.
# If it is None, then the current revisions are acceptable.
- # If the connection is in a version, mvcc will be disabled, because
- # loadBefore() only returns non-version data.
self._txn_time = None
# To support importFile(), implemented in the ExportImport base
@@ -240,8 +234,11 @@
# This appears to be an MVCC violation because we are loading
# the must recent data when perhaps we shouldnt. The key is
- # that we are only creating a ghost!
- p, serial = self._storage.load(oid, self._version)
+ # that we are only creating a ghost!
+ # A disadvantage to this optimization is that _p_serial cannot be
+ # trusted until the object has been loaded, which affects both MVCC
+ # and historical connections.
+ p, serial = self._storage.load(oid, '')
obj = self._reader.getGhost(p)
# Avoid infiniate loop if obj tries to load its state before
@@ -318,13 +315,17 @@
return self._db
def isReadOnly(self):
- """Returns True if the storage for this connection is read only."""
+ """Returns True if this connection is read only."""
if self._opened is None:
raise ConnectionStateError("The database connection is closed")
- return self._storage.isReadOnly()
+ return self.before is not None or self._storage.isReadOnly()
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids."""
+ if self.before is not None and tid > self.before:
+ # this is an historical connection, and the tid is after the
+ # freeze. Invalidations are irrelevant.
+ return
self._inv_lock.acquire()
try:
if self._txn_time is None:
@@ -339,25 +340,18 @@
self._invalidatedCache = True
finally:
self._inv_lock.release()
-
def root(self):
"""Return the database root object."""
return self.get(z64)
- def getVersion(self):
- """Returns the version this connection is attached to."""
- if self._storage is None:
- raise ConnectionStateError("The database connection is closed")
- return self._version
-
def get_connection(self, database_name):
"""Return a Connection for the named database."""
connection = self.connections.get(database_name)
if connection is None:
new_con = self._db.databases[database_name].open(
transaction_manager=self.transaction_manager,
- version=self._version,
+ before=self.before,
)
self.connections.update(new_con.connections)
new_con.connections = self.connections
@@ -539,6 +533,9 @@
def _commit(self, transaction):
"""Commit changes to an object"""
+
+ if self.before is not None:
+ raise ReadOnlyHistoryError()
if self._import:
# We are importing an export file. We alsways do this
@@ -618,15 +615,14 @@
raise ValueError("Can't commit with opened blobs.")
s = self._storage.storeBlob(oid, serial, p,
obj._uncommitted(),
- self._version, transaction)
+ '', transaction)
# we invalidate the object here in order to ensure
# that that the next attribute access of its name
# unghostify it, which will cause its blob data
# to be reattached "cleanly"
obj._p_invalidate()
else:
- s = self._storage.store(oid, serial, p, self._version,
- transaction)
+ s = self._storage.store(oid, serial, p, '', transaction)
self._store_count += 1
# Put the object in the cache before handling the
# response, just in case the response contains the
@@ -825,37 +821,63 @@
# the code if we could drop support for it.
# (BTrees.Length does.)
- # There is a harmless data race with self._invalidated. A
- # dict update could go on in another thread, but we don't care
- # because we have to check again after the load anyway.
-
- if self._invalidatedCache:
- raise ReadConflictError()
-
- if (obj._p_oid in self._invalidated and
- not myhasattr(obj, "_p_independent")):
- # If the object has _p_independent(), we will handle it below.
- self._load_before_or_conflict(obj)
- return
-
- p, serial = self._storage.load(obj._p_oid, self._version)
- self._load_count += 1
-
- self._inv_lock.acquire()
- try:
- invalid = obj._p_oid in self._invalidated
- finally:
- self._inv_lock.release()
-
- if invalid:
- if myhasattr(obj, "_p_independent"):
- # This call will raise a ReadConflictError if something
- # goes wrong
- self._handle_independent(obj)
+ if self.before is not None:
+ # Load data that was current before the time we have.
+ if self._txn_time is not None: # MVCC for readonly future conn.
+ before = self._txn_time
+ has_invalidated = True
else:
+ before = self.before
+ has_invalidated = False
+ t = self._storage.loadBefore(obj._p_oid, before)
+ if t is None:
+ raise POSKeyError()
+ p, serial, end = t
+ if not has_invalidated and end is None:
+ # MVCC: make sure another thread has not beaten us to the punch
+ self._inv_lock.acquire()
+ try:
+ txn_time = self._txn_time
+ finally:
+ self._inv_lock.release()
+ if txn_time is not None and txn_time < before:
+ t = self._storage.loadBefore(obj._p_oid, txn_time)
+ if t is None:
+ raise POSKeyError()
+ p, serial, end = t
+
+ else:
+ # There is a harmless data race with self._invalidated. A
+ # dict update could go on in another thread, but we don't care
+ # because we have to check again after the load anyway.
+
+ if self._invalidatedCache:
+ raise ReadConflictError()
+
+ if (obj._p_oid in self._invalidated and
+ not myhasattr(obj, "_p_independent")):
+ # If the object has _p_independent(), we will handle it below.
self._load_before_or_conflict(obj)
return
+
+ p, serial = self._storage.load(obj._p_oid, '')
+ self._load_count += 1
+
+ self._inv_lock.acquire()
+ try:
+ invalid = obj._p_oid in self._invalidated
+ finally:
+ self._inv_lock.release()
+
+ if invalid:
+ if myhasattr(obj, "_p_independent"):
+ # This call will raise a ReadConflictError if something
+ # goes wrong
+ self._handle_independent(obj)
+ else:
+ self._load_before_or_conflict(obj)
+ return
self._reader.setGhostState(obj, p)
obj._p_serial = serial
@@ -867,7 +889,7 @@
def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
- if not ((not self._version) and self._setstate_noncurrent(obj)):
+ if not self._setstate_noncurrent(obj):
self._register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)
@@ -1029,11 +1051,7 @@
# Python protocol
def __repr__(self):
- if self._version:
- ver = ' (in version %s)' % `self._version`
- else:
- ver = ''
- return '<Connection at %08x%s>' % (positive_id(self), ver)
+ return '<Connection at %08x>' % (positive_id(self),)
# Python protocol
##########################################################################
@@ -1043,17 +1061,6 @@
__getitem__ = get
- def modifiedInVersion(self, oid):
- """Returns the version the object with the given oid was modified in.
-
- If it wasn't modified in a version, the current version of this
- connection is returned.
- """
- try:
- return self._db.modifiedInVersion(oid)
- except KeyError:
- return self.getVersion()
-
def exchange(self, old, new):
# called by a ZClasses method that isn't executed by the test suite
oid = old._p_oid
@@ -1079,7 +1086,7 @@
def savepoint(self):
if self._savepoint_storage is None:
- tmpstore = TmpStore(self._version, self._normal_storage)
+ tmpstore = TmpStore(self._normal_storage)
self._savepoint_storage = tmpstore
self._storage = self._savepoint_storage
@@ -1124,7 +1131,7 @@
if isinstance(self._reader.getGhost(data), Blob):
blobfilename = src.loadBlob(oid, serial)
s = self._storage.storeBlob(oid, serial, data, blobfilename,
- self._version, transaction)
+ '', transaction)
# we invalidate the object here in order to ensure
# that that the next attribute access of its name
# unghostify it, which will cause its blob data
@@ -1132,7 +1139,7 @@
self.invalidate(s, {oid:True})
else:
s = self._storage.store(oid, serial, data,
- self._version, transaction)
+ '', transaction)
self._handle_serial(s, oid, change=False)
src.close()
@@ -1182,23 +1189,13 @@
implements(IBlobStorage)
- def __init__(self, base_version, storage):
+ def __init__(self, storage):
self._storage = storage
for method in (
'getName', 'new_oid', 'getSize', 'sortKey', 'loadBefore',
):
setattr(self, method, getattr(storage, method))
- try:
- supportsVersions = storage.supportsVersions
- except AttributeError:
- pass
- else:
- if supportsVersions():
- self.modifiedInVersion = storage.modifiedInVersion
- self.versionEmpty = storage.versionEmpty
-
- self._base_version = base_version
self._file = tempfile.TemporaryFile()
# position: current file position
# _tpos: file position at last commit point
@@ -1216,7 +1213,7 @@
def load(self, oid, version):
pos = self.index.get(oid)
if pos is None:
- return self._storage.load(oid, self._base_version)
+ return self._storage.load(oid, '')
self._file.seek(pos)
h = self._file.read(8)
oidlen = u64(h)
@@ -1231,7 +1228,7 @@
def store(self, oid, serial, data, version, transaction):
# we have this funny signature so we can reuse the normal non-commit
# commit logic
- assert version == self._base_version
+ assert version == ''
self._file.seek(self.position)
l = len(data)
if serial is None:
@@ -1245,7 +1242,8 @@
def storeBlob(self, oid, serial, data, blobfilename, version,
transaction):
- serial = self.store(oid, serial, data, version, transaction)
+ assert version == ''
+ serial = self.store(oid, serial, data, '', transaction)
targetpath = self._getBlobPath(oid)
if not os.path.exists(targetpath):
Modified: ZODB/trunk/src/ZODB/DB.py
===================================================================
--- ZODB/trunk/src/ZODB/DB.py 2007-11-13 18:43:03 UTC (rev 81821)
+++ ZODB/trunk/src/ZODB/DB.py 2007-11-14 01:10:26 UTC (rev 81822)
@@ -21,6 +21,8 @@
import threading
from time import time, ctime
import logging
+import datetime
+import calendar
from ZODB.broken import find_global
from ZODB.utils import z64
@@ -31,9 +33,12 @@
from zope.interface import implements
from ZODB.interfaces import IDatabase
+import BTrees.OOBTree
import transaction
+from persistent.TimeStamp import TimeStamp
+
logger = logging.getLogger('ZODB.DB')
class _ConnectionPool(object):
@@ -62,10 +67,14 @@
connectionDebugInfo() can still gather statistics.
"""
- def __init__(self, pool_size):
+ def __init__(self, pool_size, timeout=None):
# The largest # of connections we expect to see alive simultaneously.
self.pool_size = pool_size
+ # The minimum number of seconds that an available connection should
+ # be kept, or None.
+ self.timeout = timeout
+
# A weak set of all connections we've seen. A connection vanishes
# from this set if pop() hands it out, it's not reregistered via
# repush(), and it becomes unreachable.
@@ -75,10 +84,9 @@
# of self.all. push() and repush() add to this, and may remove
# the oldest available connections if the pool is too large.
# pop() pops this stack. There are never more than pool_size entries
- # in this stack.
- # In Python 2.4, a collections.deque would make more sense than
- # a list (we push only "on the right", but may pop from both ends).
- self.available = []
+ # in this stack. The keys are time.time() values of the push or
+ # repush calls.
+ self.available = BTrees.OOBTree.Bucket()
def set_pool_size(self, pool_size):
"""Change our belief about the expected maximum # of live connections.
@@ -89,6 +97,13 @@
self.pool_size = pool_size
self._reduce_size()
+ def set_timeout(self, timeout):
+ old = self.timeout
+ self.timeout = timeout
+ if timeout is not None and old != timeout and (
+ old is None or old > timeout):
+ self._reduce_size()
+
def push(self, c):
"""Register a new available connection.
@@ -96,10 +111,10 @@
stack even if we're over the pool size limit.
"""
assert c not in self.all
- assert c not in self.available
+ assert c not in self.available.values()
self._reduce_size(strictly_less=True)
self.all.add(c)
- self.available.append(c)
+ self.available[time()] = c
n = len(self.all)
limit = self.pool_size
if n > limit:
@@ -116,35 +131,47 @@
older available connections.
"""
assert c in self.all
- assert c not in self.available
+ assert c not in self.available.values()
self._reduce_size(strictly_less=True)
- self.available.append(c)
+ self.available[time()] = c
def _reduce_size(self, strictly_less=False):
"""Throw away the oldest available connections until we're under our
target size (strictly_less=False, the default) or no more than that
(strictly_less=True).
"""
+ if self.timeout is None:
+ threshhold = None
+ else:
+ threshhold = time() - self.timeout
target = self.pool_size
if strictly_less:
target -= 1
- while len(self.available) > target:
- c = self.available.pop(0)
- self.all.remove(c)
- # While application code may still hold a reference to `c`,
- # there's little useful that can be done with this Connection
- # anymore. Its cache may be holding on to limited resources,
- # and we replace the cache with an empty one now so that we
- # don't have to wait for gc to reclaim it. Note that it's not
- # possible for DB.open() to return `c` again: `c` can never
- # be in an open state again.
- # TODO: Perhaps it would be better to break the reference
- # cycles between `c` and `c._cache`, so that refcounting reclaims
- # both right now. But if user code _does_ have a strong
- # reference to `c` now, breaking the cycle would not reclaim `c`
- # now, and `c` would be left in a user-visible crazy state.
- c._resetCache()
+ for t, c in list(self.available.items()):
+ if (len(self.available) > target or
+ threshhold is not None and t < threshhold):
+ del self.available[t]
+ self.all.remove(c)
+ # While application code may still hold a reference to `c`,
+ # there's little useful that can be done with this Connection
+ # anymore. Its cache may be holding on to limited resources,
+ # and we replace the cache with an empty one now so that we
+ # don't have to wait for gc to reclaim it. Note that it's not
+ # possible for DB.open() to return `c` again: `c` can never be
+ # in an open state again.
+ # TODO: Perhaps it would be better to break the reference
+ # cycles between `c` and `c._cache`, so that refcounting
+ # reclaims both right now. But if user code _does_ have a
+ # strong reference to `c` now, breaking the cycle would not
+ # reclaim `c` now, and `c` would be left in a user-visible
+ # crazy state.
+ c._resetCache()
+ else:
+ break
+ def reduce_size(self):
+ self._reduce_size()
+
def pop(self):
"""Pop an available connection and return it.
@@ -154,24 +181,57 @@
"""
result = None
if self.available:
- result = self.available.pop()
+ result = self.available.pop(self.available.maxKey())
# Leave it in self.all, so we can still get at it for statistics
# while it's alive.
assert result in self.all
return result
- def map(self, f, open_connections=True):
- """For every live connection c, invoke f(c).
+ def map(self, f):
+ """For every live connection c, invoke f(c)."""
+ self.all.map(f)
- If `open_connections` is false then only call f(c) on closed
- connections.
+ def availableGC(self):
+ """Perform garbage collection on available connections.
+
+ If a connection is no longer viable because it has timed out, it is
+ garbage collected."""
+ if self.timeout is None:
+ threshhold = None
+ else:
+ threshhold = time() - self.timeout
+ for t, c in tuple(self.available.items()):
+ if threshhold is not None and t < threshhold:
+ del self.available[t]
+ self.all.remove(c)
+ c._resetCache()
+ else:
+ c.cacheGC()
- """
- if open_connections:
- self.all.map(f)
+def toTimeStamp(dt):
+ utc_struct = dt.utctimetuple()
+ # if this is a leapsecond, this will probably fail. That may be a good
+ # thing: leapseconds are not really accounted for with serials.
+ args = utc_struct[:5]+(utc_struct[5] + dt.microsecond/1000000.0,)
+ return TimeStamp(*args)
+
+def getTID(at, before):
+ if at is not None:
+ if before is not None:
+ raise ValueError('can only pass zero or one of `at` and `before`')
+ if isinstance(at, datetime.datetime):
+ at = toTimeStamp(at)
else:
- map(f, self.available)
+ at = TimeStamp(at)
+ before = repr(at.laterThan(at))
+ elif before is not None:
+ if isinstance(before, datetime.datetime):
+ before = repr(toTimeStamp(before))
+ else:
+ before = repr(TimeStamp(before))
+ return before
+
class DB(object):
"""The Object Database
-------------------
@@ -202,27 +262,27 @@
- `User Methods`: __init__, open, close, undo, pack, classFactory
- `Inspection Methods`: getName, getSize, objectCount,
getActivityMonitor, setActivityMonitor
- - `Connection Pool Methods`: getPoolSize, getVersionPoolSize,
- removeVersionPool, setPoolSize, setVersionPoolSize
+ - `Connection Pool Methods`: getPoolSize, getHistoricalPoolSize,
+ removeHistoricalPool, setPoolSize, setHistoricalPoolSize,
+ getHistoricalTimeout, setHistoricalTimeout
- `Transaction Methods`: invalidate
- `Other Methods`: lastTransaction, connectionDebugInfo
- - `Version Methods`: modifiedInVersion, abortVersion, commitVersion,
- versionEmpty
- `Cache Inspection Methods`: cacheDetail, cacheExtremeDetail,
cacheFullSweep, cacheLastGCTime, cacheMinimize, cacheSize,
- cacheDetailSize, getCacheSize, getVersionCacheSize, setCacheSize,
- setVersionCacheSize
+ cacheDetailSize, getCacheSize, getHistoricalCacheSize, setCacheSize,
+ setHistoricalCacheSize
"""
implements(IDatabase)
klass = Connection # Class to use for connections
- _activity_monitor = None
+ _activity_monitor = next = previous = None
def __init__(self, storage,
pool_size=7,
cache_size=400,
- version_pool_size=3,
- version_cache_size=100,
+ historical_pool_size=3,
+ historical_cache_size=1000,
+ historical_timeout=300,
database_name='unnamed',
databases=None,
):
@@ -232,10 +292,12 @@
- `storage`: the storage used by the database, e.g. FileStorage
- `pool_size`: expected maximum number of open connections
- `cache_size`: target size of Connection object cache
- - `version_pool_size`: expected maximum number of connections (per
- version)
- - `version_cache_size`: target size of Connection object cache for
- version connections
+ - `historical_pool_size`: expected maximum number of connections (per
+ historical, or transaction, identifier)
+ - `historical_cache_size`: target size of Connection object cache for
+ historical (`at` or `before`) connections
+ - `historical_timeout`: minimum number of seconds that
+ an unused historical connection will be kept, or None.
"""
# Allocate lock.
x = threading.RLock()
@@ -243,12 +305,13 @@
self._r = x.release
# Setup connection pools and cache info
- # _pools maps a version string to a _ConnectionPool object.
+ # _pools maps a tid identifier, or '', to a _ConnectionPool object.
self._pools = {}
self._pool_size = pool_size
self._cache_size = cache_size
- self._version_pool_size = version_pool_size
- self._version_cache_size = version_cache_size
+ self._historical_pool_size = historical_pool_size
+ self._historical_cache_size = historical_cache_size
+ self._historical_timeout = historical_timeout
# Setup storage
self._storage=storage
@@ -296,7 +359,6 @@
databases[database_name] = self
self._setupUndoMethods()
- self._setupVersionMethods()
self.history = storage.history
def _setupUndoMethods(self):
@@ -316,25 +378,6 @@
raise NotImplementedError
self.undo = undo
- def _setupVersionMethods(self):
- storage = self._storage
- try:
- self.supportsVersions = storage.supportsVersions
- except AttributeError:
- self.supportsVersions = lambda : False
-
- if self.supportsVersions():
- self.versionEmpty = storage.versionEmpty
- self.versions = storage.versions
- self.modifiedInVersion = storage.modifiedInVersion
- else:
- self.versionEmpty = lambda version: True
- self.versions = lambda max=None: ()
- self.modifiedInVersion = lambda oid: ''
- def commitVersion(*a, **k):
- raise NotImplementedError
- self.commitVersion = self.abortVersion = commitVersion
-
# This is called by Connection.close().
def _returnToPool(self, connection):
"""Return a connection to the pool.
@@ -351,11 +394,11 @@
if am is not None:
am.closedConnection(connection)
- version = connection._version
+ before = connection.before or ''
try:
- pool = self._pools[version]
+ pool = self._pools[before]
except KeyError:
- # No such version. We must have deleted the pool.
+ # No such tid. We must have deleted the pool.
# Just let the connection go.
# We need to break circular refs to make it really go.
@@ -368,29 +411,16 @@
finally:
self._r()
- def _connectionMap(self, f, open_connections=True):
- """Call f(c) for all connections c in all pools in all versions.
-
- If `open_connections` is false then f(c) is only called on closed
- connections.
-
+ def _connectionMap(self, f):
+ """Call f(c) for all connections c in all pools, live and historical.
"""
self._a()
try:
for pool in self._pools.values():
- pool.map(f, open_connections=open_connections)
+ pool.map(f)
finally:
self._r()
- def abortVersion(self, version, txn=None):
- warnings.warn(
- "Versions are deprecated and will become unsupported "
- "in ZODB 3.9",
- DeprecationWarning, 2)
- if txn is None:
- txn = transaction.get()
- txn.register(AbortVersion(self, version))
-
def cacheDetail(self):
"""Return information on objects in the various caches
@@ -503,15 +533,6 @@
"""
self._storage.close()
- def commitVersion(self, source, destination='', txn=None):
- warnings.warn(
- "Versions are deprecated and will become unsupported "
- "in ZODB 3.9",
- DeprecationWarning, 2)
- if txn is None:
- txn = transaction.get()
- txn.register(CommitVersion(self, source, destination))
-
def getCacheSize(self):
return self._cache_size
@@ -527,20 +548,15 @@
def getSize(self):
return self._storage.getSize()
- def getVersionCacheSize(self):
- warnings.warn(
- "Versions are deprecated and will become unsupported "
- "in ZODB 3.9",
- DeprecationWarning, 2)
- return self._version_cache_size
+ def getHistoricalCacheSize(self):
+ return self._historical_cache_size
- def getVersionPoolSize(self):
- warnings.warn(
- "Versions are deprecated and will become unsupported "
- "in ZODB 3.9",
- DeprecationWarning, 2)
- return self._version_pool_size
+ def getHistoricalPoolSize(self):
+ return self._historical_pool_size
+ def getHistoricalTimeout(self):
+ return self._historical_timeout
+
def invalidate(self, tid, oids, connection=None, version=''):
"""Invalidate references to a given oid.
@@ -549,13 +565,11 @@
passed in to prevent useless (but harmless) messages to the
connection.
"""
- if connection is not None:
- version = connection._version
-
+ # Storages, esp. ZEO tests, need the version argument still. :-/
+ assert version==''
# Notify connections.
def inval(c):
- if (c is not connection and
- (not version or c._version == version)):
+ if c is not connection:
c.invalidate(tid, oids)
self._connectionMap(inval)
@@ -567,52 +581,51 @@
def objectCount(self):
return len(self._storage)
- def open(self, version='', transaction_manager=None):
+ def open(self, transaction_manager=None, at=None, before=None):
"""Return a database Connection for use by application code.
- The optional `version` argument can be used to specify that a
- version connection is desired.
-
Note that the connection pool is managed as a stack, to
increase the likelihood that the connection's stack will
include useful objects.
:Parameters:
- - `version`: the "version" that all changes will be made
- in, defaults to no version.
- `transaction_manager`: transaction manager to use. None means
- use the default transaction manager.
+ use the default transaction manager.
+ - `at`: a datetime.datetime or 8 character transaction id of the
+ time to open the database with a read-only connection. Passing
+ both `at` and `before` raises a ValueError, and passing neither
+ opens a standard writable transaction of the newest state.
+ A timezone-naive datetime.datetime is treated as a UTC value.
+ - `before`: like `at`, but opens the readonly state before the
+ tid or datetime.
"""
+ # `at` is normalized to `before`, since we use storage.loadBefore
+ # as the underlying implementation of both.
+ before = getTID(at, before)
- if version:
- if not self.supportsVersions():
- raise ValueError(
- "Versions are not supported by this database.")
- warnings.warn(
- "Versions are deprecated and will become unsupported "
- "in ZODB 3.9",
- DeprecationWarning, 2)
-
self._a()
try:
- # pool <- the _ConnectionPool for this version
- pool = self._pools.get(version)
+ # pool <- the _ConnectionPool for this `before` tid
+ pool = self._pools.get(before or '')
if pool is None:
- if version:
- size = self._version_pool_size
+ if before is not None:
+ size = self._historical_pool_size
+ timeout = self._historical_timeout
else:
size = self._pool_size
- self._pools[version] = pool = _ConnectionPool(size)
+ timeout = None
+ self._pools[before or ''] = pool = _ConnectionPool(
+ size, timeout)
assert pool is not None
# result <- a connection
result = pool.pop()
if result is None:
- if version:
- size = self._version_cache_size
+ if before is not None:
+ size = self._historical_cache_size
else:
size = self._cache_size
- c = self.klass(self, version, size)
+ c = self.klass(self, size, before)
pool.push(c)
result = pool.pop()
assert result is not None
@@ -621,16 +634,23 @@
result.open(transaction_manager)
# A good time to do some cache cleanup.
- self._connectionMap(lambda c: c.cacheGC(), open_connections=False)
+ # (note we already have the lock)
+ for key, pool in tuple(self._pools.items()):
+ pool.availableGC()
+ if not len(pool.available) and not len(pool.all):
+ del self._pools[key]
return result
finally:
self._r()
- def removeVersionPool(self, version):
+ def removeHistoricalPool(self, at=None, before=None):
+ if at is None and before is None:
+ raise ValueError('must pass one of `at` or `before`')
+ before = getTID(at, before)
try:
- del self._pools[version]
+ del self._pools[before]
except KeyError:
pass
@@ -639,7 +659,7 @@
t = time()
def get_info(c):
- # `result`, `time` and `version` are lexically inherited.
+ # `result`, `time` and `before` are lexically inherited.
o = c._opened
d = c.getDebugInfo()
if d:
@@ -652,10 +672,10 @@
result.append({
'opened': o and ("%s (%.2fs)" % (ctime(o), t-o)),
'info': d,
- 'version': version,
+ 'before': before,
})
- for version, pool in self._pools.items():
+ for before, pool in self._pools.items():
pool.map(get_info)
return result
@@ -705,43 +725,47 @@
finally:
self._r()
- def setVersionCacheSize(self, size):
- warnings.warn(
- "Versions are deprecated and will become unsupported "
- "in ZODB 3.9",
- DeprecationWarning, 2)
+ def setHistoricalCacheSize(self, size):
self._a()
try:
- self._version_cache_size = size
+ self._historical_cache_size = size
def setsize(c):
c._cache.cache_size = size
- for version, pool in self._pools.items():
- if version:
+ for tid, pool in self._pools.items():
+ if tid:
pool.map(setsize)
finally:
self._r()
def setPoolSize(self, size):
self._pool_size = size
- self._reset_pool_sizes(size, for_versions=False)
+ self._reset_pool_sizes(size, for_historical=False)
- def setVersionPoolSize(self, size):
- warnings.warn(
- "Versions are deprecated and will become unsupported "
- "in ZODB 3.9",
- DeprecationWarning, 2)
- self._version_pool_size = size
- self._reset_pool_sizes(size, for_versions=True)
+ def setHistoricalPoolSize(self, size):
+ self._historical_pool_size = size
+ self._reset_pool_sizes(size, for_historical=True)
- def _reset_pool_sizes(self, size, for_versions=False):
+ def _reset_pool_sizes(self, size, for_historical=False):
self._a()
try:
- for version, pool in self._pools.items():
- if (version != '') == for_versions:
+ for tid, pool in self._pools.items():
+ if (tid != '') == for_historical:
pool.set_pool_size(size)
finally:
self._r()
+ def setHistoricalTimeout(self, timeout):
+ self._historical_timeout = timeout
+ self._a()
+ try:
+ for tid, pool in tuple(self._pools.items()):
+ if tid:
+ pool.set_timeout(timeout)
+ if not pool.available and not pool.all:
+ del self._pools[tid]
+ finally:
+ self._r()
+
def undo(self, id, txn=None):
"""Undo a transaction identified by id.
@@ -768,7 +792,7 @@
resource_counter = 0
class ResourceManager(object):
- """Transaction participation for a version or undo resource."""
+ """Transaction participation for an undo resource."""
# XXX This implementation is broken. Subclasses invalidate oids
# in their commit calls. Invalidations should not be sent until
@@ -811,39 +835,6 @@
def commit(self, obj, txn):
raise NotImplementedError
-class CommitVersion(ResourceManager):
-
- def __init__(self, db, version, dest=''):
- super(CommitVersion, self).__init__(db)
- self._version = version
- self._dest = dest
-
- def commit(self, ob, t):
- # XXX see XXX in ResourceManager
- dest = self._dest
- tid, oids = self._db._storage.commitVersion(self._version,
- self._dest,
- t)
- oids = dict.fromkeys(oids, 1)
- self._db.invalidate(tid, oids, version=self._dest)
- if self._dest:
- # the code above just invalidated the dest version.
- # now we need to invalidate the source!
- self._db.invalidate(tid, oids, version=self._version)
-
-class AbortVersion(ResourceManager):
-
- def __init__(self, db, version):
- super(AbortVersion, self).__init__(db)
- self._version = version
-
- def commit(self, ob, t):
- # XXX see XXX in ResourceManager
- tid, oids = self._db._storage.abortVersion(self._version, t)
- self._db.invalidate(tid,
- dict.fromkeys(oids, 1),
- version=self._version)
-
class TransactionalUndo(ResourceManager):
def __init__(self, db, tid):
Modified: ZODB/trunk/src/ZODB/ExportImport.py
===================================================================
--- ZODB/trunk/src/ZODB/ExportImport.py 2007-11-13 18:43:03 UTC (rev 81821)
+++ ZODB/trunk/src/ZODB/ExportImport.py 2007-11-14 01:10:26 UTC (rev 81822)
@@ -47,7 +47,7 @@
continue
done_oids[oid] = True
try:
- p, serial = load(oid, self._version)
+ p, serial = load(oid, '')
except:
logger.debug("broken reference for oid %s", repr(oid),
exc_info=True)
@@ -55,16 +55,16 @@
referencesf(p, oids)
f.writelines([oid, p64(len(p)), p])
- if supports_blobs:
- if not isinstance(self._reader.getGhost(p), Blob):
- continue # not a blob
-
- blobfilename = self._storage.loadBlob(oid, serial)
- f.write(blob_begin_marker)
- f.write(p64(os.stat(blobfilename).st_size))
- blobdata = open(blobfilename, "rb")
- cp(blobdata, f)
- blobdata.close()
+ if supports_blobs:
+ if not isinstance(self._reader.getGhost(p), Blob):
+ continue # not a blob
+
+ blobfilename = self._storage.loadBlob(oid, serial)
+ f.write(blob_begin_marker)
+ f.write(p64(os.stat(blobfilename).st_size))
+ blobdata = open(blobfilename, "rb")
+ cp(blobdata, f)
+ blobdata.close()
f.write(export_end_marker)
return f
@@ -127,8 +127,6 @@
return Ghost(oid)
- version = self._version
-
while 1:
header = f.read(16)
if header == export_end_marker:
@@ -180,9 +178,9 @@
if blob_filename is not None:
self._storage.storeBlob(oid, None, data, blob_filename,
- version, transaction)
+ '', transaction)
else:
- self._storage.store(oid, None, data, version, transaction)
+ self._storage.store(oid, None, data, '', transaction)
export_end_marker = '\377'*16
Modified: ZODB/trunk/src/ZODB/POSException.py
===================================================================
--- ZODB/trunk/src/ZODB/POSException.py 2007-11-13 18:43:03 UTC (rev 81821)
+++ ZODB/trunk/src/ZODB/POSException.py 2007-11-14 01:10:26 UTC (rev 81822)
@@ -234,6 +234,10 @@
return "from %s to %s" % (oid_repr(self.referer),
oid_repr(self.missing))
+
+############################################################################
+# Only used in storages; versions are no longer supported.
+
class VersionError(POSError):
"""An error in handling versions occurred."""
@@ -246,6 +250,7 @@
An attempt was made to modify an object that has been modified in an
unsaved version.
"""
+############################################################################
class UndoError(POSError):
"""An attempt was made to undo a non-undoable transaction."""
@@ -292,6 +297,9 @@
class Unsupported(POSError):
"""A feature was used that is not supported by the storage."""
+class ReadOnlyHistoryError(POSError):
+ """Unable to add or modify objects in an historical connection."""
+
class InvalidObjectReference(POSError):
"""An object contains an invalid reference to another object.
Modified: ZODB/trunk/src/ZODB/component.xml
===================================================================
--- ZODB/trunk/src/ZODB/component.xml 2007-11-13 18:43:03 UTC (rev 81821)
+++ ZODB/trunk/src/ZODB/component.xml 2007-11-14 01:10:26 UTC (rev 81822)
@@ -180,16 +180,22 @@
and exceeding twice pool-size connections causes a critical
message to be logged.
</description>
- <key name="version-pool-size" datatype="integer" default="3"/>
+ <key name="historical-pool-size" datatype="integer" default="3"/>
<description>
The expected maximum number of connections simultaneously open
- per version.
+ per historical revision.
</description>
- <key name="version-cache-size" datatype="integer" default="100"/>
+ <key name="historical-cache-size" datatype="integer" default="1000"/>
<description>
- Target size, in number of objects, of each version connection's
+ Target size, in number of objects, of each historical connection's
object cache.
</description>
+ <key name="historical-timeout" datatype="time-interval"
+ default="5m"/>
+ <description>
+ The minimum interval that an unused historical connection should be
+ kept.
+ </description>
<key name="database-name" default="unnamed"/>
<description>
When multidatabases are in use, this is the name given to this
Modified: ZODB/trunk/src/ZODB/config.py
===================================================================
--- ZODB/trunk/src/ZODB/config.py 2007-11-13 18:43:03 UTC (rev 81821)
+++ ZODB/trunk/src/ZODB/config.py 2007-11-14 01:10:26 UTC (rev 81822)
@@ -68,7 +68,6 @@
def storageFromConfig(section):
return section.open()
-
class BaseConfig:
"""Object representing a configured storage or database.
@@ -99,8 +98,9 @@
return ZODB.DB(storage,
pool_size=section.pool_size,
cache_size=section.cache_size,
- version_pool_size=section.version_pool_size,
- version_cache_size=section.version_cache_size,
+ historical_pool_size=section.historical_pool_size,
+ historical_cache_size=section.historical_cache_size,
+ historical_timeout=section.historical_timeout,
database_name=section.database_name,
databases=databases)
except:
Added: ZODB/trunk/src/ZODB/historical_connections.txt
===================================================================
--- ZODB/trunk/src/ZODB/historical_connections.txt (rev 0)
+++ ZODB/trunk/src/ZODB/historical_connections.txt 2007-11-14 01:10:26 UTC (rev 81822)
@@ -0,0 +1,399 @@
+======================
+Historical Connections
+======================
+
+Usage
+=====
+
+A database can be opened with a read-only, historical connection when given
+a specific transaction or datetime. This can enable full-context application
+level conflict resolution, historical exploration and preparation for reverts,
+or even the use of a historical database revision as "production" while
+development continues on a "development" head.
+
+A database can be opened historically ``at`` or ``before`` a given transaction
+serial or datetime. Here's a simple example. It should work with any storage
+that supports ``loadBefore``. Unfortunately that does not include
+MappingStorage, so we use a FileStorage instance. Also unfortunately, as of
+this writing there is no reliable way to determine if a storage truly
+implements loadBefore, or if it simply returns None (as in BaseStorage), other
+than reading code.
+
+We'll begin our example with a fairly standard set up. We
+
+- make a storage and a database;
+- open a normal connection;
+- modify the database through the connection;
+- commit a transaction, remembering the time in UTC;
+- modify the database again; and
+- commit a transaction.
+
+ >>> import ZODB.FileStorage
+ >>> storage = ZODB.FileStorage.FileStorage(
+ ... 'HistoricalConnectionTests.fs', create=True)
+ >>> import ZODB
+ >>> db = ZODB.DB(storage)
+ >>> conn = db.open()
+
+ >>> import persistent.mapping
+
+ >>> conn.root()['first'] = persistent.mapping.PersistentMapping(count=0)
+
+ >>> import transaction
+ >>> transaction.commit()
+
+ >>> import datetime
+ >>> now = datetime.datetime.utcnow()
+
+ >>> root = conn.root()
+ >>> root['second'] = persistent.mapping.PersistentMapping()
+ >>> root['first']['count'] += 1
+
+ >>> transaction.commit()
+
+Now we will show a historical connection. We'll open one using the ``now``
+value we generated above, and then demonstrate that the state of the original
+connection, at the mutable head of the database, is different than the
+historical state.
+
+ >>> transaction1 = transaction.TransactionManager()
+
+ >>> historical_conn = db.open(transaction_manager=transaction1, at=now)
+
+ >>> sorted(conn.root().keys())
+ ['first', 'second']
+ >>> conn.root()['first']['count']
+ 1
+
+ >>> historical_conn.root().keys()
+ ['first']
+ >>> historical_conn.root()['first']['count']
+ 0
+
+Moreover, the historical connection cannot commit changes.
+
+ >>> historical_conn.root()['first']['count'] += 1
+ >>> historical_conn.root()['first']['count']
+ 1
+ >>> transaction1.commit()
+ Traceback (most recent call last):
+ ...
+ ReadOnlyHistoryError
+ >>> transaction1.abort()
+ >>> historical_conn.root()['first']['count']
+ 0
+
+(It is because of the mutable behavior outside of transactional semantics that
+we must have a separate connection, and associated object cache, per thread,
+even though the semantics should be readonly.)
+
+As demonstrated, a timezone-naive datetime will be interpreted as UTC. You
+can also pass a timezone-aware datetime or a serial (transaction id).
+Here's opening with a serial--the serial of the root at the time of the first
+commit.
+
+ >>> historical_serial = historical_conn.root()._p_serial
+ >>> historical_conn.close()
+
+ >>> historical_conn = db.open(transaction_manager=transaction1,
+ ... at=historical_serial)
+ >>> historical_conn.root().keys()
+ ['first']
+ >>> historical_conn.root()['first']['count']
+ 0
+ >>> historical_conn.close()
+
+We've shown the ``at`` argument. You can also ask to look ``before`` a datetime
+or serial. (It's an error to pass both [#not_both]_) In this example, we're
+looking at the database immediately prior to the most recent change to the
+root.
+
+ >>> serial = conn.root()._p_serial
+ >>> historical_conn = db.open(
+ ... transaction_manager=transaction1, before=serial)
+ >>> historical_conn.root().keys()
+ ['first']
+ >>> historical_conn.root()['first']['count']
+ 0
+
+In fact, ``at`` arguments are translated into ``before`` values because the
+underlying mechanism is a storage's loadBefore method. When you look at a
+connection's ``before`` attribute, it is normalized into a ``before`` serial,
+no matter what you pass into ``db.open``.
+
+ >>> print conn.before
+ None
+ >>> historical_conn.before == serial
+ True
+
+ >>> conn.close()
+
+Configuration
+=============
+
+Like normal connections, the database lets you set how many historical
+connections can be active without generating a warning for a given serial, and
+how many objects should be kept in each connection's object cache.
+
+ >>> db.getHistoricalPoolSize()
+ 3
+ >>> db.setHistoricalPoolSize(4)
+ >>> db.getHistoricalPoolSize()
+ 4
+
+ >>> db.getHistoricalCacheSize()
+ 1000
+ >>> db.setHistoricalCacheSize(2000)
+ >>> db.getHistoricalCacheSize()
+ 2000
+
+In addition, you can specify the minimum number of seconds that an unused
+historical connection should be kept.
+
+ >>> db.getHistoricalTimeout()
+ 300
+ >>> db.setHistoricalTimeout(400)
+ >>> db.getHistoricalTimeout()
+ 400
+
+All three of these values can be specified in a ZConfig file. We're using
+mapping storage for simplicity, but remember, as we said at the start of this
+document, mapping storage will not work for historical connections (and in fact
+may seem to work but then fail confusingly) because it does not implement
+loadBefore.
+
+ >>> import ZODB.config
+ >>> db2 = ZODB.config.databaseFromString('''
+ ... <zodb>
+ ... <mappingstorage/>
+ ... historical-pool-size 5
+ ... historical-cache-size 1500
+ ... historical-timeout 6m
+ ... </zodb>
+ ... ''')
+ >>> db2.getHistoricalPoolSize()
+ 5
+ >>> db2.getHistoricalCacheSize()
+ 1500
+ >>> db2.getHistoricalTimeout()
+ 360
+
+Let's actually look at these values at work by shining some light into what
+has been a black box up to now. We'll actually do some white box examination
+of what is going on in the database, pools and connections.
+
+First we'll clean out all the old historical pools so we have a clean slate.
+
+ >>> historical_conn.close()
+ >>> db.removeHistoricalPool(at=now)
+ >>> db.removeHistoricalPool(at=historical_serial)
+ >>> db.removeHistoricalPool(before=serial)
+
+Now lets look what happens to the pools when we create an historical
+connection.
+
+ >>> pools = db._pools
+ >>> len(pools)
+ 1
+ >>> pools.keys()
+ ['']
+ >>> historical_conn = db.open(
+ ... transaction_manager=transaction1, before=serial)
+ >>> len(pools)
+ 2
+ >>> set(pools.keys()) == set(('', serial))
+ True
+ >>> pool = pools[serial]
+ >>> len(pool.all)
+ 1
+ >>> len(pool.available)
+ 0
+
+If you change the historical cache size, that changes the size of the
+persistent cache on our connection.
+
+ >>> historical_conn._cache.cache_size
+ 2000
+ >>> db.setHistoricalCacheSize(1500)
+ >>> historical_conn._cache.cache_size
+ 1500
+
+Now let's look at pool sizes. We'll set it to two, then make and close three
+connections. We should end up with only two available connections.
+
+ >>> db.setHistoricalPoolSize(2)
+
+ >>> transaction2 = transaction.TransactionManager()
+ >>> historical_conn2 = db.open(
+ ... transaction_manager=transaction2, before=serial)
+ >>> len(pools)
+ 2
+ >>> len(pool.all)
+ 2
+ >>> len(pool.available)
+ 0
+
+ >>> transaction3 = transaction.TransactionManager()
+ >>> historical_conn3 = db.open(
+ ... transaction_manager=transaction3, before=serial)
+ >>> len(pools)
+ 2
+ >>> len(pool.all)
+ 3
+ >>> len(pool.available)
+ 0
+
+ >>> historical_conn3.close()
+ >>> len(pool.all)
+ 3
+ >>> len(pool.available)
+ 1
+
+ >>> historical_conn2.close()
+ >>> len(pool.all)
+ 3
+ >>> len(pool.available)
+ 2
+
+ >>> historical_conn.close()
+ >>> len(pool.all)
+ 2
+ >>> len(pool.available)
+ 2
+
+Finally, we'll look at the timeout. We'll need to monkeypatch ``time`` for
+this. (The funky __import__ of DB is because some ZODB __init__ shenanigans
+make the DB class mask the DB module.)
+
+ >>> import time
+ >>> delta = 200
+ >>> def stub_time():
+ ... return time.time() + delta
+ ...
+ >>> DB_module = __import__('ZODB.DB', globals(), locals(), ['chicken'])
+ >>> original_time = DB_module.time
+ >>> DB_module.time = stub_time
+
+ >>> historical_conn = db.open(before=serial)
+
+ >>> len(pools)
+ 2
+ >>> len(pool.all)
+ 2
+ >>> len(pool.available)
+ 1
+
+A close or an open will do garbage collection on the timed out connections.
+
+ >>> delta += 200
+ >>> historical_conn.close()
+
+ >>> len(pools)
+ 2
+ >>> len(pool.all)
+ 1
+ >>> len(pool.available)
+ 1
+
+An open also does garbage collection on the pools themselves.
+
+ >>> delta += 400
+ >>> conn = db.open() # normal connection
+ >>> len(pools)
+ 1
+ >>> len(pool.all)
+ 0
+ >>> len(pool.available)
+ 0
+ >>> serial in pools
+ False
+
+Invalidations
+=============
+
+In general, invalidations are ignored for historical connections, assuming
+that you have really specified a point in history. This is another white box
+test.
+
+ >>> historical_conn = db.open(
+ ... transaction_manager=transaction1, at=serial)
+ >>> sorted(conn.root().keys())
+ ['first', 'second']
+ >>> conn.root()['first']['count']
+ 1
+ >>> sorted(historical_conn.root().keys())
+ ['first', 'second']
+ >>> historical_conn.root()['first']['count']
+ 1
+ >>> conn.root()['first']['count'] += 1
+ >>> conn.root()['third'] = persistent.mapping.PersistentMapping()
+ >>> transaction.commit()
+ >>> len(historical_conn._invalidated)
+ 0
+ >>> historical_conn.close()
+
+If you specify a time in the future, you get a read-only connection that
+invalidates, rather than an error. The main reason for this is that, in some
+cases, the most recent transaction id is in the future, so there's not an easy
+way to reasonably disallow values. Beyond that, it's useful to have readonly
+connections, though this spelling isn't quite appealing for the general case.
+This "future history" also works with MVCC.
+
+ >>> THE_FUTURE = datetime.datetime(2038, 1, 19)
+ >>> historical_conn = db.open(
+ ... transaction_manager=transaction1, at=THE_FUTURE)
+ >>> conn.root()['first']['count'] += 1
+ >>> conn.root()['fourth'] = persistent.mapping.PersistentMapping()
+ >>> transaction.commit()
+ >>> len(historical_conn._invalidated)
+ 2
+ >>> historical_conn.root()['first']['count'] # MVCC
+ 2
+ >>> historical_conn.sync()
+ >>> len(historical_conn._invalidated)
+ 0
+ >>> historical_conn.root()['first']['count']
+ 3
+ >>> historical_conn.root()['first']['count'] = 0
+ >>> transaction1.commit()
+ Traceback (most recent call last):
+ ...
+ ReadOnlyHistoryError
+ >>> transaction1.abort()
+ >>> historical_conn.close()
+
+Warnings
+========
+
+First, if you use datetimes to get a historical connection, be aware that the
+conversion from datetime to transaction id has some pitfalls. Generally, the
+transaction ids in the database are only as time-accurate as the system clock
+was when the transaction id was created. Moreover, leap seconds are handled
+somewhat naively in the ZODB (largely because they are handled naively in Unix/
+POSIX time) so any minute that contains a leap second may contain serials that
+are a bit off. This is not generally a problem for the ZODB, because serials
+are guaranteed to increase, but it does highlight the fact that serials are not
+guaranteed to be accurately connected to time. Generally, they are about as
+reliable as time.time.
+
+Second, historical connections currently introduce potentially wide variance in
+memory requirements for the applications. Since you can open up many
+connections to different serials, and each gets their own pool, you may collect
+quite a few connections. For now, at least, if you use this feature you need to
+be particularly careful of your memory usage. Get rid of pools when you know
+you can, and reuse the exact same values for ``at`` or ``before`` when
+possible. If historical connections are used for conflict resolution, these
+connections will probably be temporary--not saved in a pool--so that the extra
+memory usage would also be brief and unlikely to overlap.
+
+.. ......... ..
+.. Footnotes ..
+.. ......... ..
+
+.. [#not_both] It is an error to try and pass both `at` and `before`.
+
+ >>> historical_conn = db.open(
+ ... transaction_manager=transaction1, at=now, before=historical_serial)
+ Traceback (most recent call last):
+ ...
+ ValueError: can only pass zero or one of `at` and `before`
\ No newline at end of file
Property changes on: ZODB/trunk/src/ZODB/historical_connections.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: ZODB/trunk/src/ZODB/interfaces.py
===================================================================
--- ZODB/trunk/src/ZODB/interfaces.py 2007-11-13 18:43:03 UTC (rev 81821)
+++ ZODB/trunk/src/ZODB/interfaces.py 2007-11-14 01:10:26 UTC (rev 81822)
@@ -34,9 +34,10 @@
loading objects from that Connection. Objects loaded by one
thread should not be used by another thread.
- A Connection can be associated with a single version when it is
- created. By default, a Connection is not associated with a
- version; it uses non-version data.
+ A Connection can be frozen to a serial--a transaction id, a single point in
+ history-- when it is created. By default, a Connection is not associated
+ with a serial; it uses current data. A Connection frozen to a serial is
+ read-only.
Each Connection provides an isolated, consistent view of the
database, by managing independent copies of objects in the
@@ -101,8 +102,7 @@
User Methods:
root, get, add, close, db, sync, isReadOnly, cacheGC,
- cacheFullSweep, cacheMinimize, getVersion,
- modifiedInVersion
+ cacheFullSweep, cacheMinimize
Experimental Methods:
onCloseCallbacks
@@ -226,9 +226,6 @@
The root is a persistent.mapping.PersistentMapping.
"""
- def getVersion():
- """Returns the version this connection is attached to."""
-
# Multi-database support.
connections = Attribute(
@@ -325,7 +322,7 @@
there would be so many that it would be inefficient to do so.
"""
- def invalidate(transaction_id, oids, version=''):
+ def invalidate(transaction_id, oids):
"""Invalidate object ids committed by the given transaction
The oids argument is an iterable of object identifiers.
@@ -356,13 +353,15 @@
entry.
""")
- def open(version='', transaction_manager=None):
+ def open(transaction_manager=None, serial=''):
"""Return an IConnection object for use by application code.
- version: the "version" that all changes will be made
- in, defaults to no version.
transaction_manager: transaction manager to use. None means
use the default transaction manager.
+ serial: the serial (transaction id) of the database to open.
+ An empty string (the default) means to open it to the newest
+ serial. Specifying a serial results in a read-only historical
+ connection.
Note that the connection pool is managed as a stack, to
increase the likelihood that the connection's stack will
@@ -441,7 +440,7 @@
This is used soley for informational purposes.
"""
- def history(oid, version, size=1):
+ def history(oid, size=1):
"""Return a sequence of history information dictionaries.
Up to size objects (including no objects) may be returned.
@@ -457,10 +456,6 @@
tid
The transaction identifier of the transaction that
committed the version.
- version
- The version that the revision is in. If the storage
- doesn't support versions, then this must be an empty
- string.
user_name
The user identifier, if any (or an empty string) of the
user on whos behalf the revision was committed.
@@ -491,18 +486,14 @@
This is used soley for informational purposes.
"""
- def load(oid, version):
- """Load data for an object id and version
+ def load(oid):
+ """Load data for an object id
A data record and serial are returned. The serial is a
transaction identifier of the transaction that wrote the data
record.
- A POSKeyError is raised if there is no record for the object
- id and version.
-
- Storages that don't support versions must ignore the version
- argument.
+ A POSKeyError is raised if there is no record for the object id.
"""
def loadBefore(oid, tid):
@@ -575,7 +566,7 @@
has a reasonable chance of being unique.
"""
- def store(oid, serial, data, version, transaction):
+ def store(oid, serial, data, transaction):
"""Store data for the object id, oid.
Arguments:
@@ -594,11 +585,6 @@
data
The data record. This is opaque to the storage.
- version
- The version to store the data is. If the storage doesn't
- support versions, this should be an empty string and the
- storage is allowed to ignore it.
-
transaction
A transaction object. This should match the current
transaction for the storage, set by tpc_begin.
@@ -707,7 +693,7 @@
# failed to take into account records after the pack time.
- def restore(oid, serial, data, version, prev_txn, transaction):
+ def restore(oid, serial, data, prev_txn, transaction):
"""Write data already committed in a separate database
The restore method is used when copying data from one database
@@ -727,9 +713,6 @@
The record data. This will be None if the transaction
undid the creation of the object.
- version
- The version identifier for the record
-
prev_txn
The identifier of a previous transaction that held the
object data. The target storage can sometimes use this
@@ -746,7 +729,6 @@
"""
oid = Attribute("The object id")
- version = Attribute("The version")
data = Attribute("The data record")
class IStorageTransactionInformation(Interface):
@@ -936,7 +918,7 @@
class IBlobStorage(Interface):
"""A storage supporting BLOBs."""
- def storeBlob(oid, oldserial, data, blob, version, transaction):
+ def storeBlob(oid, oldserial, data, blob, transaction):
"""Stores data that has a BLOB attached."""
def loadBlob(oid, serial):
Modified: ZODB/trunk/src/ZODB/serialize.py
===================================================================
--- ZODB/trunk/src/ZODB/serialize.py 2007-11-13 18:43:03 UTC (rev 81821)
+++ ZODB/trunk/src/ZODB/serialize.py 2007-11-14 01:10:26 UTC (rev 81822)
@@ -371,7 +371,7 @@
return oid
# Note that we never get here for persistent classes.
- # We'll use driect refs for normal classes.
+ # We'll use direct refs for normal classes.
if database_name:
return ['m', (database_name, oid, klass)]
Modified: ZODB/trunk/src/ZODB/tests/VersionStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/VersionStorage.py 2007-11-13 18:43:03 UTC (rev 81821)
+++ ZODB/trunk/src/ZODB/tests/VersionStorage.py 2007-11-14 01:10:26 UTC (rev 81822)
@@ -394,153 +394,6 @@
self._storage.tpc_finish(t)
self.assertEqual(oids, [oid])
- def checkPackVersions(self):
- db = DB(self._storage)
- cn = db.open(version="testversion")
- root = cn.root()
-
- obj = root["obj"] = MinPO("obj")
- root["obj2"] = MinPO("obj2")
- txn = transaction.get()
- txn.note("create 2 objs in version")
- txn.commit()
-
- obj.value = "77"
- txn = transaction.get()
- txn.note("modify obj in version")
- txn.commit()
-
- # undo the modification to generate a mix of backpointers
- # and versions for pack to chase
- info = db.undoInfo()
- db.undo(info[0]["id"])
- txn = transaction.get()
- txn.note("undo modification")
- txn.commit()
-
- snooze()
- self._storage.pack(time.time(), referencesf)
-
- db.commitVersion("testversion")
- txn = transaction.get()
- txn.note("commit version")
- txn.commit()
-
- cn = db.open()
- root = cn.root()
- root["obj"] = "no version"
-
- txn = transaction.get()
- txn.note("modify obj")
- txn.commit()
-
- self._storage.pack(time.time(), referencesf)
-
- def checkPackVersionsInPast(self):
- db = DB(self._storage)
- cn = db.open(version="testversion")
- root = cn.root()
-
- obj = root["obj"] = MinPO("obj")
- root["obj2"] = MinPO("obj2")
- txn = transaction.get()
- txn.note("create 2 objs in version")
- txn.commit()
-
- obj.value = "77"
- txn = transaction.get()
- txn.note("modify obj in version")
- txn.commit()
-
- t0 = time.time()
- snooze()
-
- # undo the modification to generate a mix of backpointers
- # and versions for pack to chase
- info = db.undoInfo()
- db.undo(info[0]["id"])
- txn = transaction.get()
- txn.note("undo modification")
- txn.commit()
-
- self._storage.pack(t0, referencesf)
-
- db.commitVersion("testversion")
- txn = transaction.get()
- txn.note("commit version")
- txn.commit()
-
- cn = db.open()
- root = cn.root()
- root["obj"] = "no version"
-
- txn = transaction.get()
- txn.note("modify obj")
- txn.commit()
-
- self._storage.pack(time.time(), referencesf)
-
- def checkPackVersionReachable(self):
- db = DB(self._storage)
- cn = db.open()
- root = cn.root()
-
- names = "a", "b", "c"
-
- for name in names:
- root[name] = MinPO(name)
- transaction.commit()
-
- for name in names:
- cn2 = db.open(version=name)
- rt2 = cn2.root()
- obj = rt2[name]
- obj.value = MinPO("version")
- transaction.commit()
- cn2.close()
-
- root["d"] = MinPO("d")
- transaction.commit()
- snooze()
-
- self._storage.pack(time.time(), referencesf)
- cn.sync()
-
- # make sure all the non-version data is there
- for name, obj in root.items():
- self.assertEqual(name, obj.value)
-
- # make sure all the version-data is there,
- # and create a new revision in the version
- for name in names:
- cn2 = db.open(version=name)
- rt2 = cn2.root()
- obj = rt2[name].value
- self.assertEqual(obj.value, "version")
- obj.value = "still version"
- transaction.commit()
- cn2.close()
-
- db.abortVersion("b")
- txn = transaction.get()
- txn.note("abort version b")
- txn.commit()
-
- t = time.time()
- snooze()
-
- L = db.undoInfo()
- db.undo(L[0]["id"])
- txn = transaction.get()
- txn.note("undo abort")
- txn.commit()
-
- self._storage.pack(t, referencesf)
-
- cn2 = db.open(version="b")
- rt2 = cn2.root()
- self.assertEqual(rt2["b"].value.value, "still version")
-
def checkLoadBeforeVersion(self):
eq = self.assertEqual
oid = self._storage.new_oid()
Modified: ZODB/trunk/src/ZODB/tests/dbopen.txt
===================================================================
--- ZODB/trunk/src/ZODB/tests/dbopen.txt 2007-11-13 18:43:03 UTC (rev 81821)
+++ ZODB/trunk/src/ZODB/tests/dbopen.txt 2007-11-14 01:10:26 UTC (rev 81822)
@@ -239,12 +239,12 @@
Closing another one will purge the one with MARKER 0 from the stack
(since it was the first added to the stack):
- >>> [c.MARKER for c in pool.available]
+ >>> [c.MARKER for c in pool.available.values()]
[0, 1, 2]
>>> conns[0].close() # MARKER 3
>>> len(pool.available), len(pool.all)
(3, 5)
- >>> [c.MARKER for c in pool.available]
+ >>> [c.MARKER for c in pool.available.values()]
[1, 2, 3]
Similarly for the other two:
@@ -252,7 +252,7 @@
>>> conns[1].close(); conns[2].close()
>>> len(pool.available), len(pool.all)
(3, 3)
- >>> [c.MARKER for c in pool.available]
+ >>> [c.MARKER for c in pool.available.values()]
[3, 4, 5]
Reducing the pool size may also purge the oldest closed connections:
@@ -260,7 +260,7 @@
>>> db.setPoolSize(2) # gets rid of MARKER 3
>>> len(pool.available), len(pool.all)
(2, 2)
- >>> [c.MARKER for c in pool.available]
+ >>> [c.MARKER for c in pool.available.values()]
[4, 5]
Since MARKER 5 is still the last one added to the stack, it will be the
Modified: ZODB/trunk/src/ZODB/tests/testDB.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testDB.py 2007-11-13 18:43:03 UTC (rev 81821)
+++ ZODB/trunk/src/ZODB/tests/testDB.py 2007-11-14 01:10:26 UTC (rev 81822)
@@ -14,7 +14,7 @@
import os
import time
import unittest
-import warnings
+import datetime
import transaction
@@ -35,8 +35,6 @@
self.__path = os.path.abspath('test.fs')
store = ZODB.FileStorage.FileStorage(self.__path)
self.db = ZODB.DB(store)
- warnings.filterwarnings(
- 'ignore', message='Versions are deprecated', module=__name__)
def tearDown(self):
self.db.close()
@@ -44,8 +42,8 @@
if os.path.exists(self.__path+s):
os.remove(self.__path+s)
- def dowork(self, version=''):
- c = self.db.open(version)
+ def dowork(self):
+ c = self.db.open()
r = c.root()
o = r[time.time()] = MinPO(0)
transaction.commit()
@@ -53,85 +51,95 @@
o.value = MinPO(i)
transaction.commit()
o = o.value
+ serial = o._p_serial
+ root_serial = r._p_serial
c.close()
+ return serial, root_serial
# make sure the basic methods are callable
def testSets(self):
self.db.setCacheSize(15)
- self.db.setVersionCacheSize(15)
+ self.db.setHistoricalCacheSize(15)
- def test_removeVersionPool(self):
- # Test that we can remove a version pool
+ def test_removeHistoricalPool(self):
+ # Test that we can remove a historical pool
# This is white box because we check some internal data structures
- self.dowork()
- self.dowork('v2')
- c1 = self.db.open('v1')
+ serial1, root_serial1 = self.dowork()
+ now = datetime.datetime.utcnow()
+ serial2, root_serial2 = self.dowork()
+ self.failUnless(root_serial1 < root_serial2)
+ c1 = self.db.open(at=now)
+ root = c1.root()
+ root.keys() # wake up object to get proper serial set
+ self.assertEqual(root._p_serial, root_serial1)
c1.close() # return to pool
- c12 = self.db.open('v1')
+ c12 = self.db.open(at=now)
c12.close() # return to pool
self.assert_(c1 is c12) # should be same
pools = self.db._pools
- self.assertEqual(len(pools), 3)
- self.assertEqual(nconn(pools), 3)
-
- self.db.removeVersionPool('v1')
-
self.assertEqual(len(pools), 2)
self.assertEqual(nconn(pools), 2)
- c12 = self.db.open('v1')
+ self.db.removeHistoricalPool(at=now)
+
+ self.assertEqual(len(pools), 1)
+ self.assertEqual(nconn(pools), 1)
+
+ c12 = self.db.open(at=now)
c12.close() # return to pool
self.assert_(c1 is not c12) # should be different
- self.assertEqual(len(pools), 3)
- self.assertEqual(nconn(pools), 3)
+ self.assertEqual(len(pools), 2)
+ self.assertEqual(nconn(pools), 2)
def _test_for_leak(self):
self.dowork()
- self.dowork('v2')
+ now = datetime.datetime.utcnow()
+ self.dowork()
while 1:
- c1 = self.db.open('v1')
- self.db.removeVersionPool('v1')
+ c1 = self.db.open(at=now)
+ self.db.removeHistoricalPool(at=now)
c1.close() # return to pool
- def test_removeVersionPool_while_connection_open(self):
+ def test_removeHistoricalPool_while_connection_open(self):
# Test that we can remove a version pool
# This is white box because we check some internal data structures
self.dowork()
- self.dowork('v2')
- c1 = self.db.open('v1')
+ now = datetime.datetime.utcnow()
+ self.dowork()
+ c1 = self.db.open(at=now)
c1.close() # return to pool
- c12 = self.db.open('v1')
+ c12 = self.db.open(at=now)
self.assert_(c1 is c12) # should be same
pools = self.db._pools
- self.assertEqual(len(pools), 3)
- self.assertEqual(nconn(pools), 3)
-
- self.db.removeVersionPool('v1')
-
self.assertEqual(len(pools), 2)
self.assertEqual(nconn(pools), 2)
+ self.db.removeHistoricalPool(at=now)
+
+ self.assertEqual(len(pools), 1)
+ self.assertEqual(nconn(pools), 1)
+
c12.close() # should leave pools alone
- self.assertEqual(len(pools), 2)
- self.assertEqual(nconn(pools), 2)
+ self.assertEqual(len(pools), 1)
+ self.assertEqual(nconn(pools), 1)
- c12 = self.db.open('v1')
+ c12 = self.db.open(at=now)
c12.close() # return to pool
self.assert_(c1 is not c12) # should be different
- self.assertEqual(len(pools), 3)
- self.assertEqual(nconn(pools), 3)
+ self.assertEqual(len(pools), 2)
+ self.assertEqual(nconn(pools), 2)
def test_references(self):
Modified: ZODB/trunk/src/ZODB/tests/testZODB.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testZODB.py 2007-11-13 18:43:03 UTC (rev 81821)
+++ ZODB/trunk/src/ZODB/tests/testZODB.py 2007-11-14 01:10:26 UTC (rev 81822)
@@ -136,20 +136,6 @@
def checkExportImportAborted(self):
self.checkExportImport(abort_it=True)
- def checkVersionOnly(self):
- # Make sure the changes to make empty transactions a no-op
- # still allow things like abortVersion(). This should work
- # because abortVersion() calls tpc_begin() itself.
- conn = self._db.open("version")
- try:
- r = conn.root()
- r[1] = 1
- transaction.commit()
- finally:
- conn.close()
- self._db.abortVersion("version")
- transaction.commit()
-
def checkResetCache(self):
# The cache size after a reset should be 0. Note that
# _resetCache is not a public API, but the resetCaches()
Added: ZODB/trunk/src/ZODB/tests/testhistoricalconnections.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testhistoricalconnections.py (rev 0)
+++ ZODB/trunk/src/ZODB/tests/testhistoricalconnections.py 2007-11-14 01:10:26 UTC (rev 81822)
@@ -0,0 +1,44 @@
+##############################################################################
+#
+# Copyright (c) 2007 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+$Id$
+"""
+import unittest
+from zope.testing import doctest, module
+
+def setUp(test):
+ module.setUp(test, 'historical_connections_txt')
+
+def tearDown(test):
+ test.globs['db'].close()
+ test.globs['db2'].close()
+ test.globs['storage'].close()
+ test.globs['storage'].cleanup()
+ # the DB class masks the module because of __init__ shenanigans
+ DB_module = __import__('ZODB.DB', globals(), locals(), ['chicken'])
+ DB_module.time = test.globs['original_time']
+ module.tearDown(test)
+
+def test_suite():
+ return unittest.TestSuite((
+ doctest.DocFileSuite('../historical_connections.txt',
+ setUp=setUp,
+ tearDown=tearDown,
+ optionflags=doctest.INTERPRET_FOOTNOTES,
+ ),
+ ))
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
+
More information about the Zodb-checkins
mailing list