[Zodb-checkins] CVS: Zope3/src/zodb/storage - demo.py:1.6
Barry Warsaw
barry@wooz.org
Thu, 3 Apr 2003 16:45:08 -0500
Update of /cvs-repository/Zope3/src/zodb/storage
In directory cvs.zope.org:/tmp/cvs-serv7835
Modified Files:
demo.py
Log Message:
A start of a re-implementation of the demo storage based on the memory
storage. We need to flesh out the test_demo test cases to
specifically check for situations where state in the demo depends on
state in the backing storage.
=== Zope3/src/zodb/storage/demo.py 1.5 => 1.6 ===
--- Zope3/src/zodb/storage/demo.py:1.5 Fri Mar 14 12:13:49 2003
+++ Zope3/src/zodb/storage/demo.py Thu Apr 3 16:45:06 2003
@@ -13,316 +13,151 @@
##############################################################################
"""Demo ZODB storage
-The Demo storage serves two purposes:
+The Demo storage provides for a two-level storage: there is a read-only
+backing storage and an writable fronting storage. There are at least two use
+cases for a demo storage:
- - Provide an example implementation of a full storage without
- distracting storage details,
+- The backing storage can be on CDROM
- - Provide a volatile storage that is useful for giving demonstrations.
+- Functional tests can load up the backing storage with initial state, then
+ easily get back to that initial state by zapping the front storages.
-The demo strorage can have a "base" storage that is used in a
-read-only fashion. The base storage must not contain version data.
+The fronting storage is an in-memory storage.
"""
-from __future__ import generators
-
-import base64
-
-from zodb.interfaces import *
-from zodb.storage.base import BaseStorage
+from zodb.interfaces import VersionLockError, ConflictError
+from zodb.storage.memory import MemoryFullStorage
from zodb.storage.interfaces import *
-from zodb.timestamp import TimeStamp
-
-class TxnRecord(object):
- def __init__(self, tid, user, desc, ext):
- self.tid = tid
- self.user = user
- self.desc = desc
- self.ext = ext
- self.packed = False
- self.data = []
-
- def setPack(self):
- self.packed = True
-
- def append(self, rec):
- self.data.append(rec)
-
- def __iter__(self):
- for rec in self.data:
- yield rec
-
- def undoInfo(self):
- return {"id": base64.encodestring(self.tid).rstrip(),
- "time": TimeStamp(self.tid).timeTime(),
- "user_name": self.user,
- "description": self.desc}
-
-class DataRecord(object):
-
- # XXX Need to add a prev_txn field for transactions created
- # by undo or commit/abort version.
-
- def __init__(self, oid, serial, data, refs, version, prev):
- self.oid = oid
- self.serial = serial
- self.data = data
- self.refs = refs
- self.version = version
- self.prev = prev
- self.rec_version = None
- self.rec_nonversion = None
-
- def setVersionData(self, rec):
- assert self.rec_nonversion is None
- self.rec_version = rec
-
- def setNonVersionData(self, rec):
- assert self.rec_version is None
- self.rec_nonversion = rec
-class DemoStorage(BaseStorage):
- """Support ephemeral updates to a base storage.
+class DemoStorage(MemoryFullStorage):
+ """Support ephemeral updates to a read-only backing storage.
The DemoStorage extends a read-only base storage so that ephemeral
- transactions can be commited in the demo. The transactions are
- only stored in memory; the base storage is not modified and
- updates are lost when the DemoStorage object is closed.
+ transactions can be commited in the demo. The transactions are only
+ stored in memory; the base storage is not modified and updates are lost
+ when the DemoStorage object is closed.
"""
- # The implementation uses three dictionaries to manage the
- # ephemeral transactions.
- # _data maps tid to TxnRecords
- # _index maps oid to DataRecords
- # _vindex maps version name to version indexes (like _index)
- # A version index maps oids to DataRecords.
- # _index is stored in _vindex under the key ""
-
- # XXX What if the base storage doesn't support undo or versions?
-
__implements__ = IStorage, IUndoStorage, IVersionStorage
- def __init__(self, base, name=None):
- if name is None:
- name = "DemoStorage"
- BaseStorage.__init__(self, name)
- self._base = base
- # XXX waste an oid
- self._oid = base.newObjectId()
-
- self._data = {}
- self._index = {}
- self._vindex = {"": self._index}
- self._cur_txn = None
-
- # delegate some methods to _base, even though they may
- # be provided by BaseStorage.
- self.getVersion = self._base.getVersion
- self.setVersion = self._base.setVersion
+ def __init__(self, name, backstorage, config=None):
+ self._back = backstorage
+ super(DemoStorage, self).__init__(name, config)
+ # After initializing the memory storage, be sure to initialize the
+ # last transaction id from the backing storage.
+ self._ltid = self._back.lastTransaction()
def close(self):
- self._base.close()
+ self._back.close()
def cleanup(self):
- # XXX Don't cleanup the base.
+ # XXX Don't cleanup the backing storage
pass
- def _load(self, oid, version=""):
- index = self._vindex.get(version)
- if index is None:
- rec = None
- else:
- rec = index.get(oid)
- if rec is None and version:
- rec = self._vindex[""].get(oid)
- return rec
-
- def _loadCurrent(self, oid):
- # Return the current record for oid.
- # If the most recent revision is on a version, return that.
- rec = self._load(oid)
- if rec is None:
- # Consult the base storage and create a faux DataRecord
- try:
- version = self._base.modifiedInVersion(oid)
- except KeyError:
- return None
- data, serial = self._base.load(oid, version)
- return DataRecord(oid, serial, data, version, None)
- # If there is a pointer to version data from the nonversion
- # date, then it must be current.
- if rec.rec_version:
- rec = rec.rec_version
- return rec
+ def _datarec(self, storage, oid, version=''):
+ # We want a record containing the oid, serial, data, refs, version,
+ # previous serial, the rec_version and the rec_nonversion.
+ #
+ # See if we have the current record for the object
+ try:
+ data, serial = storage.load(oid, version)
+ except KeyError:
+ return None
+ it = iter(storage.iterator(serial))
+ txnrec = it.next()
+ assert txnrec.tid == serial
+ for datarec in txnrec:
+ if datarec.oid == oid:
+ return datarec
+ return None
- def load(self, oid, version):
+ def load(self, oid, version=''):
self._lock_acquire()
try:
- rec = self._load(oid, version)
- if rec is not None:
- return rec.data, rec.serial
- else:
- return self._base.load(oid, version)
+ try:
+ return super(DemoStorage, self).load(oid, version)
+ except KeyError:
+ return self._back.load(oid, version)
finally:
self._lock_release()
def loadSerial(self, oid, serial):
- rec = self._loadCurrent(oid)
- while rec is not None:
- if rec.serial == serial:
- return rec.data
- rec = rec.prev
- # XXX Need to fallback to base storage...
- raise POSKeyError(oid)
+ self._lock_acquire()
+ try:
+ try:
+ return super(DemoStorage, self).loadSerial(oid, serial)
+ except KeyError:
+ return self._back.loadSerial(oid, serial)
+ finally:
+ self._lock_release()
def getSerial(self, oid):
- rec = self._loadCurrent(oid)
- if rec is None:
- raise POSKeyError(oid)
- return rec.serial
-
- def store(self, oid, serial, data, refs, version, txn):
- if txn is not self._transaction:
- raise StorageTransactionError(self, txn)
- prev = self._loadCurrent(oid)
- if prev:
- if prev.version and prev.version != version:
- raise VersionLockError(oid, prev.version)
- if prev.serial != serial:
- raise ConflictError(oid=oid, serials=(prev.serial, serial))
- rec = DataRecord(oid, self._serial, data, refs, version, prev)
- if prev and version:
- rec.setNonVersionData(prev.rec_nonversion or prev)
- self._cur_txn.append(rec)
- return self._serial
-
- def restore(self, oid, serial, data, version, prev_txn, txn):
- pass
+ self._lock_acquire()
+ try:
+ try:
+ return super(DemoStorage, self).getSerial(oid)
+ except KeyError:
+ return self._back.getSerial(oid)
+ finally:
+ self._lock_release()
- def _begin(self, tid, user, desc, ext):
- self._cur_txn = TxnRecord(tid, user, desc, ext)
+ def lastSerial(self, oid):
+ self._lock_acquire()
+ try:
+ serial = super(DemoStorage, self).lastSerial(oid)
+ if serial is None:
+ return self._back.lastSerial(oid)
+ finally:
+ self._lock_release()
- def _finish(self, tid, user, desc, ext):
- self._data[tid] = self._cur_txn
- for rec in self._cur_txn:
- vindex = self._vindex.setdefault(rec.version, {})
- vindex[rec.oid] = rec
- nvrec = self._index.get(rec.oid)
- if rec.prev and rec.prev.version != rec.version:
- # If the current version doesn't match the previous
- # version, we may need to remove the object from a
- # version index.
- version = rec.prev.version
- if version:
- del self._vindex[version][rec.oid]
- if nvrec is not None:
- nvrec.setVersionData(rec)
- self._cur_txn = None
-
- def _abort(self):
- self._cur_txn = None
-
- # XXX Should it be possible to undo a transaction in the base storage?
-
- def undo(self, base64_tid, txn):
- tid = base64.decodestring(base64_tid + "\n")
- txn = self._data.get(tid)
- if txn is None:
- raise UndoError("Invalid transaction id")
- if txn.packed:
- raise UndoError("Can't undo packed transaction")
- oids = []
- for rec in txn:
- # In order to be able to undo this transaction, we must be
- # undoing either the current revision of the object, or we
- # must be restoring the exact same pickle (identity compared)
- # that would be restored if we were undoing the current
- # revision. Otherwise, we attempt application level conflict
- # resolution. If that fails, we raise an exception.
- if rec != self._loadCurrent(rec.oid):
- raise UndoError(rec.oid, "Can't undo transaction, "
- "because data is not current")
- if rec.prev:
- prev = rec.prev
- new = DataRecord(prev.oid, prev.serial, prev.data, prev.refs,
- prev.version, prev.prev)
+ def modifiedInVersion(self, oid):
+ version = super(DemoStorage, self).modifiedInVersion(oid)
+ if version == '':
+ # See if the backing storage has this object in a version, but
+ # watch out for KeyErrors that might occur if the back knows
+ # nothing about the object.
+ try:
+ backvers = self._back.modifiedInVersion(oid)
+ except KeyError:
+ pass
else:
- new = DataRecord(rec.oid, None, None, None, None, rec)
- self._cur_txn.append(new)
- oids.append(new.oid)
- return oids
-
- def undoLog(self, first, last, filter=None):
- if last < 0:
- last = first - last + 1
- L = self._data.keys()
- L.sort()
- L.reverse()
- i = 0
- results = []
- for tid in L:
- rec = self._data[tid]
- if rec.packed:
- break
- info = rec.undoInfo()
- if not filter or filter(info):
- if i > first:
- results.append(info)
- i += 1
- if i > last:
- break
- return results
-
- def versionEmpty(self, version):
- if self._vindex.get(version):
- return False
- else:
- return True
+ version = backvers
+ return version
def versions(self):
- return [v for v in self._vindex.keys()
- if v and not self.versionEmpty(v)]
+ vset = {}
+ for v in super(DemoStorage, self).versions():
+ vset[v] = True
+ for v in self._back.versions():
+ vset[v] = True
+ return vset.keys()
- def pack(self):
- pass
-
- def abortVersion(self, src, txn):
- if txn is not self._transaction:
- raise StorageTransactionError(self, txn)
- if not src:
- raise VersionError("Invalid version: %r" % src)
- vindex = self._vindex.get(src)
- if vindex is None:
- return []
- oids = []
- for rec in vindex.itervalues():
- if rec.rec_nonversion:
- data = rec.rec_nonversion.data
- refs = rec.rec_nonversion.refs
- new = DataRecord(rec.oid, self._serial, data, refs, "", rec)
- else:
- new = DataRecord(rec.oid, None, None, None, None, rec)
- self._cur_txn.append(new)
- oids.append(rec.oid)
- return oids
-
- def commitVersion(self, src, dest, txn):
+ def store(self, oid, serial, data, refs, version, txn):
+ superself = super(DemoStorage, self)
if txn is not self._transaction:
raise StorageTransactionError(self, txn)
- if not src:
- raise VersionCommitError("Invalid source version")
- if src == dest:
- raise VersionCommitError("Can't commit to same version: %r" % src)
- src_index = self._vindex.get(src)
- if src_index is None:
- return []
- oids = []
- for rec in src_index.itervalues():
- new = DataRecord(rec.oid, self._serial, rec.data,
- rec.refs, dest, rec)
- if dest and rec.rec_nonversion:
- new.setNonVersionData(rec.rec_nonversion)
- self._cur_txn.append(new)
- oids.append(rec.oid)
- return oids
+ self._lock_acquire()
+ try:
+ # See if we have the current record for the oid in the version
+ datarec = self._datarec(superself, oid, version)
+ if datarec is None:
+ # Try to get the datarec from the backing storage
+ try:
+ v = self._back.modifiedInVersion(oid)
+ except KeyError:
+ datarec = None
+ else:
+ datarec = self._datarec(self._back, oid, v)
+ if datarec is not None:
+ if datarec.version and datarec.version <> version:
+ raise VersionLockError(oid, datarec.version)
+ if datarec.serial <> serial:
+ data, refs = self._conflict.resolve(
+ oid, datarec.serial, serial, data)
+ # Either this is the first store of this object or we have the
+ # current revision, not the backing storage. Either way, we can
+ # store the new revision.
+ return superself.store(oid, serial, data, refs, version, txn)
+ finally:
+ self._lock_release()