[Zope3-checkins] CVS: Zope3/src/zope/app/advanced/acquisition/tests - BasicStorage.py:1.1 ConflictResolution.py:1.1 Corruption.py:1.1 HistoryStorage.py:1.1 IteratorStorage.py:1.1 LocalStorage.py:1.1 MTStorage.py:1.1 MinPO.py:1.1 PackableStorage.py:1.1 PersistentStorage.py:1.1 ReadOnlyStorage.py:1.1 RecoveryStorage.py:1.1 RevisionStorage.py:1.1 StorageTestBase.py:1.1 Synchronization.py:1.1 TransactionalUndoStorage.py:1.1 TransactionalUndoVersionStorage.py:1.1 VersionStorage.py:1.1 __init__.py:1.1 dangle.py:1.1 speed.py:1.1 testActivityMonitor.py:1.1 testCache.py:1.1 testConfig.py:1.1 testDB.py:1.1 testDemoStorage.py:1.1 testFileStorage.py:1.1 testMappingStorage.py:1.1 testPersistentList.py:1.1 testPersistentMapping.py:1.1 testRecover.py:1.1 testTimeStamp.py:1.1 testTransaction.py:1.1 testUtils.py:1.1 testZODB.py:1.1 testfsIndex.py:1.1

Sidnei da Silva sidnei at awkly.org
Thu Apr 1 13:26:46 EST 2004


Update of /cvs-repository/Zope3/src/zope/app/advanced/acquisition/tests
In directory cvs.zope.org:/tmp/cvs-serv23146/acquisition/tests

Added Files:
	BasicStorage.py ConflictResolution.py Corruption.py 
	HistoryStorage.py IteratorStorage.py LocalStorage.py 
	MTStorage.py MinPO.py PackableStorage.py PersistentStorage.py 
	ReadOnlyStorage.py RecoveryStorage.py RevisionStorage.py 
	StorageTestBase.py Synchronization.py 
	TransactionalUndoStorage.py TransactionalUndoVersionStorage.py 
	VersionStorage.py __init__.py dangle.py speed.py 
	testActivityMonitor.py testCache.py testConfig.py testDB.py 
	testDemoStorage.py testFileStorage.py testMappingStorage.py 
	testPersistentList.py testPersistentMapping.py testRecover.py 
	testTimeStamp.py testTransaction.py testUtils.py testZODB.py 
	testfsIndex.py 
Log Message:
Make a query helper that uses Implicit acquisition, as well as making the acquisition package be importable. With tests.


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/BasicStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run the basic tests for a storage as described in the official storage API

The most complete and most out-of-date description of the interface is:
http://www.zope.org/Documentation/Developer/Models/ZODB/ZODB_Architecture_Storage_Interface_Info.html

All storages should be able to pass these tests.
"""

from ZODB.Transaction import Transaction
from ZODB import POSException

from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase \
     import zodb_unpickle, zodb_pickle, handle_serials

ZERO = '\0'*8



class BasicStorage:
    def checkBasics(self):
        t = Transaction()
        self._storage.tpc_begin(t)
        # This should simply return
        self._storage.tpc_begin(t)
        # Aborting is easy
        self._storage.tpc_abort(t)
        # Test a few expected exceptions when we're doing operations giving a
        # different Transaction object than the one we've begun on.
        self._storage.tpc_begin(t)
        self.assertRaises(
            POSException.StorageTransactionError,
            self._storage.store,
            0, 0, 0, 0, Transaction())

        try:
            self._storage.abortVersion('dummy', Transaction())
        except (POSException.StorageTransactionError,
                POSException.VersionCommitError):
            pass # test passed ;)
        else:
            assert 0, "Should have failed, invalid transaction."

        try:
            self._storage.commitVersion('dummy', 'dummer', Transaction())
        except (POSException.StorageTransactionError,
                POSException.VersionCommitError):
            pass # test passed ;)
        else:
            assert 0, "Should have failed, invalid transaction."

        self.assertRaises(
            POSException.StorageTransactionError,
            self._storage.store,
            0, 1, 2, 3, Transaction())
        self._storage.tpc_abort(t)

    def checkSerialIsNoneForInitialRevision(self):
        eq = self.assertEqual
        oid = self._storage.new_oid()
        txn = Transaction()
        self._storage.tpc_begin(txn)
        # Use None for serial.  Don't use _dostore() here because that coerces
        # serial=None to serial=ZERO.
        r1 = self._storage.store(oid, None, zodb_pickle(MinPO(11)),
                                       '', txn)
        r2 = self._storage.tpc_vote(txn)
        self._storage.tpc_finish(txn)
        newrevid = handle_serials(oid, r1, r2)
        data, revid = self._storage.load(oid, '')
        value = zodb_unpickle(data)
        eq(value, MinPO(11))
        eq(revid, newrevid)

    def checkNonVersionStore(self, oid=None, revid=None, version=None):
        revid = ZERO
        newrevid = self._dostore(revid=revid)
        # Finish the transaction.
        self.assertNotEqual(newrevid, revid)

    def checkNonVersionStoreAndLoad(self):
        eq = self.assertEqual
        oid = self._storage.new_oid()
        self._dostore(oid=oid, data=MinPO(7))
        data, revid = self._storage.load(oid, '')
        value = zodb_unpickle(data)
        eq(value, MinPO(7))
        # Now do a bunch of updates to an object
        for i in range(13, 22):
            revid = self._dostore(oid, revid=revid, data=MinPO(i))
        # Now get the latest revision of the object
        data, revid = self._storage.load(oid, '')
        eq(zodb_unpickle(data), MinPO(21))

    def checkNonVersionModifiedInVersion(self):
        oid = self._storage.new_oid()
        self._dostore(oid=oid)
        self.assertEqual(self._storage.modifiedInVersion(oid), '')

    def checkConflicts(self):
        oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=MinPO(11))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
        self.assertRaises(POSException.ConflictError,
                          self._dostore,
                          oid, revid=revid1, data=MinPO(13))

    def checkWriteAfterAbort(self):
        oid = self._storage.new_oid()
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
        # Now abort this transaction
        self._storage.tpc_abort(t)
        # Now start all over again
        oid = self._storage.new_oid()
        self._dostore(oid=oid, data=MinPO(6))

    def checkAbortAfterVote(self):
        oid1 = self._storage.new_oid()
        revid1 = self._dostore(oid=oid1, data=MinPO(-2))
        oid = self._storage.new_oid()
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
        # Now abort this transaction
        self._storage.tpc_vote(t)
        self._storage.tpc_abort(t)
        # Now start all over again
        oid = self._storage.new_oid()
        revid = self._dostore(oid=oid, data=MinPO(6))

        for oid, revid in [(oid1, revid1), (oid, revid)]:
            data, _revid = self._storage.load(oid, '')
            self.assertEqual(revid, _revid)

    def checkStoreTwoObjects(self):
        noteq = self.assertNotEqual
        p31, p32, p51, p52 = map(MinPO, (31, 32, 51, 52))
        oid1 = self._storage.new_oid()
        oid2 = self._storage.new_oid()
        noteq(oid1, oid2)
        revid1 = self._dostore(oid1, data=p31)
        revid2 = self._dostore(oid2, data=p51)
        noteq(revid1, revid2)
        revid3 = self._dostore(oid1, revid=revid1, data=p32)
        revid4 = self._dostore(oid2, revid=revid2, data=p52)
        noteq(revid3, revid4)

    def checkGetSerial(self):
        if not hasattr(self._storage, 'getSerial'):
            return
        eq = self.assertEqual
        p41, p42 = map(MinPO, (41, 42))
        oid = self._storage.new_oid()
        self.assertRaises(KeyError, self._storage.getSerial, oid)
        # Now store a revision
        revid1 = self._dostore(oid, data=p41)
        eq(revid1, self._storage.getSerial(oid))
        # And another one
        revid2 = self._dostore(oid, revid=revid1, data=p42)
        eq(revid2, self._storage.getSerial(oid))

    def checkTwoArgBegin(self):
        # XXX how standard is three-argument tpc_begin()?
        t = Transaction()
        tid = '\0\0\0\0\0psu'
        self._storage.tpc_begin(t, tid)
        oid = self._storage.new_oid()
        data = zodb_pickle(MinPO(8))
        self._storage.store(oid, None, data, '', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)

    def checkLen(self):
        # len(storage) reports the number of objects.
        # check it is zero when empty
        self.assertEqual(len(self._storage),0)
        # check it is correct when the storage contains two object.
        # len may also be zero, for storages that do not keep track
        # of this number
        self._dostore(data=MinPO(22))
        self._dostore(data=MinPO(23))
        self.assert_(len(self._storage) in [0,2])

    def checkGetSize(self):
        self._dostore(data=MinPO(25))
        size = self._storage.getSize()
        # The storage API doesn't make any claims about what size
        # means except that it ought to be printable.
        str(size)

    def checkNote(self):
        oid = self._storage.new_oid()
        t = Transaction()
        self._storage.tpc_begin(t)
        t.note('this is a test')
        self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)

    def checkGetExtensionMethods(self):
        m = self._storage.getExtensionMethods()
        self.assertEqual(type(m),type({}))
        for k,v in m.items():
            self.assertEqual(v,None)
            self.assert_(callable(getattr(self._storage,k)))


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/ConflictResolution.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests for application-level conflict resolution."""

from ZODB.Transaction import Transaction
from ZODB.POSException import ConflictError, UndoError
from Persistence import Persistent

from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle

import sys
import types
from cPickle import Pickler, Unpickler
from cStringIO import StringIO

class PCounter(Persistent):

    _value = 0

    def __repr__(self):
        return "<PCounter %d>" % self._value

    def inc(self):
        self._value = self._value + 1

    def _p_resolveConflict(self, oldState, savedState, newState):
        savedDiff = savedState['_value'] - oldState['_value']
        newDiff = newState['_value'] - oldState['_value']

        oldState['_value'] = oldState['_value'] + savedDiff + newDiff

        return oldState

    # XXX What if _p_resolveConflict _thinks_ it resolved the
    # conflict, but did something wrong?

class PCounter2(PCounter):

    def _p_resolveConflict(self, oldState, savedState, newState):
        raise ConflictError

class PCounter3(PCounter):
    def _p_resolveConflict(self, oldState, savedState, newState):
        raise AttributeError, "no attribute (testing conflict resolution)"

class PCounter4(PCounter):
    def _p_resolveConflict(self, oldState, savedState):
        raise RuntimeError, "Can't get here; not enough args"

class ConflictResolvingStorage:

    def checkResolve(self):
        obj = PCounter()
        obj.inc()

        oid = self._storage.new_oid()

        revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))

        obj.inc()
        obj.inc()
        # The effect of committing two transactions with the same
        # pickle is to commit two different transactions relative to
        # revid1 that add two to _value.
        revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
        revid3 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))

        data, serialno = self._storage.load(oid, '')
        inst = zodb_unpickle(data)
        self.assertEqual(inst._value, 5)

    def checkUnresolvable(self):
        obj = PCounter2()
        obj.inc()

        oid = self._storage.new_oid()

        revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))

        obj.inc()
        obj.inc()
        # The effect of committing two transactions with the same
        # pickle is to commit two different transactions relative to
        # revid1 that add two to _value.
        revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
        self.assertRaises(ConflictError,
                          self._dostoreNP,
                          oid, revid=revid1, data=zodb_pickle(obj))

    def checkZClassesArentResolved(self):
        from ZODB.ConflictResolution import bad_class
        dummy_class_tuple = ('*foobar', ())
        assert bad_class(dummy_class_tuple) == 1

    def checkBuggyResolve1(self):
        obj = PCounter3()
        obj.inc()

        oid = self._storage.new_oid()

        revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))

        obj.inc()
        obj.inc()
        # The effect of committing two transactions with the same
        # pickle is to commit two different transactions relative to
        # revid1 that add two to _value.
        revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
        self.assertRaises(ConflictError,
                          self._dostoreNP,
                          oid, revid=revid1, data=zodb_pickle(obj))

    def checkBuggyResolve2(self):
        obj = PCounter4()
        obj.inc()

        oid = self._storage.new_oid()

        revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))

        obj.inc()
        obj.inc()
        # The effect of committing two transactions with the same
        # pickle is to commit two different transactions relative to
        # revid1 that add two to _value.
        revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
        self.assertRaises(ConflictError,
                          self._dostoreNP,
                          oid, revid=revid1, data=zodb_pickle(obj))

class ConflictResolvingTransUndoStorage:

    def checkUndoConflictResolution(self):
        # This test is based on checkNotUndoable in the
        # TransactionalUndoStorage test suite.  Except here, conflict
        # resolution should allow us to undo the transaction anyway.

        obj = PCounter()
        obj.inc()
        oid = self._storage.new_oid()
        revid_a = self._dostore(oid, data=obj)
        obj.inc()
        revid_b = self._dostore(oid, revid=revid_a, data=obj)
        obj.inc()
        revid_c = self._dostore(oid, revid=revid_b, data=obj)
        # Start the undo
        info = self._storage.undoInfo()
        tid = info[1]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.transactionalUndo(tid, t)
        self._storage.tpc_finish(t)

    def checkUndoUnresolvable(self):
        # This test is based on checkNotUndoable in the
        # TransactionalUndoStorage test suite.  Except here, conflict
        # resolution should allow us to undo the transaction anyway.

        obj = PCounter2()
        obj.inc()
        oid = self._storage.new_oid()
        revid_a = self._dostore(oid, data=obj)
        obj.inc()
        revid_b = self._dostore(oid, revid=revid_a, data=obj)
        obj.inc()
        revid_c = self._dostore(oid, revid=revid_b, data=obj)
        # Start the undo
        info = self._storage.undoInfo()
        tid = info[1]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        self.assertRaises(UndoError, self._storage.transactionalUndo,
                          tid, t)
        self._storage.tpc_abort(t)


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/Corruption.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Do some minimal tests of data corruption"""

import os
import random
import stat
import tempfile
import unittest

import ZODB, ZODB.FileStorage
from StorageTestBase import StorageTestBase, removefs

class FileStorageCorruptTests(StorageTestBase):

    def setUp(self):
        self.path = tempfile.mktemp()
        self._storage = ZODB.FileStorage.FileStorage(self.path, create=1)

    def tearDown(self):
        self._storage.close()
        removefs(self.path)

    def _do_stores(self):
        oids = []
        for i in range(5):
            oid = self._storage.new_oid()
            revid = self._dostore(oid)
            oids.append((oid, revid))
        return oids

    def _check_stores(self, oids):
        for oid, revid in oids:
            data, s_revid = self._storage.load(oid, '')
            self.assertEqual(s_revid, revid)

    def checkTruncatedIndex(self):
        oids = self._do_stores()
        self._close()

        # truncation the index file
        path = self.path + '.index'
        self.failUnless(os.path.exists(path))
        f = open(path, 'r+')
        f.seek(0, 2)
        size = f.tell()
        f.seek(size / 2)
        f.truncate()
        f.close()

        self._storage = ZODB.FileStorage.FileStorage(self.path)
        self._check_stores(oids)

    def checkCorruptedIndex(self):
        oids = self._do_stores()
        self._close()

        # truncation the index file
        path = self.path + '.index'
        self.failUnless(os.path.exists(path))
        size = os.stat(path)[stat.ST_SIZE]
        f = open(path, 'r+')
        while f.tell() < size:
            f.seek(random.randrange(1, size / 10), 1)
            f.write('\000')
        f.close()

        self._storage = ZODB.FileStorage.FileStorage(self.path)
        self._check_stores(oids)


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/HistoryStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run the history() related tests for a storage.

Any storage that supports the history() method should be able to pass
all these tests.
"""

from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle



class HistoryStorage:
    def checkSimpleHistory(self):
        eq = self.assertEqual
        # Store a couple of non-version revisions of the object
        oid = self._storage.new_oid()
        self.assertRaises(KeyError,self._storage.history,oid)
        revid1 = self._dostore(oid, data=MinPO(11))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
        revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
        # Now get various snapshots of the object's history
        h = self._storage.history(oid, size=1)
        eq(len(h), 1)
        d = h[0]
        eq(d['serial'], revid3)
        eq(d['version'], '')
        # Try to get 2 historical revisions
        h = self._storage.history(oid, size=2)
        eq(len(h), 2)
        d = h[0]
        eq(d['serial'], revid3)
        eq(d['version'], '')
        d = h[1]
        eq(d['serial'], revid2)
        eq(d['version'], '')
        # Try to get all 3 historical revisions
        h = self._storage.history(oid, size=3)
        eq(len(h), 3)
        d = h[0]
        eq(d['serial'], revid3)
        eq(d['version'], '')
        d = h[1]
        eq(d['serial'], revid2)
        eq(d['version'], '')
        d = h[2]
        eq(d['serial'], revid1)
        eq(d['version'], '')
        # There should be no more than 3 revisions
        h = self._storage.history(oid, size=4)
        eq(len(h), 3)
        d = h[0]
        eq(d['serial'], revid3)
        eq(d['version'], '')
        d = h[1]
        eq(d['serial'], revid2)
        eq(d['version'], '')
        d = h[2]
        eq(d['serial'], revid1)
        eq(d['version'], '')

    def checkVersionHistory(self):
        if not self._storage.supportsVersions():
            return
        eq = self.assertEqual
        # Store a couple of non-version revisions
        oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=MinPO(11))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
        revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
        # Now store some new revisions in a version
        version = 'test-version'
        revid4 = self._dostore(oid, revid=revid3, data=MinPO(14),
                               version=version)
        revid5 = self._dostore(oid, revid=revid4, data=MinPO(15),
                               version=version)
        revid6 = self._dostore(oid, revid=revid5, data=MinPO(16),
                               version=version)
        # Now, try to get the six historical revisions (first three are in
        # 'test-version', followed by the non-version revisions).
        h = self._storage.history(oid, version, 100)
        eq(len(h), 6)
        d = h[0]
        eq(d['serial'], revid6)
        eq(d['version'], version)
        d = h[1]
        eq(d['serial'], revid5)
        eq(d['version'], version)
        d = h[2]
        eq(d['serial'], revid4)
        eq(d['version'], version)
        d = h[3]
        eq(d['serial'], revid3)
        eq(d['version'], '')
        d = h[4]
        eq(d['serial'], revid2)
        eq(d['version'], '')
        d = h[5]
        eq(d['serial'], revid1)
        eq(d['version'], '')

    def checkHistoryAfterVersionCommit(self):
        if not self._storage.supportsVersions():
            return
        eq = self.assertEqual
        # Store a couple of non-version revisions
        oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=MinPO(11))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
        revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
        # Now store some new revisions in a version
        version = 'test-version'
        revid4 = self._dostore(oid, revid=revid3, data=MinPO(14),
                               version=version)
        revid5 = self._dostore(oid, revid=revid4, data=MinPO(15),
                               version=version)
        revid6 = self._dostore(oid, revid=revid5, data=MinPO(16),
                               version=version)
        # Now commit the version
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.commitVersion(version, '', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        # After consultation with Jim, we agreed that the semantics of
        # revision id's after a version commit is that the committed object
        # gets a new serial number (a.k.a. revision id).  Note that
        # FileStorage is broken here; the serial number in the post-commit
        # non-version revision will be the same as the serial number of the
        # previous in-version revision.
        #
        # BAW: Using load() is the only way to get the serial number of the
        # current revision of the object.  But at least this works for both
        # broken and working storages.
        ign, revid7 = self._storage.load(oid, '')
        # Now, try to get the six historical revisions (first three are in
        # 'test-version', followed by the non-version revisions).
        h = self._storage.history(oid, version, 100)
        eq(len(h), 7)
        d = h[0]
        eq(d['serial'], revid7)
        eq(d['version'], '')
        d = h[1]
        eq(d['serial'], revid6)
        eq(d['version'], version)
        d = h[2]
        eq(d['serial'], revid5)
        eq(d['version'], version)
        d = h[3]
        eq(d['serial'], revid4)
        eq(d['version'], version)
        d = h[4]
        eq(d['serial'], revid3)
        eq(d['version'], '')
        d = h[5]
        eq(d['serial'], revid2)
        eq(d['version'], '')
        d = h[6]
        eq(d['serial'], revid1)
        eq(d['version'], '')

    def checkHistoryAfterVersionAbort(self):
        if not self._storage.supportsVersions():
            return
        eq = self.assertEqual
        # Store a couple of non-version revisions
        oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=MinPO(11))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
        revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
        # Now store some new revisions in a version
        version = 'test-version'
        revid4 = self._dostore(oid, revid=revid3, data=MinPO(14),
                               version=version)
        revid5 = self._dostore(oid, revid=revid4, data=MinPO(15),
                               version=version)
        revid6 = self._dostore(oid, revid=revid5, data=MinPO(16),
                               version=version)
        # Now commit the version
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.abortVersion(version, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        # After consultation with Jim, we agreed that the semantics of
        # revision id's after a version commit is that the committed object
        # gets a new serial number (a.k.a. revision id).  Note that
        # FileStorage is broken here; the serial number in the post-commit
        # non-version revision will be the same as the serial number of the
        # previous in-version revision.
        #
        # BAW: Using load() is the only way to get the serial number of the
        # current revision of the object.  But at least this works for both
        # broken and working storages.
        ign, revid7 = self._storage.load(oid, '')
        # Now, try to get the six historical revisions (first three are in
        # 'test-version', followed by the non-version revisions).
        h = self._storage.history(oid, version, 100)
        eq(len(h), 7)
        d = h[0]
        eq(d['serial'], revid7)
        eq(d['version'], '')
        d = h[1]
        eq(d['serial'], revid6)
        eq(d['version'], version)
        d = h[2]
        eq(d['serial'], revid5)
        eq(d['version'], version)
        d = h[3]
        eq(d['serial'], revid4)
        eq(d['version'], version)
        d = h[4]
        eq(d['serial'], revid3)
        eq(d['version'], '')
        d = h[5]
        eq(d['serial'], revid2)
        eq(d['version'], '')
        d = h[6]
        eq(d['serial'], revid1)
        eq(d['version'], '')


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/IteratorStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run tests against the iterator() interface for storages.

Any storage that supports the iterator() method should be able to pass
all these tests.
"""

from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
from ZODB.utils import U64, p64
from ZODB.Transaction import Transaction


class IteratorCompare:

    def iter_verify(self, txniter, revids, val0):
        eq = self.assertEqual
        oid = self._oid
        val = val0
        for reciter, revid in zip(txniter, revids + [None]):
            eq(reciter.tid, revid)
            for rec in reciter:
                eq(rec.oid, oid)
                eq(rec.serial, revid)
                eq(rec.version, '')
                eq(zodb_unpickle(rec.data), MinPO(val))
                val = val + 1
        eq(val, val0 + len(revids))
        txniter.close()

class IteratorStorage(IteratorCompare):

    def checkSimpleIteration(self):
        # Store a bunch of revisions of a single object
        self._oid = oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=MinPO(11))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
        revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
        # Now iterate over all the transactions and compare carefully
        txniter = self._storage.iterator()
        self.iter_verify(txniter, [revid1, revid2, revid3], 11)

    def checkClose(self):
        self._oid = oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=MinPO(11))
        txniter = self._storage.iterator()
        txniter.close()
        self.assertRaises(IOError, txniter.__getitem__, 0)

    def checkVersionIterator(self):
        if not self._storage.supportsVersions():
            return
        self._dostore()
        self._dostore(version='abort')
        self._dostore()
        self._dostore(version='abort')
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.abortVersion('abort', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)

        self._dostore(version='commit')
        self._dostore()
        self._dostore(version='commit')
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.commitVersion('commit', '', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)

        txniter = self._storage.iterator()
        for trans in txniter:
            for data in trans:
                pass

    def checkUndoZombieNonVersion(self):
        if not hasattr(self._storage, 'supportsTransactionalUndo'):
            return
        if not self._storage.supportsTransactionalUndo():
            return

        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(94))
        # Get the undo information
        info = self._storage.undoInfo()
        tid = info[0]['id']
        # Undo the creation of the object, rendering it a zombie
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        # Now attempt to iterator over the storage
        iter = self._storage.iterator()
        for txn in iter:
            for rec in txn:
                pass

        # The last transaction performed an undo of the transaction that
        # created object oid.  (As Barry points out, the object is now in the
        # George Bailey state.)  Assert that the final data record contains
        # None in the data attribute.
        self.assertEqual(rec.oid, oid)
        self.assertEqual(rec.data, None)

    def checkTransactionExtensionFromIterator(self):
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(1))
        iter = self._storage.iterator()
        count = 0
        for txn in iter:
            self.assertEqual(txn._extension, {})
            count +=1
        self.assertEqual(count, 1)

    def checkIterationIntraTransaction(self):
        # XXX try this test with logging enabled.  If you see something like
        #
        # ZODB FS FS21 warn: FileStorageTests.fs truncated, possibly due to
        # damaged records at 4
        #
        # Then the code in FileIterator.next() hasn't yet been fixed.
        oid = self._storage.new_oid()
        t = Transaction()
        data = zodb_pickle(MinPO(0))
        try:
            self._storage.tpc_begin(t)
            self._storage.store(oid, '\0'*8, data, '', t)
            self._storage.tpc_vote(t)
            # Don't do tpc_finish yet
            it = self._storage.iterator()
            for x in it:
                pass
        finally:
            self._storage.tpc_finish(t)


class ExtendedIteratorStorage(IteratorCompare):

    def checkExtendedIteration(self):
        # Store a bunch of revisions of a single object
        self._oid = oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=MinPO(11))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
        revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
        revid4 = self._dostore(oid, revid=revid3, data=MinPO(14))
        # Note that the end points are included
        # Iterate over all of the transactions with explicit start/stop
        txniter = self._storage.iterator(revid1, revid4)
        self.iter_verify(txniter, [revid1, revid2, revid3, revid4], 11)
        # Iterate over some of the transactions with explicit start
        txniter = self._storage.iterator(revid3)
        self.iter_verify(txniter, [revid3, revid4], 13)
        # Iterate over some of the transactions with explicit stop
        txniter = self._storage.iterator(None, revid2)
        self.iter_verify(txniter, [revid1, revid2], 11)
        # Iterate over some of the transactions with explicit start+stop
        txniter = self._storage.iterator(revid2, revid3)
        self.iter_verify(txniter, [revid2, revid3], 12)
        # Specify an upper bound somewhere in between values
        revid3a = p64((U64(revid3) + U64(revid4)) / 2)
        txniter = self._storage.iterator(revid2, revid3a)
        self.iter_verify(txniter, [revid2, revid3], 12)
        # Specify a lower bound somewhere in between values.
        # revid2 == revid1+1 is very likely on Windows.  Adding 1 before
        # dividing ensures that "the midpoint" we compute is strictly larger
        # than revid1.
        revid1a = p64((U64(revid1) + 1 + U64(revid2)) / 2)
        assert revid1 < revid1a
        txniter = self._storage.iterator(revid1a, revid3a)
        self.iter_verify(txniter, [revid2, revid3], 12)
        # Specify an empty range
        txniter = self._storage.iterator(revid3, revid2)
        self.iter_verify(txniter, [], 13)
        # Specify a singleton range
        txniter = self._storage.iterator(revid3, revid3)
        self.iter_verify(txniter, [revid3], 13)

class IteratorDeepCompare:
    def compare(self, storage1, storage2):
        eq = self.assertEqual
        iter1 = storage1.iterator()
        iter2 = storage2.iterator()
        for txn1, txn2 in zip(iter1, iter2):
            eq(txn1.tid,         txn2.tid)
            eq(txn1.status,      txn2.status)
            eq(txn1.user,        txn2.user)
            eq(txn1.description, txn2.description)
            eq(txn1._extension,  txn2._extension)
            for rec1, rec2 in zip(txn1, txn2):
                eq(rec1.oid,     rec2.oid)
                eq(rec1.serial,  rec2.serial)
                eq(rec1.version, rec2.version)
                eq(rec1.data,    rec2.data)
            # Make sure there are no more records left in rec1 and rec2,
            # meaning they were the same length.
            self.assertRaises(IndexError, txn1.next)
            self.assertRaises(IndexError, txn2.next)
        # Make sure ther are no more records left in txn1 and txn2, meaning
        # they were the same length
        self.assertRaises(IndexError, iter1.next)
        self.assertRaises(IndexError, iter2.next)
        iter1.close()
        iter2.close()


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/LocalStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
class LocalStorage:
    """A single test that only make sense for local storages.

    A local storage is one that doens't use ZEO. The __len__()
    implementation for ZEO is inexact.
    """
    def checkLen(self):
        eq = self.assertEqual
        # The length of the database ought to grow by one each time
        eq(len(self._storage), 0)
        self._dostore()
        eq(len(self._storage), 1)
        self._dostore()
        eq(len(self._storage), 2)


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/MTStorage.py ===
import random
import sys
import threading
import time

import ZODB
from PersistentMapping import PersistentMapping

from ZODB.tests.StorageTestBase \
     import StorageTestBase, zodb_pickle, zodb_unpickle, handle_serials
from ZODB.tests.MinPO import MinPO
from ZODB.Transaction import Transaction
from ZODB.POSException import ConflictError

SHORT_DELAY = 0.01

def sort(l):
    "Sort a list in place and return it."
    l.sort()
    return l

class TestThread(threading.Thread):
    """Base class for defining threads that run from unittest.

    If the thread exits with an uncaught exception, catch it and
    re-raise it when the thread is joined.  The re-raise will cause
    the test to fail.

    The subclass should define a runtest() method instead of a run()
    method.
    """

    def __init__(self, test):
        threading.Thread.__init__(self)
        self.test = test
        self._fail = None
        self._exc_info = None

    def run(self):
        try:
            self.runtest()
        except:
            self._exc_info = sys.exc_info()

    def fail(self, msg=""):
        self._test.fail(msg)

    def join(self, timeout=None):
        threading.Thread.join(self, timeout)
        if self._exc_info:
            raise self._exc_info[0], self._exc_info[1], self._exc_info[2]

class ZODBClientThread(TestThread):

    __super_init = TestThread.__init__

    def __init__(self, db, test, commits=10, delay=SHORT_DELAY):
        self.__super_init(test)
        self.setDaemon(1)
        self.db = db
        self.test = test
        self.commits = commits
        self.delay = delay

    def runtest(self):
        conn = self.db.open()
        root = conn.root()
        d = self.get_thread_dict(root)
        if d is None:
            self.test.fail()
        else:
            for i in range(self.commits):
                self.commit(d, i)
        self.test.assertEqual(sort(d.keys()), range(self.commits))

    def commit(self, d, num):
        d[num] = time.time()
        time.sleep(self.delay)
        get_transaction().commit()
        time.sleep(self.delay)

    def get_thread_dict(self, root):
        name = self.getName()
        # arbitrarily limit to 10 re-tries
        for i in range(10):
            try:
                m = PersistentMapping()
                root[name] = m
                get_transaction().commit()
                break
            except ConflictError, err:
                get_transaction().abort()
                root._p_jar.sync()
        for i in range(10):
            try:
                return root.get(name)
            except ConflictError:
                get_transaction().abort()

class StorageClientThread(TestThread):

    __super_init = TestThread.__init__

    def __init__(self, storage, test, commits=10, delay=SHORT_DELAY):
        self.__super_init(test)
        self.storage = storage
        self.test = test
        self.commits = commits
        self.delay = delay
        self.oids = {}

    def runtest(self):
        for i in range(self.commits):
            self.dostore(i)
        self.check()

    def check(self):
        for oid, revid in self.oids.items():
            data, serial = self.storage.load(oid, '')
            self.test.assertEqual(serial, revid)
            obj = zodb_unpickle(data)
            self.test.assertEqual(obj.value[0], self.getName())

    def pause(self):
        time.sleep(self.delay)

    def oid(self):
        oid = self.storage.new_oid()
        self.oids[oid] = None
        return oid

    def dostore(self, i):
        data = zodb_pickle(MinPO((self.getName(), i)))
        t = Transaction()
        oid = self.oid()
        self.pause()

        self.storage.tpc_begin(t)
        self.pause()

        # Always create a new object, signified by None for revid
        r1 = self.storage.store(oid, None, data, '', t)
        self.pause()

        r2 = self.storage.tpc_vote(t)
        self.pause()

        self.storage.tpc_finish(t)
        self.pause()

        revid = handle_serials(oid, r1, r2)
        self.oids[oid] = revid

class ExtStorageClientThread(StorageClientThread):

    def runtest(self):
        # pick some other storage ops to execute
        ops = [getattr(self, meth) for meth in dir(ExtStorageClientThread)
               if meth.startswith('do_')]
        assert ops, "Didn't find an storage ops in %s" % self.storage
        # do a store to guarantee there's at least one oid in self.oids
        self.dostore(0)

        for i in range(self.commits - 1):
            meth = random.choice(ops)
            meth()
            self.dostore(i)
        self.check()

    def pick_oid(self):
        return random.choice(self.oids.keys())

    def do_load(self):
        oid = self.pick_oid()
        self.storage.load(oid, '')

    def do_loadSerial(self):
        oid = self.pick_oid()
        self.storage.loadSerial(oid, self.oids[oid])

    def do_modifiedInVersion(self):
        oid = self.pick_oid()
        self.storage.modifiedInVersion(oid)

    def do_undoLog(self):
        self.storage.undoLog(0, -20)

    def do_iterator(self):
        try:
            iter = self.storage.iterator()
        except AttributeError:
            # XXX It's hard to detect that a ZEO ClientStorage
            # doesn't have this method, but does have all the others.
            return
        for obj in iter:
            pass

class MTStorage:
    "Test a storage with multiple client threads executing concurrently."

    def _checkNThreads(self, n, constructor, *args):
        threads = [constructor(*args) for i in range(n)]
        for t in threads:
            t.start()
        for t in threads:
            t.join(60)
        for t in threads:
            self.failIf(t.isAlive(), "thread failed to finish in 60 seconds")

    def check2ZODBThreads(self):
        db = ZODB.DB(self._storage)
        self._checkNThreads(2, ZODBClientThread, db, self)
        db.close()

    def check7ZODBThreads(self):
        db = ZODB.DB(self._storage)
        self._checkNThreads(7, ZODBClientThread, db, self)
        db.close()

    def check2StorageThreads(self):
        self._checkNThreads(2, StorageClientThread, self._storage, self)

    def check7StorageThreads(self):
        self._checkNThreads(7, StorageClientThread, self._storage, self)

    def check4ExtStorageThread(self):
        self._checkNThreads(4, ExtStorageClientThread, self._storage, self)


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/MinPO.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""A minimal persistent object to use for tests"""

from Persistence import Persistent

class MinPO(Persistent):
    def __init__(self, value=None):
        self.value = value

    def __cmp__(self, aMinPO):
        return cmp(self.value, aMinPO.value)

    def __repr__(self):
        return "MinPO(%s)" % self.value


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/PackableStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run some tests relevant for storages that support pack()."""

try:
    import cPickle
    pickle = cPickle
    #import cPickle as pickle
except ImportError:
    import pickle

try:
    from cStringIO import StringIO
except ImportError:
    from StringIO import StringIO

import threading
import time

from ZODB import DB
from Persistence import Persistent
from ZODB.referencesf import referencesf
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import snooze
from ZODB.POSException import ConflictError, StorageError
from ZODB.PersistentMapping import PersistentMapping

ZERO = '\0'*8


# This class is for the root object.  It must not contain a getoid() method
# (really, attribute).  The persistent pickling machinery -- in the dumps()
# function below -- will pickle Root objects as normal, but any attributes
# which reference persistent Object instances will get pickled as persistent
# ids, not as the object's state.  This makes the referencesf stuff work,
# because it pickle sniffs for persistent ids (so we have to get those
# persistent ids into the root object's pickle).
class Root:
    pass


# This is the persistent Object class.  Because it has a getoid() method, the
# persistent pickling machinery -- in the dumps() function below -- will
# pickle the oid string instead of the object's actual state.  Yee haw, this
# stuff is deep. ;)
class Object:
    def __init__(self, oid):
        self._oid = oid

    def getoid(self):
        return self._oid


class C(Persistent):
    pass

# Here's where all the magic occurs.  Sadly, the pickle module is a bit
# underdocumented, but here's what happens: by setting the persistent_id
# attribute to getpersid() on the pickler, that function gets called for every
# object being pickled.  By returning None when the object has no getoid
# attribute, it signals pickle to serialize the object as normal.  That's how
# the Root instance gets pickled correctly.  But, if the object has a getoid
# attribute, then by returning that method's value, we tell pickle to
# serialize the persistent id of the object instead of the object's state.
# That sets the pickle up for proper sniffing by the referencesf machinery.
# Fun, huh?
def dumps(obj):
    def getpersid(obj):
        if hasattr(obj, 'getoid'):
            return obj.getoid()
        return None
    s = StringIO()
    p = pickle.Pickler(s)
    p.persistent_id = getpersid
    p.dump(obj)
    return s.getvalue()



class PackableStorageBase:
    # We keep a cache of object ids to instances so that the unpickler can
    # easily return any persistent object.
    _cache = {}

    def _newobj(self):
        # This is a convenience method to create a new persistent Object
        # instance.  It asks the storage for a new object id, creates the
        # instance with the given oid, populates the cache and returns the
        # object.
        oid = self._storage.new_oid()
        obj = Object(oid)
        self._cache[obj.getoid()] = obj
        return obj

    def _makeloader(self):
        # This is the other side of the persistent pickling magic.  We need a
        # custom unpickler to mirror our custom pickler above.  By setting the
        # persistent_load function of the unpickler to self._cache.get(),
        # whenever a persistent id is unpickled, it will actually return the
        # Object instance out of the cache.  As far as returning a function
        # with an argument bound to an instance attribute method, we do it
        # this way because it makes the code in the tests more succinct.
        #
        # BUT!  Be careful in your use of loads() vs. pickle.loads().  loads()
        # should only be used on the Root object's pickle since it's the only
        # special one.  All the Object instances should use pickle.loads().
        def loads(str, persfunc=self._cache.get):
            fp = StringIO(str)
            u = pickle.Unpickler(fp)
            u.persistent_load = persfunc
            return u.load()
        return loads



class PackableStorage(PackableStorageBase):
    def _initroot(self):
        try:
            self._storage.load(ZERO, '')
        except KeyError:
            import PersistentMapping
            from ZODB.Transaction import Transaction
            file = StringIO()
            p = cPickle.Pickler(file, 1)
            p.dump((PersistentMapping.PersistentMapping, None))
            p.dump({'_container': {}})
            t=Transaction()
            t.description='initial database creation'
            self._storage.tpc_begin(t)
            self._storage.store(ZERO, None, file.getvalue(), '', t)
            self._storage.tpc_vote(t)
            self._storage.tpc_finish(t)

    def checkPackEmptyStorage(self):
        self._storage.pack(time.time(), referencesf)

    def checkPackTomorrow(self):
        self._initroot()
        self._storage.pack(time.time() + 10000, referencesf)

    def checkPackYesterday(self):
        self._initroot()
        self._storage.pack(time.time() - 10000, referencesf)

    def checkPackAllRevisions(self):
        self._initroot()
        eq = self.assertEqual
        raises = self.assertRaises
        # Create a `persistent' object
        obj = self._newobj()
        oid = obj.getoid()
        obj.value = 1
        # Commit three different revisions
        revid1 = self._dostoreNP(oid, data=pickle.dumps(obj))
        obj.value = 2
        revid2 = self._dostoreNP(oid, revid=revid1, data=pickle.dumps(obj))
        obj.value = 3
        revid3 = self._dostoreNP(oid, revid=revid2, data=pickle.dumps(obj))
        # Now make sure all three revisions can be extracted
        data = self._storage.loadSerial(oid, revid1)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid)
        eq(pobj.value, 1)
        data = self._storage.loadSerial(oid, revid2)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid)
        eq(pobj.value, 2)
        data = self._storage.loadSerial(oid, revid3)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid)
        eq(pobj.value, 3)
        # Now pack all transactions; need to sleep a second to make
        # sure that the pack time is greater than the last commit time.
        now = packtime = time.time()
        while packtime <= now:
            packtime = time.time()
        self._storage.pack(packtime, referencesf)
        # All revisions of the object should be gone, since there is no
        # reference from the root object to this object.
        raises(KeyError, self._storage.loadSerial, oid, revid1)
        raises(KeyError, self._storage.loadSerial, oid, revid2)
        raises(KeyError, self._storage.loadSerial, oid, revid3)

    def checkPackJustOldRevisions(self):
        eq = self.assertEqual
        raises = self.assertRaises
        loads = self._makeloader()
        # Create a root object.  This can't be an instance of Object,
        # otherwise the pickling machinery will serialize it as a persistent
        # id and not as an object that contains references (persistent ids) to
        # other objects.
        root = Root()
        # Create a persistent object, with some initial state
        obj = self._newobj()
        oid = obj.getoid()
        # Link the root object to the persistent object, in order to keep the
        # persistent object alive.  Store the root object.
        root.obj = obj
        root.value = 0
        revid0 = self._dostoreNP(ZERO, data=dumps(root))
        # Make sure the root can be retrieved
        data, revid = self._storage.load(ZERO, '')
        eq(revid, revid0)
        eq(loads(data).value, 0)
        # Commit three different revisions of the other object
        obj.value = 1
        revid1 = self._dostoreNP(oid, data=pickle.dumps(obj))
        obj.value = 2
        revid2 = self._dostoreNP(oid, revid=revid1, data=pickle.dumps(obj))
        obj.value = 3
        revid3 = self._dostoreNP(oid, revid=revid2, data=pickle.dumps(obj))
        # Now make sure all three revisions can be extracted
        data = self._storage.loadSerial(oid, revid1)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid)
        eq(pobj.value, 1)
        data = self._storage.loadSerial(oid, revid2)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid)
        eq(pobj.value, 2)
        data = self._storage.loadSerial(oid, revid3)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid)
        eq(pobj.value, 3)
        # Now pack just revisions 1 and 2.  The object's current revision
        # should stay alive because it's pointed to by the root.
        now = packtime = time.time()
        while packtime <= now:
            packtime = time.time()
        self._storage.pack(packtime, referencesf)
        # Make sure the revisions are gone, but that object zero and revision
        # 3 are still there and correct
        data, revid = self._storage.load(ZERO, '')
        eq(revid, revid0)
        eq(loads(data).value, 0)
        raises(KeyError, self._storage.loadSerial, oid, revid1)
        raises(KeyError, self._storage.loadSerial, oid, revid2)
        data = self._storage.loadSerial(oid, revid3)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid)
        eq(pobj.value, 3)
        data, revid = self._storage.load(oid, '')
        eq(revid, revid3)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid)
        eq(pobj.value, 3)

    def checkPackOnlyOneObject(self):
        eq = self.assertEqual
        raises = self.assertRaises
        loads = self._makeloader()
        # Create a root object.  This can't be an instance of Object,
        # otherwise the pickling machinery will serialize it as a persistent
        # id and not as an object that contains references (persistent ids) to
        # other objects.
        root = Root()
        # Create a persistent object, with some initial state
        obj1 = self._newobj()
        oid1 = obj1.getoid()
        # Create another persistent object, with some initial state.  Make
        # sure it's oid is greater than the first object's oid.
        obj2 = self._newobj()
        oid2 = obj2.getoid()
        self.failUnless(oid2 > oid1)
        # Link the root object to the persistent objects, in order to keep
        # them alive.  Store the root object.
        root.obj1 = obj1
        root.obj2 = obj2
        root.value = 0
        revid0 = self._dostoreNP(ZERO, data=dumps(root))
        # Make sure the root can be retrieved
        data, revid = self._storage.load(ZERO, '')
        eq(revid, revid0)
        eq(loads(data).value, 0)
        # Commit three different revisions of the first object
        obj1.value = 1
        revid1 = self._dostoreNP(oid1, data=pickle.dumps(obj1))
        obj1.value = 2
        revid2 = self._dostoreNP(oid1, revid=revid1, data=pickle.dumps(obj1))
        obj1.value = 3
        revid3 = self._dostoreNP(oid1, revid=revid2, data=pickle.dumps(obj1))
        # Now make sure all three revisions can be extracted
        data = self._storage.loadSerial(oid1, revid1)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid1)
        eq(pobj.value, 1)
        data = self._storage.loadSerial(oid1, revid2)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid1)
        eq(pobj.value, 2)
        data = self._storage.loadSerial(oid1, revid3)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid1)
        eq(pobj.value, 3)
        # Now commit a revision of the second object
        obj2.value = 11
        revid4 = self._dostoreNP(oid2, data=pickle.dumps(obj2))
        # And make sure the revision can be extracted
        data = self._storage.loadSerial(oid2, revid4)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid2)
        eq(pobj.value, 11)
        # Now pack just revisions 1 and 2 of object1.  Object1's current
        # revision should stay alive because it's pointed to by the root, as
        # should Object2's current revision.
        now = packtime = time.time()
        while packtime <= now:
            packtime = time.time()
        self._storage.pack(packtime, referencesf)
        # Make sure the revisions are gone, but that object zero, object2, and
        # revision 3 of object1 are still there and correct.
        data, revid = self._storage.load(ZERO, '')
        eq(revid, revid0)
        eq(loads(data).value, 0)
        raises(KeyError, self._storage.loadSerial, oid1, revid1)
        raises(KeyError, self._storage.loadSerial, oid1, revid2)
        data = self._storage.loadSerial(oid1, revid3)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid1)
        eq(pobj.value, 3)
        data, revid = self._storage.load(oid1, '')
        eq(revid, revid3)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid1)
        eq(pobj.value, 3)
        data, revid = self._storage.load(oid2, '')
        eq(revid, revid4)
        eq(loads(data).value, 11)
        data = self._storage.loadSerial(oid2, revid4)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid2)
        eq(pobj.value, 11)

    def checkPackUnlinkedFromRoot(self):
        eq = self.assertEqual
        db = DB(self._storage)
        conn = db.open()
        root = conn.root()

        txn = get_transaction()
        txn.note('root')
        txn.commit()

        now = packtime = time.time()
        while packtime <= now:
            packtime = time.time()

        obj = C()
        obj.value = 7

        root['obj'] = obj
        txn = get_transaction()
        txn.note('root -> o1')
        txn.commit()

        del root['obj']
        txn = get_transaction()
        txn.note('root -x-> o1')
        txn.commit()

        self._storage.pack(packtime, referencesf)

        log = self._storage.undoLog()
        tid = log[0]['id']
        db.undo(tid)
        txn = get_transaction()
        txn.note('undo root -x-> o1')
        txn.commit()

        conn.sync()

        eq(root['obj'].value, 7)

    def _PackWhileWriting(self, pack_now=0):
        # A storage should allow some reading and writing during
        # a pack.  This test attempts to exercise locking code
        # in the storage to test that it is safe.  It generates
        # a lot of revisions, so that pack takes a long time.

        db = DB(self._storage)
        conn = db.open()
        root = conn.root()

        for i in range(10):
            root[i] = MinPO(i)
        get_transaction().commit()

        snooze()
        packt = time.time()

        for j in range(10):
            for i in range(10):
                root[i].value = MinPO(i)
                get_transaction().commit()

        threads = [ClientThread(db) for i in range(4)]
        for t in threads:
            t.start()

        if pack_now:
            db.pack(time.time())
        else:
            db.pack(packt)

        for t in threads:
            t.join(30)
        for t in threads:
            t.join(1)
            self.assert_(not t.isAlive())

        # Iterate over the storage to make sure it's sane, but not every
        # storage supports iterators.
        if not hasattr(self._storage, "iterator"):
            return

        iter = self._storage.iterator()
        for txn in iter:
            for data in txn:
                pass
        iter.close()

    def checkPackWhileWriting(self):
        self._PackWhileWriting(pack_now=0)

    def checkPackNowWhileWriting(self):
        self._PackWhileWriting(pack_now=1)

    def checkRedundantPack(self):
        # It is an error to perform a pack with a packtime earlier
        # than a previous packtime.  The storage can't do a full
        # traversal as of the packtime, because the previous pack may
        # have removed revisions necessary for a full traversal.

        # It should be simple to test that a storage error is raised,
        # but this test case goes to the trouble of constructing a
        # scenario that would lose data if the earlier packtime was
        # honored.

        self._initroot()

        db = DB(self._storage)
        conn = db.open()
        root = conn.root()

        root["d"] = d = PersistentMapping()
        get_transaction().commit()
        snooze()

        obj = d["obj"] = C()
        obj.value = 1
        get_transaction().commit()
        snooze()
        packt1 = time.time()
        lost_oid = obj._p_oid

        obj = d["anotherobj"] = C()
        obj.value = 2
        get_transaction().commit()
        snooze()
        packt2 = time.time()

        db.pack(packt2)
        # BDBStorage allows the second pack, but doesn't lose data.
        try:
            db.pack(packt1)
        except StorageError:
            pass
        # This object would be removed by the second pack, even though
        # it is reachable.
        self._storage.load(lost_oid, "")

    def checkPackUndoLog(self):
        self._initroot()
        eq = self.assertEqual
        raises = self.assertRaises
        # Create a `persistent' object
        obj = self._newobj()
        oid = obj.getoid()
        obj.value = 1
        # Commit two different revisions
        revid1 = self._dostoreNP(oid, data=pickle.dumps(obj))
        obj.value = 2
        snooze()
        packtime = time.time()
        snooze()
        revid2 = self._dostoreNP(oid, revid=revid1, data=pickle.dumps(obj))
        # Now pack the first transaction
        self.assertEqual(3,len(self._storage.undoLog()))
        self._storage.pack(packtime, referencesf)
        # The undo log contains only the most resent transaction
        self.assertEqual(1,len(self._storage.undoLog()))

    def dont_checkPackUndoLogUndoable(self):
        # A disabled test. I wanted to test that the content of the
        # undo log was consistent, but every storage appears to
        # include something slightly different. If the result of this
        # method is only used to fill a GUI then this difference
        # doesnt matter.  Perhaps re-enable this test once we agree
        # what should be asserted.

        self._initroot()
        # Create two `persistent' object
        obj1 = self._newobj()
        oid1 = obj1.getoid()
        obj1.value = 1
        obj2 = self._newobj()
        oid2 = obj2.getoid()
        obj2.value = 2

        # Commit the first revision of each of them
        revid11 = self._dostoreNP(oid1, data=pickle.dumps(obj1),
                                  description="1-1")
        revid22 = self._dostoreNP(oid2, data=pickle.dumps(obj2),
                                  description="2-2")

        # remember the time. everything above here will be packed away
        snooze()
        packtime = time.time()
        snooze()
        # Commit two revisions of the first object
        obj1.value = 3
        revid13 = self._dostoreNP(oid1, revid=revid11,
                                  data=pickle.dumps(obj1), description="1-3")
        obj1.value = 4
        revid14 = self._dostoreNP(oid1, revid=revid13,
                                  data=pickle.dumps(obj1), description="1-4")
        # Commit one revision of the second object
        obj2.value = 5
        revid25 = self._dostoreNP(oid2, revid=revid22,
                                  data=pickle.dumps(obj2), description="2-5")
        # Now pack
        self.assertEqual(6,len(self._storage.undoLog()))
        print '\ninitial undoLog was'
        for r in self._storage.undoLog(): print r
        self._storage.pack(packtime, referencesf)
        # The undo log contains only two undoable transaction.
        print '\nafter packing undoLog was'
        for r in self._storage.undoLog(): print r
        # what can we assert about that?

class ClientThread(threading.Thread):

    def __init__(self, db):
        threading.Thread.__init__(self)
        self.root = db.open().root()

    def run(self):
        for j in range(50):
            try:
                self.root[j % 10].value = MinPO(j)
                get_transaction().commit()
            except ConflictError:
                get_transaction().abort()


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/PersistentStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test that a storage's values persist across open and close."""

class PersistentStorage:

    def checkUpdatesPersist(self):
        oids = []

        def new_oid_wrapper(l=oids, new_oid=self._storage.new_oid):
            oid = new_oid()
            l.append(oid)
            return oid

        self._storage.new_oid = new_oid_wrapper

        self._dostore()
        oid = self._storage.new_oid()
        revid = self._dostore(oid)
        if self._storage.supportsVersions():
            self._dostore(oid, revid, data=8, version='b')
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=1)
        revid = self._dostore(oid, revid, data=2)
        self._dostore(oid, revid, data=3)

        # keep copies of all the objects
        objects = []
        for oid in oids:
            p, s = self._storage.load(oid, '')
            objects.append((oid, '', p, s))
            ver = self._storage.modifiedInVersion(oid)
            if ver:
                p, s = self._storage.load(oid, ver)
                objects.append((oid, ver, p, s))

        self._storage.close()
        self.open()

        # keep copies of all the objects
        for oid, ver, p, s in objects:
            _p, _s = self._storage.load(oid, ver)
            self.assertEquals(p, _p)
            self.assertEquals(s, _s)


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/ReadOnlyStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from ZODB.POSException import ReadOnlyError
from ZODB.Transaction import Transaction

class ReadOnlyStorage:

    def _create_data(self):
        # test a read-only storage that already has some data
        self.oids = {}
        for i in range(10):
            oid = self._storage.new_oid()
            revid = self._dostore(oid)
            self.oids[oid] = revid

    def _make_readonly(self):
        self._storage.close()
        self.open(read_only=1)
        self.assert_(self._storage.isReadOnly())

    def checkReadMethods(self):
        self._create_data()
        self._make_readonly()
        # XXX not going to bother checking all read methods
        for oid in self.oids.keys():
            data, revid = self._storage.load(oid, '')
            self.assertEqual(revid, self.oids[oid])
            self.assert_(not self._storage.modifiedInVersion(oid))
            _data = self._storage.loadSerial(oid, revid)
            self.assertEqual(data, _data)

    def checkWriteMethods(self):
        self._make_readonly()
        self.assertRaises(ReadOnlyError, self._storage.new_oid)
        t = Transaction()
        self.assertRaises(ReadOnlyError, self._storage.tpc_begin, t)

        if self._storage.supportsVersions():
            self.assertRaises(ReadOnlyError, self._storage.abortVersion,
                              '', t)
            self.assertRaises(ReadOnlyError, self._storage.commitVersion,
                              '', '', t)

        self.assertRaises(ReadOnlyError, self._storage.store,
                          '\000' * 8, None, '', '', t)

        if self._storage.supportsTransactionalUndo():
            self.assertRaises(ReadOnlyError, self._storage.transactionalUndo,
                              '\000' * 8, t)


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/RecoveryStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""More recovery and iterator tests."""

from ZODB.Transaction import Transaction
from ZODB.tests.IteratorStorage import IteratorDeepCompare
from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle, snooze
from ZODB import DB
from ZODB.referencesf import referencesf

import time

class RecoveryStorage(IteratorDeepCompare):
    # Requires a setUp() that creates a self._dst destination storage
    def checkSimpleRecovery(self):
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=11)
        revid = self._dostore(oid, revid=revid, data=12)
        revid = self._dostore(oid, revid=revid, data=13)
        self._dst.copyTransactionsFrom(self._storage)
        self.compare(self._storage, self._dst)

    def checkRecoveryAcrossVersions(self):
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=21)
        revid = self._dostore(oid, revid=revid, data=22)
        revid = self._dostore(oid, revid=revid, data=23, version='one')
        revid = self._dostore(oid, revid=revid, data=34, version='one')
        # Now commit the version
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.commitVersion('one', '', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        self._dst.copyTransactionsFrom(self._storage)
        self.compare(self._storage, self._dst)

    def checkRecoverAbortVersion(self):
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=21, version="one")
        revid = self._dostore(oid, revid=revid, data=23, version='one')
        revid = self._dostore(oid, revid=revid, data=34, version='one')
        # Now abort the version and the creation
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.abortVersion('one', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        self.assertEqual(oids, [oid])
        self._dst.copyTransactionsFrom(self._storage)
        self.compare(self._storage, self._dst)
        # Also make sure the the last transaction has a data record
        # with None for its data attribute, because we've undone the
        # object.
        for s in self._storage, self._dst:
            iter = s.iterator()
            for trans in iter:
                pass # iterate until we get the last one
            data = trans[0]
            self.assertRaises(IndexError, lambda i, t=trans: t[i], 1)
            self.assertEqual(data.oid, oid)
            self.assertEqual(data.data, None)

    def checkRecoverUndoInVersion(self):
        oid = self._storage.new_oid()
        version = "aVersion"
        revid_a = self._dostore(oid, data=MinPO(91))
        revid_b = self._dostore(oid, revid=revid_a, version=version,
                                data=MinPO(92))
        revid_c = self._dostore(oid, revid=revid_b, version=version,
                                data=MinPO(93))
        self._undo(self._storage.undoInfo()[0]['id'], oid)
        self._commitVersion(version, '')
        self._undo(self._storage.undoInfo()[0]['id'], oid)

        # now copy the records to a new storage
        self._dst.copyTransactionsFrom(self._storage)
        self.compare(self._storage, self._dst)

        # The last two transactions were applied directly rather than
        # copied.  So we can't use compare() to verify that they new
        # transactions are applied correctly.  (The new transactions
        # will have different timestamps for each storage.)

        self._abortVersion(version)
        self.assert_(self._storage.versionEmpty(version))
        self._undo(self._storage.undoInfo()[0]['id'], oid)
        self.assert_(not self._storage.versionEmpty(version))

        # check the data is what we expect it to be
        data, revid = self._storage.load(oid, version)
        self.assertEqual(zodb_unpickle(data), MinPO(92))
        data, revid = self._storage.load(oid, '')
        self.assertEqual(zodb_unpickle(data), MinPO(91))

        # and swap the storages
        tmp = self._storage
        self._storage = self._dst
        self._abortVersion(version)
        self.assert_(self._storage.versionEmpty(version))
        self._undo(self._storage.undoInfo()[0]['id'], oid)
        self.assert_(not self._storage.versionEmpty(version))

        # check the data is what we expect it to be
        data, revid = self._storage.load(oid, version)
        self.assertEqual(zodb_unpickle(data), MinPO(92))
        data, revid = self._storage.load(oid, '')
        self.assertEqual(zodb_unpickle(data), MinPO(91))

        # swap them back
        self._storage = tmp

        # Now remove _dst and copy all the transactions a second time.
        # This time we will be able to confirm via compare().
        self._dst.close()
        self._dst.cleanup()
        self._dst = self.new_dest()
        self._dst.copyTransactionsFrom(self._storage)
        self.compare(self._storage, self._dst)

    def checkRestoreAcrossPack(self):
        db = DB(self._storage)
        c = db.open()
        r = c.root()
        obj = r["obj1"] = MinPO(1)
        get_transaction().commit()
        obj = r["obj2"] = MinPO(1)
        get_transaction().commit()

        self._dst.copyTransactionsFrom(self._storage)
        self._dst.pack(time.time(), referencesf)

        self._undo(self._storage.undoInfo()[0]['id'])

        # copy the final transaction manually.  even though there
        # was a pack, the restore() ought to succeed.
        it = self._storage.iterator()
        final = list(it)[-1]
        self._dst.tpc_begin(final, final.tid, final.status)
        for r in final:
            self._dst.restore(r.oid, r.serial, r.data, r.version, r.data_txn,
                              final)
        it.close()
        self._dst.tpc_vote(final)
        self._dst.tpc_finish(final)

    def checkPackWithGCOnDestinationAfterRestore(self):
        raises = self.assertRaises
        db = DB(self._storage)
        conn = db.open()
        root = conn.root()
        root.obj = obj1 = MinPO(1)
        txn = get_transaction()
        txn.note('root -> obj')
        txn.commit()
        root.obj.obj = obj2 = MinPO(2)
        txn = get_transaction()
        txn.note('root -> obj -> obj')
        txn.commit()
        del root.obj
        txn = get_transaction()
        txn.note('root -X->')
        txn.commit()
        # Now copy the transactions to the destination
        self._dst.copyTransactionsFrom(self._storage)
        # Now pack the destination.
        snooze()
        self._dst.pack(time.time(),  referencesf)
        # And check to see that the root object exists, but not the other
        # objects.
        data, serial = self._dst.load(root._p_oid, '')
        raises(KeyError, self._dst.load, obj1._p_oid, '')
        raises(KeyError, self._dst.load, obj2._p_oid, '')


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/RevisionStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Check loadSerial() on storages that support historical revisions."""

from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle

ZERO = '\0'*8

class RevisionStorage:

    def checkLoadSerial(self):
        oid = self._storage.new_oid()
        revid = ZERO
        revisions = {}
        for i in range(31, 38):
            revid = self._dostore(oid, revid=revid, data=MinPO(i))
            revisions[revid] = MinPO(i)
        # Now make sure all the revisions have the correct value
        for revid, value in revisions.items():
            data = self._storage.loadSerial(oid, revid)
            self.assertEqual(zodb_unpickle(data), value)


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/StorageTestBase.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Provide a mixin base class for storage tests.

The StorageTestBase class provides basic setUp() and tearDown()
semantics (which you can override), and it also provides a helper
method _dostore() which performs a complete store transaction for a
single object revision.
"""

import errno
import os
import pickle
import string
import sys
import time
import types
import unittest
from cPickle import Pickler, Unpickler
from cStringIO import StringIO

from ZODB.Transaction import Transaction
from ZODB.utils import u64

from ZODB.tests.MinPO import MinPO

ZERO = '\0'*8

def snooze():
    # In Windows, it's possible that two successive time.time() calls return
    # the same value.  Tim guarantees that time never runs backwards.  You
    # usually want to call this before you pack a storage, or must make other
    # guarantees about increasing timestamps.
    now = time.time()
    while now == time.time():
        time.sleep(0.1)

def zodb_pickle(obj):
    """Create a pickle in the format expected by ZODB."""
    f = StringIO()
    p = Pickler(f, 1)
    p.persistent_id = lambda obj: getattr(obj, '_p_oid', None)
    klass = obj.__class__
    assert not hasattr(obj, '__getinitargs__'), "not ready for constructors"
    args = None

    mod = getattr(klass, '__module__', None)
    if mod is not None:
        klass = mod, klass.__name__

    state = obj.__getstate__()

    p.dump((klass, args))
    p.dump(state)
    return f.getvalue(1)

def persistent_load(pid):
    # helper for zodb_unpickle
    return "ref to %s.%s oid=%s" % (pid[1][0], pid[1][1], u64(pid[0]))

def zodb_unpickle(data):
    """Unpickle an object stored using the format expected by ZODB."""
    f = StringIO(data)
    u = Unpickler(f)
    u.persistent_load = persistent_load
    klass_info = u.load()
    if isinstance(klass_info, types.TupleType):
        if isinstance(klass_info[0], types.TupleType):
            modname, klassname = klass_info[0]
            args = klass_info[1]
        else:
            modname, klassname = klass_info
            args = None
        if modname == "__main__":
            ns = globals()
        else:
            mod = import_helper(modname)
            ns = mod.__dict__
        try:
            klass = ns[klassname]
        except KeyError:
            sys.stderr.write("can't find %s in %s" % (klassname,
                                                      repr(ns)))
        inst = klass()
    else:
        raise ValueError, "expected class info: %s" % repr(klass_info)
    state = u.load()
    inst.__setstate__(state)
    return inst

def handle_all_serials(oid, *args):
    """Return dict of oid to serialno from store() and tpc_vote().

    Raises an exception if one of the calls raised an exception.

    The storage interface got complicated when ZEO was introduced.
    Any individual store() call can return None or a sequence of
    2-tuples where the 2-tuple is either oid, serialno or an
    exception to be raised by the client.

    The original interface just returned the serialno for the
    object.
    """
    d = {}
    for arg in args:
        if isinstance(arg, types.StringType):
            d[oid] = arg
        elif arg is None:
            pass
        else:
            for oid, serial in arg:
                if not isinstance(serial, types.StringType):
                    raise serial # error from ZEO server
                d[oid] = serial
    return d

def handle_serials(oid, *args):
    """Return the serialno for oid based on multiple return values.

    A helper for function _handle_all_serials().
    """
    return handle_all_serials(oid, *args)[oid]

def import_helper(name):
    mod = __import__(name)
    return sys.modules[name]

def removefs(base):
    """Remove all files created by FileStorage with path base."""
    for ext in '', '.old', '.tmp', '.lock', '.index', '.pack':
        path = base + ext
        try:
            os.remove(path)
        except os.error, err:
            if err[0] != errno.ENOENT:
                raise


class StorageTestBase(unittest.TestCase):

    # XXX It would be simpler if concrete tests didn't need to extend
    # setUp() and tearDown().

    def setUp(self):
        # You need to override this with a setUp that creates self._storage
        self._storage = None

    def _close(self):
        # You should override this if closing your storage requires additional
        # shutdown operations.
        if self._storage is not None:
            self._storage.close()

    def tearDown(self):
        self._close()

    def _dostore(self, oid=None, revid=None, data=None, version=None,
                 already_pickled=0, user=None, description=None):
        """Do a complete storage transaction.  The defaults are:

         - oid=None, ask the storage for a new oid
         - revid=None, use a revid of ZERO
         - data=None, pickle up some arbitrary data (the integer 7)
         - version=None, use the empty string version

        Returns the object's new revision id.
        """
        if oid is None:
            oid = self._storage.new_oid()
        if revid is None:
            revid = ZERO
        if data is None:
            data = MinPO(7)
        if type(data) == types.IntType:
            data = MinPO(data)
        if not already_pickled:
            data = zodb_pickle(data)
        if version is None:
            version = ''
        # Begin the transaction
        t = Transaction()
        if user is not None:
            t.user = user
        if description is not None:
            t.description = description
        try:
            self._storage.tpc_begin(t)
            # Store an object
            r1 = self._storage.store(oid, revid, data, version, t)
            # Finish the transaction
            r2 = self._storage.tpc_vote(t)
            revid = handle_serials(oid, r1, r2)
            self._storage.tpc_finish(t)
        except:
            self._storage.tpc_abort(t)
            raise
        return revid

    def _dostoreNP(self, oid=None, revid=None, data=None, version=None,
                   user=None, description=None):
        return self._dostore(oid, revid, data, version, 1, user, description)

    # The following methods depend on optional storage features.

    def _undo(self, tid, oid=None):
        # Undo a tid that affects a single object (oid).
        # XXX This is very specialized
        t = Transaction()
        t.note("undo")
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        if oid is not None:
            self.assertEqual(len(oids), 1)
            self.assertEqual(oids[0], oid)
        return self._storage.lastTransaction()

    def _commitVersion(self, src, dst):
        t = Transaction()
        t.note("commit %r to %r" % (src, dst))
        self._storage.tpc_begin(t)
        oids = self._storage.commitVersion(src, dst, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        return oids

    def _abortVersion(self, ver):
        t = Transaction()
        t.note("abort %r" % ver)
        self._storage.tpc_begin(t)
        oids = self._storage.abortVersion(ver, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        return oids


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/Synchronization.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test the storage's implemenetation of the storage synchronization spec.

The Synchronization spec
    http://www.zope.org/Documentation/Developer/Models/ZODB/
    ZODB_Architecture_Storage_Interface_State_Synchronization_Diag.html

It specifies two states committing and non-committing.  A storage
starts in the non-committing state.  tpc_begin() transfers to the
committting state; tpc_abort() and tpc_finish() transfer back to
non-committing.

Several other methods are only allowed in one state or another.  Many
methods allowed only in the committing state require that they apply
to the currently committing transaction.

The spec is silent on a variety of methods that don't appear to modify
the state, e.g. load(), undoLog(), pack().  It's unclear whether there
is a separate set of synchronization rules that apply to these methods
or if the synchronization is implementation dependent, i.e. only what
is need to guarantee a corrected implementation.

The synchronization spec is also silent on whether there is any
contract implied with the caller.  If the storage can assume that a
single client is single-threaded and that it will not call, e.g., store()
until after it calls tpc_begin(), the implementation can be
substantially simplified.

New and/or unspecified methods:

tpc_vote(): handled like tpc_abort
transactionalUndo(): handled like undo()  (which is how?)

Methods that have nothing to do with committing/non-committing:
load(), loadSerial(), getName(), getSize(), __len__(), history(),
undoLog(), modifiedInVersion(), versionEmpty(), versions(), pack().

Specific questions:

The spec & docs say that undo() takes three arguments, the second
being a transaction.  If the specified arg isn't the current
transaction, the undo() should raise StorageTransactionError.  This
isn't implemented anywhere.  It looks like undo can be called at
anytime.

FileStorage does not allow undo() during a pack.  How should this be
tested?  Is it a general restriction?



"""

from ZODB.Transaction import Transaction
from ZODB.POSException import StorageTransactionError

VERSION = "testversion"
OID = "\000" * 8
SERIALNO = "\000" * 8
TID = "\000" * 8

class SynchronizedStorage:

##    def verifyCommitting(self, callable, *args):
##        self.assertRaises(StorageTransactionError, callable *args)

    def verifyNotCommitting(self, callable, *args):
        self.assertRaises(StorageTransactionError, callable, *args)

    def verifyWrongTrans(self, callable, *args):
        t = Transaction()
        self._storage.tpc_begin(t)
        self.assertRaises(StorageTransactionError, callable, *args)
        self._storage.tpc_abort(t)

    def checkAbortVersionNotCommitting(self):
        self.verifyNotCommitting(self._storage.abortVersion,
                                 VERSION, Transaction())

    def checkAbortVersionWrongTrans(self):
        self.verifyWrongTrans(self._storage.abortVersion,
                              VERSION, Transaction())

    def checkCommitVersionNotCommitting(self):
        self.verifyNotCommitting(self._storage.commitVersion,
                                 VERSION, "", Transaction())

    def checkCommitVersionWrongTrans(self):
        self.verifyWrongTrans(self._storage.commitVersion,
                              VERSION, "", Transaction())


    def checkStoreNotCommitting(self):
        self.verifyNotCommitting(self._storage.store,
                                 OID, SERIALNO, "", "", Transaction())

    def checkStoreWrongTrans(self):
        self.verifyWrongTrans(self._storage.store,
                              OID, SERIALNO, "", "", Transaction())

##    def checkNewOidNotCommitting(self):
##        self.verifyNotCommitting(self._storage.new_oid)

##    def checkNewOidWrongTrans(self):
##        self.verifyWrongTrans(self._storage.new_oid)


    def checkAbortNotCommitting(self):
        self._storage.tpc_abort(Transaction())

    def checkAbortWrongTrans(self):
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.tpc_abort(Transaction())
        self._storage.tpc_abort(t)

    def checkFinishNotCommitting(self):
        t = Transaction()
        self._storage.tpc_finish(t)
        self._storage.tpc_abort(t)

    def checkFinishWrongTrans(self):
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.tpc_finish(Transaction())
        self._storage.tpc_abort(t)

    def checkBeginCommitting(self):
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.tpc_begin(t)
        self._storage.tpc_abort(t)

    # XXX how to check undo?


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/TransactionalUndoStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Check transactionalUndo().

Any storage that supports transactionalUndo() must pass these tests.
"""
from __future__ import nested_scopes

import time
import types
from ZODB import POSException
from ZODB.Transaction import Transaction
from ZODB.referencesf import referencesf
from ZODB.utils import u64, p64
from ZODB import DB

from Persistence import Persistent
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle

ZERO = '\0'*8

class C(Persistent):
    pass

def snooze():
    # In Windows, it's possible that two successive time.time() calls return
    # the same value.  Tim guarantees that time never runs backwards.  You
    # usually want to call this before you pack a storage, or must make other
    # guarantees about increasing timestamps.
    now = time.time()
    while now == time.time():
        time.sleep(0.1)

def listeq(L1, L2):
    """Return True if L1.sort() == L2.sort()"""
    c1 = L1[:]
    c2 = L2[:]
    c1.sort()
    c2.sort()
    return c1 == c2

class TransactionalUndoStorage:

    def _transaction_begin(self):
        self.__serials = {}

    def _transaction_store(self, oid, rev, data, vers, trans):
        r = self._storage.store(oid, rev, data, vers, trans)
        if r:
            if type(r) == types.StringType:
                self.__serials[oid] = r
            else:
                for oid, serial in r:
                    self.__serials[oid] = serial

    def _transaction_vote(self, trans):
        r = self._storage.tpc_vote(trans)
        if r:
            for oid, serial in r:
                self.__serials[oid] = serial

    def _transaction_newserial(self, oid):
        return self.__serials[oid]

    def _multi_obj_transaction(self, objs):
        newrevs = {}
        t = Transaction()
        self._storage.tpc_begin(t)
        self._transaction_begin()
        for oid, rev, data in objs:
            self._transaction_store(oid, rev, data, '', t)
            newrevs[oid] = None
        self._transaction_vote(t)
        self._storage.tpc_finish(t)
        for oid in newrevs.keys():
            newrevs[oid] = self._transaction_newserial(oid)
        return newrevs

    def _iterate(self):
        """Iterate over the storage in its final state."""
        # This is testing that the iterator() code works correctly.
        # The hasattr() guards against ZEO, which doesn't support iterator.
        if not hasattr(self._storage, "iterator"):
            return
        iter = self._storage.iterator()
        for txn in iter:
            for rec in txn:
                pass

    def checkSimpleTransactionalUndo(self):
        eq = self.assertEqual
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(23))
        revid = self._dostore(oid, revid=revid, data=MinPO(24))
        revid = self._dostore(oid, revid=revid, data=MinPO(25))

        info = self._storage.undoInfo()
        tid = info[0]['id']
        # Now start an undo transaction
        t = Transaction()
        t.note('undo1')
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 1)
        eq(oids[0], oid)
        data, revid = self._storage.load(oid, '')
        eq(zodb_unpickle(data), MinPO(24))
        # Do another one
        info = self._storage.undoInfo()
        tid = info[2]['id']
        t = Transaction()
        t.note('undo2')
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 1)
        eq(oids[0], oid)
        data, revid = self._storage.load(oid, '')
        eq(zodb_unpickle(data), MinPO(23))
        # Try to undo the first record
        info = self._storage.undoInfo()
        tid = info[4]['id']
        t = Transaction()
        t.note('undo3')
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 1)
        eq(oids[0], oid)
        # This should fail since we've undone the object's creation
        self.assertRaises(KeyError,
                          self._storage.load, oid, '')
        # And now let's try to redo the object's creation
        info = self._storage.undoInfo()
        tid = info[0]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 1)
        eq(oids[0], oid)
        data, revid = self._storage.load(oid, '')
        eq(zodb_unpickle(data), MinPO(23))
        self._iterate()

    def checkCreationUndoneGetSerial(self):
        # create an object
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(23))
        # undo its creation
        info = self._storage.undoInfo()
        tid = info[0]['id']
        t = Transaction()
        t.note('undo1')
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        # Check that calling getSerial on an uncreated object raises a KeyError
        # The current version of FileStorage fails this test
        self.assertRaises(KeyError, self._storage.getSerial, oid)

    def checkUndoCreationBranch1(self):
        eq = self.assertEqual
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(11))
        revid = self._dostore(oid, revid=revid, data=MinPO(12))
        # Undo the last transaction
        info = self._storage.undoInfo()
        tid = info[0]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 1)
        eq(oids[0], oid)
        data, revid = self._storage.load(oid, '')
        eq(zodb_unpickle(data), MinPO(11))
        # Now from here, we can either redo the last undo, or undo the object
        # creation.  Let's undo the object creation.
        info = self._storage.undoInfo()
        tid = info[2]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 1)
        eq(oids[0], oid)
        self.assertRaises(KeyError, self._storage.load, oid, '')
        self._iterate()

    def checkUndoCreationBranch2(self):
        eq = self.assertEqual
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(11))
        revid = self._dostore(oid, revid=revid, data=MinPO(12))
        # Undo the last transaction
        info = self._storage.undoInfo()
        tid = info[0]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 1)
        eq(oids[0], oid)
        data, revid = self._storage.load(oid, '')
        eq(zodb_unpickle(data), MinPO(11))
        # Now from here, we can either redo the last undo, or undo the object
        # creation.  Let's redo the last undo
        info = self._storage.undoInfo()
        tid = info[0]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 1)
        eq(oids[0], oid)
        data, revid = self._storage.load(oid, '')
        eq(zodb_unpickle(data), MinPO(12))
        self._iterate()

    def checkTwoObjectUndo(self):
        eq = self.assertEqual
        # Convenience
        p31, p32, p51, p52 = map(zodb_pickle,
                                 map(MinPO, (31, 32, 51, 52)))
        oid1 = self._storage.new_oid()
        oid2 = self._storage.new_oid()
        revid1 = revid2 = ZERO
        # Store two objects in the same transaction
        t = Transaction()
        self._storage.tpc_begin(t)
        self._transaction_begin()
        self._transaction_store(oid1, revid1, p31, '', t)
        self._transaction_store(oid2, revid2, p51, '', t)
        # Finish the transaction
        self._transaction_vote(t)
        revid1 = self._transaction_newserial(oid1)
        revid2 = self._transaction_newserial(oid2)
        self._storage.tpc_finish(t)
        eq(revid1, revid2)
        # Update those same two objects
        t = Transaction()
        self._storage.tpc_begin(t)
        self._transaction_begin()
        self._transaction_store(oid1, revid1, p32, '', t)
        self._transaction_store(oid2, revid2, p52, '', t)
        # Finish the transaction
        self._transaction_vote(t)
        revid1 = self._transaction_newserial(oid1)
        revid2 = self._transaction_newserial(oid2)
        self._storage.tpc_finish(t)
        eq(revid1, revid2)
        # Make sure the objects have the current value
        data, revid1 = self._storage.load(oid1, '')
        eq(zodb_unpickle(data), MinPO(32))
        data, revid2 = self._storage.load(oid2, '')
        eq(zodb_unpickle(data), MinPO(52))
        # Now attempt to undo the transaction containing two objects
        info = self._storage.undoInfo()
        tid = info[0]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 2)
        self.failUnless(oid1 in oids)
        self.failUnless(oid2 in oids)
        data, revid1 = self._storage.load(oid1, '')
        eq(zodb_unpickle(data), MinPO(31))
        data, revid2 = self._storage.load(oid2, '')
        eq(zodb_unpickle(data), MinPO(51))
        self._iterate()

    def checkTwoObjectUndoAtOnce(self):
        # Convenience
        eq = self.assertEqual
        unless = self.failUnless
        p30, p31, p32, p50, p51, p52 = map(zodb_pickle,
                                           map(MinPO,
                                               (30, 31, 32, 50, 51, 52)))
        oid1 = self._storage.new_oid()
        oid2 = self._storage.new_oid()
        revid1 = revid2 = ZERO
        # Store two objects in the same transaction
        d = self._multi_obj_transaction([(oid1, revid1, p30),
                                         (oid2, revid2, p50),
                                         ])
        eq(d[oid1], d[oid2])
        # Update those same two objects
        d = self._multi_obj_transaction([(oid1, d[oid1], p31),
                                         (oid2, d[oid2], p51),
                                         ])
        eq(d[oid1], d[oid2])
        # Update those same two objects
        d = self._multi_obj_transaction([(oid1, d[oid1], p32),
                                         (oid2, d[oid2], p52),
                                         ])
        eq(d[oid1], d[oid2])
        revid1 = self._transaction_newserial(oid1)
        revid2 = self._transaction_newserial(oid2)
        eq(revid1, revid2)
        # Make sure the objects have the current value
        data, revid1 = self._storage.load(oid1, '')
        eq(zodb_unpickle(data), MinPO(32))
        data, revid2 = self._storage.load(oid2, '')
        eq(zodb_unpickle(data), MinPO(52))
        # Now attempt to undo the transaction containing two objects
        info = self._storage.undoInfo()
        tid = info[0]['id']
        tid1 = info[1]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        oids1 = self._storage.transactionalUndo(tid1, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        # We get the finalization stuff called an extra time:
##        self._storage.tpc_vote(t)
##        self._storage.tpc_finish(t)
        eq(len(oids), 2)
        eq(len(oids1), 2)
        unless(oid1 in oids)
        unless(oid2 in oids)
        data, revid1 = self._storage.load(oid1, '')
        eq(zodb_unpickle(data), MinPO(30))
        data, revid2 = self._storage.load(oid2, '')
        eq(zodb_unpickle(data), MinPO(50))
        # Now try to undo the one we just did to undo, whew
        info = self._storage.undoInfo()
        tid = info[0]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 2)
        unless(oid1 in oids)
        unless(oid2 in oids)
        data, revid1 = self._storage.load(oid1, '')
        eq(zodb_unpickle(data), MinPO(32))
        data, revid2 = self._storage.load(oid2, '')
        eq(zodb_unpickle(data), MinPO(52))
        self._iterate()

    def checkTwoObjectUndoAgain(self):
        eq = self.assertEqual
        p31, p32, p33, p51, p52, p53 = map(
            zodb_pickle,
            map(MinPO, (31, 32, 33, 51, 52, 53)))
        # Like the above, but the first revision of the objects are stored in
        # different transactions.
        oid1 = self._storage.new_oid()
        oid2 = self._storage.new_oid()
        revid1 = self._dostore(oid1, data=p31, already_pickled=1)
        revid2 = self._dostore(oid2, data=p51, already_pickled=1)
        # Update those same two objects
        t = Transaction()
        self._storage.tpc_begin(t)
        self._transaction_begin()
        self._transaction_store(oid1, revid1, p32, '', t)
        self._transaction_store(oid2, revid2, p52, '', t)
        # Finish the transaction
        self._transaction_vote(t)
        self._storage.tpc_finish(t)
        revid1 = self._transaction_newserial(oid1)
        revid2 = self._transaction_newserial(oid2)
        eq(revid1, revid2)
        # Now attempt to undo the transaction containing two objects
        info = self._storage.undoInfo()
        tid = info[0]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 2)
        self.failUnless(oid1 in oids)
        self.failUnless(oid2 in oids)
        data, revid1 = self._storage.load(oid1, '')
        eq(zodb_unpickle(data), MinPO(31))
        data, revid2 = self._storage.load(oid2, '')
        eq(zodb_unpickle(data), MinPO(51))
        # Like the above, but this time, the second transaction contains only
        # one object.
        t = Transaction()
        self._storage.tpc_begin(t)
        self._transaction_begin()
        self._transaction_store(oid1, revid1, p33, '', t)
        self._transaction_store(oid2, revid2, p53, '', t)
        # Finish the transaction
        self._transaction_vote(t)
        self._storage.tpc_finish(t)
        revid1 = self._transaction_newserial(oid1)
        revid2 = self._transaction_newserial(oid2)
        eq(revid1, revid2)
        # Update in different transactions
        revid1 = self._dostore(oid1, revid=revid1, data=MinPO(34))
        revid2 = self._dostore(oid2, revid=revid2, data=MinPO(54))
        # Now attempt to undo the transaction containing two objects
        info = self._storage.undoInfo()
        tid = info[1]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 1)
        self.failUnless(oid1 in oids)
        self.failUnless(not oid2 in oids)
        data, revid1 = self._storage.load(oid1, '')
        eq(zodb_unpickle(data), MinPO(33))
        data, revid2 = self._storage.load(oid2, '')
        eq(zodb_unpickle(data), MinPO(54))
        self._iterate()


    def checkNotUndoable(self):
        eq = self.assertEqual
        # Set things up so we've got a transaction that can't be undone
        oid = self._storage.new_oid()
        revid_a = self._dostore(oid, data=MinPO(51))
        revid_b = self._dostore(oid, revid=revid_a, data=MinPO(52))
        revid_c = self._dostore(oid, revid=revid_b, data=MinPO(53))
        # Start the undo
        info = self._storage.undoInfo()
        tid = info[1]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        self.assertRaises(POSException.UndoError,
                          self._storage.transactionalUndo,
                          tid, t)
        self._storage.tpc_abort(t)
        # Now have more fun: object1 and object2 are in the same transaction,
        # which we'll try to undo to, but one of them has since modified in
        # different transaction, so the undo should fail.
        oid1 = oid
        revid1 = revid_c
        oid2 = self._storage.new_oid()
        revid2 = ZERO
        p81, p82, p91, p92 = map(zodb_pickle,
                                 map(MinPO, (81, 82, 91, 92)))

        t = Transaction()
        self._storage.tpc_begin(t)
        self._transaction_begin()
        self._transaction_store(oid1, revid1, p81, '', t)
        self._transaction_store(oid2, revid2, p91, '', t)
        self._transaction_vote(t)
        self._storage.tpc_finish(t)
        revid1 = self._transaction_newserial(oid1)
        revid2 = self._transaction_newserial(oid2)
        eq(revid1, revid2)
        # Make sure the objects have the expected values
        data, revid_11 = self._storage.load(oid1, '')
        eq(zodb_unpickle(data), MinPO(81))
        data, revid_22 = self._storage.load(oid2, '')
        eq(zodb_unpickle(data), MinPO(91))
        eq(revid_11, revid1)
        eq(revid_22, revid2)
        # Now modify oid2
        revid2 = self._dostore(oid2, revid=revid2, data=MinPO(92))
        self.assertNotEqual(revid1, revid2)
        self.assertNotEqual(revid2, revid_22)
        info = self._storage.undoInfo()
        tid = info[1]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        self.assertRaises(POSException.UndoError,
                          self._storage.transactionalUndo,
                          tid, t)
        self._storage.tpc_abort(t)
        self._iterate()

    def checkTransactionalUndoAfterPack(self):
        eq = self.assertEqual
        # Add a few object revisions
        oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=MinPO(51))
        packtime = time.time()
        snooze()                # time.time() now distinct from packtime
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(52))
        revid3 = self._dostore(oid, revid=revid2, data=MinPO(53))
        # Now get the undo log
        info = self._storage.undoInfo()
        eq(len(info), 3)
        tid = info[0]['id']
        # Now pack just the initial revision of the object.  We need the
        # second revision otherwise we won't be able to undo the third
        # revision!
        self._storage.pack(packtime, referencesf)
        # Make some basic assertions about the undo information now
        info2 = self._storage.undoInfo()
        eq(len(info2), 2)
        # And now attempt to undo the last transaction
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 1)
        eq(oids[0], oid)
        data, revid = self._storage.load(oid, '')
        # The object must now be at the second state
        eq(zodb_unpickle(data), MinPO(52))
        self._iterate()

    def checkTransactionalUndoAfterPackWithObjectUnlinkFromRoot(self):
        eq = self.assertEqual
        db = DB(self._storage)
        conn = db.open()
        root = conn.root()

        o1 = C()
        o2 = C()
        root['obj'] = o1
        o1.obj = o2
        txn = get_transaction()
        txn.note('o1 -> o2')
        txn.commit()
        now = packtime = time.time()
        while packtime <= now:
            packtime = time.time()

        o3 = C()
        o2.obj = o3
        txn = get_transaction()
        txn.note('o1 -> o2 -> o3')
        txn.commit()

        o1.obj = o3
        txn = get_transaction()
        txn.note('o1 -> o3')
        txn.commit()

        log = self._storage.undoLog()
        eq(len(log), 4)
        for entry in zip(log, ('o1 -> o3', 'o1 -> o2 -> o3',
                               'o1 -> o2', 'initial database creation')):
            eq(entry[0]['description'], entry[1])

        self._storage.pack(packtime, referencesf)

        log = self._storage.undoLog()
        for entry in zip(log, ('o1 -> o3', 'o1 -> o2 -> o3')):
            eq(entry[0]['description'], entry[1])

        tid = log[0]['id']
        db.undo(tid)
        txn = get_transaction()
        txn.note('undo')
        txn.commit()
        # undo does a txn-undo, but doesn't invalidate
        conn.sync()

        log = self._storage.undoLog()
        for entry in zip(log, ('undo', 'o1 -> o3', 'o1 -> o2 -> o3')):
            eq(entry[0]['description'], entry[1])

        eq(o1.obj, o2)
        eq(o1.obj.obj, o3)
        self._iterate()

    def checkPackAfterUndoDeletion(self):
        db = DB(self._storage)
        cn = db.open()
        root = cn.root()

        pack_times = []
        def set_pack_time():
            pack_times.append(time.time())
            snooze()

        root["key0"] = MinPO(0)
        root["key1"] = MinPO(1)
        root["key2"] = MinPO(2)
        txn = get_transaction()
        txn.note("create 3 keys")
        txn.commit()

        set_pack_time()

        del root["key1"]
        txn = get_transaction()
        txn.note("delete 1 key")
        txn.commit()

        set_pack_time()

        root._p_deactivate()
        cn.sync()
        self.assert_(listeq(root.keys(), ["key0", "key2"]))

        L = db.undoInfo()
        db.undo(L[0]["id"])
        txn = get_transaction()
        txn.note("undo deletion")
        txn.commit()

        set_pack_time()

        root._p_deactivate()
        cn.sync()
        self.assert_(listeq(root.keys(), ["key0", "key1", "key2"]))

        for t in pack_times:
            self._storage.pack(t, referencesf)

            root._p_deactivate()
            cn.sync()
            self.assert_(listeq(root.keys(), ["key0", "key1", "key2"]))
            for i in range(3):
                obj = root["key%d" % i]
                self.assertEqual(obj.value, i)
            root.items()
            self._inter_pack_pause()

    def checkPackAfterUndoManyTimes(self):
        db = DB(self._storage)
        cn = db.open()
        rt = cn.root()

        rt["test"] = MinPO(1)
        get_transaction().commit()
        rt["test2"] = MinPO(2)
        get_transaction().commit()
        rt["test"] = MinPO(3)
        txn = get_transaction()
        txn.note("root of undo")
        txn.commit()

        packtimes = []
        for i in range(10):
            L = db.undoInfo()
            db.undo(L[0]["id"])
            txn = get_transaction()
            txn.note("undo %d" % i)
            txn.commit()
            rt._p_deactivate()
            cn.sync()

            self.assertEqual(rt["test"].value, i % 2 and 3 or 1)
            self.assertEqual(rt["test2"].value, 2)

            packtimes.append(time.time())
            snooze()

        for t in packtimes:
            self._storage.pack(t, referencesf)
            cn.sync()
            cn._cache.clear()
            # The last undo set the value to 3 and pack should
            # never change that.
            self.assertEqual(rt["test"].value, 3)
            self.assertEqual(rt["test2"].value, 2)
            self._inter_pack_pause()

    def _inter_pack_pause(self):
        # DirectoryStorage needs a pause between packs,
        # most other storages dont.
        pass

    def checkTransactionalUndoIterator(self):
        # check that data_txn set in iterator makes sense
        if not hasattr(self._storage, "iterator"):
            return

        s = self._storage

        BATCHES = 4
        OBJECTS = 4

        orig = []
        for i in range(BATCHES):
            t = Transaction()
            tid = p64(i + 1)
            s.tpc_begin(t, tid)
            for j in range(OBJECTS):
                oid = s.new_oid()
                obj = MinPO(i * OBJECTS + j)
                revid = s.store(oid, None, zodb_pickle(obj), '', t)
                orig.append((tid, oid, revid))
            s.tpc_vote(t)
            s.tpc_finish(t)

        i = 0
        for tid, oid, revid in orig:
            self._dostore(oid, revid=revid, data=MinPO(revid),
                          description="update %s" % i)

        # Undo the OBJECTS transactions that modified objects created
        # in the ith original transaction.

        def undo(i):
            info = s.undoInfo()
            t = Transaction()
            s.tpc_begin(t)
            base = i * OBJECTS + i
            for j in range(OBJECTS):
                tid = info[base + j]['id']
                s.transactionalUndo(tid, t)
            s.tpc_vote(t)
            s.tpc_finish(t)

        for i in range(BATCHES):
            undo(i)

        # There are now (2 + OBJECTS) * BATCHES transactions:
        #     BATCHES original transactions, followed by
        #     OBJECTS * BATCHES modifications, followed by
        #     BATCHES undos

        iter = s.iterator()
        offset = 0

        eq = self.assertEqual

        for i in range(BATCHES):
            txn = iter[offset]
            offset += 1

            tid = p64(i + 1)
            eq(txn.tid, tid)

            L1 = [(rec.oid, rec.serial, rec.data_txn) for rec in txn]
            L2 = [(oid, revid, None) for _tid, oid, revid in orig
                  if _tid == tid]

            eq(L1, L2)

        for i in range(BATCHES * OBJECTS):
            txn = iter[offset]
            offset += 1
            eq(len([rec for rec in txn if rec.data_txn is None]), 1)

        for i in range(BATCHES):
            txn = iter[offset]
            offset += 1

            # The undos are performed in reverse order.
            otid = p64(BATCHES - i)
            L1 = [(rec.oid, rec.data_txn) for rec in txn]
            L2 = [(oid, otid) for _tid, oid, revid in orig
                  if _tid == otid]
            L1.sort()
            L2.sort()
            eq(L1, L2)

        self.assertRaises(IndexError, iter.__getitem__, offset)

    def checkUndoLogMetadata(self):
        # test that the metadata is correct in the undo log
        t = get_transaction()
        t.note('t1')
        t.setExtendedInfo('k2','this is transaction metadata')
        t.setUser('u3',path='p3')
        db = DB(self._storage)
        conn = db.open()
        root = conn.root()
        o1 = C()
        root['obj'] = o1
        txn = get_transaction()
        txn.commit()
        l = self._storage.undoLog()
        self.assertEqual(len(l),2)
        d = l[0]
        self.assertEqual(d['description'],'t1')
        self.assertEqual(d['k2'],'this is transaction metadata')
        self.assertEqual(d['user_name'],'p3 u3')


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/TransactionalUndoVersionStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from __future__ import nested_scopes

# Check interactions between transactionalUndo() and versions.  Any storage
# that supports both transactionalUndo() and versions must pass these tests.

import time

from ZODB import POSException
from ZODB.referencesf import referencesf
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle


class TransactionalUndoVersionStorage:

    def _x_dostore(self, *args, **kwargs):
        # ugh: backwards compatibilty for ZEO 1.0 which runs these
        # tests but has a _dostore() method that does not support the
        # description kwarg.
        try:
            return self._dostore(*args, **kwargs)
        except TypeError:
            # assume that the type error means we've got a _dostore()
            # without the description kwarg
            try:
                del kwargs['description']
            except KeyError:
                pass # not expected
        return self._dostore(*args, **kwargs)

    def _undo(self, tid, oid):
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        self.assertEqual(len(oids), 1)
        self.assertEqual(oids[0], oid)

    def checkUndoInVersion(self):
        eq = self.assertEqual
        unless = self.failUnless

        def check_objects(nonversiondata, versiondata):
            data, revid = self._storage.load(oid, version)
            self.assertEqual(zodb_unpickle(data), MinPO(versiondata))
            data, revid = self._storage.load(oid, '')
            self.assertEqual(zodb_unpickle(data), MinPO(nonversiondata))

        oid = self._storage.new_oid()
        version = 'one'
        revid_a = self._dostore(oid, data=MinPO(91))
        revid_b = self._dostore(oid, revid=revid_a, data=MinPO(92),
                                version=version)
        revid_c = self._dostore(oid, revid=revid_b, data=MinPO(93),
                                version=version)

        info = self._storage.undoInfo()
        self._undo(info[0]['id'], oid)

        data, revid = self._storage.load(oid, '')
        eq(revid, revid_a)
        eq(zodb_unpickle(data), MinPO(91))
        data, revid = self._storage.load(oid, version)
        unless(revid > revid_b and revid > revid_c)
        eq(zodb_unpickle(data), MinPO(92))

        # Now commit the version...
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.commitVersion(version, '', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 1)
        eq(oids[0], oid)

        check_objects(92, 92)

        # ...and undo the commit
        info = self._storage.undoInfo()
        self._undo(info[0]['id'], oid)

        check_objects(91, 92)

        oids = self._abortVersion(version)
        assert len(oids) == 1
        assert oids[0] == oid

        check_objects(91, 91)

        # Now undo the abort
        info=self._storage.undoInfo()
        self._undo(info[0]['id'], oid)

        check_objects(91, 92)

    def checkUndoCommitVersion(self):
        def load_value(oid, version=''):
            data, revid = self._storage.load(oid, version)
            return zodb_unpickle(data).value

        # create a bunch of packable transactions
        oid = self._storage.new_oid()
        revid = '\000' * 8
        for i in range(4):
            revid = self._x_dostore(oid, revid, description='packable%d' % i)
        pt = time.time()
        time.sleep(1)

        oid1 = self._storage.new_oid()
        version = 'version'
        revid1 = self._x_dostore(oid1, data=MinPO(0), description='create1')
        revid2 = self._x_dostore(oid1, data=MinPO(1), revid=revid1,
                               version=version, description='version1')
        revid3 = self._x_dostore(oid1, data=MinPO(2), revid=revid2,
                               version=version, description='version2')
        self._x_dostore(description='create2')

        t = Transaction()
        t.description = 'commit version'
        self._storage.tpc_begin(t)
        self._storage.commitVersion(version, '', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)

        info = self._storage.undoInfo()
        t_id = info[0]['id']

        self.assertEqual(load_value(oid1), 2)
        self.assertEqual(load_value(oid1, version), 2)

        self._storage.pack(pt, referencesf)

        t = Transaction()
        t.description = 'undo commit version'
        self._storage.tpc_begin(t)
        self._storage.transactionalUndo(t_id, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)

        self.assertEqual(load_value(oid1), 0)
        self.assertEqual(load_value(oid1, version), 2)

    def checkUndoAbortVersion(self):
        def load_value(oid, version=''):
            data, revid = self._storage.load(oid, version)
            return zodb_unpickle(data).value

        # create a bunch of packable transactions
        oid = self._storage.new_oid()
        revid = '\000' * 8
        for i in range(3):
            revid = self._x_dostore(oid, revid, description='packable%d' % i)
        pt = time.time()
        time.sleep(1)

        oid1 = self._storage.new_oid()
        version = 'version'
        revid1 = self._x_dostore(oid1, data=MinPO(0), description='create1')
        revid2 = self._x_dostore(oid1, data=MinPO(1), revid=revid1,
                               version=version, description='version1')
        revid3 = self._x_dostore(oid1, data=MinPO(2), revid=revid2,
                               version=version, description='version2')
        self._x_dostore(description='create2')

        t = Transaction()
        t.description = 'abort version'
        self._storage.tpc_begin(t)
        self._storage.abortVersion(version, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)

        info = self._storage.undoInfo()
        t_id = info[0]['id']

        self.assertEqual(load_value(oid1), 0)
        # after abort, we should see non-version data
        self.assertEqual(load_value(oid1, version), 0)

        t = Transaction()
        t.description = 'undo abort version'
        self._storage.tpc_begin(t)
        self._storage.transactionalUndo(t_id, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)

        self.assertEqual(load_value(oid1), 0)
        # t undo will re-create the version
        self.assertEqual(load_value(oid1, version), 2)

        info = self._storage.undoInfo()
        t_id = info[0]['id']

        self._storage.pack(pt, referencesf)

        t = Transaction()
        t.description = 'undo undo'
        self._storage.tpc_begin(t)
        self._storage.transactionalUndo(t_id, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)

        # undo of undo will put as back where we started
        self.assertEqual(load_value(oid1), 0)
        # after abort, we should see non-version data
        self.assertEqual(load_value(oid1, version), 0)


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/VersionStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run the version related tests for a storage.

Any storage that supports versions should be able to pass all these tests.
"""

# XXX we should clean this code up to get rid of the #JF# comments.
# They were introduced when Jim reviewed the original version of the
# code.  Barry and Jeremy didn't understand versions then.

import time

from ZODB import POSException
from ZODB.referencesf import referencesf
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle, snooze
from ZODB import DB

class VersionStorage:

    def checkCommitVersionSerialno(self):
        oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=MinPO(12))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(13),
                               version="version")
        oids = self._commitVersion("version", "")
        self.assertEqual([oid], oids)
        data, revid3 = self._storage.load(oid, "")
        # use repr() to avoid getting binary data in a traceback on error
        self.assertNotEqual(`revid1`, `revid3`)
        self.assertNotEqual(`revid2`, `revid3`)

    def checkAbortVersionSerialno(self):
        oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=MinPO(12))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(13),
                               version="version")
        oids = self._abortVersion("version")
        self.assertEqual([oid], oids)
        data, revid3 = self._storage.load(oid, "")
        # use repr() to avoid getting binary data in a traceback on error
        self.assertEqual(`revid1`, `revid3`)
        self.assertNotEqual(`revid2`, `revid3`)

    def checkVersionedStoreAndLoad(self):
        eq = self.assertEqual
        # Store a couple of non-version revisions of the object
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(11))
        revid = self._dostore(oid, revid=revid, data=MinPO(12))
        # And now store some new revisions in a version
        version = 'test-version'
        revid = self._dostore(oid, revid=revid, data=MinPO(13),
                              version=version)
        revid = self._dostore(oid, revid=revid, data=MinPO(14),
                              version=version)
        revid = self._dostore(oid, revid=revid, data=MinPO(15),
                              version=version)
        # Now read back the object in both the non-version and version and
        # make sure the values jive.
        data, revid = self._storage.load(oid, '')
        eq(zodb_unpickle(data), MinPO(12))
        data, vrevid = self._storage.load(oid, version)
        eq(zodb_unpickle(data), MinPO(15))
        if hasattr(self._storage, 'getSerial'):
            s = self._storage.getSerial(oid)
            eq(s, max(revid, vrevid))

    def checkVersionedLoadErrors(self):
        oid = self._storage.new_oid()
        version = 'test-version'
        revid = self._dostore(oid, data=MinPO(11))
        revid = self._dostore(oid, revid=revid, data=MinPO(12),
                              version=version)
        # Try to load a bogus oid
        self.assertRaises(KeyError,
                          self._storage.load,
                          self._storage.new_oid(), '')
        # Try to load a bogus version string
        #JF# Nope, fall back to non-version
        #JF# self.assertRaises(KeyError,
        #JF#                   self._storage.load,
        #JF#                   oid, 'bogus')
        data, revid = self._storage.load(oid, 'bogus')
        self.assertEqual(zodb_unpickle(data), MinPO(11))


    def checkVersionLock(self):
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(11))
        version = 'test-version'
        revid = self._dostore(oid, revid=revid, data=MinPO(12),
                              version=version)
        self.assertRaises(POSException.VersionLockError,
                          self._dostore,
                          oid, revid=revid, data=MinPO(14),
                          version='another-version')

    def checkVersionEmpty(self):
        # Before we store anything, these versions ought to be empty
        version = 'test-version'
        #JF# The empty string is not a valid version. I think that this should
        #JF# be an error. Let's punt for now.
        #JF# assert self._storage.versionEmpty('')
        self.failUnless(self._storage.versionEmpty(version))
        # Now store some objects
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(11))
        revid = self._dostore(oid, revid=revid, data=MinPO(12))
        revid = self._dostore(oid, revid=revid, data=MinPO(13),
                              version=version)
        revid = self._dostore(oid, revid=revid, data=MinPO(14),
                              version=version)
        # The blank version should not be empty
        #JF# The empty string is not a valid version. I think that this should
        #JF# be an error. Let's punt for now.
        #JF# assert not self._storage.versionEmpty('')

        # Neither should 'test-version'
        self.failUnless(not self._storage.versionEmpty(version))
        # But this non-existant version should be empty
        self.failUnless(self._storage.versionEmpty('bogus'))

    def checkVersions(self):
        unless = self.failUnless
        # Store some objects in the non-version
        oid1 = self._storage.new_oid()
        oid2 = self._storage.new_oid()
        oid3 = self._storage.new_oid()
        revid1 = self._dostore(oid1, data=MinPO(11))
        revid2 = self._dostore(oid2, data=MinPO(12))
        revid3 = self._dostore(oid3, data=MinPO(13))
        # Now create some new versions
        revid1 = self._dostore(oid1, revid=revid1, data=MinPO(14),
                               version='one')
        revid2 = self._dostore(oid2, revid=revid2, data=MinPO(15),
                               version='two')
        revid3 = self._dostore(oid3, revid=revid3, data=MinPO(16),
                               version='three')
        # Ask for the versions
        versions = self._storage.versions()
        unless('one' in versions)
        unless('two' in versions)
        unless('three' in versions)
        # Now flex the `max' argument
        versions = self._storage.versions(1)
        self.assertEqual(len(versions), 1)
        unless('one' in versions or 'two' in versions or 'three' in versions)

    def _setup_version(self, version='test-version'):
        # Store some revisions in the non-version
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(49))
        revid = self._dostore(oid, revid=revid, data=MinPO(50))
        nvrevid = revid = self._dostore(oid, revid=revid, data=MinPO(51))
        # Now do some stores in a version
        revid = self._dostore(oid, revid=revid, data=MinPO(52),
                              version=version)
        revid = self._dostore(oid, revid=revid, data=MinPO(53),
                              version=version)
        revid = self._dostore(oid, revid=revid, data=MinPO(54),
                              version=version)
        return oid, version

    def checkAbortVersion(self):
        eq = self.assertEqual
        oid, version = self._setup_version()

        # XXX Not sure I can write a test for getSerial() in the
        # presence of aborted versions, because FileStorage and
        # Berkeley storage give a different answer. I think Berkeley
        # is right and FS is wrong.

        oids = self._abortVersion(version)
        eq(len(oids), 1)
        eq(oids[0], oid)
        data, revid = self._storage.load(oid, '')
        eq(zodb_unpickle(data), MinPO(51))

    def checkAbortVersionErrors(self):
        eq = self.assertEqual
        oid, version = self._setup_version()
        # Now abort a bogus version
        t = Transaction()
        self._storage.tpc_begin(t)

        #JF# The spec is silent on what happens if you abort or commit
        #JF# a non-existent version. FileStorage consideres this a noop.
        #JF# We can change the spec, but until we do ....
        #JF# self.assertRaises(POSException.VersionError,
        #JF#                   self._storage.abortVersion,
        #JF#                   'bogus', t)

        # And try to abort the empty version
        if (hasattr(self._storage, 'supportsTransactionalUndo')
            and self._storage.supportsTransactionalUndo()):
            # XXX FileStorage used to be broken on this one
            self.assertRaises(POSException.VersionError,
                              self._storage.abortVersion,
                              '', t)

        # But now we really try to abort the version
        oids = self._storage.abortVersion(version, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 1)
        eq(oids[0], oid)
        data, revid = self._storage.load(oid, '')
        eq(zodb_unpickle(data), MinPO(51))

    def checkCommitVersionErrors(self):
        if not (hasattr(self._storage, 'supportsTransactionalUndo')
            and self._storage.supportsTransactionalUndo()):
            # XXX FileStorage used to be broken on this one
            return
        eq = self.assertEqual
        oid1, version1 = self._setup_version('one')
        data, revid1 = self._storage.load(oid1, version1)
        eq(zodb_unpickle(data), MinPO(54))
        t = Transaction()
        self._storage.tpc_begin(t)
        try:
            self.assertRaises(POSException.VersionCommitError,
                              self._storage.commitVersion,
                              'one', 'one', t)
        finally:
            self._storage.tpc_abort(t)

    def checkNewSerialOnCommitVersionToVersion(self):
        eq = self.assertEqual
        oid, version = self._setup_version()
        data, vserial = self._storage.load(oid, version)
        data, nserial = self._storage.load(oid, '')

        version2 = 'test version 2'
        self._commitVersion(version, version2)
        data, serial = self._storage.load(oid, version2)

        self.failUnless(serial != vserial and serial != nserial,
                        "New serial, %r, should be different from the old "
                        "version, %r, and non-version, %r, serials."
                        % (serial, vserial, nserial))

    def checkModifyAfterAbortVersion(self):
        eq = self.assertEqual
        oid, version = self._setup_version()
        self._abortVersion(version)
        data, revid = self._storage.load(oid, '')
        # And modify it a few times
        revid = self._dostore(oid, revid=revid, data=MinPO(52))
        revid = self._dostore(oid, revid=revid, data=MinPO(53))
        revid = self._dostore(oid, revid=revid, data=MinPO(54))
        data, newrevid = self._storage.load(oid, '')
        eq(newrevid, revid)
        eq(zodb_unpickle(data), MinPO(54))

    def checkCommitToNonVersion(self):
        eq = self.assertEqual
        oid, version = self._setup_version()
        data, revid = self._storage.load(oid, version)
        eq(zodb_unpickle(data), MinPO(54))
        data, revid = self._storage.load(oid, '')
        eq(zodb_unpickle(data), MinPO(51))
        self._commitVersion(version, '')
        data, revid = self._storage.load(oid, '')
        eq(zodb_unpickle(data), MinPO(54))

    def checkCommitToOtherVersion(self):
        eq = self.assertEqual
        oid1, version1 = self._setup_version('one')

        data, revid1 = self._storage.load(oid1, version1)
        eq(zodb_unpickle(data), MinPO(54))
        oid2, version2 = self._setup_version('two')
        data, revid2 = self._storage.load(oid2, version2)
        eq(zodb_unpickle(data), MinPO(54))

        # make sure we see the non-version data when appropriate
        data, revid2 = self._storage.load(oid1, version2)
        eq(zodb_unpickle(data), MinPO(51))
        data, revid2 = self._storage.load(oid2, version1)
        eq(zodb_unpickle(data), MinPO(51))
        data, revid2 = self._storage.load(oid1, '')
        eq(zodb_unpickle(data), MinPO(51))

        # Okay, now let's commit object1 to version2
        oids = self._commitVersion(version1, version2)
        eq(len(oids), 1)
        eq(oids[0], oid1)
        data, revid = self._storage.load(oid1, version2)
        eq(zodb_unpickle(data), MinPO(54))
        data, revid = self._storage.load(oid2, version2)
        eq(zodb_unpickle(data), MinPO(54))

        # an object can only exist in one version, so a load from
        # version1 should now give the non-version data
        data, revid2 = self._storage.load(oid1, version1)
        eq(zodb_unpickle(data), MinPO(51))

        # as should a version that has never been used
        data, revid2 = self._storage.load(oid1, 'bela lugosi')
        eq(zodb_unpickle(data), MinPO(51))

    def checkAbortOneVersionCommitTheOther(self):
        eq = self.assertEqual
        oid1, version1 = self._setup_version('one')
        data, revid1 = self._storage.load(oid1, version1)
        eq(zodb_unpickle(data), MinPO(54))
        oid2, version2 = self._setup_version('two')
        data, revid2 = self._storage.load(oid2, version2)
        eq(zodb_unpickle(data), MinPO(54))

        # Let's make sure we can't get object1 in version2
        data, revid2 = self._storage.load(oid1, version2)
        eq(zodb_unpickle(data), MinPO(51))

        oids = self._abortVersion(version1)
        eq(len(oids), 1)
        eq(oids[0], oid1)
        data, revid = self._storage.load(oid1, '')
        eq(zodb_unpickle(data), MinPO(51))

        #JF# Ditto
        #JF# self.assertRaises(POSException.VersionError,
        #JF#                   self._storage.load, oid1, version1)
        data, revid = self._storage.load(oid1, '')
        eq(zodb_unpickle(data), MinPO(51))
        #JF# self.assertRaises(POSException.VersionError,
        #JF#                   self._storage.load, oid1, version2)
        data, revid = self._storage.load(oid1, '')
        eq(zodb_unpickle(data), MinPO(51))

        data, revid = self._storage.load(oid2, '')
        eq(zodb_unpickle(data), MinPO(51))
        data, revid = self._storage.load(oid2, version2)
        eq(zodb_unpickle(data), MinPO(54))
        # Okay, now let's commit version2 back to the trunk
        oids = self._commitVersion(version2, '')
        eq(len(oids), 1)
        eq(oids[0], oid2)
        data, revid = self._storage.load(oid1, '')
        eq(zodb_unpickle(data), MinPO(51))

        # But the trunk should be up to date now
        data, revid = self._storage.load(oid2, '')
        eq(zodb_unpickle(data), MinPO(54))
        data, revid = self._storage.load(oid2, version2)
        eq(zodb_unpickle(data), MinPO(54))

        #JF# To do a test like you want, you have to add the data in a version
        oid = self._storage.new_oid()
        revid = self._dostore(oid, revid=revid, data=MinPO(54), version='one')
        self.assertRaises(KeyError,
                          self._storage.load, oid, '')
        self.assertRaises(KeyError,
                          self._storage.load, oid, 'two')

    def checkCreateObjectInVersionWithAbort(self):
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=21, version="one")
        revid = self._dostore(oid, revid=revid, data=23, version='one')
        revid = self._dostore(oid, revid=revid, data=34, version='one')
        # Now abort the version and the creation
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.abortVersion('one', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        self.assertEqual(oids, [oid])

    def checkPackVersions(self):
        db = DB(self._storage)
        cn = db.open(version="testversion")
        root = cn.root()

        obj = root["obj"] = MinPO("obj")
        root["obj2"] = MinPO("obj2")
        txn = get_transaction()
        txn.note("create 2 objs in version")
        txn.commit()

        obj.value = "77"
        txn = get_transaction()
        txn.note("modify obj in version")
        txn.commit()

        # undo the modification to generate a mix of backpointers
        # and versions for pack to chase
        info = db.undoInfo()
        db.undo(info[0]["id"])
        txn = get_transaction()
        txn.note("undo modification")
        txn.commit()

        snooze()
        self._storage.pack(time.time(), referencesf)

        db.commitVersion("testversion")
        txn = get_transaction()
        txn.note("commit version")
        txn.commit()

        cn = db.open()
        root = cn.root()
        root["obj"] = "no version"

        txn = get_transaction()
        txn.note("modify obj")
        txn.commit()

        self._storage.pack(time.time(), referencesf)

    def checkPackVersionsInPast(self):
        db = DB(self._storage)
        cn = db.open(version="testversion")
        root = cn.root()

        obj = root["obj"] = MinPO("obj")
        root["obj2"] = MinPO("obj2")
        txn = get_transaction()
        txn.note("create 2 objs in version")
        txn.commit()

        obj.value = "77"
        txn = get_transaction()
        txn.note("modify obj in version")
        txn.commit()

        t0 = time.time()
        snooze()

        # undo the modification to generate a mix of backpointers
        # and versions for pack to chase
        info = db.undoInfo()
        db.undo(info[0]["id"])
        txn = get_transaction()
        txn.note("undo modification")
        txn.commit()

        self._storage.pack(t0, referencesf)

        db.commitVersion("testversion")
        txn = get_transaction()
        txn.note("commit version")
        txn.commit()

        cn = db.open()
        root = cn.root()
        root["obj"] = "no version"

        txn = get_transaction()
        txn.note("modify obj")
        txn.commit()

        self._storage.pack(time.time(), referencesf)

    def checkPackVersionReachable(self):
        db = DB(self._storage)
        cn = db.open()
        root = cn.root()

        names = "a", "b", "c"

        for name in names:
            root[name] = MinPO(name)
            get_transaction().commit()

        for name in names:
            cn2 = db.open(version=name)
            rt2 = cn2.root()
            obj = rt2[name]
            obj.value = MinPO("version")
            get_transaction().commit()
            cn2.close()

        root["d"] = MinPO("d")
        get_transaction().commit()
        snooze()

        self._storage.pack(time.time(), referencesf)
        cn.sync()
        cn._cache.clear()

        # make sure all the non-version data is there
        for name, obj in root.items():
            self.assertEqual(name, obj.value)

        # make sure all the version-data is there,
        # and create a new revision in the version
        for name in names:
            cn2 = db.open(version=name)
            rt2 = cn2.root()
            obj = rt2[name].value
            self.assertEqual(obj.value, "version")
            obj.value = "still version"
            get_transaction().commit()
            cn2.close()

        db.abortVersion("b")
        txn = get_transaction()
        txn.note("abort version b")
        txn.commit()

        t = time.time()
        snooze()

        L = db.undoInfo()
        db.undo(L[0]["id"])
        txn = get_transaction()
        txn.note("undo abort")
        txn.commit()

        self._storage.pack(t, referencesf)

        cn2 = db.open(version="b")
        rt2 = cn2.root()
        self.assertEqual(rt2["b"].value.value, "still version")


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/__init__.py ===
# Having this makes debugging better.


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/dangle.py ===
#! /usr/bin/env python

##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

"""Functional test to produce a dangling reference."""

import time

from ZODB.FileStorage import FileStorage
from ZODB import DB

from Persistence import Persistent

class P(Persistent):
    pass

def create_dangling_ref(db):
    rt = db.open().root()

    rt[1] = o1 = P()
    get_transaction().note("create o1")
    get_transaction().commit()

    rt[2] = o2 = P()
    get_transaction().note("create o2")
    get_transaction().commit()

    c = o1.child = P()
    get_transaction().note("set child on o1")
    get_transaction().commit()

    o1.child = P()
    get_transaction().note("replace child on o1")
    get_transaction().commit()

    time.sleep(2)
    # The pack should remove the reference to c, because it is no
    # longer referenced from o1.  But the object still exists and has
    # an oid, so a new commit of it won't create a new object.
    db.pack()

    print repr(c._p_oid)
    o2.child = c
    get_transaction().note("set child on o2")
    get_transaction().commit()

def main():
    fs = FileStorage("dangle.fs")
    db = DB(fs)
    create_dangling_ref(db)
    db.close()

if __name__ == "__main__":
    main()


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/speed.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
usage="""Test speed of a ZODB storage

Options:

    -d file    The data file to use as input.
               The default is this script.

    -n n       The number of repititions

    -s module  A module that defines a 'Storage'
               attribute, which is an open storage.
               If not specified, a FileStorage will ne
               used.

    -z         Test compressing data

    -D         Run in debug mode

    -L         Test loads as well as stores by minimizing
               the cache after eachrun

    -M         Output means only
"""

import sys, os, getopt, string, time
sys.path.insert(0, os.getcwd())

import ZODB, ZODB.FileStorage
import Persistence

class P(Persistence.Persistent): pass

def main(args):

    opts, args = getopt.getopt(args, 'zd:n:Ds:LM')
    z=s=None
    data=sys.argv[0]
    nrep=5
    minimize=0
    detailed=1
    for o, v in opts:
        if o=='-n': nrep=string.atoi(v)
        elif o=='-d': data=v
        elif o=='-s': s=v
        elif o=='-z':
            global zlib
            import zlib
            z=compress
        elif o=='-L':
            minimize=1
        elif o=='-M':
            detailed=0
        elif o=='-D':
            global debug
            os.environ['STUPID_LOG_FILE']=''
            os.environ['STUPID_LOG_SEVERITY']='-999'

    if s:
        s=__import__(s, globals(), globals(), ('__doc__',))
        s=s.Storage
    else:
        s=ZODB.FileStorage.FileStorage('zeo_speed.fs', create=1)

    data=open(data).read()
    db=ZODB.DB(s,
               # disable cache deactivation
               cache_size=4000,
               cache_deactivate_after=6000,)

    results={1:0, 10:0, 100:0, 1000:0}
    for j in range(nrep):
        for r in 1, 10, 100, 1000:
            t=time.time()
            jar=db.open()
            get_transaction().begin()
            rt=jar.root()
            key='s%s' % r
            if rt.has_key(key): p=rt[key]
            else: rt[key]=p=P()
            for i in range(r):
                if z is not None: d=z(data)
                else: d=data
                v=getattr(p, str(i), P())
                v.d=d
                setattr(p,str(i),v)
            get_transaction().commit()
            jar.close()
            t=time.time()-t
            if detailed:
                sys.stderr.write("%s\t%s\t%.4f\n" % (j, r, t))
                sys.stdout.flush()
            results[r]=results[r]+t
            rt=d=p=v=None # release all references
            if minimize:
                time.sleep(3)
                jar.cacheMinimize(3)

    if detailed: print '-'*24
    for r in 1, 10, 100, 1000:
        t=results[r]/nrep
        sys.stderr.write("mean:\t%s\t%.4f\t%.4f (s/o)\n" % (r, t, t/r))

    db.close()


def compress(s):
    c=zlib.compressobj()
    o=c.compress(s)
    return o+c.flush()

if __name__=='__main__': main(sys.argv[1:])


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testActivityMonitor.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests of the default activity monitor.

See ZODB/ActivityMonitor.py

$Id: testActivityMonitor.py,v 1.1 2004/04/01 18:26:43 sidnei Exp $
"""

import unittest
import time

from ZODB.ActivityMonitor import ActivityMonitor


class FakeConnection:

    loads = 0
    stores = 0

    def _transferred(self, loads, stores):
        self.loads = self.loads + loads
        self.stores = self.stores + stores

    def getTransferCounts(self, clear=0):
        res = self.loads, self.stores
        if clear:
            self.loads = self.stores = 0
        return res


class Tests(unittest.TestCase):

    def testAddLogEntries(self):
        am = ActivityMonitor(history_length=3600)
        self.assertEqual(len(am.log), 0)
        c = FakeConnection()
        c._transferred(1, 2)
        am.closedConnection(c)
        c._transferred(3, 7)
        am.closedConnection(c)
        self.assertEqual(len(am.log), 2)

    def testTrim(self):
        am = ActivityMonitor(history_length=0.1)
        c = FakeConnection()
        c._transferred(1, 2)
        am.closedConnection(c)
        time.sleep(0.2)
        c._transferred(3, 7)
        am.closedConnection(c)
        self.assert_(len(am.log) <= 1)

    def testSetHistoryLength(self):
        am = ActivityMonitor(history_length=3600)
        c = FakeConnection()
        c._transferred(1, 2)
        am.closedConnection(c)
        time.sleep(0.2)
        c._transferred(3, 7)
        am.closedConnection(c)
        self.assertEqual(len(am.log), 2)
        am.setHistoryLength(0.1)
        self.assertEqual(am.getHistoryLength(), 0.1)
        self.assert_(len(am.log) <= 1)

    def testActivityAnalysis(self):
        am = ActivityMonitor(history_length=3600)
        c = FakeConnection()
        c._transferred(1, 2)
        am.closedConnection(c)
        c._transferred(3, 7)
        am.closedConnection(c)
        res = am.getActivityAnalysis(start=0, end=0, divisions=10)
        lastend = 0
        for n in range(9):
            div = res[n]
            self.assertEqual(div['stores'], 0)
            self.assertEqual(div['loads'], 0)
            self.assert_(div['start'] > 0)
            self.assert_(div['start'] >= lastend)
            self.assert_(div['start'] < div['end'])
            lastend = div['end']
        div = res[9]
        self.assertEqual(div['stores'], 9)
        self.assertEqual(div['loads'], 4)
        self.assert_(div['start'] > 0)
        self.assert_(div['start'] >= lastend)
        self.assert_(div['start'] < div['end'])


def test_suite():
    return unittest.makeSuite(Tests)

if __name__=='__main__':
    unittest.main(defaultTest='test_suite')


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testCache.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""A few simple tests of the public cache API.

Each DB Connection has a separate PickleCache.  The Cache serves two
purposes. It acts like a memo for unpickling.  It also keeps recent
objects in memory under the assumption that they may be used again.
"""
from __future__ import nested_scopes

import time
import types
import unittest

import ZODB
import ZODB.MappingStorage
from ZODB.cPickleCache import PickleCache
from ZODB.POSException import ConflictError
from ZODB.PersistentMapping import PersistentMapping
from ZODB.tests.MinPO import MinPO
from ZODB.utils import p64

from Persistence import Persistent

class CacheTestBase(unittest.TestCase):

    def setUp(self):
        store = ZODB.MappingStorage.MappingStorage()
        self.db = ZODB.DB(store,
                          cache_size = self.CACHE_SIZE)
        self.conns = []

    def tearDown(self):
        for conn in self.conns:
            conn.close()
        self.db.close()

    CACHE_SIZE = 20

    def noodle_new_connection(self):
        """Do some reads and writes on a new connection."""

        c = self.db.open()
        self.conns.append(c)
        self.noodle_connection(c)

    def noodle_connection(self, c):
        r = c.root()

        i = len(self.conns)
        d = r.get(i)
        if d is None:
            d = r[i] = PersistentMapping()
            get_transaction().commit()

        for i in range(15):
            o = d.get(i)
            if o is None:
                o = d[i] = MinPO(i)
            o.value += 1
        get_transaction().commit()

class DBMethods(CacheTestBase):

    __super_setUp = CacheTestBase.setUp

    def setUp(self):
        self.__super_setUp()
        for i in range(4):
            self.noodle_new_connection()

    def checkCacheDetail(self):
        for name, count in self.db.cacheDetail():
            self.assert_(isinstance(name, types.StringType))
            self.assert_(isinstance(count, types.IntType))

    def checkCacheExtremeDetail(self):
        expected = ['conn_no', 'id', 'oid', 'rc', 'klass', 'state']
        for dict in self.db.cacheExtremeDetail():
            for k, v in dict.items():
                self.assert_(k in expected)

    # XXX not really sure how to do a black box test of the cache.
    # should the full sweep and minimize calls always remove things?

    # The sleep(3) call is based on the implementation of the cache.
    # It's measures time in units of three seconds, so something used
    # within the last three seconds looks like something that is
    # currently being used.  Three seconds old is the youngest
    # something can be and still be collected.

    def checkFullSweep(self):
        old_size = self.db.cacheSize()
        time.sleep(3)
        self.db.cacheFullSweep(0)
        new_size = self.db.cacheSize()
        self.assert_(new_size < old_size, "%s < %s" % (old_size, new_size))

    def checkMinimize(self):
        old_size = self.db.cacheSize()
        time.sleep(3)
        self.db.cacheMinimize(0)
        new_size = self.db.cacheSize()
        self.assert_(new_size < old_size, "%s < %s" % (old_size, new_size))

    # XXX don't have an explicit test for incrgc, because the
    # connection and database call it internally

    # XXX same for the get and invalidate methods

    def checkLRUitems(self):
        # get a cache
        c = self.conns[0]._cache
        c.lru_items()

    def checkClassItems(self):
        c = self.conns[0]._cache
        c.klass_items()

class LRUCacheTests(CacheTestBase):

    def checkLRU(self):
        # verify the LRU behavior of the cache
        dataset_size = 5
        CACHE_SIZE = dataset_size*2+1
        # a cache big enough to hold the objects added in two
        # transactions, plus the root object
        self.db.setCacheSize(CACHE_SIZE)
        c = self.db.open()
        r = c.root()
        l = {}
        # the root is the only thing in the cache, because all the
        # other objects are new
        self.assertEqual(len(c._cache), 1)
        # run several transactions
        for t in range(5):
            for i in range(dataset_size):
                l[(t,i)] = r[i] = MinPO(i)
            get_transaction().commit()
            # commit() will register the objects, placing them in the
            # cache.  at the end of commit, the cache will be reduced
            # down to CACHE_SIZE items
            if len(l)>CACHE_SIZE:
                self.assertEqual(c._cache.ringlen(), CACHE_SIZE)
        for i in range(dataset_size):
            # Check objects added in the first two transactions.
            # They must all be ghostified.
            self.assertEqual(l[(0,i)]._p_changed, None)
            self.assertEqual(l[(1,i)]._p_changed, None)
            # Check objects added in the last two transactions.
            # They must all still exist in memory, but have
            # had their changes flushed
            self.assertEqual(l[(3,i)]._p_changed, 0)
            self.assertEqual(l[(4,i)]._p_changed, 0)
            # Of the objects added in the middle transaction, most
            # will have been ghostified. There is one cache slot
            # that may be occupied by either one of those objects or
            # the root, depending on precise order of access. We do
            # not bother to check this

    def checkSize(self):
        self.assertEqual(self.db.cacheSize(), 0)
        self.assertEqual(self.db.cacheDetailSize(), [])

        CACHE_SIZE = 10
        self.db.setCacheSize(CACHE_SIZE)

        CONNS = 3
        for i in range(CONNS):
            self.noodle_new_connection()

        self.assertEquals(self.db.cacheSize(), CACHE_SIZE * CONNS)
        details = self.db.cacheDetailSize()
        self.assertEquals(len(details), CONNS)
        for d in details:
            self.assertEquals(d['ngsize'], CACHE_SIZE)
            self.assertEquals(d['size'], CACHE_SIZE)

    def checkDetail(self):
        CACHE_SIZE = 10
        self.db.setCacheSize(CACHE_SIZE)

        CONNS = 3
        for i in range(CONNS):
            self.noodle_new_connection()

        for klass, count in self.db.cacheDetail():
            if klass.endswith('MinPO'):
                self.assertEqual(count, CONNS * CACHE_SIZE)
            if klass.endswith('PersistentMapping'):
                # one root per connection
                self.assertEqual(count, CONNS)

        for details in self.db.cacheExtremeDetail():
            # one dict per object.  keys:
            if details['klass'].endswith('PersistentMapping'):
                self.assertEqual(details['state'], None)
            else:
                self.assert_(details['klass'].endswith('MinPO'))
                self.assertEqual(details['state'], 0)

class StubDataManager:
    def setklassstate(self, object):
        pass

class StubObject(Persistent):
    pass

class CacheErrors(unittest.TestCase):

    def setUp(self):
        self.jar = StubDataManager()
        self.cache = PickleCache(self.jar)

    def checkGetBogusKey(self):
        self.assertRaises(KeyError, self.cache.get, p64(0))
        try:
            self.cache[12]
        except KeyError:
            pass
        else:
            self.fail("expected KeyError")
        try:
            self.cache[12] = 12
        except TypeError:
            pass
        else:
            self.fail("expected TyepError")
        try:
            del self.cache[12]
        except TypeError:
            pass
        else:
            self.fail("expected TypeError")

    def checkBogusObject(self):
        def add(key, obj):
            self.cache[key] = obj

        key = p64(2)
        # value isn't persistent
        self.assertRaises(TypeError, add, key, 12)

        o = StubObject()
        # o._p_oid == None
        self.assertRaises(TypeError, add, key, o)

        o._p_oid = p64(3)
        self.assertRaises(ValueError, add, key, o)

        o._p_oid = key
        # o._p_jar == None
        self.assertRaises(Exception, add, key, o)

        o._p_jar = self.jar
        self.cache[key] = o
        # make sure it can be added multiple times
        self.cache[key] = o

        # same object, different keys
        self.assertRaises(ValueError, add, p64(0), o)

    def checkTwoCaches(self):
        jar2 = StubDataManager()
        cache2 = PickleCache(jar2)

        o = StubObject()
        key = o._p_oid = p64(1)
        o._p_jar = jar2

        cache2[key] = o

        try:
            self.cache[key] = o
        except ValueError:
            pass
        else:
            self.fail("expected ValueError because object already in cache")

    def checkReadOnlyAttrsWhenCached(self):
        o = StubObject()
        key = o._p_oid = p64(1)
        o._p_jar = self.jar
        self.cache[key] = o
        try:
            o._p_oid = p64(2)
        except ValueError:
            pass
        else:
            self.fail("expect that you can't change oid of cached object")
        try:
            del o._p_jar
        except ValueError:
            pass
        else:
            self.fail("expect that you can't delete jar of cached object")

def test_suite():
    s = unittest.makeSuite(DBMethods, 'check')
    s.addTest(unittest.makeSuite(LRUCacheTests, 'check'))
    s.addTest(unittest.makeSuite(CacheErrors, 'check'))
    return s


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testConfig.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

import os
import errno
import shutil
import tempfile
import unittest

import ZODB.config
import ZODB.tests
from ZODB.POSException import ReadOnlyError
from ZEO.ClientStorage import ClientDisconnected

class ConfigTestBase(unittest.TestCase):
    def _opendb(self, s):
        return ZODB.config.databaseFromString(s)

    def _test(self, s):
        db = self._opendb(s)
        # Do something with the database to make sure it works
        cn = db.open()
        rt = cn.root()
        rt["test"] = 1
        get_transaction().commit()
        db.close()


class ZODBConfigTest(ConfigTestBase):
    def test_map_config1(self):
        self._test(
            """
            <zodb>
              <mappingstorage/>
            </zodb>
            """)

    def test_map_config2(self):
        self._test(
            """
            <zodb>
              <mappingstorage/>
              cache-size 1000
            </zodb>
            """)

    def test_file_config1(self):
        import ZODB.FileStorage
        path = tempfile.mktemp()
        self._test(
            """
            <zodb>
              <filestorage>
                path %s
              </filestorage>
            </zodb>
            """ % path)
        ZODB.FileStorage.cleanup(path)

    def test_file_config2(self):
        import ZODB.FileStorage
        path = tempfile.mktemp()
        cfg = """
        <zodb>
          <filestorage>
            path %s
            create false
            read-only true
          </filestorage>
        </zodb>
        """ % path
        self.assertRaises(ReadOnlyError, self._test, cfg)
        ZODB.FileStorage.cleanup(path)

    def test_zeo_config(self):
        # We're looking for a port that doesn't exist so a connection attempt
        # will fail.  Instead of elaborate logic to loop over a port
        # calculation, we'll just pick a simple "random", likely to not-exist
        # port number and add an elaborate comment explaining this instead.
        # Go ahead, grep for 9.
        cfg = """
        <zodb>
          <zeoclient>
            server localhost:56897
            wait false
          </zeoclient>
        </zodb>
        """
        self.assertRaises(ClientDisconnected, self._test, cfg)

    def test_demo_config(self):
        cfg = """
        <zodb unused-name>
          <demostorage>
            name foo
            <mappingstorage/>
          </demostorage>
        </zodb>
        """
        self._test(cfg)

class BDBConfigTest(ConfigTestBase):
    def setUp(self):
        self._path = tempfile.mktemp()
        try:
            os.mkdir(self._path)
        except OSError, e:
            if e.errno <> errno.EEXIST:
                raise

    def tearDown(self):
        shutil.rmtree(self._path)

    def test_bdbfull_simple(self):
        cfg = """
        <zodb>
          <fullstorage>
             envdir %s
          </fullstorage>
        </zodb>
        """ % self._path
        self._test(cfg)

    def test_bdbminimal_simple(self):
        cfg = """
        <zodb>
          <minimalstorage>
            envdir %s
          </minimalstorage>
        </zodb>
        """ % self._path
        self._test(cfg)


def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(ZODBConfigTest))
    # Only run the Berkeley tests if they are available
    import BDBStorage
    if BDBStorage.is_available:
        suite.addTest(unittest.makeSuite(BDBConfigTest))
    return suite


if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testDB.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import os
import time
import unittest

import ZODB
import ZODB.FileStorage

from ZODB.tests.MinPO import MinPO

class DBTests(unittest.TestCase):

    def setUp(self):
        self.__path = os.path.abspath('test.fs')
        store = ZODB.FileStorage.FileStorage(self.__path)
        self.db = ZODB.DB(store)

    def tearDown(self):
        self.db.close()
        for s in ('', '.index', '.lock', '.tmp'):
            if os.path.exists(self.__path+s):
                os.remove(self.__path+s)

    def dowork(self, version=''):
        c = self.db.open(version)
        r = c.root()
        o = r[time.time()] = MinPO(0)
        get_transaction().commit()
        for i in range(25):
            o.value = MinPO(i)
            get_transaction().commit()
            o = o.value
        c.close()

    # make sure the basic methods are callable

    def testSets(self):
        # test set methods that have non-trivial implementations
        self.db.setCacheDeactivateAfter(12) # deprecated
        self.db.setCacheSize(15)
        self.db.setVersionCacheDeactivateAfter(12) # deprecated
        self.db.setVersionCacheSize(15)

    def test_removeVersionPool(self):
        # Test that we can remove a version pool

        # This is white box because we check some internal data structures

        self.dowork()
        self.dowork('v2')
        c1 = self.db.open('v1')
        c1.close() # return to pool
        c12 = self.db.open('v1')
        c12.close() # return to pool
        self.assert_(c1 is c12) # should be same

        pools, pooll = self.db._pools

        self.assertEqual(len(pools), 3)
        self.assertEqual(len(pooll), 3)

        self.db.removeVersionPool('v1')

        self.assertEqual(len(pools), 2)
        self.assertEqual(len(pooll), 2)

        c12 = self.db.open('v1')
        c12.close() # return to pool
        self.assert_(c1 is not c12) # should be different

        self.assertEqual(len(pools), 3)
        self.assertEqual(len(pooll), 3)

    def _test_for_leak(self):
        self.dowork()
        self.dowork('v2')
        while 1:
            c1 = self.db.open('v1')
            self.db.removeVersionPool('v1')
            c1.close() # return to pool

    def test_removeVersionPool_while_connection_open(self):
        # Test that we can remove a version pool

        # This is white box because we check some internal data structures

        self.dowork()
        self.dowork('v2')
        c1 = self.db.open('v1')
        c1.close() # return to pool
        c12 = self.db.open('v1')
        self.assert_(c1 is c12) # should be same

        pools, pooll = self.db._pools

        self.assertEqual(len(pools), 3)
        self.assertEqual(len(pooll), 3)

        self.db.removeVersionPool('v1')

        self.assertEqual(len(pools), 2)
        self.assertEqual(len(pooll), 2)

        c12.close() # should leave pools alone

        self.assertEqual(len(pools), 2)
        self.assertEqual(len(pooll), 2)

        c12 = self.db.open('v1')
        c12.close() # return to pool
        self.assert_(c1 is not c12) # should be different

        self.assertEqual(len(pools), 3)
        self.assertEqual(len(pooll), 3)


def test_suite():
    return unittest.makeSuite(DBTests)


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testDemoStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import ZODB.DemoStorage
import os, unittest

from ZODB.tests import StorageTestBase, BasicStorage, \
     VersionStorage, Synchronization

class DemoStorageTests(StorageTestBase.StorageTestBase,
                       BasicStorage.BasicStorage,
                       VersionStorage.VersionStorage,
                       Synchronization.SynchronizedStorage,
                       ):

    def setUp(self):
        self._storage = ZODB.DemoStorage.DemoStorage()

    def tearDown(self):
        self._storage.close()

    def checkOversizeNote(self):
        # This base class test checks for the common case where a storage
        # doesnt support huge transaction metadata. This storage doesnt
        # have this limit, so we inhibit this test here.
        pass


def test_suite():
    suite = unittest.makeSuite(DemoStorageTests, 'check')
    return suite

if __name__ == "__main__":
    loader = unittest.TestLoader()
    loader.testMethodPrefix = "check"
    unittest.main(testLoader=loader)


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testFileStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from __future__ import nested_scopes

import ZODB.FileStorage
import sys, os, unittest
import errno
import filecmp
import StringIO
from ZODB.Transaction import Transaction
from ZODB import POSException
from ZODB.fsrecover import recover

from ZODB.tests import StorageTestBase, BasicStorage, \
     TransactionalUndoStorage, VersionStorage, \
     TransactionalUndoVersionStorage, PackableStorage, \
     Synchronization, ConflictResolution, HistoryStorage, \
     IteratorStorage, Corruption, RevisionStorage, PersistentStorage, \
     MTStorage, ReadOnlyStorage, RecoveryStorage
from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle

class FileStorageTests(
    StorageTestBase.StorageTestBase,
    BasicStorage.BasicStorage,
    TransactionalUndoStorage.TransactionalUndoStorage,
    RevisionStorage.RevisionStorage,
    VersionStorage.VersionStorage,
    TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
    PackableStorage.PackableStorage,
    Synchronization.SynchronizedStorage,
    ConflictResolution.ConflictResolvingStorage,
    ConflictResolution.ConflictResolvingTransUndoStorage,
    HistoryStorage.HistoryStorage,
    IteratorStorage.IteratorStorage,
    IteratorStorage.ExtendedIteratorStorage,
    PersistentStorage.PersistentStorage,
    MTStorage.MTStorage,
    ReadOnlyStorage.ReadOnlyStorage
    ):

    def open(self, **kwargs):
        self._storage = ZODB.FileStorage.FileStorage('FileStorageTests.fs',
                                                     **kwargs)

    def setUp(self):
        self.open(create=1)

    def tearDown(self):
        self._storage.close()
        StorageTestBase.removefs("FileStorageTests.fs")

    def checkLongMetadata(self):
        s = "X" * 75000
        try:
            self._dostore(user=s)
        except POSException.StorageError:
            pass
        else:
            self.fail("expect long user field to raise error")
        try:
            self._dostore(description=s)
        except POSException.StorageError:
            pass
        else:
            self.fail("expect long user field to raise error")

    def check_use_fsIndex(self):
        from ZODB.fsIndex import fsIndex

        self.assertEqual(self._storage._index.__class__, fsIndex)

    # XXX We could really use some tests for sanity checking

    def check_conversion_to_fsIndex_not_if_readonly(self):

        self.tearDown()

        class OldFileStorage(ZODB.FileStorage.FileStorage):
            def _newIndexes(self):
                return {}, {}, {}, {}, {}, {}, {}


        from ZODB.fsIndex import fsIndex

        # Hack FileStorage to create dictionary indexes
        self._storage = OldFileStorage('FileStorageTests.fs')

        self.assertEqual(type(self._storage._index), type({}))
        for i in range(10):
            self._dostore()

        # Should save the index
        self._storage.close()

        self._storage = ZODB.FileStorage.FileStorage(
            'FileStorageTests.fs', read_only=1)
        self.assertEqual(type(self._storage._index), type({}))

    def check_conversion_to_fsIndex(self):

        self.tearDown()

        class OldFileStorage(ZODB.FileStorage.FileStorage):
            def _newIndexes(self):
                return {}, {}, {}, {}, {}, {}, {}


        from ZODB.fsIndex import fsIndex

        # Hack FileStorage to create dictionary indexes
        self._storage = OldFileStorage('FileStorageTests.fs')

        self.assertEqual(type(self._storage._index), type({}))
        for i in range(10):
            self._dostore()

        oldindex = self._storage._index.copy()

        # Should save the index
        self._storage.close()

        self._storage = ZODB.FileStorage.FileStorage('FileStorageTests.fs')
        self.assertEqual(self._storage._index.__class__, fsIndex)
        self.failUnless(self._storage._used_index)

        index = {}
        for k, v in self._storage._index.items():
            index[k] = v

        self.assertEqual(index, oldindex)


    def check_save_after_load_with_no_index(self):
        for i in range(10):
            self._dostore()
        self._storage.close()
        os.remove('FileStorageTests.fs.index')
        self.open()
        self.assertEqual(self._storage._saved, 1)


    # This would make the unit tests too slow
    # check_save_after_load_that_worked_hard(self)

    def check_periodic_save_index(self):

        # Check the basic algorithm
        oldsaved = self._storage._saved
        self._storage._records_before_save = 10
        for i in range(4):
            self._dostore()
        self.assertEqual(self._storage._saved, oldsaved)
        self._dostore()
        self.assertEqual(self._storage._saved, oldsaved+1)

        # Now make sure the parameter changes as we get bigger
        for i in range(20):
            self._dostore()

        self.failUnless(self._storage._records_before_save > 20)

    # There are a bunch of tests that the current pack() implementation
    # does not past.  We need to fix pack(), but don't want tests to
    # fail until then.

    def checkPackVersionsInPast(self):
        pass

    def checkPackAfterUndoDeletion(self):
        pass

class FileStorageRecoveryTest(
    StorageTestBase.StorageTestBase,
    RecoveryStorage.RecoveryStorage,
    ):

    def setUp(self):
        StorageTestBase.removefs("Source.fs")
        StorageTestBase.removefs("Dest.fs")
        self._storage = ZODB.FileStorage.FileStorage('Source.fs')
        self._dst = ZODB.FileStorage.FileStorage('Dest.fs')

    def tearDown(self):
        self._storage.close()
        self._dst.close()
        StorageTestBase.removefs("Source.fs")
        StorageTestBase.removefs("Dest.fs")

    def new_dest(self):
        StorageTestBase.removefs('Dest.fs')
        return ZODB.FileStorage.FileStorage('Dest.fs')


def test_suite():
    suite = unittest.makeSuite(FileStorageTests, 'check')
    suite2 = unittest.makeSuite(Corruption.FileStorageCorruptTests, 'check')
    suite3 = unittest.makeSuite(FileStorageRecoveryTest, 'check')
    suite.addTest(suite2)
    suite.addTest(suite3)
    return suite

if __name__=='__main__':
    unittest.main()


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testMappingStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import ZODB.MappingStorage
import os, unittest

from ZODB.tests import StorageTestBase, BasicStorage, Synchronization

class MappingStorageTests(StorageTestBase.StorageTestBase,
                       BasicStorage.BasicStorage,
                       Synchronization.SynchronizedStorage,
                       ):

    def setUp(self):
        self._storage = ZODB.MappingStorage.MappingStorage()

    def tearDown(self):
        self._storage.close()

    def checkOversizeNote(self):
        # This base class test checks for the common case where a storage
        # doesnt support huge transaction metadata. This storage doesnt
        # have this limit, so we inhibit this test here.
        pass

def test_suite():
    suite = unittest.makeSuite(MappingStorageTests, 'check')
    return suite

if __name__ == "__main__":
    loader = unittest.TestLoader()
    loader.testMethodPrefix = "check"
    unittest.main(testLoader=loader)


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testPersistentList.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test the list interface to PersistentList
"""

import unittest
from ZODB.PersistentList import PersistentList

l0 = []
l1 = [0]
l2 = [0, 1]

class TestPList(unittest.TestCase):
    def checkTheWorld(self):
        # Test constructors
        u = PersistentList()
        u0 = PersistentList(l0)
        u1 = PersistentList(l1)
        u2 = PersistentList(l2)

        uu = PersistentList(u)
        uu0 = PersistentList(u0)
        uu1 = PersistentList(u1)
        uu2 = PersistentList(u2)

        v = PersistentList(tuple(u))
        class OtherList:
            def __init__(self, initlist):
                self.__data = initlist
            def __len__(self):
                return len(self.__data)
            def __getitem__(self, i):
                return self.__data[i]
        v0 = PersistentList(OtherList(u0))
        vv = PersistentList("this is also a sequence")

        # Test __repr__
        eq = self.assertEqual

        eq(str(u0), str(l0), "str(u0) == str(l0)")
        eq(repr(u1), repr(l1), "repr(u1) == repr(l1)")
        eq(`u2`, `l2`, "`u2` == `l2`")

        # Test __cmp__ and __len__

        def mycmp(a, b):
            r = cmp(a, b)
            if r < 0: return -1
            if r > 0: return 1
            return r

        all = [l0, l1, l2, u, u0, u1, u2, uu, uu0, uu1, uu2]
        for a in all:
            for b in all:
                eq(mycmp(a, b), mycmp(len(a), len(b)),
                      "mycmp(a, b) == mycmp(len(a), len(b))")

        # Test __getitem__

        for i in range(len(u2)):
            eq(u2[i], i, "u2[i] == i")

        # Test __setitem__

        uu2[0] = 0
        uu2[1] = 100
        try:
            uu2[2] = 200
        except IndexError:
            pass
        else:
            raise TestFailed("uu2[2] shouldn't be assignable")

        # Test __delitem__

        del uu2[1]
        del uu2[0]
        try:
            del uu2[0]
        except IndexError:
            pass
        else:
            raise TestFailed("uu2[0] shouldn't be deletable")

        # Test __getslice__

        for i in range(-3, 4):
            eq(u2[:i], l2[:i], "u2[:i] == l2[:i]")
            eq(u2[i:], l2[i:], "u2[i:] == l2[i:]")
            for j in range(-3, 4):
                eq(u2[i:j], l2[i:j], "u2[i:j] == l2[i:j]")

        # Test __setslice__

        for i in range(-3, 4):
            u2[:i] = l2[:i]
            eq(u2, l2, "u2 == l2")
            u2[i:] = l2[i:]
            eq(u2, l2, "u2 == l2")
            for j in range(-3, 4):
                u2[i:j] = l2[i:j]
                eq(u2, l2, "u2 == l2")

        uu2 = u2[:]
        uu2[:0] = [-2, -1]
        eq(uu2, [-2, -1, 0, 1], "uu2 == [-2, -1, 0, 1]")
        uu2[0:] = []
        eq(uu2, [], "uu2 == []")

        # Test __contains__
        for i in u2:
            self.failUnless(i in u2, "i in u2")
        for i in min(u2)-1, max(u2)+1:
            self.failUnless(i not in u2, "i not in u2")

        # Test __delslice__

        uu2 = u2[:]
        del uu2[1:2]
        del uu2[0:1]
        eq(uu2, [], "uu2 == []")

        uu2 = u2[:]
        del uu2[1:]
        del uu2[:1]
        eq(uu2, [], "uu2 == []")

        # Test __add__, __radd__, __mul__ and __rmul__

        #self.failUnless(u1 + [] == [] + u1 == u1, "u1 + [] == [] + u1 == u1")
        self.failUnless(u1 + [1] == u2, "u1 + [1] == u2")
        #self.failUnless([-1] + u1 == [-1, 0], "[-1] + u1 == [-1, 0]")
        self.failUnless(u2 == u2*1 == 1*u2, "u2 == u2*1 == 1*u2")
        self.failUnless(u2+u2 == u2*2 == 2*u2, "u2+u2 == u2*2 == 2*u2")
        self.failUnless(u2+u2+u2 == u2*3 == 3*u2, "u2+u2+u2 == u2*3 == 3*u2")

        # Test append

        u = u1[:]
        u.append(1)
        eq(u, u2, "u == u2")

        # Test insert

        u = u2[:]
        u.insert(0, -1)
        eq(u, [-1, 0, 1], "u == [-1, 0, 1]")

        # Test pop

        u = PersistentList([0, -1, 1])
        u.pop()
        eq(u, [0, -1], "u == [0, -1]")
        u.pop(0)
        eq(u, [-1], "u == [-1]")

        # Test remove

        u = u2[:]
        u.remove(1)
        eq(u, u1, "u == u1")

        # Test count
        u = u2*3
        eq(u.count(0), 3, "u.count(0) == 3")
        eq(u.count(1), 3, "u.count(1) == 3")
        eq(u.count(2), 0, "u.count(2) == 0")


        # Test index

        eq(u2.index(0), 0, "u2.index(0) == 0")
        eq(u2.index(1), 1, "u2.index(1) == 1")
        try:
            u2.index(2)
        except ValueError:
            pass
        else:
            raise TestFailed("expected ValueError")

        # Test reverse

        u = u2[:]
        u.reverse()
        eq(u, [1, 0], "u == [1, 0]")
        u.reverse()
        eq(u, u2, "u == u2")

        # Test sort

        u = PersistentList([1, 0])
        u.sort()
        eq(u, u2, "u == u2")

        # Test extend

        u = u1[:]
        u.extend(u2)
        eq(u, u1 + u2, "u == u1 + u2")


def test_suite():
    return unittest.makeSuite(TestPList, 'check')

if __name__ == "__main__":
    loader = unittest.TestLoader()
    loader.testMethodPrefix = "check"
    unittest.main(testLoader=loader)


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testPersistentMapping.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Verify that PersistentMapping works with old versions of Zope.

The comments in PersistentMapping.py address the issue in some detail.
The pickled form of a PersistentMapping must use _container to store
the actual mapping, because old versions of Zope used this attribute.
If the new code doesn't generate pickles that are consistent with the
old code, developers will have a hard time testing the new code.
"""

import unittest

import ZODB
from ZODB.MappingStorage import MappingStorage
from ZODB.Transaction import Transaction
import cPickle
import cStringIO
import sys

# This pickle contains a persistent mapping pickle created from the
# old code.
pickle = ('((U\x0bPersistenceq\x01U\x11PersistentMappingtq\x02Nt.}q\x03U\n'
          '_containerq\x04}q\x05U\x07versionq\x06U\x03oldq\x07ss.\n')

class PMTests(unittest.TestCase):

    def checkOldStyleRoot(self):
        # insert the pickle in place of the root
        s = MappingStorage()
        t = Transaction()
        s.tpc_begin(t)
        s.store('\000' * 8, None, pickle, '', t)
        s.tpc_vote(t)
        s.tpc_finish(t)

        db = ZODB.DB(s)
        # If the root can be loaded successfully, we should be okay.
        r = db.open().root()
        # But make sure it looks like a new mapping
        self.assert_(hasattr(r, 'data'))
        self.assert_(not hasattr(r, '_container'))

    def checkNewPicklesAreSafe(self):
        s = MappingStorage()
        db = ZODB.DB(s)
        r = db.open().root()
        r[1] = 1
        r[2] = 2
##        r[3] = r
        get_transaction().commit()
        # MappingStorage stores serialno + pickle in its _index.
        root_pickle = s._index['\000' * 8][8:]

        f = cStringIO.StringIO(root_pickle)
        u = cPickle.Unpickler(f)
        klass_info = u.load()
        klass = find_global(*klass_info[0])
        inst = klass()
        state = u.load()
        inst.__setstate__(state)

        self.assert_(hasattr(inst, '_container'))
        self.assert_(not hasattr(inst, 'data'))

def find_global(modulename, classname):
    """Helper for this test suite to get special PersistentMapping"""

    if classname == "PersistentMapping":
        class PersistentMapping:
            def __setstate__(self, state):
                self.__dict__.update(state)
        return PersistentMapping
    else:
        __import__(modulename)
        mod = sys.modules[modulename]
        return getattr(mod, classname)

def test_suite():
    return unittest.makeSuite(PMTests, 'check')

if __name__ == "__main__":
    loader = unittest.TestLoader()
    loader.testMethodPrefix = "check"
    unittest.main(testLoader=loader)


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testRecover.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests of the file storage recovery script."""

import base64
import os
import random
import sys
import tempfile
import unittest
import StringIO

import ZODB
from ZODB.FileStorage import FileStorage
from ZODB.PersistentMapping import PersistentMapping
from ZODB.fsrecover import recover
from ZODB.tests.StorageTestBase import removefs

from ZODB.fsdump import Dumper

class RecoverTest(unittest.TestCase):

    level = 2

    path = None

    def setUp(self):
        self.path = tempfile.mktemp(suffix=".fs")
        self.storage = FileStorage(self.path)
        self.populate()
        self.dest = tempfile.mktemp(suffix=".fs")
        self.recovered = None

    def tearDown(self):
        self.storage.close()
        if self.recovered is not None:
            self.recovered.close()
        removefs(self.path)
        removefs(self.dest)

    def populate(self):
        db = ZODB.DB(self.storage)
        cn = db.open()
        rt = cn.root()

        # create a whole bunch of objects,
        # looks like a Data.fs > 1MB
        for i in range(50):
            d = rt[i] = PersistentMapping()
            get_transaction().commit()
            for j in range(50):
                d[j] = "a" * j
            get_transaction().commit()

    def damage(self, num, size):
        self.storage.close()
        # Drop size null bytes into num random spots.
        for i in range(num):
            offset = random.randint(0, self.storage._pos - size)
            f = open(self.path, "a+b")
            f.seek(offset)
            f.write("\0" * size)
            f.close()

    ITERATIONS = 5

    def recover(self, source, dest):
        orig = sys.stdout
        try:
            sys.stdout = StringIO.StringIO()
            try:
                recover(self.path, self.dest,
                        verbose=0, partial=1, force=0, pack=1)
            except SystemExit:
                raise RuntimeError, "recover tried to exit"
        finally:
            sys.stdout = orig

    def testOneBlock(self):
        for i in range(self.ITERATIONS):
            self.damage(1, 1024)
            self.recover(self.path, self.dest)
            self.recovered = FileStorage(self.dest)
            self.recovered.close()
            os.remove(self.path)
            os.rename(self.dest, self.path)

    def testFourBlocks(self):
        for i in range(self.ITERATIONS):
            self.damage(4, 512)
            self.recover(self.path, self.dest)
            self.recovered = FileStorage(self.dest)
            self.recovered.close()
            os.remove(self.path)
            os.rename(self.dest, self.path)

    def testBigBlock(self):
        for i in range(self.ITERATIONS):
            self.damage(1, 32 * 1024)
            self.recover(self.path, self.dest)
            self.recovered = FileStorage(self.dest)
            self.recovered.close()
            os.remove(self.path)
            os.rename(self.dest, self.path)

    def testBadTransaction(self):
        # Find transaction headers and blast them.

        L = self.storage.undoLog()
        r = L[3]
        tid = base64.decodestring(r["id"] + "\n")
        pos1 = self.storage._txn_find(tid, 0)

        r = L[8]
        tid = base64.decodestring(r["id"] + "\n")
        pos2 = self.storage._txn_find(tid, 0)

        self.storage.close()

        # Overwrite the entire header.
        f = open(self.path, "a+b")
        f.seek(pos1 - 50)
        f.write("\0" * 100)
        f.close()
        self.recover(self.path, self.dest)
        self.recovered = FileStorage(self.dest)
        self.recovered.close()
        os.remove(self.path)
        os.rename(self.dest, self.path)

        # Overwrite part of the header.
        f = open(self.path, "a+b")
        f.seek(pos2 + 10)
        f.write("\0" * 100)
        f.close()
        self.recover(self.path, self.dest)
        self.recovered = FileStorage(self.dest)
        self.recovered.close()


def test_suite():
    return unittest.makeSuite(RecoverTest)


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testTimeStamp.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test the TimeStamp utility type"""

import time
import unittest

from ZODB.TimeStamp import TimeStamp

EPSILON = 0.000001

class TimeStampTests(unittest.TestCase):

    def checkYMDTimeStamp(self):
        self._check_ymd(2001, 6, 3)

    def _check_ymd(self, yr, mo, dy):
        ts = TimeStamp(yr, mo, dy)
        self.assertEqual(ts.year(), yr)
        self.assertEqual(ts.month(), mo)
        self.assertEqual(ts.day(), dy)

        self.assertEquals(ts.hour(), 0)
        self.assertEquals(ts.minute(), 0)
        self.assertEquals(ts.second(), 0)

        t = time.gmtime(ts.timeTime())
        self.assertEquals(yr, t[0])
        self.assertEquals(mo, t[1])
        self.assertEquals(dy, t[2])

    def checkFullTimeStamp(self):
        t = time.gmtime()
        ts = TimeStamp(*t[:6])

        # XXX floating point comparison
        self.assertEquals(ts.timeTime() + time.timezone, time.mktime(t))

        self.assertEqual(ts.year(), t[0])
        self.assertEqual(ts.month(), t[1])
        self.assertEqual(ts.day(), t[2])

        self.assertEquals(ts.hour(), t[3])
        self.assertEquals(ts.minute(), t[4])
        self.assert_(abs(ts.second() - t[5]) < EPSILON)

    def checkRawTimestamp(self):
        t = time.gmtime()
        ts1 = TimeStamp(*t[:6])
        ts2 = TimeStamp(`ts1`)

        self.assertEquals(ts1, ts2)
        self.assertEquals(ts1.timeTime(), ts2.timeTime())
        self.assertEqual(ts1.year(), ts2.year())
        self.assertEqual(ts1.month(), ts2.month())
        self.assertEqual(ts1.day(), ts2.day())
        self.assertEquals(ts1.hour(), ts2.hour())
        self.assertEquals(ts1.minute(), ts2.minute())
        self.assert_(abs(ts1.second() - ts2.second()) < EPSILON)

    def checkDictKey(self):
        t = time.gmtime()
        ts1 = TimeStamp(*t[:6])
        ts2 = TimeStamp(2000, *t[1:6])

        d = {}
        d[ts1] = 1
        d[ts2] = 2

        self.assertEquals(len(d), 2)

    def checkCompare(self):
        ts1 = TimeStamp(1972, 6, 27)
        ts2 = TimeStamp(1971, 12, 12)
        self.assert_(ts1 > ts2)
        self.assert_(ts2 <= ts1)

    def checkLaterThan(self):
        t = time.gmtime()
        ts = TimeStamp(*t[:6])
        ts2 = ts.laterThan(ts)
        self.assert_(ts2 > ts)

    # XXX should test for bogus inputs to TimeStamp constructor

    def checkTimeStamp(self):
        # Alternate test suite
        t = TimeStamp(2002, 1, 23, 10, 48, 5) # GMT
        self.assertEquals(str(t), '2002-01-23 10:48:05.000000')
        self.assertEquals(repr(t), '\x03B9H\x15UUU')
        self.assertEquals(TimeStamp('\x03B9H\x15UUU'), t)
        self.assertEquals(t.year(), 2002)
        self.assertEquals(t.month(), 1)
        self.assertEquals(t.day(), 23)
        self.assertEquals(t.hour(), 10)
        self.assertEquals(t.minute(), 48)
        self.assertEquals(round(t.second()), 5)
        self.assertEquals(t.second(), t.seconds()) # Alias
        self.assertEquals(t.timeTime(), 1011782885)
        t1 = TimeStamp(2002, 1, 23, 10, 48, 10)
        self.assertEquals(str(t1), '2002-01-23 10:48:10.000000')
        self.assert_(t == t)
        self.assert_(t != t1)
        self.assert_(t < t1)
        self.assert_(t <= t1)
        self.assert_(t1 >= t)
        self.assert_(t1 > t)
        self.failIf(t == t1)
        self.failIf(t != t)
        self.failIf(t > t1)
        self.failIf(t >= t1)
        self.failIf(t1 < t)
        self.failIf(t1 <= t)
        self.assertEquals(cmp(t, t), 0)
        self.assertEquals(cmp(t, t1), -1)
        self.assertEquals(cmp(t1, t), 1)
        self.assertEquals(t1.laterThan(t), t1)
        self.assert_(t.laterThan(t1) > t1)
        self.assertEquals(TimeStamp(2002,1,23), TimeStamp(2002,1,23,0,0,0))

def test_suite():
    return unittest.makeSuite(TimeStampTests, 'check')


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testTransaction.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

"""
Revision information:
$Id: testTransaction.py,v 1.1 2004/04/01 18:26:43 sidnei Exp $
"""

"""

I wrote these unittests to investigate some odd transaction
behavior when doing unittests of integrating non sub transaction
aware objects, and to insure proper txn behavior. these
tests test the transaction system independent of the rest of the
zodb.

you can see the method calls to a jar by passing the
keyword arg tracing to the modify method of a dataobject.
the value of the arg is a prefix used for tracing print calls
to that objects jar.

the number of times a jar method was called can be inspected
by looking at an attribute of the jar that is the method
name prefixed with a c (count/check).

i've included some tracing examples for tests that i thought
were illuminating as doc strings below.

TODO

    add in tests for objects which are modified multiple times,
    for example an object that gets modified in multiple sub txns.

"""

import random
from types import TupleType
import unittest

from ZODB import Transaction

class TransactionTests(unittest.TestCase):

    def setUp(self):

        Transaction.hosed = 0
        self.sub1 = DataObject()
        self.sub2 = DataObject()
        self.sub3 = DataObject()
        self.nosub1 = DataObject(nost=1)

    def tearDown(self):

        Transaction.free_transaction()

    # basic tests with two sub trans jars
    # really we only need one, so tests for
    # sub1 should identical to tests for sub2
    def testTransactionCommit(self):

        self.sub1.modify()
        self.sub2.modify()

        get_transaction().commit()

        assert self.sub1._p_jar.ccommit_sub == 0
        assert self.sub1._p_jar.ctpc_finish == 1

    def testTransactionAbort(self):

        self.sub1.modify()
        self.sub2.modify()

        get_transaction().abort()

        assert self.sub2._p_jar.cabort == 1

    def testTransactionNote(self):

        t = get_transaction()

        t.note('This is a note.')
        self.assertEqual(t.description, 'This is a note.')
        t.note('Another.')
        self.assertEqual(t.description, 'This is a note.\n\nAnother.')

        t.abort()

    def testSubTransactionCommitCommit(self):

        self.sub1.modify()
        self.sub2.modify()

        get_transaction().commit(1)

        assert self.sub1._p_jar.ctpc_vote == 0
        assert self.sub1._p_jar.ctpc_finish == 1

        get_transaction().commit()

        assert self.sub1._p_jar.ccommit_sub == 1
        assert self.sub1._p_jar.ctpc_vote == 1

    def testSubTransactionCommitAbort(self):

        self.sub1.modify()
        self.sub2.modify()

        get_transaction().commit(1)
        get_transaction().abort()

        assert self.sub1._p_jar.ctpc_vote == 0
        assert self.sub1._p_jar.cabort == 0
        assert self.sub1._p_jar.cabort_sub == 1

    def testMultipleSubTransactionCommitCommit(self):

        # add it
        self.sub1.modify()

        get_transaction().commit(1)

        # add another
        self.sub2.modify()

        # reset a flag on the original to test it again
        self.sub1.ctpc_finish = 0

        get_transaction().commit(1)

        # this is interesting.. we go through
        # every subtrans commit with all subtrans capable
        # objects... i don't like this but its an impl artifact

        assert self.sub1._p_jar.ctpc_vote == 0
        assert self.sub1._p_jar.ctpc_finish > 0

        # add another before we do the entire txn commit
        self.sub3.modify()

        get_transaction().commit()

        # we did an implicit sub commit, is this impl artifiact?
        assert self.sub3._p_jar.ccommit_sub == 1
        assert self.sub1._p_jar.ctpc_finish > 1


    def testMultipleSubTransactionCommitAbortSub(self):
        """
        sub1 calling method commit
        sub1 calling method tpc_finish
        sub2 calling method tpc_begin
        sub2 calling method commit
        sub2 calling method tpc_finish
        sub3 calling method abort
        sub1 calling method commit_sub
        sub2 calling method commit_sub
        sub2 calling method tpc_vote
        sub1 calling method tpc_vote
        sub1 calling method tpc_finish
        sub2 calling method tpc_finish
        """

        # add it
        self.sub1.modify()

        get_transaction().commit(1)

        # add another
        self.sub2.modify()

        get_transaction().commit(1)

        assert self.sub1._p_jar.ctpc_vote == 0
        assert self.sub1._p_jar.ctpc_finish > 0

        # add another before we do the entire txn commit
        self.sub3.modify()

        # abort the sub transaction
        get_transaction().abort(1)

        # commit the container transaction
        get_transaction().commit()

        assert self.sub3._p_jar.cabort == 1
        assert self.sub1._p_jar.ccommit_sub == 1
        assert self.sub1._p_jar.ctpc_finish > 1

    # repeat adding in a nonsub trans jars

    def testNSJTransactionCommit(self):

        self.nosub1.modify()

        get_transaction().commit()

        assert self.nosub1._p_jar.ctpc_finish == 1

    def testNSJTransactionAbort(self):

        self.nosub1.modify()

        get_transaction().abort()

        assert self.nosub1._p_jar.ctpc_finish == 0
        assert self.nosub1._p_jar.cabort == 1

    # XXX
    def BUGtestNSJSubTransactionCommitAbort(self):
        """
        this reveals a bug in transaction.py
        the nosub jar should not have tpc_finish
        called on it till the containing txn
        ends.

        sub calling method commit
        nosub calling method tpc_begin
        sub calling method tpc_finish
        nosub calling method tpc_finish
        nosub calling method abort
        sub calling method abort_sub
        """

        self.sub1.modify(tracing='sub')
        self.nosub1.modify(tracing='nosub')

        get_transaction().commit(1)

        assert self.sub1._p_jar.ctpc_finish == 1

        # bug, non sub trans jars are getting finished
        # in a subtrans
        assert self.nosub1._p_jar.ctpc_finish == 0

        get_transaction().abort()

        assert self.nosub1._p_jar.cabort == 1
        assert self.sub1._p_jar.cabort_sub == 1

    def testNSJSubTransactionCommitCommit(self):

        self.sub1.modify()
        self.nosub1.modify()

        get_transaction().commit(1)

        assert self.nosub1._p_jar.ctpc_vote == 0

        get_transaction().commit()

        #assert self.nosub1._p_jar.ccommit_sub == 0
        assert self.nosub1._p_jar.ctpc_vote == 1
        assert self.sub1._p_jar.ccommit_sub == 1
        assert self.sub1._p_jar.ctpc_vote == 1


    def testNSJMultipleSubTransactionCommitCommit(self):
        """
        sub1 calling method tpc_begin
        sub1 calling method commit
        sub1 calling method tpc_finish
        nosub calling method tpc_begin
        nosub calling method tpc_finish
        sub2 calling method tpc_begin
        sub2 calling method commit
        sub2 calling method tpc_finish
        nosub calling method tpc_begin
        nosub calling method commit
        sub1 calling method commit_sub
        sub2 calling method commit_sub
        sub1 calling method tpc_vote
        nosub calling method tpc_vote
        sub2 calling method tpc_vote
        sub2 calling method tpc_finish
        nosub calling method tpc_finish
        sub1 calling method tpc_finish
        """

        # add it
        self.sub1.modify()

        get_transaction().commit(1)

        # add another
        self.nosub1.modify()

        get_transaction().commit(1)

        assert self.sub1._p_jar.ctpc_vote == 0
        assert self.nosub1._p_jar.ctpc_vote == 0
        assert self.sub1._p_jar.ctpc_finish > 0

        # add another before we do the entire txn commit
        self.sub2.modify()

        # commit the container transaction
        get_transaction().commit()

        # we did an implicit sub commit
        assert self.sub2._p_jar.ccommit_sub == 1
        assert self.sub1._p_jar.ctpc_finish > 1

    ### Failure Mode Tests
    #
    # ok now we do some more interesting
    # tests that check the implementations
    # error handling by throwing errors from
    # various jar methods
    ###

    # first the recoverable errors

    def testExceptionInAbort(self):

        self.sub1._p_jar = SubTransactionJar(errors='abort')

        self.nosub1.modify()
        self.sub1.modify(nojar=1)
        self.sub2.modify()

        try:
            get_transaction().abort()
        except TestTxnException: pass

        assert self.nosub1._p_jar.cabort == 1
        assert self.sub2._p_jar.cabort == 1

    def testExceptionInCommit(self):

        self.sub1._p_jar = SubTransactionJar(errors='commit')

        self.nosub1.modify()
        self.sub1.modify(nojar=1)

        try:
            get_transaction().commit()
        except TestTxnException: pass

        assert self.nosub1._p_jar.ctpc_finish == 0
        assert self.nosub1._p_jar.ccommit == 1
        assert self.nosub1._p_jar.ctpc_abort == 1
        assert Transaction.hosed == 0

    def testExceptionInTpcVote(self):

        self.sub1._p_jar = SubTransactionJar(errors='tpc_vote')

        self.nosub1.modify()
        self.sub1.modify(nojar=1)

        try:
            get_transaction().commit()
        except TestTxnException: pass

        assert self.nosub1._p_jar.ctpc_finish == 0
        assert self.nosub1._p_jar.ccommit == 1
        assert self.nosub1._p_jar.ctpc_abort == 1
        assert self.sub1._p_jar.ctpc_abort == 1
        assert Transaction.hosed == 0

    def testExceptionInTpcBegin(self):
        """
        ok this test reveals a bug in the TM.py
        as the nosub tpc_abort there is ignored.

        nosub calling method tpc_begin
        nosub calling method commit
        sub calling method tpc_begin
        sub calling method abort
        sub calling method tpc_abort
        nosub calling method tpc_abort
        """
        self.sub1._p_jar = SubTransactionJar(errors='tpc_begin')

        self.nosub1.modify()
        self.sub1.modify(nojar=1)

        try:
            get_transaction().commit()
        except TestTxnException: pass

        assert self.nosub1._p_jar.ctpc_abort == 1
        assert self.sub1._p_jar.ctpc_abort == 1

    def testExceptionInTpcAbort(self):

        self.sub1._p_jar = SubTransactionJar(
                                errors=('tpc_abort', 'tpc_vote'))

        self.nosub1.modify()
        self.sub1.modify(nojar=1)

        try:
            get_transaction().commit()
        except TestTxnException: pass

        assert self.nosub1._p_jar.ctpc_abort == 1
        assert Transaction.hosed == 0

    ### More Failure modes...
    # now we mix in some sub transactions
    ###

    def testExceptionInSubCommitSub(self):
        """
        this tests exhibits some odd behavior,
        nothing thats technically incorrect...

        basically it seems non deterministic, even
        stranger the behavior seems dependent on what
        values i test after the fact... very odd,
        almost relativistic.

        in-retrospect this is from the fact that
        dictionaries are used to store jars at some point

        """

        self.sub1.modify()

        get_transaction().commit(1)

        self.nosub1.modify()

        self.sub2._p_jar = SubTransactionJar(errors='commit_sub')
        self.sub2.modify(nojar=1)

        get_transaction().commit(1)

        self.sub3.modify()

        try:
            get_transaction().commit()
        except TestTxnException: pass


        # odd this doesn't seem to be entirely deterministic..
        if self.sub1._p_jar.ccommit_sub:
            assert self.sub1._p_jar.ctpc_abort == 1
        else:
            assert self.sub1._p_jar.cabort_sub == 1

        if self.sub3._p_jar.ccommit_sub:
            assert self.sub3._p_jar.ctpc_abort == 1
        else:
            assert self.sub3._p_jar.cabort_sub == 1

        assert self.sub2._p_jar.ctpc_abort == 1
        assert self.nosub1._p_jar.ctpc_abort == 1

    def testExceptionInSubAbortSub(self):
        self.sub1._p_jar = SubTransactionJar(errors='commit_sub')
        self.sub1.modify(nojar=1)
        get_transaction().commit(1)

        self.nosub1.modify()
        self.sub2._p_jar = SubTransactionJar(errors='abort_sub')
        self.sub2.modify(nojar=1)
        get_transaction().commit(1)

        self.sub3.modify()

        try:
            get_transaction().commit()
        except TestTxnException, err:
            pass
        else:
            self.fail("expected transaction to fail")

        # The last commit failed.  If the commit_sub() method was
        # called, then tpc_abort() should be called to abort the
        # actual transaction.  If not, then calling abort_sub() is
        # sufficient.
        if self.sub3._p_jar.ccommit_sub == 1:
            self.assertEqual(self.sub3._p_jar.ctpc_abort, 1)
        else:
            self.assertEqual(self.sub3._p_jar.cabort_sub, 1)

    # last test, check the hosing mechanism

    def testHoserStoppage(self):
        # XXX We should consult ZConfig to decide whether we can get into a
        # hosed state or not.
        return

        # It's hard to test the "hosed" state of the database, where
        # hosed means that a failure occurred in the second phase of
        # the two phase commit.  It's hard because the database can
        # recover from such an error if it occurs during the very first
        # tpc_finish() call of the second phase.

        for obj in self.sub1, self.sub2:
            j = HoserJar(errors='tpc_finish')
            j.reset()
            obj._p_jar = j
            obj.modify(nojar=1)

        try:
            get_transaction().commit()
        except TestTxnException:
            pass

        self.assert_(Transaction.hosed)

        self.sub2.modify()

        try:
            get_transaction().commit()
        except Transaction.POSException.TransactionError:
            pass
        else:
            self.fail("Hosed Application didn't stop commits")


class DataObject:

    def __init__(self, nost=0):
        self.nost= nost
        self._p_jar = None

    def modify(self, nojar=0, tracing=0):
        if not nojar:
            if self.nost:
                self._p_jar = NoSubTransactionJar(tracing=tracing)
            else:
                self._p_jar = SubTransactionJar(tracing=tracing)
        get_transaction().register(self)

class TestTxnException(Exception):
    pass

class BasicJar:

    def __init__(self, errors=(), tracing=0):
        if not isinstance(errors, TupleType):
            errors = errors,
        self.errors = errors
        self.tracing = tracing
        self.cabort = 0
        self.ccommit = 0
        self.ctpc_begin = 0
        self.ctpc_abort = 0
        self.ctpc_vote = 0
        self.ctpc_finish = 0
        self.cabort_sub = 0
        self.ccommit_sub = 0

    def __repr__(self):
        return "<jar %X %s>" % (id(self), self.errors)

    def check(self, method):
        if self.tracing:
            print '%s calling method %s'%(str(self.tracing),method)

        if method in self.errors:
            raise TestTxnException("error %s" % method)

    ## basic jar txn interface

    def abort(self, *args):
        self.check('abort')
        self.cabort += 1

    def commit(self, *args):
        self.check('commit')
        self.ccommit += 1

    def tpc_begin(self, txn, sub=0):
        self.check('tpc_begin')
        self.ctpc_begin += 1

    def tpc_vote(self, *args):
        self.check('tpc_vote')
        self.ctpc_vote += 1

    def tpc_abort(self, *args):
        self.check('tpc_abort')
        self.ctpc_abort += 1

    def tpc_finish(self, *args):
        self.check('tpc_finish')
        self.ctpc_finish += 1

class SubTransactionJar(BasicJar):

    def abort_sub(self, txn):
        self.check('abort_sub')
        self.cabort_sub = 1

    def commit_sub(self, txn):
        self.check('commit_sub')
        self.ccommit_sub = 1

class NoSubTransactionJar(BasicJar):
    pass

class HoserJar(BasicJar):

    # The HoserJars coordinate their actions via the class variable
    # committed.  The check() method will only raise its exception
    # if committed > 0.

    committed = 0

    def reset(self):
        # Calling reset() on any instance will reset the class variable.
        HoserJar.committed = 0

    def check(self, method):
        if HoserJar.committed > 0:
            BasicJar.check(self, method)

    def tpc_finish(self, *args):
        self.check('tpc_finish')
        self.ctpc_finish += 1
        HoserJar.committed += 1


def test_suite():

    return unittest.makeSuite(TransactionTests)

if __name__ == '__main__':
    unittest.TextTestRunner().run(test_suite())


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testUtils.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test the routines to convert between long and 64-bit strings"""

import random
import unittest

NUM = 100

from ZODB.utils import U64, p64, u64

class TestUtils(unittest.TestCase):

    small = [random.randrange(1, 1L<<32, int=long)
             for i in range(NUM)]
    large = [random.randrange(1L<<32, 1L<<64, int=long)
             for i in range(NUM)]
    all = small + large

    def checkLongToStringToLong(self):
        for num in self.all:
            s = p64(num)
            n = U64(s)
            self.assertEquals(num, n, "U64() failed")
            n2 = u64(s)
            self.assertEquals(num, n2, "u64() failed")

    def checkKnownConstants(self):
        self.assertEquals("\000\000\000\000\000\000\000\001", p64(1))
        self.assertEquals("\000\000\000\001\000\000\000\000", p64(1L<<32))
        self.assertEquals(u64("\000\000\000\000\000\000\000\001"), 1)
        self.assertEquals(U64("\000\000\000\000\000\000\000\001"), 1)
        self.assertEquals(u64("\000\000\000\001\000\000\000\000"), 1L<<32)
        self.assertEquals(U64("\000\000\000\001\000\000\000\000"), 1L<<32)

def test_suite():
    return unittest.makeSuite(TestUtils, 'check')

if __name__ == "__main__":
    loader = unittest.TestLoader()
    loader.testMethodPrefix = "check"
    unittest.main(testLoader=loader)


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testZODB.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from __future__ import nested_scopes

import unittest

import ZODB
import ZODB.FileStorage
from ZODB.PersistentMapping import PersistentMapping
from ZODB.POSException import ReadConflictError, ConflictError
from ZODB.tests.StorageTestBase import removefs
from Persistence import Persistent

class P(Persistent):
    pass

class Independent(Persistent):

    def _p_independent(self):
        return 1

class DecoyIndependent(Persistent):

    def _p_independent(self):
        return 0

class ZODBTests(unittest.TestCase):

    def setUp(self):
        self._storage = ZODB.FileStorage.FileStorage(
            'ZODBTests.fs', create=1)
        self._db = ZODB.DB(self._storage)

    def populate(self):
        get_transaction().begin()
        conn = self._db.open()
        root = conn.root()
        root['test'] = pm = PersistentMapping()
        for n in range(100):
            pm[n] = PersistentMapping({0: 100 - n})
        get_transaction().note('created test data')
        get_transaction().commit()
        conn.close()

    def tearDown(self):
        self._db.close()
        removefs("ZODBTests.fs")

    def checkExportImport(self, abort_it=0, dup_name='test_duplicate'):
        self.populate()
        get_transaction().begin()
        get_transaction().note('duplication')
        # Duplicate the 'test' object.
        conn = self._db.open()
        try:
            root = conn.root()
            ob = root['test']
            assert len(ob) > 10, 'Insufficient test data'
            try:
                import tempfile
                f = tempfile.TemporaryFile()
                ob._p_jar.exportFile(ob._p_oid, f)
                assert f.tell() > 0, 'Did not export correctly'
                f.seek(0)
                new_ob = ob._p_jar.importFile(f)
                root[dup_name] = new_ob
                f.close()
                if abort_it:
                    get_transaction().abort()
                else:
                    get_transaction().commit()
            except:
                get_transaction().abort()
                raise
        finally:
            conn.close()
        get_transaction().begin()
        # Verify the duplicate.
        conn = self._db.open()
        try:
            root = conn.root()
            ob = root['test']
            try:
                ob2 = root[dup_name]
            except KeyError:
                if abort_it:
                    # Passed the test.
                    return
                else:
                    raise
            else:
                if abort_it:
                    assert 0, 'Did not abort duplication'
            l1 = list(ob.items())
            l1.sort()
            l2 = list(ob2.items())
            l2.sort()
            l1 = map(lambda (k, v): (k, v[0]), l1)
            l2 = map(lambda (k, v): (k, v[0]), l2)
            assert l1 == l2, 'Duplicate did not match'
            assert ob._p_oid != ob2._p_oid, 'Did not duplicate'
            assert ob._p_jar == ob2._p_jar, 'Not same connection'
            oids = {}
            for v in ob.values():
                oids[v._p_oid] = 1
            for v in ob2.values():
                assert not oids.has_key(v._p_oid), (
                    'Did not fully separate duplicate from original')
            get_transaction().commit()
        finally:
            conn.close()

    def checkExportImportAborted(self):
        self.checkExportImport(abort_it=1, dup_name='test_duplicate_aborted')

    def checkVersionOnly(self):
        # Make sure the changes to make empty transactions a no-op
        # still allow things like abortVersion().  This should work
        # because abortVersion() calls tpc_begin() itself.
        conn = self._db.open("version")
        try:
            r = conn.root()
            r[1] = 1
            get_transaction().commit()
        finally:
            conn.close()
        self._db.abortVersion("version")
        get_transaction().commit()

    def checkResetCache(self):
        # The cache size after a reset should be 0 and the GC attributes
        # ought to be linked to it rather than the old cache.
        self.populate()
        conn = self._db.open()
        conn.root()
        self.assert_(len(conn._cache) > 0)  # Precondition
        conn._resetCache()
        self.assertEqual(len(conn._cache), 0)
        self.assert_(conn._incrgc == conn._cache.incrgc)
        self.assert_(conn.cacheGC == conn._cache.incrgc)

    def checkLocalTransactions(self):
        # Test of transactions that apply to only the connection,
        # not the thread.
        conn1 = self._db.open()
        conn2 = self._db.open()
        try:
            conn1.setLocalTransaction()
            conn2.setLocalTransaction()
            r1 = conn1.root()
            r2 = conn2.root()
            if r1.has_key('item'):
                del r1['item']
                conn1.getTransaction().commit()
            r1.get('item')
            r2.get('item')
            r1['item'] = 1
            conn1.getTransaction().commit()
            self.assertEqual(r1['item'], 1)
            # r2 has not seen a transaction boundary,
            # so it should be unchanged.
            self.assertEqual(r2.get('item'), None)
            conn2.sync()
            # Now r2 is updated.
            self.assertEqual(r2['item'], 1)

            # Now, for good measure, send an update in the other direction.
            r2['item'] = 2
            conn2.getTransaction().commit()
            self.assertEqual(r1['item'], 1)
            self.assertEqual(r2['item'], 2)
            conn1.sync()
            conn2.sync()
            self.assertEqual(r1['item'], 2)
            self.assertEqual(r2['item'], 2)
        finally:
            conn1.close()
            conn2.close()

    def checkReadConflict(self):
        self.obj = P()
        self.readConflict()

    def readConflict(self, shouldFail=1):
        # Two transactions run concurrently.  Each reads some object,
        # then one commits and the other tries to read an object
        # modified by the first.  This read should fail with a conflict
        # error because the object state read is not necessarily
        # consistent with the objects read earlier in the transaction.

        conn = self._db.open()
        conn.setLocalTransaction()
        r1 = conn.root()
        r1["p"] = self.obj
        self.obj.child1 = P()
        conn.getTransaction().commit()

        # start a new transaction with a new connection
        cn2 = self._db.open()
        # start a new transaction with the other connection
        cn2.setLocalTransaction()
        r2 = cn2.root()

        self.assertEqual(r1._p_serial, r2._p_serial)

        self.obj.child2 = P()
        conn.getTransaction().commit()

        # resume the transaction using cn2
        obj = r2["p"]
        # An attempt to access obj should fail, because r2 was read
        # earlier in the transaction and obj was modified by the othe
        # transaction.
        if shouldFail:
            self.assertRaises(ReadConflictError, lambda: obj.child1)
        else:
            # make sure that accessing the object succeeds
            obj.child1
        cn2.getTransaction().abort()

    def checkReadConflictIgnored(self):
        # Test that an application that catches a read conflict and
        # continues can not commit the transaction later.
        root = self._db.open().root()
        root["real_data"] = real_data = PersistentMapping()
        root["index"] = index = PersistentMapping()

        real_data["a"] = PersistentMapping({"indexed_value": 0})
        real_data["b"] = PersistentMapping({"indexed_value": 1})
        index[1] = PersistentMapping({"b": 1})
        index[0] = PersistentMapping({"a": 1})
        get_transaction().commit()

        # load some objects from one connection
        cn2 = self._db.open()
        cn2.setLocalTransaction()
        r2 = cn2.root()
        real_data2 = r2["real_data"]
        index2 = r2["index"]

        real_data["b"]["indexed_value"] = 0
        del index[1]["b"]
        index[0]["b"] = 1
        get_transaction().commit()

        del real_data2["a"]
        try:
            del index2[0]["a"]
        except ReadConflictError:
            # This is the crux of the test.  Ignore the error.
            pass
        else:
            self.fail("No conflict occurred")

        # real_data2 still ready to commit
        self.assert_(real_data2._p_changed)

        # index2 values not ready to commit
        self.assert_(not index2._p_changed)
        self.assert_(not index2[0]._p_changed)
        self.assert_(not index2[1]._p_changed)

        self.assertRaises(ConflictError, cn2.getTransaction().commit)
        get_transaction().abort()

    def checkIgnoreReadConflict(self):
        # Test that an application that catches a read conflict and
        # continues can not commit the transaction later.
        root = self._db.open().root()
        root["real_data"] = real_data = PersistentMapping()
        root["index"] = index = PersistentMapping()

        real_data["a"] = PersistentMapping({"indexed_value": 0})
        real_data["b"] = PersistentMapping({"indexed_value": 1})
        index[1] = PersistentMapping({"b": 1})
        index[0] = PersistentMapping({"a": 1})
        get_transaction().commit()

        # load some objects from one connection
        cn2 = self._db.open()
        cn2.setLocalTransaction()
        r2 = cn2.root()
        real_data2 = r2["real_data"]
        index2 = r2["index"]

        real_data["b"]["indexed_value"] = 0
        del index[1]["b"]
        index[0]["b"] = 1
        get_transaction().commit()

        del real_data2["a"]
        try:
            del index2[0]["a"]
        except ReadConflictError, obj:
            # The point of this test is to make sure that the conflict
            # can be ignored.
            obj.ignore()
        else:
            self.fail("No conflict occurred")

        # real_data2 still ready to commit
        self.assert_(real_data2._p_changed)

        # index2 values not ready to commit
        self.assert_(not index2._p_changed)
        self.assert_(not index2[0]._p_changed)
        self.assert_(not index2[1]._p_changed)

        # The commit should succeed despite the ReadConflictError, because
        # we explicitly said to ignore it.
        cn2.getTransaction().commit()
        get_transaction().abort()

    def checkIndependent(self):
        self.obj = Independent()
        self.readConflict(shouldFail=0)

    def checkNotIndependent(self):
        self.obj = DecoyIndependent()
        self.readConflict()

def test_suite():
    return unittest.makeSuite(ZODBTests, 'check')


=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testfsIndex.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import unittest, sys
from ZODB.fsIndex import fsIndex
from ZODB.utils import p64


class Test(unittest.TestCase):

    def testInserts(self):
        index=fsIndex()

        for i in range(200):
            index[p64(i*1000)]=(i*1000L+1)

        for i in range(0,200):
            self.assertEqual((i,index[p64(i*1000)]), (i,(i*1000L+1)))

        self.assertEqual(len(index), 200)

        key=p64(2000)

        self.assertEqual(index.get(key), 2001)

        key=p64(2001)
        self.assertEqual(index.get(key), None)
        self.assertEqual(index.get(key, ''), '')

        # self.failUnless(len(index._data) > 1)

    def testUpdate(self):
        index=fsIndex()
        d={}

        for i in range(200):
            d[p64(i*1000)]=(i*1000L+1)

        index.update(d)

        for i in range(400,600):
            d[p64(i*1000)]=(i*1000L+1)

        index.update(d)

        for i in range(100, 500):
            d[p64(i*1000)]=(i*1000L+2)

        index.update(d)

        self.assertEqual(index.get(p64(2000)), 2001)
        self.assertEqual(index.get(p64(599000)), 599001)
        self.assertEqual(index.get(p64(399000)), 399002)
        self.assertEqual(len(index), 600)


def test_suite():
    loader=unittest.TestLoader()
    return loader.loadTestsFromTestCase(Test)

if __name__=='__main__':
    unittest.TextTestRunner().run(test_suite())




More information about the Zope3-Checkins mailing list