[Zodb-checkins] SVN: ZODB/trunk/src/Z Got rid of more version
support.
Jim Fulton
jim at zope.com
Sun Jan 6 20:14:54 EST 2008
Log message for revision 82726:
Got rid of more version support.
Changed:
U ZODB/trunk/src/ZEO/ClientStorage.py
U ZODB/trunk/src/ZEO/CommitLog.py
U ZODB/trunk/src/ZEO/ServerStub.py
U ZODB/trunk/src/ZEO/StorageServer.py
U ZODB/trunk/src/ZEO/TransactionBuffer.py
U ZODB/trunk/src/ZEO/__init__.py
U ZODB/trunk/src/ZEO/interfaces.py
U ZODB/trunk/src/ZEO/monitor.py
U ZODB/trunk/src/ZEO/scripts/timeout.py
U ZODB/trunk/src/ZEO/tests/ConnectionTests.py
U ZODB/trunk/src/ZEO/tests/InvalidationTests.py
U ZODB/trunk/src/ZEO/tests/registerDB.test
U ZODB/trunk/src/ZEO/tests/testTransactionBuffer.py
U ZODB/trunk/src/ZEO/tests/testZEO.py
U ZODB/trunk/src/ZEO/tests/test_cache.py
U ZODB/trunk/src/ZODB/interfaces.py
U ZODB/trunk/src/ZODB/tests/BasicStorage.py
-=-
Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py 2008-01-07 00:34:04 UTC (rev 82725)
+++ ZODB/trunk/src/ZEO/ClientStorage.py 2008-01-07 01:14:50 UTC (rev 82726)
@@ -265,11 +265,11 @@
# _server_addr is used by sortKey()
self._server_addr = None
- self._tfile = None
- self._pickler = None
+
+ self._verification_invalidations = None
self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
- 'supportsUndo':0, 'supportsVersions': 0}
+ 'supportsUndo':0}
self._tbuf = self.TransactionBufferClass()
self._db = None
@@ -572,6 +572,13 @@
# it should set self._server. If it goes through full cache
# verification, then endVerify() should self._server.
+# if not self._cache:
+# log2("No verification necessary -- empty cache")
+# self._server = server
+# self._ready.set()
+# return "full verification"
+
+
last_inval_tid = self._cache.getLastTid()
if last_inval_tid is not None:
ltid = server.lastTransaction()
@@ -594,17 +601,15 @@
self._server = server
self._ready.set()
return "quick verification"
-
+
log2("Verifying cache")
# setup tempfile to hold zeoVerify results
- self._tfile = tempfile.TemporaryFile(suffix=".inv")
- self._pickler = cPickle.Pickler(self._tfile, 1)
- self._pickler.fast = 1 # Don't use the memo
+ self._verification_invalidations = []
# TODO: should batch these operations for efficiency; would need
# to acquire lock ...
for oid, tid, version in self._cache.contents():
- server.verify(oid, version, tid)
+ server.verify(oid, tid)
self._pending_server = server
server.endZeoVerify()
return "full verification"
@@ -667,10 +672,6 @@
"""Storage API: return whether we support undo."""
return self._info['supportsUndo']
- def supportsVersions(self):
- """Storage API: return whether we support versions."""
- return self._info['supportsVersions']
-
def isReadOnly(self):
"""Storage API: return whether we are in read-only mode."""
if self._is_read_only:
@@ -689,44 +690,10 @@
raise POSException.StorageTransactionError(self._transaction,
trans)
- def abortVersion(self, version, txn):
- """Storage API: clear any changes made by the given version."""
- self._check_trans(txn)
- tid, oids = self._server.abortVersion(version, id(txn))
- # When a version aborts, invalidate the version and
- # non-version data. The non-version data should still be
- # valid, but older versions of ZODB will change the
- # non-version serialno on an abort version. With those
- # versions of ZODB, you'd get a conflict error if you tried to
- # commit a transaction with the cached data.
-
- # If we could guarantee that ZODB gave the right answer,
- # we could just invalidate the version data.
- for oid in oids:
- self._tbuf.invalidate(oid, '')
- return tid, oids
-
- def commitVersion(self, source, destination, txn):
- """Storage API: commit the source version in the destination."""
- self._check_trans(txn)
- tid, oids = self._server.commitVersion(source, destination, id(txn))
- if destination:
- # just invalidate our version data
- for oid in oids:
- self._tbuf.invalidate(oid, source)
- else:
- # destination is "", so invalidate version and non-version
- for oid in oids:
- self._tbuf.invalidate(oid, "")
- return tid, oids
-
- def history(self, oid, version, length=1):
+ def history(self, oid, length=1):
"""Storage API: return a sequence of HistoryEntry objects.
-
- This does not support the optional filter argument defined by
- the Storage API.
"""
- return self._server.history(oid, version, length)
+ return self._server.history(oid, length)
def record_iternext(self, next=None):
"""Storage API: get the next database record.
@@ -743,21 +710,18 @@
"""Storage API: load a historical revision of an object."""
return self._server.loadSerial(oid, serial)
- def load(self, oid, version):
+ def load(self, oid, version=''):
"""Storage API: return the data for a given object.
This returns the pickle data and serial number for the object
- specified by the given object id and version, if they exist;
+ specified by the given object id, if they exist;
otherwise a KeyError is raised.
"""
- return self.loadEx(oid, version)[:2]
-
- def loadEx(self, oid, version):
self._lock.acquire() # for atomic processing of invalidations
try:
- t = self._cache.load(oid, version)
+ t = self._cache.load(oid, '')
if t:
- return t
+ return t[:2] # XXX strip version
finally:
self._lock.release()
@@ -773,19 +737,19 @@
finally:
self._lock.release()
- data, tid, ver = self._server.loadEx(oid, version)
+ data, tid = self._server.loadEx(oid)
self._lock.acquire() # for atomic processing of invalidations
try:
if self._load_status:
- self._cache.store(oid, ver, tid, None, data)
+ self._cache.store(oid, '', tid, None, data)
self._load_oid = None
finally:
self._lock.release()
finally:
self._load_lock.release()
- return data, tid, ver
+ return data, tid
def loadBefore(self, oid, tid):
self._lock.acquire()
@@ -823,20 +787,6 @@
return data, start, end
- def modifiedInVersion(self, oid):
- """Storage API: return the version, if any, that modfied an object.
-
- If no version modified the object, return an empty string.
- """
- self._lock.acquire()
- try:
- v = self._cache.modifiedInVersion(oid)
- if v is not None:
- return v
- finally:
- self._lock.release()
- return self._server.modifiedInVersion(oid)
-
def new_oid(self):
"""Storage API: return a new object identifier."""
if self._is_read_only:
@@ -887,24 +837,25 @@
def store(self, oid, serial, data, version, txn):
"""Storage API: store data for an object."""
+ assert not version
+
self._check_trans(txn)
- self._server.storea(oid, serial, data, version, id(txn))
- self._tbuf.store(oid, version, data)
+ self._server.storea(oid, serial, data, id(txn))
+ self._tbuf.store(oid, data)
return self._check_serials()
def storeBlob(self, oid, serial, data, blobfilename, version, txn):
"""Storage API: store a blob object."""
- serials = self.store(oid, serial, data, version, txn)
+ assert not version
+ serials = self.store(oid, serial, data, '', txn)
if self.shared_blob_dir:
- self._storeBlob_shared(
- oid, serial, data, blobfilename, version, txn)
+ self._storeBlob_shared(oid, serial, data, blobfilename, txn)
else:
- self._server.storeBlob(
- oid, serial, data, blobfilename, version, txn)
+ self._server.storeBlob(oid, serial, data, blobfilename, txn)
self._tbuf.storeBlob(oid, blobfilename)
return serials
- def _storeBlob_shared(self, oid, serial, data, filename, version, txn):
+ def _storeBlob_shared(self, oid, serial, data, filename, txn):
# First, move the blob into the blob directory
dir = self.fshelper.getPathForOID(oid)
if not os.path.exists(dir):
@@ -924,8 +875,7 @@
# Now tell the server where we put it
self._server.storeBlobShared(
- oid, serial, data,
- os.path.basename(target), version, id(txn))
+ oid, serial, data, os.path.basename(target), id(txn))
def _have_blob(self, blob_filename, oid, serial):
if os.path.exists(blob_filename):
@@ -1161,14 +1111,14 @@
if self._cache is None:
return
- for oid, version, data in self._tbuf:
- self._cache.invalidate(oid, version, tid)
+ for oid, data in self._tbuf:
+ self._cache.invalidate(oid, '', tid)
# If data is None, we just invalidate.
if data is not None:
s = self._seriald[oid]
if s != ResolvedSerial:
assert s == tid, (s, tid)
- self._cache.store(oid, version, s, None, data)
+ self._cache.store(oid, '', s, None, data)
if self.fshelper is not None:
@@ -1197,7 +1147,7 @@
self._check_trans(txn)
tid, oids = self._server.undo(trans_id, id(txn))
for oid in oids:
- self._tbuf.invalidate(oid, '')
+ self._tbuf.invalidate(oid)
return tid, oids
def undoInfo(self, first=0, last=-20, specification=None):
@@ -1216,14 +1166,6 @@
return []
return self._server.undoLog(first, last)
- def versionEmpty(self, version):
- """Storage API: return whether the version has no transactions."""
- return self._server.versionEmpty(version)
-
- def versions(self, max=None):
- """Storage API: return a sequence of versions in the storage."""
- return self._server.versions(max)
-
# Below are methods invoked by the StorageServer
def serialnos(self, args):
@@ -1235,55 +1177,38 @@
self._info.update(dict)
def invalidateVerify(self, args):
- """Server callback to invalidate an (oid, version) pair.
+ """Server callback to invalidate an (oid, '') pair.
This is called as part of cache validation.
"""
# Invalidation as result of verify_cache().
# Queue an invalidate for the end the verification procedure.
- if self._pickler is None:
+ if self._verification_invalidations is None:
# This should never happen. TODO: assert it doesn't, or log
# if it does.
return
- self._pickler.dump(args)
+ self._verification_invalidations.append(args[0])
- def _process_invalidations(self, invs):
- # Invalidations are sent by the ZEO server as a sequence of
- # oid, version pairs. The DB's invalidate() method expects a
- # dictionary of oids.
-
+ def _process_invalidations(self, tid, oids):
self._lock.acquire()
try:
- # versions maps version names to dictionary of invalidations
- versions = {}
- for oid, version, tid in invs:
+ for oid in oids:
if oid == self._load_oid:
self._load_status = 0
- self._cache.invalidate(oid, version, tid)
- oids = versions.get((version, tid))
- if not oids:
- versions[(version, tid)] = [oid]
- else:
- oids.append(oid)
+ self._cache.invalidate(oid, '', tid)
if self._db is not None:
- for (version, tid), d in versions.items():
- self._db.invalidate(tid, d, version=version)
+ self._db.invalidate(tid, oids)
finally:
self._lock.release()
def endVerify(self):
"""Server callback to signal end of cache validation."""
- if self._pickler is None:
+ if self._verification_invalidations is None:
return
# write end-of-data marker
- self._pickler.dump((None, None))
- self._pickler = None
- self._tfile.seek(0)
- f = self._tfile
- self._tfile = None
- self._process_invalidations(InvalidationLogIterator(f))
- f.close()
+ self._process_invalidations(None, self._verification_invalidations)
+ self._verification_invalidations = None
log2("endVerify finishing")
self._server = self._pending_server
@@ -1293,19 +1218,18 @@
def invalidateTransaction(self, tid, args):
"""Invalidate objects modified by tid."""
+ oids = (a[0] for a in args)
self._lock.acquire()
try:
self._cache.setLastTid(tid)
finally:
self._lock.release()
- if self._pickler is not None:
+ if self._verification_invalidations is not None:
log2("Transactional invalidation during cache verification",
level=BLATHER)
- for t in args:
- self._pickler.dump(t)
+ self._verification_invalidations.extend(oids)
return
- self._process_invalidations([(oid, version, tid)
- for oid, version in args])
+ self._process_invalidations(tid, list(oids))
# The following are for compatibility with protocol version 2.0.0
@@ -1316,10 +1240,3 @@
end = endVerify
Invalidate = invalidateTrans
-def InvalidationLogIterator(fileobj):
- unpickler = cPickle.Unpickler(fileobj)
- while 1:
- oid, version = unpickler.load()
- if oid is None:
- break
- yield oid, version, None
Modified: ZODB/trunk/src/ZEO/CommitLog.py
===================================================================
--- ZODB/trunk/src/ZEO/CommitLog.py 2008-01-07 00:34:04 UTC (rev 82725)
+++ ZODB/trunk/src/ZEO/CommitLog.py 2008-01-07 01:14:50 UTC (rev 82726)
@@ -34,8 +34,8 @@
def size(self):
return self.file.tell()
- def store(self, oid, serial, data, version):
- self.pickler.dump((oid, serial, data, version))
+ def store(self, oid, serial, data):
+ self.pickler.dump((oid, serial, data))
self.stores += 1
def get_loader(self):
Modified: ZODB/trunk/src/ZEO/ServerStub.py
===================================================================
--- ZODB/trunk/src/ZEO/ServerStub.py 2008-01-07 00:34:04 UTC (rev 82725)
+++ ZODB/trunk/src/ZEO/ServerStub.py 2008-01-07 01:14:50 UTC (rev 82726)
@@ -116,23 +116,21 @@
# server will make an asynchronous invalidateVerify() call.
# @param oid object id
# @param s serial number on non-version data
- # @param sv serial number of version data or None
# @defreturn async
- def zeoVerify(self, oid, s, sv):
- self.rpc.callAsync('zeoVerify', oid, s, sv)
+ def zeoVerify(self, oid, s):
+ self.rpc.callAsync('zeoVerify', oid, s, None)
##
- # Check whether current serial number is valid for oid and version.
+ # Check whether current serial number is valid for oid.
# If the serial number is not current, the server will make an
# asynchronous invalidateVerify() call.
# @param oid object id
- # @param version name of version for oid
# @param serial client's current serial number
# @defreturn async
- def verify(self, oid, version, serial):
- self.rpc.callAsync('verify', oid, version, serial)
+ def verify(self, oid, serial):
+ self.rpc.callAsync('verify', oid, '', serial)
##
# Signal to the server that cache verification is done.
@@ -166,34 +164,26 @@
self.rpc.call('pack', t, wait)
##
- # Return current data for oid. Version data is returned if
- # present.
+ # Return current data for oid.
# @param oid object id
- # @defreturn 5-tuple
- # @return 5-tuple, current non-version data, serial number,
- # version name, version data, version data serial number
+ # @defreturn 2-tuple
+ # @return 2-tuple, current non-version data, serial number
# @exception KeyError if oid is not found
def zeoLoad(self, oid):
- return self.rpc.call('zeoLoad', oid)
+ return self.rpc.call('zeoLoad', oid)[:2]
##
- # Return current data for oid in version, the tid of the
- # transaction that wrote the most recent revision, and the name of
- # the version for the data returned. Note that if the object
- # wasn't modified in the version, then the non-version data is
- # returned and the returned version is an empty string.
+ # Return current data for oid, and the tid of the
+ # transaction that wrote the most recent revision.
# @param oid object id
- # @param version string, name of version
- # @defreturn 3-tuple
- # @return data, transaction id, version
- # where version is the name of the version the data came
- # from or "" for non-version data
+ # @defreturn 2-tuple
+ # @return data, transaction id
# @exception KeyError if oid is not found
- def loadEx(self, oid, version):
- return self.rpc.call("loadEx", oid, version)
+ def loadEx(self, oid):
+ return self.rpc.call("loadEx", oid, '')[:2]
##
# Return non-current data along with transaction ids that identify
@@ -213,14 +203,13 @@
# @param oid object id
# @param serial serial number that this transaction read
# @param data new data record for oid
- # @param version name of version or ""
# @param id id of current transaction
# @defreturn async
- def storea(self, oid, serial, data, version, id):
- self.rpc.callAsync('storea', oid, serial, data, version, id)
+ def storea(self, oid, serial, data, id):
+ self.rpc.callAsync('storea', oid, serial, data, '', id)
- def storeBlob(self, oid, serial, data, blobfilename, version, txn):
+ def storeBlob(self, oid, serial, data, blobfilename, txn):
# Store a blob to the server. We don't want to real all of
# the data into memory, so we use a message iterator. This
@@ -235,13 +224,13 @@
break
yield ('storeBlobChunk', (chunk, ))
f.close()
- yield ('storeBlobEnd', (oid, serial, data, version, id(txn)))
+ yield ('storeBlobEnd', (oid, serial, data, '', id(txn)))
self.rpc.callAsyncIterator(store())
- def storeBlobShared(self, oid, serial, data, filename, version, id):
+ def storeBlobShared(self, oid, serial, data, filename, id):
self.rpc.callAsync('storeBlobShared', oid, serial, data, filename,
- version, id)
+ '', id)
##
# Start two-phase commit for a transaction
@@ -267,23 +256,17 @@
def tpc_abort(self, id):
self.rpc.callAsync('tpc_abort', id)
- def abortVersion(self, src, id):
- return self.rpc.call('abortVersion', src, id)
-
- def commitVersion(self, src, dest, id):
- return self.rpc.call('commitVersion', src, dest, id)
-
- def history(self, oid, version, length=None):
+ def history(self, oid, length=None):
if length is None:
- return self.rpc.call('history', oid, version)
+ return self.rpc.call('history', oid, '')
else:
- return self.rpc.call('history', oid, version, length)
+ return self.rpc.call('history', oid, '', length)
def record_iternext(self, next):
return self.rpc.call('record_iternext', next)
- def load(self, oid, version):
- return self.rpc.call('load', oid, version)
+ def load(self, oid):
+ return self.rpc.call('load', oid, '')
def sendBlob(self, oid, serial):
return self.rpc.call('sendBlob', oid, serial)
@@ -294,14 +277,11 @@
def loadSerial(self, oid, serial):
return self.rpc.call('loadSerial', oid, serial)
- def modifiedInVersion(self, oid):
- return self.rpc.call('modifiedInVersion', oid)
-
def new_oid(self):
return self.rpc.call('new_oid')
- def store(self, oid, serial, data, version, trans):
- return self.rpc.call('store', oid, serial, data, version, trans)
+ def store(self, oid, serial, data, trans):
+ return self.rpc.call('store', oid, serial, data, '', trans)
def undo(self, trans_id, trans):
return self.rpc.call('undo', trans_id, trans)
@@ -312,15 +292,6 @@
def undoInfo(self, first, last, spec):
return self.rpc.call('undoInfo', first, last, spec)
- def versionEmpty(self, vers):
- return self.rpc.call('versionEmpty', vers)
-
- def versions(self, max=None):
- if max is None:
- return self.rpc.call('versions')
- else:
- return self.rpc.call('versions', max)
-
class ExtensionMethodWrapper:
def __init__(self, rpc, name):
self.rpc = rpc
Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py 2008-01-07 00:34:04 UTC (rev 82725)
+++ ZODB/trunk/src/ZEO/StorageServer.py 2008-01-07 01:14:50 UTC (rev 82726)
@@ -162,17 +162,6 @@
storage = self.storage
info = self.get_info()
- if info['supportsVersions']:
- self.versionEmpty = storage.versionEmpty
- self.versions = storage.versions
- self.modifiedInVersion = storage.modifiedInVersion
- else:
- self.versionEmpty = lambda version: True
- self.versions = lambda max=None: ()
- self.modifiedInVersion = lambda oid: ''
- def commitVersion(*a, **k):
- raise NotImplementedError
- self.commitVersion = self.abortVersion = commitVersion
if not info['supportsUndo']:
self.undoLog = self.undoInfo = lambda *a,**k: ()
@@ -277,13 +266,6 @@
storage = self.storage
try:
- supportsVersions = storage.supportsVersions
- except AttributeError:
- supportsVersions = False
- else:
- supportsVersions = supportsVersions()
-
- try:
supportsUndo = storage.supportsUndo
except AttributeError:
supportsUndo = False
@@ -294,7 +276,7 @@
'size': storage.getSize(),
'name': storage.getName(),
'supportsUndo': supportsUndo,
- 'supportsVersions': supportsVersions,
+ 'supportsVersions': False,
'extensionMethods': self.getExtensionMethods(),
'supports_record_iternext': hasattr(self, 'record_iternext'),
}
@@ -307,13 +289,10 @@
def getExtensionMethods(self):
return self._extensions
- def loadEx(self, oid, version):
+ def loadEx(self, oid, version=''):
self.stats.loads += 1
if version:
- oversion = self.storage.modifiedInVersion(oid)
- if oversion == version:
- data, serial = self.storage.load(oid, version)
- return data, serial, version
+ raise StorageServerError("Versions aren't supported.")
data, serial = self.storage.load(oid, '')
return data, serial, ''
@@ -324,20 +303,8 @@
def zeoLoad(self, oid):
self.stats.loads += 1
- v = self.storage.modifiedInVersion(oid)
- if v:
- pv, sv = self.storage.load(oid, v)
- else:
- pv = sv = None
- try:
- p, s = self.storage.load(oid, '')
- except KeyError:
- if sv:
- # Created in version, no non-version data
- p = s = None
- else:
- raise
- return p, s, v, pv, sv
+ p, s = self.storage.load(oid, '')
+ return p, s, '', None, None
def getInvalidations(self, tid):
invtid, invlist = self.server.get_invalidations(self.storage_id, tid)
@@ -348,20 +315,17 @@
return invtid, invlist
def verify(self, oid, version, tid):
+ if version:
+ raise StorageServerError("Versions aren't supported.")
try:
t = self.getTid(oid)
except KeyError:
self.client.invalidateVerify((oid, ""))
else:
if tid != t:
- # This will invalidate non-version data when the
- # client only has invalid version data. Since this is
- # an uncommon case, we avoid the cost of checking
- # whether the serial number matches the current
- # non-version data.
- self.client.invalidateVerify((oid, version))
+ self.client.invalidateVerify((oid, ''))
- def zeoVerify(self, oid, s, sv):
+ def zeoVerify(self, oid, s, sv=None):
if not self.verifying:
self.verifying = 1
self.stats.verifying_clients += 1
@@ -374,22 +338,8 @@
# invalidation is right. It could be an application bug
# that left a dangling reference, in which case it's bad.
else:
- # If the client has version data, the logic is a bit more
- # complicated. If the current serial number matches the
- # client serial number, then the non-version data must
- # also be valid. If the current serialno is for a
- # version, then the non-version data can't change.
-
- # If the version serialno isn't valid, then the
- # non-version serialno may or may not be valid. Rather
- # than trying to figure it whether it is valid, we just
- # invalidate it. Sending an invalidation for the
- # non-version data implies invalidating the version data
- # too, since an update to non-version data can only occur
- # after the version is aborted or committed.
if sv:
- if sv != os:
- self.client.invalidateVerify((oid, ''))
+ raise StorageServerError("Versions aren't supported.")
else:
if s != os:
self.client.invalidateVerify((oid, ''))
@@ -521,9 +471,11 @@
# an _.
def storea(self, oid, serial, data, version, id):
+ if version:
+ raise StorageServerError("Versions aren't supported.")
self._check_tid(id, exc=StorageTransactionError)
self.stats.stores += 1
- self.txnlog.store(oid, serial, data, version)
+ self.txnlog.store(oid, serial, data)
def storeBlobStart(self):
assert self.blob_tempfile is None
@@ -534,16 +486,20 @@
os.write(self.blob_tempfile[0], chunk)
def storeBlobEnd(self, oid, serial, data, version, id):
+ if version:
+ raise StorageServerError("Versions aren't supported.")
fd, tempname = self.blob_tempfile
self.blob_tempfile = None
os.close(fd)
- self.blob_log.append((oid, serial, data, tempname, version))
+ self.blob_log.append((oid, serial, data, tempname))
def storeBlobShared(self, oid, serial, data, filename, version, id):
+ if version:
+ raise StorageServerError("Versions aren't supported.")
# Reconstruct the full path from the filename in the OID directory
filename = os.path.join(self.storage.fshelper.getPathForOID(oid),
filename)
- self.blob_log.append((oid, serial, data, filename, version))
+ self.blob_log.append((oid, serial, data, filename))
def sendBlob(self, oid, serial):
self.client.storeBlob(oid, serial, self.storage.loadBlob(oid, serial))
@@ -558,20 +514,6 @@
else:
return self._wait(lambda: self._vote())
- def abortVersion(self, src, id):
- self._check_tid(id, exc=StorageTransactionError)
- if self.locked:
- return self._abortVersion(src)
- else:
- return self._wait(lambda: self._abortVersion(src))
-
- def commitVersion(self, src, dest, id):
- self._check_tid(id, exc=StorageTransactionError)
- if self.locked:
- return self._commitVersion(src, dest)
- else:
- return self._wait(lambda: self._commitVersion(src, dest))
-
def undo(self, trans_id, id):
self._check_tid(id, exc=StorageTransactionError)
if self.locked:
@@ -585,10 +527,10 @@
self.stats.lock_time = time.time()
self.storage.tpc_begin(txn, tid, status)
- def _store(self, oid, serial, data, version):
+ def _store(self, oid, serial, data):
err = None
try:
- newserial = self.storage.store(oid, serial, data, version,
+ newserial = self.storage.store(oid, serial, data, '',
self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
@@ -616,7 +558,7 @@
newserial = [(oid, err)]
else:
if serial != "\0\0\0\0\0\0\0\0":
- self.invalidated.append((oid, version))
+ self.invalidated.append((oid, ''))
if isinstance(newserial, str):
newserial = [(oid, newserial)]
@@ -645,21 +587,6 @@
self.client.serialnos(self.serials)
return
- def _abortVersion(self, src):
- tid, oids = self.storage.abortVersion(src, self.transaction)
- inv = [(oid, src) for oid in oids]
- self.invalidated.extend(inv)
- return tid, oids
-
- def _commitVersion(self, src, dest):
- tid, oids = self.storage.commitVersion(src, dest, self.transaction)
- inv = [(oid, dest) for oid in oids]
- self.invalidated.extend(inv)
- if dest:
- inv = [(oid, src) for oid in oids]
- self.invalidated.extend(inv)
- return tid, oids
-
def _undo(self, trans_id):
tid, oids = self.storage.undo(trans_id, self.transaction)
inv = [(oid, None) for oid in oids]
@@ -706,9 +633,9 @@
# Blob support
while self.blob_log:
- oid, oldserial, data, blobfilename, version = self.blob_log.pop()
+ oid, oldserial, data, blobfilename = self.blob_log.pop()
self.storage.storeBlob(oid, oldserial, data, blobfilename,
- version, self.transaction,)
+ '', self.transaction,)
resp = self._thunk()
if delay is not None:
@@ -742,6 +669,20 @@
else:
return 1
+ def modifiedInVersion(self, oid):
+ return ''
+
+ def versions(self):
+ return ()
+
+ def versionEmpty(self, version):
+ return True
+
+ def commitVersion(self, *a, **k):
+ raise NotImplementedError
+
+ abortVersion = commitVersion
+
class StorageServerDB:
def __init__(self, server, storage_id):
@@ -750,10 +691,12 @@
self.references = ZODB.serialize.referencesf
def invalidate(self, tid, oids, version=''):
+ if version:
+ raise StorageServerError("Versions aren't supported.")
storage_id = self.storage_id
self.server.invalidate(
None, storage_id, tid,
- [(oid, version) for oid in oids],
+ [(oid, '') for oid in oids],
)
for zeo_server in self.server.connections.get(storage_id, ())[:]:
try:
@@ -1026,7 +969,7 @@
This is called from several ZEOStorage methods.
- invalidated is a sequence of oid, version pairs.
+ invalidated is a sequence of oid, empty-string pairs.
This can do three different things:
Modified: ZODB/trunk/src/ZEO/TransactionBuffer.py
===================================================================
--- ZODB/trunk/src/ZEO/TransactionBuffer.py 2008-01-07 00:34:04 UTC (rev 82725)
+++ ZODB/trunk/src/ZEO/TransactionBuffer.py 2008-01-07 01:14:50 UTC (rev 82726)
@@ -77,34 +77,28 @@
finally:
self.lock.release()
- def store(self, oid, version, data):
+ def store(self, oid, data):
+ """Store oid, version, data for later retrieval"""
self.lock.acquire()
try:
- self._store(oid, version, data)
+ if self.closed:
+ return
+ self.pickler.dump((oid, data))
+ self.count += 1
+ # Estimate per-record cache size
+ self.size = self.size + len(data) + 31
finally:
self.lock.release()
def storeBlob(self, oid, blobfilename):
self.blobs.append((oid, blobfilename))
- def _store(self, oid, version, data):
- """Store oid, version, data for later retrieval"""
- if self.closed:
- return
- self.pickler.dump((oid, version, data))
- self.count += 1
- # Estimate per-record cache size
- self.size = self.size + len(data) + 31
- if version:
- # Assume version data has same size as non-version data
- self.size = self.size + len(version) + len(data) + 12
-
- def invalidate(self, oid, version):
+ def invalidate(self, oid):
self.lock.acquire()
try:
if self.closed:
return
- self.pickler.dump((oid, version, None))
+ self.pickler.dump((oid, None))
self.count += 1
finally:
self.lock.release()
Modified: ZODB/trunk/src/ZEO/__init__.py
===================================================================
--- ZODB/trunk/src/ZEO/__init__.py 2008-01-07 00:34:04 UTC (rev 82725)
+++ ZODB/trunk/src/ZEO/__init__.py 2008-01-07 01:14:50 UTC (rev 82726)
@@ -20,6 +20,3 @@
http://www.zope.org/Wikis/ZODB
"""
-
-# The next line must use double quotes, so release.py recognizes it.
-version = "3.7.0b3"
Modified: ZODB/trunk/src/ZEO/interfaces.py
===================================================================
--- ZODB/trunk/src/ZEO/interfaces.py 2008-01-07 00:34:04 UTC (rev 82725)
+++ ZODB/trunk/src/ZEO/interfaces.py 2008-01-07 01:14:50 UTC (rev 82726)
@@ -41,7 +41,7 @@
performed by the most recent transactions.
An iterable of up to size entries must be returned, where each
- entry is a transaction id and a sequence of object-id/version
- pairs describing the objects and versions written by the
+ entry is a transaction id and a sequence of object-id/empty-string
+ pairs describing the objects written by the
transaction, in chronological order.
"""
Modified: ZODB/trunk/src/ZEO/monitor.py
===================================================================
--- ZODB/trunk/src/ZEO/monitor.py 2008-01-07 00:34:04 UTC (rev 82725)
+++ ZODB/trunk/src/ZEO/monitor.py 2008-01-07 01:14:50 UTC (rev 82726)
@@ -24,6 +24,18 @@
import ZEO
+zeo_version = 'unknown'
+try:
+ import pkg_resources
+except ImportError:
+ pass
+else:
+ zeo_dist = pkg_resources.working_set.find(
+ pkg_resources.Requirement.parse('ZODB3')
+ )
+ if zeo_dist is not None:
+ zeo_version = zeo_dist.version
+
class StorageStats:
"""Per-storage usage statistics."""
@@ -149,7 +161,7 @@
f.close()
def dump(self, f):
- print >> f, "ZEO monitor server version %s" % ZEO.version
+ print >> f, "ZEO monitor server version %s" % zeo_version
print >> f, time.ctime()
print >> f
Modified: ZODB/trunk/src/ZEO/scripts/timeout.py
===================================================================
--- ZODB/trunk/src/ZEO/scripts/timeout.py 2008-01-07 00:34:04 UTC (rev 82725)
+++ ZODB/trunk/src/ZEO/scripts/timeout.py 2008-01-07 01:14:50 UTC (rev 82726)
@@ -49,14 +49,13 @@
print "Connected. Now starting a transaction..."
oid = storage.new_oid()
- version = ""
revid = ZERO
data = MinPO("timeout.py")
pickled_data = zodb_pickle(data)
t = Transaction()
t.user = "timeout.py"
storage.tpc_begin(t)
- storage.store(oid, revid, pickled_data, version, t)
+ storage.store(oid, revid, pickled_data, '', t)
print "Stored. Now voting..."
storage.tpc_vote(t)
Modified: ZODB/trunk/src/ZEO/tests/ConnectionTests.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/ConnectionTests.py 2008-01-07 00:34:04 UTC (rev 82725)
+++ ZODB/trunk/src/ZEO/tests/ConnectionTests.py 2008-01-07 01:14:50 UTC (rev 82726)
@@ -111,9 +111,17 @@
self._newAddr()
self.startServer()
+# self._old_log_level = logging.getLogger().getEffectiveLevel()
+# logging.getLogger().setLevel(logging.WARNING)
+# self._log_handler = logging.StreamHandler()
+# logging.getLogger().addHandler(self._log_handler)
+
def tearDown(self):
"""Try to cause the tests to halt"""
- logging.info("tearDown() %s" % self.id())
+# logging.getLogger().setLevel(self._old_log_level)
+# logging.getLogger().removeHandler(self._log_handler)
+# logging.info("tearDown() %s" % self.id())
+
for p in self.conf_paths:
os.remove(p)
if getattr(self, '_storage', None) is not None:
Modified: ZODB/trunk/src/ZEO/tests/InvalidationTests.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/InvalidationTests.py 2008-01-07 00:34:04 UTC (rev 82725)
+++ ZODB/trunk/src/ZEO/tests/InvalidationTests.py 2008-01-07 01:14:50 UTC (rev 82726)
@@ -24,8 +24,7 @@
from ZEO.tests.TestThread import TestThread
from ZODB.DB import DB
-from ZODB.POSException \
- import ReadConflictError, ConflictError, VersionLockError
+from ZODB.POSException import ReadConflictError, ConflictError
# The tests here let several threads have a go at one or more database
# instances simultaneously. Each thread appends a disjoint (from the
@@ -433,44 +432,6 @@
db1.close()
db2.close()
- # TODO: Temporarily disabled. I know it fails, and there's no point
- # getting an endless number of reports about that.
- def xxxcheckConcurrentUpdatesInVersions(self):
- self._storage = storage1 = self.openClientStorage()
- db1 = DB(storage1)
- db2 = DB(self.openClientStorage())
- stop = threading.Event()
-
- cn = db1.open()
- tree = cn.root()["tree"] = OOBTree()
- transaction.commit()
- cn.close()
-
- # Run three threads that update the BTree.
- # Two of the threads share a single storage so that it
- # is possible for both threads to read the same object
- # at the same time.
-
- cd = {}
- t1 = VersionStressThread(db1, stop, 1, cd, 1, 3)
- t2 = VersionStressThread(db2, stop, 2, cd, 2, 3, 0.01)
- t3 = VersionStressThread(db2, stop, 3, cd, 3, 3, 0.01)
- self.go(stop, cd, t1, t2, t3)
-
- while db1.lastTransaction() != db2.lastTransaction():
- db1._storage.sync()
- db2._storage.sync()
-
-
- cn = db1.open()
- tree = cn.root()["tree"]
- self._check_tree(cn, tree)
- self._check_threads(tree, t1, t2, t3)
-
- cn.close()
- db1.close()
- db2.close()
-
def checkConcurrentLargeUpdates(self):
# Use 3 threads like the 2StorageMT test above.
self._storage = storage1 = self.openClientStorage()
Modified: ZODB/trunk/src/ZEO/tests/registerDB.test
===================================================================
--- ZODB/trunk/src/ZEO/tests/registerDB.test 2008-01-07 00:34:04 UTC (rev 82725)
+++ ZODB/trunk/src/ZEO/tests/registerDB.test 2008-01-07 01:14:50 UTC (rev 82726)
@@ -81,17 +81,17 @@
invalidateTransaction trans2 2
[('ob1', ''), ('ob2', '')]
- >>> storage.db.invalidate('trans3', ['ob1', 'ob2'], 'v')
+ >>> storage.db.invalidate('trans3', ['ob1', 'ob2'])
invalidateTransaction trans3 1
- [('ob1', 'v'), ('ob2', 'v')]
+ [('ob1', ''), ('ob2', '')]
invalidateTransaction trans3 2
- [('ob1', 'v'), ('ob2', 'v')]
+ [('ob1', ''), ('ob2', '')]
The storage servers queue will reflect the invalidations:
>>> for tid, invalidated in server.invq['t']:
... print repr(tid), invalidated
- 'trans3' [('ob1', 'v'), ('ob2', 'v')]
+ 'trans3' [('ob1', ''), ('ob2', '')]
'trans2' [('ob1', ''), ('ob2', '')]
'trans1' [('ob0', ''), ('ob1', '')]
'trans0' [('ob0', '')]
Modified: ZODB/trunk/src/ZEO/tests/testTransactionBuffer.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testTransactionBuffer.py 2008-01-07 00:34:04 UTC (rev 82725)
+++ ZODB/trunk/src/ZEO/tests/testTransactionBuffer.py 2008-01-07 01:14:50 UTC (rev 82726)
@@ -23,18 +23,18 @@
def new_store_data():
"""Return arbitrary data to use as argument to store() method."""
- return random_string(8), '', random_string(random.randrange(1000))
+ return random_string(8), random_string(random.randrange(1000))
def new_invalidate_data():
"""Return arbitrary data to use as argument to invalidate() method."""
- return random_string(8), ''
+ return random_string(8)
class TransBufTests(unittest.TestCase):
def checkTypicalUsage(self):
tbuf = TransactionBuffer()
tbuf.store(*new_store_data())
- tbuf.invalidate(*new_invalidate_data())
+ tbuf.invalidate(new_invalidate_data())
for o in tbuf:
pass
@@ -45,13 +45,13 @@
tbuf.store(*d)
data.append(d)
d = new_invalidate_data()
- tbuf.invalidate(*d)
+ tbuf.invalidate(d)
data.append(d)
for i, x in enumerate(tbuf):
- if x[2] is None:
+ if x[1] is None:
# the tbuf add a dummy None to invalidates
- x = x[:2]
+ x = x[0]
self.assertEqual(x, data[i])
def checkOrderPreserved(self):
Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py 2008-01-07 00:34:04 UTC (rev 82725)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py 2008-01-07 01:14:50 UTC (rev 82726)
@@ -517,7 +517,6 @@
handle_serials
import transaction
- version = ''
somedata = 'a' * 10
blob = Blob()
@@ -680,9 +679,6 @@
def supportsUndo(self):
return False
- def supportsVersions(self):
- return False
-
def new_oid(self):
return self.server.new_oids(1)[0]
@@ -696,8 +692,8 @@
del self.server.client.serials[:]
return result
- def store(self, oid, serial, data, version, transaction):
- self.server.storea(oid, serial, data, version, id(transaction))
+ def store(self, oid, serial, data, version_ignored, transaction):
+ self.server.storea(oid, serial, data, '', id(transaction))
def tpc_finish(self, transaction, func = lambda: None):
self.server.tpc_finish(id(transaction))
@@ -792,7 +788,7 @@
>>> from ZODB.utils import u64
- >>> sorted([int(u64(oid)) for (oid, version) in oids])
+ >>> sorted([int(u64(oid)) for (oid, _) in oids])
[0, 92, 93, 94, 95, 96, 97, 98, 99, 100]
(Note that the fact that we get oids for 92-100 is actually an
@@ -840,7 +836,7 @@
>>> ntid == last[-1]
True
- >>> sorted([int(u64(oid)) for (oid, version) in oids])
+ >>> sorted([int(u64(oid)) for (oid, _) in oids])
[0, 101, 102, 103, 104]
"""
Modified: ZODB/trunk/src/ZEO/tests/test_cache.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/test_cache.py 2008-01-07 00:34:04 UTC (rev 82725)
+++ ZODB/trunk/src/ZEO/tests/test_cache.py 2008-01-07 01:14:50 UTC (rev 82726)
@@ -94,12 +94,10 @@
def testLoad(self):
data1 = "data for n1"
self.assertEqual(self.cache.load(n1, ""), None)
- self.assertEqual(self.cache.load(n1, "version"), None)
self.cache.store(n1, "", n3, None, data1)
self.assertEqual(self.cache.load(n1, ""), (data1, n3, ""))
# The cache doesn't know whether version exists, because it
# only has non-version data.
- self.assertEqual(self.cache.load(n1, "version"), None)
self.assertEqual(self.cache.modifiedInVersion(n1), None)
def testInvalidate(self):
Modified: ZODB/trunk/src/ZODB/interfaces.py
===================================================================
--- ZODB/trunk/src/ZODB/interfaces.py 2008-01-07 00:34:04 UTC (rev 82725)
+++ ZODB/trunk/src/ZODB/interfaces.py 2008-01-07 01:14:50 UTC (rev 82726)
@@ -217,7 +217,7 @@
Parameters:
tid: the storage-level id of the transaction that committed
- oids: oids is a set of oids, represented as a dict with oids as keys.
+ oids: oids is an iterable of oids.
"""
def root():
Modified: ZODB/trunk/src/ZODB/tests/BasicStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/BasicStorage.py 2008-01-07 00:34:04 UTC (rev 82725)
+++ ZODB/trunk/src/ZODB/tests/BasicStorage.py 2008-01-07 01:14:50 UTC (rev 82726)
@@ -47,7 +47,7 @@
self.assertRaises(
POSException.StorageTransactionError,
self._storage.store,
- 0, 1, 2, 3, transaction.Transaction())
+ 0, 1, 2, '', transaction.Transaction())
self._storage.tpc_abort(t)
def checkSerialIsNoneForInitialRevision(self):
More information about the Zodb-checkins
mailing list