[Zodb-checkins] SVN: ZODB/trunk/src/ZODB/ Reimplemented MappingStorage to be more full featured, with a cleaner
Jim Fulton
jim at zope.com
Sat Oct 25 20:36:37 EDT 2008
Log message for revision 92565:
Reimplemented MappingStorage to be more full featured, with a cleaner
and more instructive implementation.
Changed:
U ZODB/trunk/src/ZODB/MappingStorage.py
U ZODB/trunk/src/ZODB/tests/testMappingStorage.py
-=-
Modified: ZODB/trunk/src/ZODB/MappingStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/MappingStorage.py 2008-10-26 00:36:28 UTC (rev 92564)
+++ ZODB/trunk/src/ZODB/MappingStorage.py 2008-10-26 00:36:36 UTC (rev 92565)
@@ -1,6 +1,6 @@
##############################################################################
#
-# Copyright (c) 2001, 2002, 2003 Zope Corporation and Contributors.
+# Copyright (c) Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
@@ -11,135 +11,346 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
-"""Very Simple Mapping ZODB storage
+"""A simple in-memory mapping-based ZODB storage
-The Mapping storage provides an extremely simple storage implementation that
-doesn't provide undo or version support.
+This storage provides an example implementation of a fairly full
+storage without distracting storage details.
+"""
-It is meant to illustrate the simplest possible storage.
+import BTrees
+import cPickle
+import time
+import threading
+import ZODB.interfaces
+import ZODB.POSException
+import ZODB.TimeStamp
+import ZODB.utils
+import zope.interface
-The Mapping storage uses a single data structure to map object ids to data.
-"""
+class MappingStorage:
+
+ zope.interface.implements(
+ ZODB.interfaces.IStorage,
+ ZODB.interfaces.IStorageIteration,
+ )
-import ZODB.BaseStorage
-from ZODB.utils import u64, z64
-from ZODB import POSException
-from persistent.TimeStamp import TimeStamp
+ def __init__(self, name='MappingStorage'):
+ self.__name__ = name
+ self._data = {} # {oid->{tid->pickle}}
+ self._transactions = BTrees.OOBTree.OOBTree() # {tid->transaction}
+ self._ltid = None
+ self._last_pack = None
+ _lock = threading.RLock()
+ self._lock_acquire = _lock.acquire
+ self._lock_release = _lock.release
+ self._commit_lock = threading.Lock()
+ self._opened = True
+ self._transaction = None
+ self._oid = 0
+ ######################################################################
+ # Preconditions:
+
+ def opened(self):
+ """The storage is open
+ """
+ return self._opened
-class MappingStorage(ZODB.BaseStorage.BaseStorage):
+ def not_in_transaction(self):
+ """The storage is not committing a transaction
+ """
+ return self._transaction is None
- def __init__(self, name='Mapping Storage'):
- ZODB.BaseStorage.BaseStorage.__init__(self, name)
- # ._index maps an oid to a string s. s[:8] is the tid of the
- # transaction that created oid's current state, and s[8:] is oid's
- # current state.
- self._index = {}
- self._clear_temp()
- self._ltid = None
- # Note: If you subclass this and use a persistent mapping facility
- # (e.g. a dbm file), you will need to get the maximum key and save it
- # as self._oid. See dbmStorage.
+ #
+ ######################################################################
- def __len__(self):
- return len(self._index)
+ # testing framework (lame)
+ def cleanup(self):
+ pass
+ # ZODB.interfaces.IStorage
+ @ZODB.utils.locked
+ def close(self):
+ self._opened = False
+
+ # ZODB.interfaces.IStorage
+ def getName(self):
+ return self.__name__
+
+ # ZODB.interfaces.IStorage
+ @ZODB.utils.locked(opened)
def getSize(self):
- self._lock_acquire()
- try:
- # These constants are for Python object memory overheads. Heh.
- s = 32
- for p in self._index.itervalues():
- s += 56 + len(p)
- return s
- finally:
- self._lock_release()
+ size = 0
+ for oid, tid_data in self._data.items():
+ size += 50
+ for tid, pickle in tid_data.items():
+ size += 100+len(pickle)
+ return size
- def load(self, oid, version):
- self._lock_acquire()
- try:
+ # ZEO.interfaces.IServeable
+ @ZODB.utils.locked(opened)
+ def getTid(self, oid):
+ tid_data = self._data.get(oid)
+ if tid_data:
+ return tid_data.maxKey()
+ raise ZODB.POSException.POSKeyError(oid)
+
+ # ZODB.interfaces.IStorage
+ @ZODB.utils.locked(opened)
+ def history(self, oid, size=1):
+ tid_data = self._data.get(oid)
+ if not tid_data:
+ raise ZODB.POSException.POSKeyError(oid)
+
+ tids = tid_data.keys()[-size:]
+ tids.reverse()
+ return [
+ dict(
+ time = ZODB.TimeStamp.TimeStamp(tid),
+ tid = tid,
+ serial = tid,
+ user_name = self._transactions[tid].user,
+ description = self._transactions[tid].description,
+ extension = self._transactions[tid].extension,
+ size = len(tid_data[tid])
+ )
+ for tid in tids]
+
+ # ZODB.interfaces.IStorage
+ def isReadOnly(self):
+ return False
+
+ # ZODB.interfaces.IStorageIteration
+ def iterator(self, start=None, end=None):
+ for transaction_record in self._transactions.values(start, end):
+ yield transaction_record
+
+ # ZODB.interfaces.IStorage
+ @ZODB.utils.locked(opened)
+ def lastTransaction(self):
+ if self._ltid is not None:
+ return self._ltid
+
+ # ZODB.interfaces.IStorage
+ @ZODB.utils.locked(opened)
+ def __len__(self):
+ return len(self._data)
+
+ # ZODB.interfaces.IStorage
+ @ZODB.utils.locked(opened)
+ def load(self, oid, version=''):
+ assert not version, "Versions are not supported"
+ tid_data = self._data.get(oid)
+ if tid_data:
+ tid = tid_data.maxKey()
+ return tid_data[tid], tid
+ raise ZODB.POSException.POSKeyError(oid)
+
+ # ZODB.interfaces.IStorage
+ @ZODB.utils.locked(opened)
+ def loadBefore(self, oid, tid):
+ tid_data = self._data.get(oid)
+ if tid_data:
+ before = ZODB.utils.u64(tid)
+ if not before:
+ return None
+ before = ZODB.utils.p64(before-1)
+ tids_before = tid_data.keys(None, before)
+ if tids_before:
+ tids_after = tid_data.keys(tid, None)
+ tid = tids_before[-1]
+ return (tid_data[tid], tid,
+ (tids_after and tids_after[0] or None)
+ )
+ else:
+ raise ZODB.POSException.POSKeyError(oid)
+
+
+ # ZODB.interfaces.IStorage
+ @ZODB.utils.locked(opened)
+ def loadSerial(self, oid, serial):
+ tid_data = self._data.get(oid)
+ if tid_data:
try:
- p = self._index[oid]
- return p[8:], p[:8] # pickle, serial
+ return tid_data[serial]
except KeyError:
- raise POSException.POSKeyError(oid)
- finally:
- self._lock_release()
+ pass
- def getTid(self, oid):
- self._lock_acquire()
- try:
- # The tid is the first 8 bytes of the buffer.
- return self._index[oid][:8]
- finally:
- self._lock_release()
+ raise ZODB.POSException.POSBeforeKeyError(oid)
+ # ZODB.interfaces.IStorage
+ @ZODB.utils.locked(opened)
+ def new_oid(self):
+ self._oid += 1
+ return ZODB.utils.p64(self._oid)
+
+ # ZODB.interfaces.IStorage
+ @ZODB.utils.locked(opened)
+ def pack(self, t, referencesf, gc=True):
+ if not self._data:
+ return
+
+ stop = `ZODB.TimeStamp.TimeStamp(*time.gmtime(t)[:5]+(t%60,))`
+ if self._last_pack is not None and self._last_pack >= stop:
+ if self._last_pack == stop:
+ return
+ raise ValueError("Already packed to a later time")
+
+ self._last_pack = stop
+ transactions = self._transactions
+
+ # Step 1, remove old non-current records
+ for oid, tid_data in self._data.items():
+ tids_to_remove = tid_data.keys(None, stop)
+ if tids_to_remove:
+ tids_to_remove.pop() # Keep the last, if any
+
+ if tids_to_remove:
+ for tid in tids_to_remove:
+ del tid_data[tid]
+ if transactions[tid].pack(oid):
+ del transactions[tid]
+
+ if gc:
+ # Step 2, GC. A simple sweep+copy
+ new_data = BTrees.OOBTree.OOBTree()
+ to_copy = set([ZODB.utils.z64])
+ while to_copy:
+ oid = to_copy.pop()
+ tid_data = self._data.pop(oid)
+ new_data[oid] = tid_data
+ for pickle in tid_data.values():
+ for oid in referencesf(pickle):
+ if oid in new_data:
+ continue
+ to_copy.add(oid)
+
+ # Remove left over data from transactions
+ for oid, tid_data in self._data.items():
+ for tid in tid_data:
+ if transactions[tid].pack(oid):
+ del transactions[tid]
+
+ self._data = new_data
+
+ # ZODB.interfaces.IStorage
+ def registerDB(self, db):
+ pass
+
+ # ZODB.interfaces.IStorage
+ def sortKey(self):
+ return self.__name__
+
+ # ZODB.interfaces.IStorage
+ @ZODB.utils.locked(opened)
def store(self, oid, serial, data, version, transaction):
+ assert not version, "Versions are not supported"
if transaction is not self._transaction:
- raise POSException.StorageTransactionError(self, transaction)
+ raise ZODB.POSException.StorageTransactionError(self, transaction)
- if version:
- raise POSException.Unsupported("Versions aren't supported")
+ old_tid = None
+ tid_data = self._data.get(oid)
+ if tid_data:
+ old_tid = tid_data.maxKey()
+ if serial != old_tid:
+ raise ZODB.POSException.ConflictError(
+ oid=oid, serials=(old_tid, serial), data=data)
- self._lock_acquire()
- try:
- if oid in self._index:
- oserial = self._index[oid][:8]
- if serial != oserial:
- raise POSException.ConflictError(oid=oid,
- serials=(oserial, serial),
- data=data)
- self._tindex[oid] = self._tid + data
- finally:
- self._lock_release()
+ self._tdata[oid] = data
+
return self._tid
- def _clear_temp(self):
- # store() saves data in _tindex; if the transaction completes
- # successfully, _finish() merges _tindex into _index.
- self._tindex = {}
+ # ZODB.interfaces.IStorage
+ @ZODB.utils.locked(opened)
+ def tpc_abort(self, transaction):
+ if transaction is not self._transaction:
+ return
+ self._transaction = None
+ self._commit_lock.release()
- def _finish(self, tid, user, desc, ext):
- self._index.update(self._tindex)
- self._ltid = self._tid
+ # ZODB.interfaces.IStorage
+ @ZODB.utils.locked(opened)
+ def tpc_begin(self, transaction, tid=None):
+ # The tid argument exists to support testing.
+ if transaction is self._transaction:
+ return
+ self._lock_release()
+ self._commit_lock.acquire()
+ self._lock_acquire()
+ self._transaction = transaction
+ self._tdata = {}
+ if tid is None:
+ tid = ZODB.utils.newTid(self._ltid)
+ self._tid = tid
- def lastTransaction(self):
- return self._ltid
+ # ZODB.interfaces.IStorage
+ @ZODB.utils.locked(opened)
+ def tpc_finish(self, transaction, func = lambda tid: None):
+ if (transaction is not self._transaction) or not self._tdata:
+ return
- def pack(self, t, referencesf):
- self._lock_acquire()
- try:
- if not self._index:
- return
- # Build an index of *only* those objects reachable from the root.
- rootl = [z64]
- pindex = {}
- while rootl:
- oid = rootl.pop()
- if oid not in pindex:
- # Scan non-version pickle for references.
- r = self._index[oid]
- pindex[oid] = r
- referencesf(r[8:], rootl)
- self._index = pindex
+ tid = self._tid
+ func(tid)
- finally:
- self._lock_release()
+ tdata = self._tdata
+ for oid in tdata:
+ tid_data = self._data.get(oid)
+ if tid_data is None:
+ tid_data = BTrees.OOBTree.OOBucket()
+ self._data[oid] = tid_data
+ tid_data[tid] = tdata[oid]
- def _splat(self):
- """Spit out a string showing state."""
- o = ['Index:']
- keys = self._index.keys()
- keys.sort()
- for oid in keys:
- r = self._index[oid]
- o.append(' %s: %s, %s' %
- (u64(oid), TimeStamp(r[:8]), repr(r[8:])))
+ self._ltid = tid
+ self._transactions[tid] = TransactionRecord(tid, transaction, tdata)
+ self._transaction = None
+ self._commit_lock.release()
+
+ # ZEO.interfaces.IServeable
+ @ZODB.utils.locked(opened)
+ def tpc_transaction(self):
+ return self._transaction
- return '\n'.join(o)
-
- def cleanup(self):
+ # ZODB.interfaces.IStorage
+ def tpc_vote(self, transaction):
pass
- def close(self):
- pass
+class TransactionRecord:
+
+ status = ' '
+
+ def __init__(self, tid, transaction, data):
+ self.tid = tid
+ self.user = transaction.user
+ self.description = transaction.description
+ extension = transaction._extension
+ self.extension = extension
+ self.data = data
+
+ _extension = property(lambda self: self._extension,
+ lambda self, v: setattr(self, '_extension', v),
+ )
+
+ def __iter__(self):
+ for oid, data in self.data.items():
+ yield DataRecord(oid, self.tid, data, None)
+
+ def pack(self, oid):
+ self.status = 'p'
+ del self.data[oid]
+ return not self.data
+
+class DataRecord(object):
+ """Abstract base class for iterator protocol"""
+
+ zope.interface.implements(ZODB.interfaces.IStorageRecordInformation)
+
+ version = ''
+
+ def __init__(self, oid, tid, data, prev):
+ self.oid = oid
+ self.tid = tid
+ self.data = data
+ self.data_txn = prev
+
+def DB(*args, **kw):
+ return ZODB.DB(MappingStorage(), *args, **kw)
Modified: ZODB/trunk/src/ZODB/tests/testMappingStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testMappingStorage.py 2008-10-26 00:36:28 UTC (rev 92564)
+++ ZODB/trunk/src/ZODB/tests/testMappingStorage.py 2008-10-26 00:36:36 UTC (rev 92565)
@@ -14,17 +14,31 @@
import ZODB.MappingStorage
import unittest
-from ZODB.tests import StorageTestBase
-from ZODB.tests import BasicStorage, MTStorage, Synchronization
-from ZODB.tests import PackableStorage
-class MappingStorageTests(StorageTestBase.StorageTestBase,
- BasicStorage.BasicStorage,
- MTStorage.MTStorage,
- PackableStorage.PackableStorage,
- Synchronization.SynchronizedStorage,
- ):
+from ZODB.tests import (
+ BasicStorage,
+ HistoryStorage,
+ IteratorStorage,
+ MTStorage,
+ PackableStorage,
+ RevisionStorage,
+ StorageTestBase,
+ Synchronization,
+ )
+class MappingStorageTests(
+ StorageTestBase.StorageTestBase,
+ BasicStorage.BasicStorage,
+
+ HistoryStorage.HistoryStorage,
+ IteratorStorage.ExtendedIteratorStorage,
+ IteratorStorage.IteratorStorage,
+ MTStorage.MTStorage,
+ PackableStorage.PackableStorage,
+ RevisionStorage.RevisionStorage,
+ Synchronization.SynchronizedStorage,
+ ):
+
def setUp(self):
self._storage = ZODB.MappingStorage.MappingStorage()
@@ -36,8 +50,11 @@
# doesnt support huge transaction metadata. This storage doesnt
# have this limit, so we inhibit this test here.
pass
+
+ def checkLoadBeforeUndo(self):
+ pass # we don't support undo yet
+ checkUndoZombie = checkLoadBeforeUndo
-
def test_suite():
suite = unittest.makeSuite(MappingStorageTests, 'check')
return suite
More information about the Zodb-checkins
mailing list