[Zope-Checkins] CVS: ZODB3/BDBStorage/tests - test_autopack.py:1.6.2.1 test_whitebox.py:1.4.2.1

Barry Warsaw barry@wooz.org
Tue, 7 Jan 2003 14:36:24 -0500


Update of /cvs-repository/ZODB3/BDBStorage/tests
In directory cvs.zope.org:/tmp/cvs-serv23965/BDBStorage/tests

Added Files:
      Tag: ZODB3-3_1-branch
	test_autopack.py test_whitebox.py 
Log Message:
Sync'ing with the trunk for BDBStorage, these are new files.



=== Added File ZODB3/BDBStorage/tests/test_autopack.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

import os
import time
import unittest

from ZODB import DB
from ZODB.referencesf import referencesf
from ZODB.tests.MinPO import MinPO
from Persistence import Persistent

import BDBStorage
if BDBStorage.is_available:
    from BDBStorage.BDBFullStorage import BDBFullStorage
    from BDBStorage.BDBMinimalStorage import BDBMinimalStorage
    from BDBStorage.BerkeleyBase import BerkeleyConfig
else:
    # Sigh
    class FakeBaseClass: pass
    BDBFullStorage = BDBMinimalStorage = FakeBaseClass

from BDBStorage.tests.BerkeleyTestBase import BerkeleyTestBase

ZERO = '\0'*8

class C(Persistent):
    pass



class TestAutopackBase(BerkeleyTestBase):
    def _config(self):
        config = BerkeleyConfig()
        # Autopack every 3 seconds, 6 seconds into the past, no classic packs
        config.frequency = 3
        config.packtime = 6
        config.classicpack = 0
        return config

    def _wait_for_next_autopack(self):
        storage = self._storage
        # BAW: this uses a non-public interface
        packtime = storage._autopacker._nextcheck
        while packtime == storage._autopacker._nextcheck:
            time.sleep(1)

    def _mk_dbhome(self, dir):
        # Create the storage
        os.mkdir(dir)
        try:
            return self.ConcreteStorage(dir, config=self._config())
        except:
            self._zap_dbhome(dir)
            raise


class TestAutopack(TestAutopackBase):
    ConcreteStorage = BDBFullStorage

    def checkAutopack(self):
        unless = self.failUnless
        raises = self.assertRaises
        storage = self._storage
        # Wait for an autopack operation to occur, then make three revisions
        # to an object.  Wait for the next autopack operation and make sure
        # all three revisions still exist.  Then sleep 10 seconds and wait for
        # another autopack operation.  Then verify that the first two
        # revisions have been packed away.
        oid = storage.new_oid()
        self._wait_for_next_autopack()
        revid1 = self._dostore(oid, data=MinPO(2112))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(2113))
        revid3 = self._dostore(oid, revid=revid2, data=MinPO(2114))
        self._wait_for_next_autopack()
        unless(storage.loadSerial(oid, revid1))
        unless(storage.loadSerial(oid, revid2))
        unless(storage.loadSerial(oid, revid3))
        # Should be enough time for the revisions to get packed away
        time.sleep(10)
        self._wait_for_next_autopack()
        # The first two revisions should now be gone, but the third should
        # still exist because it's the current revision, and we haven't done a
        # classic pack.
        raises(KeyError, self._storage.loadSerial, oid, revid1)
        raises(KeyError, self._storage.loadSerial, oid, revid2)
        unless(storage.loadSerial(oid, revid3))



class TestAutomaticClassicPack(TestAutopackBase):
    ConcreteStorage = BDBFullStorage

    def _config(self):
        config = BerkeleyConfig()
        # Autopack every 3 seconds, 6 seconds into the past, no classic packs
        config.frequency = 3
        config.packtime = 6
        config.classicpack = 1
        return config

    def checkAutomaticClassicPack(self):
        unless = self.failUnless
        raises = self.assertRaises
        storage = self._storage
        # Wait for an autopack operation to occur, then make three revisions
        # to an object.  Wait for the next autopack operation and make sure
        # all three revisions still exist.  Then sleep 10 seconds and wait for
        # another autopack operation.  Then verify that the first two
        # revisions have been packed away.
        oid = storage.new_oid()
        self._wait_for_next_autopack()
        revid1 = self._dostore(oid, data=MinPO(2112))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(2113))
        revid3 = self._dostore(oid, revid=revid2, data=MinPO(2114))
        self._wait_for_next_autopack()
        unless(storage.loadSerial(oid, revid1))
        unless(storage.loadSerial(oid, revid2))
        unless(storage.loadSerial(oid, revid3))
        # Should be enough time for the revisions to get packed away
        time.sleep(10)
        self._wait_for_next_autopack()
        # The first two revisions should now be gone, but the third should
        # still exist because it's the current revision, and we haven't done a
        # classic pack.
        raises(KeyError, storage.loadSerial, oid, revid1)
        raises(KeyError, storage.loadSerial, oid, revid2)
        raises(KeyError, storage.loadSerial, oid, revid3)

    def checkCycleUnreachable(self):
        unless = self.failUnless
        raises = self.assertRaises
        storage = self._storage
        db = DB(storage)
        conn = db.open()
        root = conn.root()
        self._wait_for_next_autopack()
        # Store an object that's reachable from the root
        obj1 = C()
        obj2 = C()
        obj1.obj = obj2
        obj2.obj = obj1
        root.obj = obj1
        txn = get_transaction()
        txn.note('root -> obj1 <-> obj2')
        txn.commit()
        oid1 = obj1._p_oid
        oid2 = obj2._p_oid
        assert oid1 and oid2 and oid1 <> oid2
        self._wait_for_next_autopack()
        unless(storage.load(ZERO, ''))
        unless(storage.load(oid1, ''))
        unless(storage.load(oid2, ''))
        # Now unlink it, which should still leave obj1 and obj2 alive
        del root.obj
        txn = get_transaction()
        txn.note('root -X-> obj1 <-> obj2')
        txn.commit()
        unless(storage.load(ZERO, ''))
        unless(storage.load(oid1, ''))
        unless(storage.load(oid2, ''))
        # Do an explicit full pack to right now to collect all the old
        # revisions and the cycle.
        storage.pack(time.time(), referencesf)
        # And it should be packed away
        unless(storage.load(ZERO, ''))
        raises(KeyError, storage.load, oid1, '')
        raises(KeyError, storage.load, oid2, '')



class TestMinimalPack(TestAutopackBase):
    ConcreteStorage = BDBMinimalStorage

    def _config(self):
        config = BerkeleyConfig()
        # Autopack every 3 seconds
        config.frequency = 3
        return config

    def checkRootUnreachable(self):
        unless = self.failUnless
        raises = self.assertRaises
        storage = self._storage
        db = DB(storage)
        conn = db.open()
        root = conn.root()
        self._wait_for_next_autopack()
        # Store an object that's reachable from the root
        obj = C()
        obj.value = 999
        root.obj = obj
        txn = get_transaction()
        txn.note('root -> obj')
        txn.commit()
        oid = obj._p_oid
        assert oid
        self._wait_for_next_autopack()
        unless(storage.load(ZERO, ''))
        unless(storage.load(oid, ''))
        # Now unlink it
        del root.obj
        txn = get_transaction()
        txn.note('root -X-> obj')
        txn.commit()
        # The object should be gone due to reference counting
        unless(storage.load(ZERO, ''))
        raises(KeyError, storage.load, oid, '')

    def checkCycleUnreachable(self):
        unless = self.failUnless
        raises = self.assertRaises
        storage = self._storage
        db = DB(storage)
        conn = db.open()
        root = conn.root()
        self._wait_for_next_autopack()
        # Store an object that's reachable from the root
        obj1 = C()
        obj2 = C()
        obj1.obj = obj2
        obj2.obj = obj1
        root.obj = obj1
        txn = get_transaction()
        txn.note('root -> obj1 <-> obj2')
        txn.commit()
        oid1 = obj1._p_oid
        oid2 = obj2._p_oid
        assert oid1 and oid2 and oid1 <> oid2
        self._wait_for_next_autopack()
        unless(storage.load(ZERO, ''))
        unless(storage.load(oid1, ''))
        unless(storage.load(oid2, ''))
        # Now unlink it, which should still leave obj1 and obj2 alive
        del root.obj
        txn = get_transaction()
        txn.note('root -X-> obj1 <-> obj2')
        txn.commit()
        unless(storage.load(ZERO, ''))
        unless(storage.load(oid1, ''))
        unless(storage.load(oid2, ''))
        # But the next autopack should collect both obj1 and obj2
        self._wait_for_next_autopack()
        # And it should be packed away
        unless(storage.load(ZERO, ''))
        raises(KeyError, storage.load, oid1, '')
        raises(KeyError, storage.load, oid2, '')



def test_suite():
    suite = unittest.TestSuite()
    if BDBStorage.is_available:
        suite.addTest(unittest.makeSuite(TestAutopack, 'check'))
        suite.addTest(unittest.makeSuite(TestAutomaticClassicPack, 'check'))
        suite.addTest(unittest.makeSuite(TestMinimalPack, 'check'))
    return suite



if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')


=== Added File ZODB3/BDBStorage/tests/test_whitebox.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

# Whitebox testing of storage implementation details.

import unittest

from ZODB.utils import U64
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle

import BDBStorage
if BDBStorage.is_available:
    from BDBStorage.BDBMinimalStorage import BDBMinimalStorage
    from BDBStorage.BDBFullStorage import BDBFullStorage
else:
    # Sigh
    class FakeBaseClass: pass
    BDBFullStorage = BDBMinimalStorage = FakeBaseClass

from BDBStorage.tests.ZODBTestBase import ZODBTestBase
from BDBStorage.tests.BerkeleyTestBase import BerkeleyTestBase

from Persistence import Persistent

ZERO = '\0'*8



class Object(Persistent):
    pass



class WhiteboxLowLevelMinimal(BerkeleyTestBase):
    ConcreteStorage = BDBMinimalStorage

    def checkTableConsistencyAfterCommit(self):
        unless = self.failIf
        eq = self.assertEqual
        oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=11)
        revid2 = self._dostore(oid, revid=revid1, data=12)
        revid3 = self._dostore(oid, revid=revid2, data=13)
        # First off, there should be no entries in the pending table
        unless(self._storage._pending.keys())
        # Also, there should be no entries in the oids table
        unless(self._storage._oids.keys())
        # Now, there should be exactly one oid in the serials table, and
        # exactly one record for that oid in the table too.
        oids = {}
        c = self._storage._serials.cursor()
        try:
            rec = c.first()
            while rec:
                oid, serial = rec
                oids.setdefault(oid, []).append(serial)
                rec = c.next()
        finally:
            c.close()
        eq(len(oids), 1)
        eq(len(oids[oids.keys()[0]]), 1)
        # There should now be exactly one entry in the pickles table.
        pickles = self._storage._pickles.items()
        eq(len(pickles), 1)
        key, data = pickles[0]
        poid = key[:8]
        pserial = key[8:]
        eq(oid, poid)
        eq(revid3, pserial)
        obj = zodb_unpickle(data)
        eq(obj.value, 13)
        # Now verify the refcounts table, which should be empty because the
        # stored object isn't referenced by any other objects.
        eq(len(self._storage._refcounts.keys()), 0)



class WhiteboxHighLevelMinimal(ZODBTestBase):
    ConcreteStorage = BDBMinimalStorage

    def checkReferenceCounting(self):
        eq = self.assertEqual
        obj = MinPO(11)
        self._root.obj = obj
        get_transaction().commit()
        obj.value = 12
        get_transaction().commit()
        obj.value = 13
        get_transaction().commit()
        # Make sure the databases have what we expect
        eq(len(self._storage._serials.items()), 2)
        eq(len(self._storage._pickles.items()), 2)
        # And now refcount out the object
        del self._root.obj
        get_transaction().commit()
        # Verification stage.  Our serials table should have exactly one
        # entry, oid == 0
        keys = self._storage._serials.keys()
        eq(len(keys), 1)
        eq(len(self._storage._serials.items()), 1)
        eq(keys[0], ZERO)
        # The pickles table now should have exactly one revision of the root
        # object, and no revisions of the MinPO object, which should have been
        # collected away.
        pickles = self._storage._pickles.items()
        eq(len(pickles), 1)
        rec = pickles[0]
        key = rec[0]
        data = rec[1]
        eq(key[:8], ZERO)
        # And that pickle should have no 'obj' attribute.
        unobj = zodb_unpickle(data)
        self.failIf(hasattr(unobj, 'obj'))
        # Our refcounts table should have no entries in it, because the root
        # object is an island.
        eq(len(self._storage._refcounts.keys()), 0)
        # And of course, oids and pendings should be empty too
        eq(len(self._storage._oids.keys()), 0)
        eq(len(self._storage._pending.keys()), 0)

    def checkRecursiveReferenceCounting(self):
        eq = self.assertEqual
        obj1 = Object()
        obj2 = Object()
        obj3 = Object()
        obj4 = Object()
        self._root.obj = obj1
        obj1.obj = obj2
        obj2.obj = obj3
        obj3.obj = obj4
        get_transaction().commit()
        # Make sure the databases have what we expect
        eq(len(self._storage._serials.items()), 5)
        eq(len(self._storage._pickles.items()), 5)
        # And now refcount out the object
        del self._root.obj
        get_transaction().commit()
        # Verification stage.  Our serials table should have exactly one
        # entry, oid == 0
        keys = self._storage._serials.keys()
        eq(len(keys), 1)
        eq(len(self._storage._serials.items()), 1)
        eq(keys[0], ZERO)
        # The pickles table now should have exactly one revision of the root
        # object, and no revisions of any other objects, which should have
        # been collected away.
        pickles = self._storage._pickles.items()
        eq(len(pickles), 1)
        rec = pickles[0]
        key = rec[0]
        data = rec[1]
        eq(key[:8], ZERO)
        # And that pickle should have no 'obj' attribute.
        unobj = zodb_unpickle(data)
        self.failIf(hasattr(unobj, 'obj'))
        # Our refcounts table should have no entries in it, because the root
        # object is an island.
        eq(len(self._storage._refcounts.keys()), 0)
        # And of course, oids and pendings should be empty too
        eq(len(self._storage._oids.keys()), 0)
        eq(len(self._storage._pending.keys()), 0)



class WhiteboxHighLevelFull(ZODBTestBase):
    ConcreteStorage = BDBFullStorage

    def checkReferenceCounting(self):
        eq = self.assertEqual
        # Make sure the databases have what we expect
        eq(len(self._storage._serials.items()), 1)
        eq(len(self._storage._pickles.items()), 1)
        # Now store an object
        obj = MinPO(11)
        self._root.obj = obj
        get_transaction().commit()
        # Make sure the databases have what we expect
        eq(len(self._storage._serials.items()), 2)
        eq(len(self._storage._pickles.items()), 3)
        obj.value = 12
        get_transaction().commit()
        # Make sure the databases have what we expect
        eq(len(self._storage._serials.items()), 2)
        eq(len(self._storage._pickles.items()), 4)
        obj.value = 13
        get_transaction().commit()
        # Make sure the databases have what we expect
        eq(len(self._storage._serials.items()), 2)
        eq(len(self._storage._pickles.items()), 5)
        # And now refcount out the object
        del self._root.obj
        get_transaction().commit()
        # Verification stage.  Our serials tabl should still have 2 entries,
        # one for the root object and one for the now unlinked MinPO obj.
        keys = self._storage._serials.keys()
        eq(len(keys), 2)
        eq(len(self._storage._serials.items()), 2)
        eq(keys[0], ZERO)
        # The pickles table should now have 6 entries, broken down like so:
        # - 3 revisions of the root object: the initial database-open
        #   revision, the revision that got its obj attribute set, and the
        #   revision that got its obj attribute deleted.
        # - 3 Three revisions of obj, corresponding to values 11, 12, and 13
        pickles = self._storage._pickles.items()
        eq(len(pickles), 6)
        # Our refcounts table should have one entry in it for the MinPO that's
        # referenced in an earlier revision of the root object
        eq(len(self._storage._refcounts.keys()), 1)
        # And of course, oids and pendings should be empty too
        eq(len(self._storage._oids.keys()), 0)
        eq(len(self._storage._pending.keys()), 0)



def test_suite():
    suite = unittest.TestSuite()
    if BDBStorage.is_available:
        suite.addTest(unittest.makeSuite(WhiteboxLowLevelMinimal, 'check'))
        suite.addTest(unittest.makeSuite(WhiteboxHighLevelMinimal, 'check'))
        suite.addTest(unittest.makeSuite(WhiteboxHighLevelFull, 'check'))
    return suite



if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')