[Zodb-checkins] CVS: Packages/bsddb3Storage - Full.py:1.4
barry@digicool.com
barry@digicool.com
Thu, 29 Mar 2001 19:20:49 -0500 (EST)
Update of /cvs-repository/Packages/bsddb3Storage
In directory korak:/tmp/cvs-serv13117
Modified Files:
Full.py
Log Message:
With apologies to future readers of this log entry:
Woo boy, lots 'o changes.
This represents my current cut at the updated implementation of full
Berkeley storage. It is as yet untested, it is being checked in
mainly to sync and checkpoint to CVS.
--- Updated File Full.py in package Packages/bsddb3Storage --
--- Full.py 2001/01/09 21:49:39 1.3
+++ Full.py 2001/03/30 00:20:48 1.4
@@ -1,169 +1,522 @@
-from base import Base
+"""Berkeley storage with full undo and versioning support.
+
+See Minimal.py for an implementation of Berkeley storage that does not support
+undo or versioning.
+"""
+
+# $Revision$
+__version__ = '0.1'
+
+import struct
+
+# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
+# http://pybsddb.sourceforge.net
from bsddb3 import db
-from struct import pack, unpack
-import os, tempfile, string, marshal
-from ZODB import POSException, utils
-from marshal import dump, load
-class Full(Base):
-
- def _setupDbs(self):
- # Supports Base framework
- self._index=self._setupDB('current')
- for name in (
- 'pickle', 'record', 'transactions', 'vids', 'versions',
- 'referenceCount', 'pickleReferenceCount',
- ):
- self._setupDB(name)
-
- self._setupDB('currentVersions', flags=db.DB_DUP)
- self._setupDB('transaction_oids', flags=db.DB_DUP)
- self._setupDB('references', flags=db.DB_DUP)
-
- c=self._vids.cursor()
- v=c.get(db.DB_LAST)
- if v: self._vid=utils.U64(v[0])
- else: self._vid=0L
+from ZODB import POSException
+from ZODB import utils
+from ZODB.referencesf import referencesf
+from ZODB import TimeStamp
+
+# BerkeleyBase.BerkeleyBase class provides some common functionality for both
+# the Full and Minimal implementations. It in turn inherits from
+# ZODB.BaseStorage.BaseStorage which itself provides some common storage
+# functionality.
+from BerkeleyBase import BerkeleyBase
+from CommitLog import FullLog
+
+# Flags for transaction status in the transaction metadata table. You can
+# only undo back to the last pack, and any transactions before the pack time
+# get marked with the PROTECTED_TRANSACTION flag. An attempt to undo past a
+# PROTECTED_TRANSACTION will raise an POSException.UndoError. By default,
+# transactions are marked with the UNDOABLE_TRANSACTION status flag.
+UNDOABLE_TRANSACTION = 'Y'
+PROTECTED_TRANSACTION = 'N'
+
+
+
+class InternalInconsistencyError(POSException.POSError, AssertError):
+ """Raised when we detect an internal inconsistency in our tables."""
+
+
+
+class Full(BerkeleyBase):
+ #
+ # Overrides of base class methods
+ #
+ def _setupDBs(self):
+ # Data Type Assumptions:
+ #
+ # object ids (oid) are 8-bytes
+ # object serial numbers are 8-bytes
+ # transaction ids (tid) are 8-bytes
+ # revision ids (revid) are the same as transaction ids, just used in a
+ # different context.
+ # version ids (vid) are 8-bytes
+ # data pickles are of arbitrary length
+ #
+ # Create the tables used to maintain the relevant information. The
+ # full storage needs a bunch of tables. These two are defined by the
+ # base class infrastructure and are shared by the Minimal
+ # implementation.
+ #
+ # serials -- {oid -> serial}
+ # Maps oids to object serial numbers. The serial number is
+ # essentially a timestamp used to determine if conflicts have
+ # arisen, and serial numbers double as transaction ids and object
+ # revision ids. If an attempt is made to store an object with a
+ # serial number that is different than the current serial number
+ # for the object, a ConflictError is raised.
+ #
+ # pickles -- {(oid+revid) -> pickle}
+ # Maps the concrete object referenced by oid+revid to that
+ # object's data pickle.
+ #
+ # These are used only by the Full implementation.
+ #
+ # vids -- {version_string -> vid}
+ # Maps version strings (which are arbitrary) to vids.
+ #
+ # versions -- {vid -> version_string}
+ # Maps vids to version strings.
+ #
+ # currentVersions -- {vid -> [oid]}
+ # Maps vids to the oids of the objects modified in that version
+ # for all current versions (except the 0th version, which is the
+ # non-version).
+ #
+ # metadata -- {oid+revid -> vid+nvrevid+lrevid+previd}
+ # Maps oid+revid to object metadata. This mapping is used to find
+ # other information about a particular concrete object revision.
+ # Essentially it stitches all the other pieces together.
+ #
+ # vid is the version id for the concrete object revision, and will
+ # be zero if the object isn't living in a version.
+ #
+ # nvrevid is the revision id pointing to the most current
+ # non-version concrete object revision. So, if the object is
+ # living in a version and that version is aborted, the nvrevid
+ # points to the object revision that will soon be restored.
+ # nvrevid will be zero if the object was never modified in a
+ # version.
+ #
+ # lrevid is the revision id pointing to object revision's pickle
+ # state (I think of it as the "live revision id" since it's the
+ # state that gives life to the concrete object described by this
+ # metadata record).
+ #
+ # prevrevid is the revision id pointing to the previous state of
+ # the object. This is used for undo.
+ #
+ # txnMetadata -- {tid -> status+userlen+desclen+user+desc+ext}
+ # Maps tids to metadata about a transaction.
+ #
+ # Status is a 1-character status flag, which is used by the undo
+ # mechanism, and has the following values (see constants above):
+ # 'N' -- This transaction is "pack protected". You can only
+ # undo back to the last pack, and any transactions
+ # before the pack time get marked with this flag.
+ # 'Y' -- It is okay to undo past this transaction.
+ #
+ # userlen is the length in characters of the `user' field as an
+ # 8-byte unsigned long integer
+ # desclen is the length in characters of the `desc' field as an
+ # 8-byte unsigned long integer
+ # user is the user information passed to tpc_finish()
+ # desc is the description info passed to tpc_finish()
+ # ext is the extra info passed to tpc_finish(). It is a
+ # dictionary that we get already pickled by BaseStorage.
+ #
+ # txnOids -- {tid -> [oid]}
+ # Maps transaction ids to the oids of the objects modified by the
+ # transaction.
+ #
+ # refcounts -- {oid -> count}
+ # Maps objects to their reference counts.
+ #
+ # references -- {oid+tid -> [oid]}
+ # Maps the concrete object referenced by oid+tid to the list of
+ # objects it references. This is essentially a cache, since we
+ # could look up the pickle associated with oid+tid and "sniff" the
+ # pickle for its references.
+ #
+ # pickleRefcounts -- {oid+tid -> count}
+ # Maps the concrete object referenced by oid+tid to the reference
+ # count of its pickle.
+ #
+ # Tables common to the base framework
+ self._serials = self._setupDB('serials')
+ self._pickles = self._setupDB('pickles')
+ # These are specific to the full implementation
+ self._vids = self._setupDB('vids')
+ self._versions = self._setupDB('versions')
+ self._currentVersions = self._setupDB('currentVersions', db.DB_DUP)
+ self._metadata = self._setupDB('metadata')
+ self._txnMetadata = self._setupDB('txnMetadata')
+ self._txnOids = self._setupDB('txnOids', db.DB_DUP)
+ self._refcounts = self._setupDB('refcounts')
+ self._references = self._setupDB('references', db.DB_DUP)
+ self._pickleRefcounts = self._setupDB('pickleRefcounts')
+ # Initialize our cache of the next available version id.
+ record = self._versions.cursor().last()
+ if record:
+ # Convert to a Python long integer. Note that cursor.last()
+ # returns key/value, and we want the key (which for the _version
+ # table is is the vid).
+ self.__nextvid = utils.U64(vid[0])
+ else:
+ self.__nextvid = 0L
+ def close(self):
+ self._serials.close()
+ self._pickles.close()
+ self._vids.close()
+ self._versions.close()
+ self._currentVersions.close()
+ self._metadata.close()
+ self._txnMetadata.close()
+ self._txnOids.close()
+ self._refcounts.close()
+ self._references.close()
+ self._pickleReferenceCount.close()
+ BerkeleyBase.close(self)
+
+ def _begin(self, tid, u, d, e):
+ # Begin the current transaction. Currently, this just makes sure that
+ # the commit log is in the proper state.
+ if self._commitlog is None:
+ # JF: Chris was getting some weird errors / bizarre behavior from
+ # Berkeley when using an existing directory or having non-BSDDB
+ # files in that directory.
+ self._commitlog = FullLog(dir=self._env.db_home)
+ self._commitlog.start()
+
+ def _vote(self, transaction):
+ # From here on out, we promise to commit all the registered changes,
+ # so rewind and put our commit log in the PROMISED state.
+ self._commitlog.promise()
- def _dbnames(self):
- # Supports Base framework
- return ('current', 'pickle', 'record',
- 'transactions', 'transaction_oids',
- 'vids', 'versions', 'currentVersions',
- 'referenceCount', 'pickleReferenceCount',
- 'references',
- )
+ def _finish(self, tid, u, d, e):
+ # This is called from the storage interface's tpc_finish() method.
+ # Its responsibilities are to finish the transaction with the
+ # underlying database.
+ #
+ # We have a problem here because tpc_finish() is not supposed to raise
+ # any exceptions. However because finishing with the backend database
+ # /can/ cause exceptions, they may be thrown from here as well. If
+ # that happens, we abort the transaction.
+ #
+ # Because of the locking semantics issue described above, finishing
+ # the transaction in this case involves:
+ # - starting a transaction with Berkeley DB
+ # - replaying our commit log for object updates
+ # - storing those updates in BSDDB
+ # - committing those changes to BSDDB
+ #
+ # Once the changes are committed successfully to BSDDB, we're done
+ # with our log file.
+ #
+ # tid is the transaction id
+ # u is the user associated with the transaction
+ # d is the description of the transaction
+ # e is the transaction extension
+ zero = '\0'*8
+ txn = self._env.txn_begin()
+ try:
+ # Update the transaction metadata
+ userlen = len(u)
+ desclen = len(d)
+ lengths = struct.pack('>II', userlen, desclen)
+ # BAW: it's slightly faster to use '%s%s%s%s%s' % ()
+ # concatentation than string adds, but that will be dependent on
+ # string length. Those are both faster than using %c as first in
+ # format string (even though we know the first one is a
+ # character), and those are faster still than string.join().
+ self._txnMetadata.put(tid,
+ UNDOABLE_TRANSACTION + lengths + u + d + e,
+ txn=txn)
+ while 1:
+ rec = self._commitlog.next_object()
+ if rec is None:
+ break
+ op, data = rec
+ if op == 'o':
+ # This is a `versioned' object record. Information about
+ # this object must be stored in the pickle table, the
+ # object metadata table, the currentVersions tables , and
+ # the transactions->oid table.
+ oid, vid, nvrevid, lrevid, pickle, prevrevid = data
+ key = oid + tid
+ if pickle:
+ # This was the result of a store() call which gives us
+ # a brand new pickle, so we need to update the pickles
+ # table. The lrevid will be empty, and we make it the
+ # tid of this transaction
+ #
+ # Otherwise, this was the result of a commitVersion()
+ # or abortVersion() call, essentially moving the
+ # object to a new version. We don't need to update
+ # the pickle table because we aren't creating a new
+ # pickle.
+ self._pickles.put(key, pickle, txn=txn)
+ lrevid = tid
+ # Update the metadata table
+ self._metadata.put(key, vid+nvrevid+tid+prevrevid, txn=txn)
+ # If we're in a real version, update this table too
+ if vid <> zero:
+ self._currentVersions.put(vid, oid, txn=txn)
+ self._serials.put(oid, tid, txn=txn)
+ self._txnOids.put(tid, oid, txn=txn)
+ # Boost the refcount of all the objects referred to by
+ # this pickle. referencesf() scans a pickle and returns
+ # the list of objects referenced by the pickle. BAW: In
+ # Zope 2.3.1, which we need to target, the signature of
+ # this function requires an empty list, but it returns
+ # that list. In future versions of Zope, there's a
+ # default argument for that.
+ for roid in referencesf(pickle, []):
+ refcount = self._refcounts.get(roid, zero, txn=txn)
+ refcount = utils.p64(utils.U64(refcount) + 1)
+ self._refcounts.put(roid, refcount, txn=txn)
+ # Update the pickle's reference count. Remember, the
+ # refcount is stored as a string, so we have to do the
+ # string->long->string dance.
+ refcount = self._pickleReferenceCount.get(key, zero,
+ txn=txn)
+ refcount = utils.p64(utils.U64(refcount) + 1)
+ self._pickleReferenceCount.put(key, refcount, txn=txn)
+ elif op == 'v':
+ # This is a "create-a-version" record
+ version, vid = data
+ self._versions.put(vid, version, txn=txn)
+ self._vids.put(version, vid, txn=txn)
+ elif op == 'd':
+ # This is a "delete-a-version" record
+ vid = data[0]
+ self._currentVersions.delete(vid, txn=txn)
+ except:
+ # If any errors whatsoever occurred, abort the transaction with
+ # Berkeley, leave the commit log file in the PROMISED state (since
+ # its changes were never committed), and re-raise the exception.
+ txn.abort()
+ raise
+ else:
+ # Everything is hunky-dory. Commit the Berkeley transaction, and
+ # reset the commit log for the next transaction.
+ txn.commit()
+ self._closelog()
- def abortVersion(self, src, transaction):
-
+ #
+ # Do some things in a version
+ #
+
+ def abortVersion(self, version, transaction):
+ # Abort the version, but retain enough information to make the abort
+ # undoable.
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
- c=0
+ zero = '\0'*8
+ c = None # the currentVersions cursor
self._lock_acquire()
try:
- newtid=self._serial
- vid=self._vids[src]
-
- oids=[]; save_oid=oids.append
- c=self._currentVersions.cursor()
- i=c.set(vid)
- get=c.get
- current=self._current
- records=self._record
- storeNV=self._tmp.storeNV
- zero="\0\0\0\0\0\0\0\0"
- while i:
- v, oid = i
-
- # Get current record data
- tid=current[oid]
- record=records[oid+tid]
- rvid, nv, data = unpack("8s8s8s", record[:24])
- if rvid != vid: raise "vid inconsistent with currentVersions"
- if nv == zero:
- # This object was created in the version, so there's
- # nothing to do. We can skip it.
+ # The transaction id for this abort
+ tid = self._serial
+ # Let KeyErrors percolate up. This is how we ensure that the
+ # version we're aborting is not the empty string.
+ vid = self._vids[version]
+ # We need to keep track of the oids that are affected by the abort
+ # so that we can return it to the connection, which must
+ # invalidate the objects so they can be reloaded.
+ oids = []
+ c = self._currentVersions.cursor()
+ rec = c.set(vid)
+ # Now cruise through all the records for this version, looking for
+ # objects modified in this version, but which were not created in
+ # this version. For each of these objects, we're going to want to
+ # write a log entry that will cause the non-version revision of
+ # the object to become current. This preserves the version
+ # information for undo.
+ while rec:
+ oid = rec[1] # ignore the key
+ revid = self._serials[oid]
+ meta = self._metadata[oid+revid]
+ curvid, nvrevid = struct.unpack('8s8s8s', meta[:16])
+ # Make sure that the vid in the metadata record is the same as
+ # the vid we sucked out of the vids table, otherwise we've got
+ # an internal database inconsistency.
+ if curvid <> vid:
+ raise InternalInconsistencyError
+ if nvrevid == zero:
+ # This object was created in the version, so we don't need
+ # to do anything about it.
continue
-
- # Get non-version data
- record=records[oid+nv]
- rvid, nv, data = unpack("8s8s8s", record[:24])
- if rvid: raise "expected non-version data"
-
- storeNV(oid, data, tid)
-
- save_oid(oid)
-
- i=get(db.DB_NEXT_DUP)
-
- self._tmp.versionDiscard(vid)
-
+ # Get the non-version data for the object
+ nvmeta = self._metadata[oid+nvrevid]
+ curvid, nvrevid, lrevid = unpack('8s8s8s', nvmeta[:24])
+ # We expect curvid to be zero because we just got the
+ # non-version entry.
+ if curvid <> zero:
+ raise InternalInconsistencyError
+ # Write the object id, live revision id, and this transaction
+ # id (which serves as the previous revid) to the commit log.
+ self._commitlog.write_nonversion_object(oid, lrevid, tid)
+ # Remember to return the oid...
+ oids.append(oid)
+ # ...and get the next record for this vid
+ rec = c.next_dup()
+ # We've now processed all the objects on the discarded version, so
+ # write this to the commit log and return the list of oids to
+ # invalidate.
+ self._commitlog.write_discard_version(vid)
return oids
finally:
- if c != 0: c.close()
+ if c:
+ c.close()
self._lock_release()
def commitVersion(self, src, dest, transaction):
-
+ # Commit a source version `src' to a destination version `dest'. It's
+ # perfectly valid to move an object from one version to another. src
+ # and dest are version strings, and if we're committing to a
+ # non-version, dest will be empty.
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
- c=0
+ zero = '\0'*8
+ c = None # the currentVersions cursor
self._lock_acquire()
try:
- newtid=self._serial
- vid=self._vids[src]
-
- oids=[]; save_oid=oids.append
- c=self._currentVersions.cursor()
- i=c.set(vid)
- get=c.get
- current=self._current
- records=self._record
- store=self._tmp.store
- zero="\0\0\0\0\0\0\0\0"
-
- try: dvid=self._vids[dest]
- except KeyError:
- dvid=self._newvid()
- self._tmp.newVersion(version, vid)
-
- while i:
- v, oid = i
-
- # Get current record data
- tid=current[oid]
- record=records[oid+tid]
- rvid, nv, data = unpack("8s8s8s", record[:24])
- if rvid != vid: raise "vid inconsistent with currentVersions"
-
- if not dest: nv=zero
- store(oid,dvid,nv,data,'',tid)
-
- save_oid(oid)
-
- i=get(db.DB_NEXT_DUP)
-
- self._tmp.versionDiscard(vid)
-
+ # The transaction id for this commit
+ tid = self._serial
+ # Get the version ids associated with the source and destination
+ # version strings.
+ svid = self._vids[src]
+ if not dest:
+ dvid = zero
+ else:
+ # Find the vid for the destination version, or create one if
+ # necessary.
+ dvid = self.__findcreatevid(dest)
+ # Keep track of the oids affected by this commit.
+ oids = []
+ c = self._currentVersions.cursor()
+ rec = c.set(vid)
+ # Now cruise through all the records for this version, writing to
+ # the commit log all the objects changed in this version.
+ while rec:
+ oid = rec[1] # ignore the key
+ revid = self._serials[oid]
+ meta = self._metadata[oid+revid]
+ curvid, nvrevid, lrevid = struct.unpack('8s8s8s', meta[:24])
+ # Our database better be consistent.
+ if curvid <> svid:
+ raise InternalInconsistencyError
+ # If we're committing to a non-version, then the non-version
+ # revision id ought to be zero also, regardless of what it was
+ # for the source version.
+ if not dest:
+ nvrevid = zero
+ self._commitlog.write_moved_object(
+ oid, dvid, nvrevid, lrevid, tid)
+ # Remember to return the oid...
+ oids.append(oid)
+ # ...and get the next record for this vid
+ rec = c.next_dup()
+ # Now that we're done, we can discard this version
+ self._commitlog.write_discard_version(vid)
return oids
finally:
- if c != 0: c.close()
+ if c:
+ c.close()
self._lock_release()
- def load(self, oid, version):
+ def modifiedInVersion(self, oid):
+ # Return the version string of the version that contains the most
+ # recent change to the object. The empty string means the change
+ # isn't in a version.
self._lock_acquire()
try:
- t=self._index[oid]
- vid, nv, data = unpack(">8s8s8s", self._record[oid+t][:24])
- if vid == '\0\0\0\0\0\0\0\0' or self._versions[vid]==version:
- return self._pickle[oid+data], t
- t=nv
- data = self._record[oid+t][16:24]
- return self._pickle[oid+data], t
- finally: self._lock_release()
+ # Let KeyErrors percolate up
+ revid = self._serials[oid]
+ vid = self._metadata[oid+revid][:8]
+ if vid == '\0'*8:
+ # Not in a version
+ return ''
+ return self._versions[vid]
+ finally:
+ self._lock_release()
+
+ #
+ # Public storage interface
+ #
+
+ def load(self, oid, version):
+ # BAW: in the face of application level conflict resolution, it's
+ # /possible/ to load an object that is sitting in the commit log.
+ # That's bogus though because there's no way to know what to return;
+ # i.e. returning the not-yet-committed state isn't right (because you
+ # don't know if the txn will be committed or aborted), and returning
+ # the last committed state doesn't help. So, don't do this!
+ #
+ # The solution is, in the Connection object wait to reload the object
+ # until the transaction has been committed. Still, we don't check for
+ # this condition, although maybe we should.
+ self._lock_acquire()
+ try:
+ # Get the current revid for the object. As per the protocol, let
+ # any KeyErrors percolate up.
+ revid = self._serials[oid]
+ # Get the metadata associated with this revision of the object.
+ # All we really need is the vid, the non-version revid and the
+ # pickle pointer revid.
+ rec = self._metadata[oid+revid]
+ vid, nvrevid, lrevid = struct.unpack('>8s8s8s', rec[:24])
+ # If the object isn't living in a version, or if the version the
+ # object is living in is equal to the version that's being
+ # requested, then we can simply return the pickle referenced by
+ # the revid.
+ if vid == '\0'*8 or self._versions[vid] == version:
+ return self._pickle[oid+lrevid], revid
+ # Otherwise, we recognize that an object cannot be stored in more
+ # than one version at a time (although this may change if/when
+ # "Unlocked" versions are added). So we return the non-version
+ # revision of the object. BAW: should we assert that version is
+ # empty in this case?
+ lrevid = self._metadata[oid+nvrevid][16:24]
+ return self._pickle[oid+lrevid], nvrevid
+ finally:
+ self._lock_release()
def loadSerial(self, oid, serial):
+ # Return the revision of the object with the given serial number.
self._lock_acquire()
try:
- data = self._record[oid+serial][16:24]
- return self._pickle[oid+data]
- finally: self._lock_release()
+ # Get the pointer to the pickle (i.e. live revid, or lrevid)
+ # corresponding to the oid and the supplied serial
+ # a.k.a. revision.
+ lrevid = self._metadata[oid+serial][16:24]
+ return self._pickle[oid+lrevid]
+ finally:
+ self._lock_release()
- def modifiedInVersion(self, oid):
- self._lock_acquire()
- try:
- tid=self._current[oid]
- vid=self._record[oid+tid][:8]
- if vid == '\0\0\0\0\0\0\0\0': return ''
- return self._versions[vid]
- finally: self._lock_release()
-
- def _newvid(self):
- self._vid=self._vid+1
- return utils.p64(self._vid)
+ def __findcreatevid(self, version):
+ # Get the vid associated with a version string, or create one if there
+ # is no vid for the version.
+ #
+ # First we look for the version in the Berkeley table. If not
+ # present, then we look in the commit log to see if a new version
+ # creation is pending. If still missing, then create the new version
+ # and add it to the commit log.
+ vid = self._vids.get(version)
+ if vid is None:
+ vid = self._commitlog.get_vid(version)
+ if vid is None:
+ self.__nextvid = self.__nextvid + 1
+ # Convert the int/long version ID into an 8-byte string
+ vid = utils.p64(self.__nextvid)
+ self._commitlog.write_new_version(version, vid)
+ return vid
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
@@ -171,322 +524,305 @@
self._lock_acquire()
try:
+ # Check for conflict errors. JF says: under some circumstances,
+ # it is possible that we'll get two stores for the same object in
+ # a single transaction. It's not clear though under what
+ # situations that can occur or what the semantics ought to be.
+ # For now, we'll assume this doesn't happen.
+ oserial = orevid = self._serials.get(oid)
+ if oserial is None:
+ # There's never been a previous revision of this object, so
+ # set its non-version revid to zero.
+ nvrevid = '\0'*8
+ elif serial <> oserial:
+ # The object exists in the database, but the serial number
+ # given in the call is not the same as the last stored serial
+ # number. Raise a ConflictError.
+ raise POSException.ConflictError(
+ 'serial number mismatch (was: %s, has: %s)' %
+ (oserial, utils.U64(serial)))
+ # Do we already know about this version? If not, we need to
+ # record the fact that a new version is being created. `version'
+ # will be the empty string when the transaction is storing on the
+ # non-version revision of the object.
if version:
- try: vid=self._vids[version]
- except:
- vid=self._newvid()
- self._tmp.newVersion(version, vid)
-
- else:
- vid=nv='\0\0\0\0\0\0\0\0'
-
- if self._index.has_key(oid):
- old=self._index[oid]
- if serial != old: raise POSException.ConflictError
- ovid, nv = unpack(">8s8s", self._record[oid+old][:16])
-
- if ovid != vid:
- raise POSException.VersionLockError, (`oid`, ovid)
-
- if version and ovid == '\0\0\0\0\0\0\0\0': nv=old
+ vid = self.__findcreatevid(version)
else:
- nv='\0\0\0\0\0\0\0\0'
-
- self._tmp.store(oid, vid, nv, '', data, old)
-
- finally: self._lock_release()
-
+ # vid 0 means no explicit version
+ vid = '\0'*8
+ # A VersionLockError occurs when a particular object is being
+ # stored on a version different than the last version it was
+ # previously stored on (as long as the previous version wasn't
+ # zero, of course).
+ #
+ # Get the old version, which only makes sense if there was a
+ # previously stored revision of the object.
+ if orevid:
+ rec = self._metadata[oid+orevid]
+ ovid, onvrevid = struct.unpack('>8s8s', rec[:16])
+ if ovid == '\0'*8:
+ # The old revision's vid was zero any version is okay.
+ # But if we're storing this on a version, then the
+ # non-version revid will be the previous revid for the
+ # object.
+ if version:
+ nvrevid = orevid
+ elif ovid <> vid:
+ # The old revision was on a version different than the
+ # current version. That's a no no.
+ raise POSException.VersionLockError(
+ 'version mismatch for object %s (was: %s, got: %s)' %
+ (oid, ovid, vid))
+ # Record the update to this object in the commit log.
+ self._commitlog.write_object(oid, vid, nvrevid, data, oserial)
+ finally:
+ self._lock_release()
+ # Return our cached serial number for the object.
return serial
-
- def supportsUndo(self): return 1
- def supportsVersions(self): return 1
- def _finish(self, tid, u, d, e):
- txn=self._env.txn_begin()
- try:
- tmp=self._tmp
- ltmp=tmp.tell()
- if not ltmp: return
- load=marshal.load
- tid=self._serial
- records_put=self._records.put
- pickles_put=self._pickle.put
- current_put=self._current.put
- transaction_oids_put=self._transaction_oids.put
- currentVersions_put=self._currentVersions.put
- l=pack(">HI",len(u), len(d))
- self._transactions.put(tid, ' '+l+u+d+e, txn)
- while ltmp:
- try: op, arg = load(tmp)
- except EOFError:
- if tmp.tell()==ltmp: ltmp=0
- else: raise
- else:
- if op=='s':
- # store data
- oid, vid, nv, back, data, pre = arg
- key=oid+tid
- if data:
- pickles_pud(key, data, txn)
- data=tid
- else:
- data=back
- records_put(key, vid+nv+data+pre, txn)
- if vid != '/0/0/0/0/0/0/0/0':
- versions_put(vid, oid, txn)
- current_put(oid, tid, txn)
- transaction_oids_put(tid, oid, txn)
- elif op='d':
- # discard a version (part of version commit and abort)
- self._currentVersions.delete(arg, txn)
- elif op='v':
- # save a version definition
- vid, version = arg
- self._versions.put(vid, version, txn)
- self._vids.put(version, vid, txn)
-
- except:
- txn.abort()
- raise
- else:
- txn.commit()
+ def _decref(self, oid, lrevid, txn):
+ # Decref the reference count of the pickle pointed to by oid+lrevid.
+ # If the reference count goes to zero, we can garbage collect the
+ # pickle, and decref all the object pointed to by the pickle (with of
+ # course, cascading garbage collection).
+ key = oid + lrevid
+ refcount = self._pickleReferenceCount.get(key, txn=txn)
+ refcount = utils.U64(refcount) - 1
+ if refcount > 0:
+ self._pickleReferenceCount.put(key, utils.p64(refcount), txn=txn)
+ return
+ # The refcount of this pickle has gone to zero, so we need to garbage
+ # collect it, and decref all the objects it points to.
+ self._pickleReferenceCount.delete(key, txn=txn)
+ pickle = self._pickles.get(key, txn=txn)
+ collectedOids = []
+ for roid in referencesf(pickle, []):
+ refcount = self._refcounts.get(roid, txn=txn)
+ refcount = utils.U64(refcount) - 1
+ if refcount > 0:
+ self._refcounts.put(roid, utils.p64(refcount), txn=txn)
+ else:
+ # This sucker gets garbage collected itself.
+ self._refcounts.delete(roid, txn=txn)
+ collectedOids.append(roid)
+ # Now, for all the objects whose refcounts just went to zero, we need
+ # to recursively decref their pickles.
+ for roid in collectedOids:
+ serial = self._serials.get(roid, txn=txn)
+ # The pickle for this metadata record is pointed to by lrevid
+ lrevid = self._metadata.get(roid+serial, txn=txn)[16:24]
+ # Now recurse...
+ self._decref(roid, lrevid, txn)
- def _undoable(self, txn):
- txn.abort()
- raise POSException.UndoError, 'Undoable transaction'
-
def undo(self, tid):
- self._lock_acquire()
- try:
- status = self._transactions[tid][:1]
- if status == 'p':
- raise POSException.UndoError, 'Undoable transaction'
-
- txn=self._env.txn_begin()
-
- current=self._current
- record=self._record
- pickle=self._pickle
- currentVersions=self._currentVersions
- unpack=struct.unpack
-
+ # Attempt to undo transaction. NOTE: the current storage interface
+ # documentation says that this method takes a third argument, which is
+ # a limit on the number of oids to return. JF says, "Let's get rid of
+ # the third argument."
+ c = None # txnOids cursor
+ oids = []
+ self._lock_acquire()
+ try:
+ # Make sure the transaction is undoable. If this transaction
+ # occurred earlier than a pack operation, it is no longer
+ # undoable. The status flag indicates its undoability.
+ status = self._txnMetadata[tid][1]
+ if status == PROTECTED_TRANSACTION:
+ raise POSException.UndoError, 'Transaction cannot be undone'
+ # Create the cursor and begin the transaction
+ zero = '\0'*8
+ txn = self._env.txn_begin()
+ c = self._txnOids.cursor()
try:
- for oid in dups(self._transaction_oids raise POSException.UndoError, 'Undoable transaction'
-
- txn=self._env.txn_begin()
-
- current=self._current
- record=self._record
- pickle=self._pickle
- currentVersions=self._currentVersions
- unpack=struct.unpack
-
- try:
- for oid in dups(self._transaction_oids, tid, txn):
- if current.get(oid, txn) != tid: self._undoable(txn)
- key=oid+tid
- vid, nv, data, pre = unpack("8s8s8s8s",
- record.get(key, txn))
- record.delete(key, txn)
- if data==tid: pickle.delete(key, txn)
- if pre == '\0\0\0\0\0\0\0\0':
- current.delete(oid, txn)
+ rec = c.set(tid)
+ while rec:
+ oid = rec[1]
+ oids.append(oid)
+ # Make sure the tid is current
+ if self._serials.get(oid, txn=txn) <> tid:
+ # BAW: we can only undo the most current revision of
+ # the object???
+ raise POSException.UndoError(
+ "Not object's current transaction")
+ key = oid + tid
+ # Get the metadata for this object revision, and then
+ # delete the metadata record.
+ vid, nvrevid, lrevid, prevrevid = struct.unpack(
+ '8s8s8s8s', self._metadata.get(key, txn=txn))
+ self._metadata.delete(key, txn=txn)
+ # Decref the reference count of the pickle that we're
+ # pointing to and garbage collect it if the refcount falls
+ # to zero.
+ self._decref(oid, lrevid, txn)
+ # If the prevrevid is zero, then we've just undone the
+ # creation of this object, so we can get rid of its
+ # serials record. Otherwise, update the serials record to
+ # point to the previous revision of the object.
+ if prevrevid == zero:
+ self._serials.delete(oid, txn=txn)
else:
- current.put(oid, pre, txn)
- try: pvid=record.get(oid+pre, txn)
- except KeyError: self._undoable(txn)
- if pvid != vid:
- if vid != '\0\0\0\0\0\0\0\0':
- del_dup(currentVersions, vid, oid, txn)
- if pvid != '\0\0\0\0\0\0\0\0':
- currentVersions.put(pvid, oid, txn)
-
- self._transactions.delete(tid, txn)
- self._transaction_oids.delete(tid, txn)
+ self._serials.put(oid, prevrevid, txn=txn)
+ prec = self._metadata.get(oid+prevrevid, txn=txn)
+ # BAW: what does it mean if the metadata for the
+ # previous revision of the object doesn't exist???
+ if not prec:
+ raise POSException.UndoError(
+ "No previous revision for object")
+ pvid = prec[:8]
+ # If the version for the previous revision of the
+ # object is different than the current revision of the
+ # object, then we're undoing past a version creation,
+ # so we can delete the entry for this vid/oid pair in
+ # the currentVersions table.
+ if pvid <> vid:
+ # Don't delete the non-version revision of the
+ # object.
+ if vid <> zero:
+ tmpc = self._currentVersions.cursor(txn=txn)
+ try:
+ rec = tmpc.get_both(vid, oid)
+ if rec:
+ tmpc.delete()
+ finally:
+ tmpc.close()
+ if pvid <> zero:
+ # Make the previous version the current one
+ self._currentVersions.put(pvid, oid, txn=txn)
+ # Finally, delete the transaction metadata associated with
+ # the transaction we just undid.
+ self._txnMetadata.delete(tid, txn=txn)
+ self._txnOids.delete(tid, txn=txn)
except:
txn.abort()
raise
else:
txn.commit()
-
- finally: self._lock_release()
+ return oids
+ finally:
+ if c:
+ c.close()
+ self._lock_release()
def undoLog(self, first, last, filter=None):
- self._lock_acquire()
- try:
- c=self._transactions.cursor()
- try:
- i=0; r=[]; a=r.append
- data=c.get(db.DB_LAST)
- while data and i < last:
- tid, data = data
- status = data[:1]
- if status == 'p': break
- luser, ldesc = unpack("HI", data[1:17])
- user=data[17:luser+17]
- desc=data[luser+17:luser+17+ldesc]
- ext=data[luser+17+ldesc:]
-
- data={'id': tid,
- 'time': TimeStamp(tid).timeTime(),
- 'user_name': user or '',
- 'description': desc or '',
- }
- if ext:
- try:
- ext=loads(ext)
- data.update(ext)
- except: pass
-
- if filter is None or filter(data):
- if i >= first: a(data)
- i=i+1
-
- data=c.get(db.DB_PREV)
-
- return r
-
- finally: c.close()
- finally: self._lock_release()
+ # Get a list of transaction ids that can be undone, based on the
+ # determination of the filter. filter is a function which takes a
+ # transaction id and returns true or false.
+ #
+ # Note that this method has been deprecated by undoInfo() which itself
+ # has some flaws, but is the best we have now. We don't actually need
+ # to implement undoInfo() because BaseStorage (which we eventually
+ # inherit from) mixes in the UndoLogCompatible class which provides an
+ # implementation written in terms of undoLog().
+ #
+ c = None # tnxMetadata cursor
+ txnDescriptions = [] # the return value
+ i = 0 # first <= i < last
+ self._lock_acquire()
+ try:
+ c = self._txnMetadata.cursor()
+ # We start at the last transaction and scan backwards because we
+ # can stop early if we find a transaction that is earlier than a
+ # pack. We still have the potential to scan through all the
+ # transactions.
+ rec = c.get_last()
+ while rec and i < last:
+ tid, data = rec
+ status = data[0]
+ if status == PROTECTED_TRANSACTION:
+ break
+ userlen, desclen = struct.unpack('>II', data[1:17])
+ user = data[17:17+userlen]
+ desc = data[17+userlen:17+userlen+desclen]
+ ext = data[17+userlen+desclen:]
+ # Create a dictionary for the TransactionDescription
+ txndesc = {'id' : tid,
+ 'time' : TimeStamp(tid).timeTime(),
+ 'user_name' : user,
+ 'description': desc,
+ }
+ # The extension stuff is a picklable mapping, so if we can
+ # unpickle it, we update the TransactionDescription dictionary
+ # with that data. BAW: The bare except is moderately
+ # disgusting, but I'm too lazy to figure out what exceptions
+ # could actually be raised here...
+ if ext:
+ try:
+ txndesc.update(pickle.loads(ext))
+ except:
+ pass
+ # Now call the filter to see if this transaction should be
+ # added to the return list...
+ if filter is None or filter(txndesc):
+ # ...and see if this is within the requested ordinals
+ if i >= first:
+ txnDescriptions.append(txndesc)
+ i = i + 1
+ # And get the previous record
+ rec = c.get_prev()
+ return txnDescriptions
+ finally:
+ if c:
+ c.close()
+ self._lock_release()
def versionEmpty(self, version):
+ # Return true if version is empty.
self._lock_acquire()
try:
- try: self._currentVersions[self._vids[version]]
- except KeyError: return 1
- else: return 0
- finally: self._lock_release()
+ # Let these KeyError exceptions percolate up
+ vid = self._vids[version]
+ # But catch these, because it means the version is empty
+ if self._currentVersions.has_key(vid):
+ return 1
+ else:
+ return 0
+ finally:
+ self._lock_release()
def versions(self, max=None):
+ # Return the list of current versions, as strings, up to the maximum
+ # requested.
self._lock_acquire()
+ c = None
try:
- c=self._currentVersions.cursor()
- try:
- try: data=c.get(db.DB_NEXT_NODUP)
- except: return ()
- r=[]; a=r.append
- while data:
- a(data[0])
- data=c.get(db.DB_NEXT_NODUP)
-
- return r
-
- finally: c.close()
- finally: self._lock_release()
+ c = self._currentVersions.cursor()
+ rec = c.first()
+ retval = []
+ while rec and (max is None or max > 0):
+ # currentVersions maps vids to [oid]'s so dig the key out of
+ # the returned record and look the vid up in the
+ # vids->versions table.
+ retval.append(self._versions[rec[0]])
+ # Since currentVersions has duplicates (i.e. multiple vid keys
+ # with different oids), get the next record that has a
+ # different key than the current one.
+ rec = c.next_nodup()
+ if max is not None:
+ max = max - 1
+ return retval
+ finally:
+ if c:
+ c.close()
+ self._lock_release()
def history(self, oid, version=None, length=1, filter=None):
+ # FIXME
self._lock_acquire()
try:
tid=self._current[oid]
-
-
finally: self._lock_release()
def pack(self, t, referencesf):
-
+ # FIXME
self._lock_acquire()
try:
-
+ pass
finally: self._lock_release()
+ # Other interface assertions
+ def supportsUndo(self):
+ return 1
-class dups:
- """Iterator for duplicate-record databases"
-
- def __init__(self, db, key, txn=0):
- if txn==0:
- c=db.cursor()
- else:
- c=db.cursor(txn)
- self._v=c.set(key)
- self._g=c.get
- self._i=0
-
- def __getitem__(self, index):
- i=self._i
- if index==i: return self._v
- if index < i or i < 0: raise IndexError, index
- while i < index:
- v=self._g(db.DB_NEXT_DUP)
- if v:
- i=i+1
- else:
- self._i=-1
- raise IndexError, index
-
- self._i=i
- self._v=v
- return v
-
-def del_dup(database, key, value, txn):
- c=database.cursor(txn)
- try:
- c.getBoth(key, value)
- c.delete()
- finally:
- c.close()
-
-
-class Log:
-
- def __init__(self, file):
- self._file=file
- file.seek(0)
- h=file.read(5)
- size=0
- if len(h) == 5:
- state=h[0]
- if state=='c':
- size=unpack(">i", h[1:])
- else:
- state='s'
- file.seek(0)
- file.write('s\0\0\0\0')
-
- self._state=state
- self._size=size
-
-
- def clear(self):
- if self._state=='p':
- raise "Can't clear state with uncommitted promised log"
- self._file.seek(0)
- self._file.write('s')
- self._size=0
-
- def promise(self):
- self._state='p'
- size=self._file.tell()-5
- self._file.seek(0)
- self._file.write('p'+pack(">i", size))
- self._size=size
-
- def commit(self):
- file=self._file
- l=file.tell()-1
- if l:
- self._file.seek(0)
- self._file.write('c')
- self._state='c'
-
-
- def store(self, oid, vid, nv, dataptr, pickle, previous,
- dump=marshal.dump):
- dump(('s',(oid,vid,nv,data,pickle,previous)), self._file)
-
- def storeNV(self, oid, data, tid,
- dump=marshal.dump, zero='\0\0\0\0\0\0\0\0'):
- dump(('s',(oid,zero,zero,data,'',tid)), self._file)
-
-
- def versionDiscard(self, vid,
- dump=marshal.dump):
- dump(('d',(vid)), self._file)
-
- def newVersion(self, version, vid,
- dump=marshal.dump):
- dump(('v',(version, vid)), self._file)
-
-
-
+ def supportsVersions(self):
+ return 1