[Zope3-checkins] CVS: Zope3/src/zodb/tests - __init__.py:1.2 test_connection.py:1.2 test_timestamp.py:1.2 test_txn.py:1.2 test_utils.py:1.2 test_zodb.py:1.2 undo.py:1.2
Jim Fulton
jim@zope.com
Wed, 25 Dec 2002 09:13:53 -0500
Update of /cvs-repository/Zope3/src/zodb/tests
In directory cvs.zope.org:/tmp/cvs-serv15352/src/zodb/tests
Added Files:
__init__.py test_connection.py test_timestamp.py test_txn.py
test_utils.py test_zodb.py undo.py
Log Message:
Grand renaming:
- Renamed most files (especially python modules) to lower case.
- Moved views and interfaces into separate hierarchies within each
project, where each top-level directory under the zope package
is a separate project.
- Moved everything to src from lib/python.
lib/python will eventually go away. I need access to the cvs
repository to make this happen, however.
There are probably some bits that are broken. All tests pass
and zope runs, but I haven't tried everything. There are a number
of cleanups I'll work on tomorrow.
=== Zope3/src/zodb/tests/__init__.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:52 2002
+++ Zope3/src/zodb/tests/__init__.py Wed Dec 25 09:12:21 2002
@@ -0,0 +1,2 @@
+#
+# This file is necessary to make this directory a package.
=== Zope3/src/zodb/tests/test_connection.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:52 2002
+++ Zope3/src/zodb/tests/test_connection.py Wed Dec 25 09:12:21 2002
@@ -0,0 +1,47 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import unittest
+
+from persistence import Persistent
+from transaction.tests.abstestIDataManager import IDataManagerTests
+
+from zodb.storage.mapping import DB
+from zodb.ztransaction import Transaction
+
+class P(Persistent):
+ pass
+
+class ConnectionTests(IDataManagerTests):
+
+ def setUp(self):
+ self.db = DB()
+ self.datamgr = self.db.open()
+ self.obj = P()
+ self.txn_factory = Transaction
+
+ def get_transaction(self):
+ t = super(ConnectionTests, self).get_transaction()
+ t.setUser('IDataManagerTests')
+ t.note('dummy note')
+ return t
+
+ def test_cacheGC(self):
+ self.datamgr.cacheGC()
+
+ def tearDown(self):
+ self.datamgr.close()
+ self.db.close()
+
+def test_suite():
+ return unittest.makeSuite(ConnectionTests)
=== Zope3/src/zodb/tests/test_timestamp.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:52 2002
+++ Zope3/src/zodb/tests/test_timestamp.py Wed Dec 25 09:12:21 2002
@@ -0,0 +1,133 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Test the TimeStamp utility type"""
+
+import time
+import unittest
+
+from zodb.timestamp import TimeStamp
+
+EPSILON = 0.000001
+
+class TimeStampTests(unittest.TestCase):
+
+ def checkYMDTimeStamp(self):
+ self._check_ymd(2001, 6, 3)
+
+ def _check_ymd(self, yr, mo, dy):
+ ts = TimeStamp(yr, mo, dy)
+ self.assertEqual(ts.year(), yr)
+ self.assertEqual(ts.month(), mo)
+ self.assertEqual(ts.day(), dy)
+
+ self.assertEquals(ts.hour(), 0)
+ self.assertEquals(ts.minute(), 0)
+ self.assertEquals(ts.second(), 0)
+
+ t = time.gmtime(ts.timeTime())
+ self.assertEquals(yr, t[0])
+ self.assertEquals(mo, t[1])
+ self.assertEquals(dy, t[2])
+
+ def checkFullTimeStamp(self):
+ t = time.gmtime()
+ ts = TimeStamp(*t[:6])
+
+ # XXX floating point comparison
+ self.assertEquals(ts.timeTime() + time.timezone, time.mktime(t))
+
+ self.assertEqual(ts.year(), t[0])
+ self.assertEqual(ts.month(), t[1])
+ self.assertEqual(ts.day(), t[2])
+
+ self.assertEquals(ts.hour(), t[3])
+ self.assertEquals(ts.minute(), t[4])
+ self.assert_(abs(ts.second() - t[5]) < EPSILON)
+
+ def checkRawTimestamp(self):
+ t = time.gmtime()
+ ts1 = TimeStamp(*t[:6])
+ ts2 = TimeStamp(ts1.raw())
+
+ self.assertEquals(ts1, ts2)
+ self.assertEquals(ts1.timeTime(), ts2.timeTime())
+ self.assertEqual(ts1.year(), ts2.year())
+ self.assertEqual(ts1.month(), ts2.month())
+ self.assertEqual(ts1.day(), ts2.day())
+ self.assertEquals(ts1.hour(), ts2.hour())
+ self.assertEquals(ts1.minute(), ts2.minute())
+ self.assert_(abs(ts1.second() - ts2.second()) < EPSILON)
+
+ def checkDictKey(self):
+ t = time.gmtime()
+ ts1 = TimeStamp(*t[:6])
+ ts2 = TimeStamp(2000, *t[1:6])
+
+ d = {}
+ d[ts1] = 1
+ d[ts2] = 2
+
+ self.assertEquals(len(d), 2)
+
+ def checkCompare(self):
+ ts1 = TimeStamp(1972, 6, 27)
+ ts2 = TimeStamp(1971, 12, 12)
+ self.assert_(ts1 > ts2)
+ self.assert_(ts2 <= ts1)
+
+ def checkLaterThan(self):
+ # XXX what does laterThan() do?
+ t = time.gmtime()
+ ts = TimeStamp(*t[:6])
+ ts2 = ts.laterThan(ts)
+ self.assert_(ts2 > ts)
+
+ # XXX should test for bogus inputs to TimeStamp constructor
+
+ def checkTimeStamp(self):
+ # Alternate test suite
+ t = TimeStamp(2002, 1, 23, 10, 48, 5) # GMT
+## self.assertEquals(str(t), '2002-01-23 10:48:05.000000')
+ self.assertEquals(t.raw(), '\x03B9H\x15UUU')
+ self.assertEquals(TimeStamp('\x03B9H\x15UUU'), t)
+ self.assertEquals(t.year(), 2002)
+ self.assertEquals(t.month(), 1)
+ self.assertEquals(t.day(), 23)
+ self.assertEquals(t.hour(), 10)
+ self.assertEquals(t.minute(), 48)
+ self.assertEquals(round(t.second()), 5)
+ self.assertEquals(t.timeTime(), 1011782885)
+ t1 = TimeStamp(2002, 1, 23, 10, 48, 10)
+## self.assertEquals(str(t1), '2002-01-23 10:48:10.000000')
+ self.assert_(t == t)
+ self.assert_(t != t1)
+ self.assert_(t < t1)
+ self.assert_(t <= t1)
+ self.assert_(t1 >= t)
+ self.assert_(t1 > t)
+ self.failIf(t == t1)
+ self.failIf(t != t)
+ self.failIf(t > t1)
+ self.failIf(t >= t1)
+ self.failIf(t1 < t)
+ self.failIf(t1 <= t)
+ self.assertEquals(cmp(t, t), 0)
+ self.assertEquals(cmp(t, t1), -1)
+ self.assertEquals(cmp(t1, t), 1)
+ self.assertEquals(t1.laterThan(t), t1)
+ self.assert_(t.laterThan(t1) > t1)
+ self.assertEquals(TimeStamp(2002,1,23), TimeStamp(2002,1,23,0,0,0))
+
+def test_suite():
+ return unittest.makeSuite(TimeStampTests, 'check')
=== Zope3/src/zodb/tests/test_txn.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:52 2002
+++ Zope3/src/zodb/tests/test_txn.py Wed Dec 25 09:12:21 2002
@@ -0,0 +1,145 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""High-level tests of the transaction interface"""
+
+import os
+import tempfile
+import unittest
+
+from transaction import get_transaction
+
+import zodb
+from zodb.db import DB
+from zodb.storage.file import FileStorage
+from zodb.storage.tests.minpo import MinPO
+from zodb.interfaces import RollbackError
+
+class TransactionTestBase(unittest.TestCase):
+
+ def setUp(self):
+ self.fs_path = tempfile.mktemp()
+ self.fs = FileStorage(self.fs_path)
+ db = DB(self.fs)
+ conn = db.open()
+ self.root = conn.root()
+
+ def tearDown(self):
+ get_transaction().abort()
+ self.fs.close()
+ for ext in '', '.index', '.lock', '.tmp':
+ path = self.fs_path + ext
+ if os.path.exists(path):
+ os.unlink(path)
+
+class BasicTests:
+
+ def testSingleCommit(self, subtrans=None):
+ self.root["a"] = MinPO("a")
+ if subtrans:
+ get_transaction().savepoint()
+ else:
+ get_transaction().commit()
+ self.assertEqual(self.root["a"].value, "a")
+
+ def testMultipleCommits(self, subtrans=None):
+ a = self.root["a"] = MinPO("a")
+ get_transaction().commit()
+ a.extra_attr = MinPO("b")
+ if subtrans:
+ get_transaction().savepoint()
+ else:
+ get_transaction().commit()
+ del a
+ self.assertEqual(self.root["a"].value, "a")
+ self.assertEqual(self.root["a"].extra_attr, MinPO("b"))
+
+ def testCommitAndAbort(self, subtrans=None):
+ a = self.root["a"] = MinPO("a")
+ if subtrans:
+ get_transaction().savepoint()
+ else:
+ get_transaction().commit()
+ a.extra_attr = MinPO("b")
+ get_transaction().abort()
+ del a
+ if subtrans:
+ self.assert_("a" not in self.root)
+ else:
+ self.assertEqual(self.root["a"].value, "a")
+ self.assert_(not hasattr(self.root["a"], 'extra_attr'))
+
+class SubtransTests(BasicTests):
+
+ def wrap_test(self, klass, meth_name):
+ unbound_method = getattr(klass, meth_name)
+ unbound_method(self, 1)
+ get_transaction().commit()
+
+ testSubSingleCommit = lambda self:\
+ self.wrap_test(BasicTests, "testSingleCommit")
+
+ testSubMultipleCommits = lambda self:\
+ self.wrap_test(BasicTests,
+ "testMultipleCommits")
+
+ testSubCommitAndAbort = lambda self:\
+ self.wrap_test(BasicTests,
+ "testCommitAndAbort")
+
+class AllTests(TransactionTestBase, SubtransTests):
+
+ def testSavepointAndRollback(self):
+ self.root["a"] = MinPO()
+ rb1 = get_transaction().savepoint()
+ self.root["b"] = MinPO()
+ rb2 = get_transaction().savepoint()
+ self.assertEqual(len(self.root), 2)
+ self.assert_("a" in self.root)
+ self.assert_("b" in self.root)
+
+ rb2.rollback()
+ self.assertEqual(len(self.root), 1)
+ self.assert_("a" in self.root)
+ self.assert_("b" not in self.root)
+
+ self.root["c"] = MinPO()
+ rb3 = get_transaction().savepoint()
+ self.root["d"] = MinPO()
+ rb4 = get_transaction().savepoint()
+ rb3.rollback()
+ self.assertRaises(RollbackError, rb4.rollback)
+
+ self.root["e"] = MinPO()
+ rb5 = get_transaction().savepoint()
+ self.root["f"] = MinPO()
+ rb6 = get_transaction().savepoint()
+ self.root["g"] = MinPO()
+ rb6.rollback()
+ self.root["h"] = MinPO()
+ self.assertEqual(len(self.root), 3)
+ for name in "a", "e", "h":
+ self.assert_(name in self.root)
+ for name in "b", "c", "d", "f", "g":
+ self.assert_(name not in self.root)
+
+def test_suite():
+ return unittest.makeSuite(AllTests)
+
+def main():
+ tests = test_suite()
+ runner = unittest.TextTestRunner()
+ runner.run(tests)
+
+if __name__ == "__main__":
+ main()
=== Zope3/src/zodb/tests/test_utils.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:53 2002
+++ Zope3/src/zodb/tests/test_utils.py Wed Dec 25 09:12:21 2002
@@ -0,0 +1,49 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Test the routines to convert between long and 64-bit strings"""
+
+import random
+import unittest
+
+NUM = 100
+
+from zodb.utils import p64, u64
+
+class TestUtils(unittest.TestCase):
+
+ small = [random.randrange(1, 1L<<32, int=long)
+ for i in range(NUM)]
+ large = [random.randrange(1L<<32, 1L<<64, int=long)
+ for i in range(NUM)]
+ all = small + large
+
+ def checkLongToStringToLong(self):
+ for num in self.all:
+ s = p64(num)
+ n2 = u64(s)
+ self.assertEquals(num, n2, "u64() failed")
+
+ def checkKnownConstants(self):
+ self.assertEquals("\000\000\000\000\000\000\000\001", p64(1))
+ self.assertEquals("\000\000\000\001\000\000\000\000", p64(1L<<32))
+ self.assertEquals(u64("\000\000\000\000\000\000\000\001"), 1)
+ self.assertEquals(u64("\000\000\000\001\000\000\000\000"), 1L<<32)
+
+def test_suite():
+ return unittest.makeSuite(TestUtils, 'check')
+
+if __name__ == "__main__":
+ loader = unittest.TestLoader()
+ loader.testMethodPrefix = "check"
+ unittest.main(testLoader=loader)
=== Zope3/src/zodb/tests/test_zodb.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:53 2002
+++ Zope3/src/zodb/tests/test_zodb.py Wed Dec 25 09:12:21 2002
@@ -0,0 +1,147 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import os
+import unittest
+import tempfile
+
+from zodb.storage.file import DB
+from zodb.utils import u64
+from zodb.tests.undo import TransactionalUndoDB
+from persistence.dict import PersistentDict
+from transaction import get_transaction
+
+_fsname = tempfile.mktemp() + ".fs"
+
+class ExportImportTests:
+
+ def duplicate(self, abort_it, dup_name):
+ conn = self._db.open()
+ try:
+ root = conn.root()
+ ob = root['test']
+ self.assert_(len(ob) > 10, 'Insufficient test data')
+ try:
+ f = tempfile.TemporaryFile()
+ ob._p_jar.exportFile(ob._p_oid, f)
+ self.assert_(f.tell() > 0, 'Did not export correctly')
+ f.seek(0)
+ new_ob = ob._p_jar.importFile(f)
+ root[dup_name] = new_ob
+ f.close()
+ if abort_it:
+ get_transaction().abort()
+ else:
+ get_transaction().commit()
+ except Exception, err:
+ get_transaction().abort()
+ raise
+ finally:
+ conn.close()
+
+ def verify(self, abort_it, dup_name):
+ get_transaction().begin()
+ # Verify the duplicate.
+ conn = self._db.open()
+ try:
+ root = conn.root()
+ ob = root['test']
+ try:
+ ob2 = root[dup_name]
+ except KeyError:
+ if abort_it:
+ # Passed the test.
+ return
+ else:
+ raise
+ else:
+ if abort_it:
+ oid = ob2._p_oid
+ if oid is not None:
+ oid = u64(oid)
+ print oid, ob2.__class__, ob2._p_state
+ print ob2
+ self.fail("Did not abort duplication")
+ l1 = list(ob.items())
+ l1.sort()
+ l2 = list(ob2.items())
+ l2.sort()
+ l1 = [(k, v[0]) for k, v in l1]
+ l2 = [(k, v[0]) for k, v in l2]
+ self.assertEqual(l1, l2, 'Duplicate did not match')
+ self.assert_(ob._p_oid != ob2._p_oid, 'Did not duplicate')
+ self.assertEqual(ob._p_jar, ob2._p_jar, 'Not same connection')
+ oids = {}
+ for v in ob.values():
+ oids[v._p_oid] = 1
+ for v in ob2.values():
+ self.assert_(v._p_oid not in oids,
+ 'Did not fully separate duplicate from original')
+ get_transaction().commit()
+ finally:
+ conn.close()
+
+ def checkDuplicate(self, abort_it=False, dup_name='test_duplicate'):
+ self.populate()
+ get_transaction().begin()
+ get_transaction().note('duplication')
+ self.duplicate(abort_it, dup_name)
+ self.verify(abort_it, dup_name)
+
+ def checkDuplicateAborted(self):
+ self.checkDuplicate(abort_it=True, dup_name='test_duplicate_aborted')
+
+class ZODBTests(ExportImportTests, TransactionalUndoDB,
+ unittest.TestCase):
+
+ def setUp(self):
+ self._db = DB(_fsname, create=1)
+ self._conn = self._db.open()
+ self._root = self._conn.root()
+
+ def populate(self):
+ get_transaction().begin()
+ conn = self._db.open()
+ root = conn.root()
+ root['test'] = pm = PersistentDict()
+ for n in range(100):
+ pm[n] = PersistentDict({0: 100 - n})
+ get_transaction().note('created test data')
+ get_transaction().commit()
+ conn.close()
+
+ def checkModifyGhost(self):
+ self.populate()
+ root = self._db.open().root()
+ o = root["test"][5]
+ o._p_activate()
+ o._p_deactivate()
+ o.anattr = "anattr"
+ self.assert_(o._p_changed)
+ get_transaction().commit()
+ self.assert_(not o._p_changed)
+ del o._p_changed
+ self.assertEqual(o.anattr, "anattr")
+
+ def tearDown(self):
+ self._db.close()
+ for ext in '', '.old', '.tmp', '.lock', '.index':
+ path = _fsname + ext
+ if os.path.exists(path):
+ os.remove(path)
+
+def test_suite():
+ return unittest.makeSuite(ZODBTests, 'check')
+
+if __name__=='__main__':
+ unittest.TextTestRunner().run(test_suite())
=== Zope3/src/zodb/tests/undo.py 1.1 => 1.2 === (470/570 lines abridged)
--- /dev/null Wed Dec 25 09:13:53 2002
+++ Zope3/src/zodb/tests/undo.py Wed Dec 25 09:12:21 2002
@@ -0,0 +1,567 @@
+"""Check transactional undo performed via the database."""
+
+import time
+import unittest
+
+from zodb import interfaces
+from zodb.ztransaction import Transaction
+from zodb.utils import u64, p64, z64
+from zodb.db import DB
+from zodb.storage.tests.minpo import MinPO
+from zodb.storage.tests.base import zodb_pickle, zodb_unpickle
+
+from persistence import Persistent
+from transaction import get_transaction
+
+class C(Persistent):
+ pass
+
+class TransactionalUndoDB(unittest.TestCase):
+
+ def checkSimpleTransactionalUndo(self):
+ obj = MinPO(23)
+ self._root["obj"] = obj
+ get_transaction().note("23")
+ get_transaction().commit()
+
+ obj.value = 24
+ get_transaction().note("24")
+ get_transaction().commit()
+
+ obj.value = 25
+ get_transaction().note("25")
+ get_transaction().commit()
+
+ info = self._db.undoInfo()
+ tid = info[0]['id']
+ self._db.undo(tid)
+ get_transaction().commit()
+ self._conn.sync()
+ self.assertEqual(obj.value, 24)
+
+ info = self._db.undoInfo()
+ tid = info[2]['id']
+ self._db.undo(tid)
+ get_transaction().commit()
+ self._conn.sync()
+ self.assertEqual(obj.value, 23)
[-=- -=- -=- 470 lines omitted -=- -=- -=-]
+ tid = info[base + j]['id']
+ s.transactionalUndo(tid, t)
+ s.tpc_vote(t)
+ s.tpc_finish(t)
+
+ for i in range(BATCHES):
+ undo(i)
+
+ # There are now (2 + OBJECTS) * BATCHES transactions:
+ # BATCHES original transactions, followed by
+ # OBJECTS * BATCHES modifications, followed by
+ # BATCHES undos
+
+ iter = s.iterator()
+ offset = 0
+
+ eq = self.assertEqual
+
+ for i in range(BATCHES):
+ txn = iter[offset]
+ offset += 1
+
+ tid = p64(i + 1)
+ eq(txn.tid, tid)
+
+ L1 = [(rec.oid, rec.serial, rec.data_txn) for rec in txn]
+ L2 = [(oid, revid, None) for _tid, oid, revid in orig
+ if _tid == tid]
+
+ eq(L1, L2)
+
+ for i in range(BATCHES * OBJECTS):
+ txn = iter[offset]
+ offset += 1
+ eq(len([rec for rec in txn if rec.data_txn is None]), 1)
+
+ for i in range(BATCHES):
+ txn = iter[offset]
+ offset += 1
+
+ # The undos are performed in reverse order.
+ otid = p64(BATCHES - i)
+ L1 = [(rec.oid, rec.data_txn) for rec in txn]
+ L2 = [(oid, otid) for _tid, oid, revid in orig
+ if _tid == otid]
+ L1.sort()
+ L2.sort()
+ eq(L1, L2)
+
+ self.assertRaises(IndexError, iter.__getitem__, offset)