[Zope3-checkins] CVS: Zope3/src/zodb/tests - __init__.py:1.1.2.1 test_connection.py:1.1.2.1 test_timestamp.py:1.1.2.1 test_txn.py:1.1.2.1 test_utils.py:1.1.2.1 test_zodb.py:1.1.2.1 undo.py:1.1.2.1
Jim Fulton
jim@zope.com
Mon, 23 Dec 2002 14:30:53 -0500
Update of /cvs-repository/Zope3/src/zodb/tests
In directory cvs.zope.org:/tmp/cvs-serv19908/zodb/tests
Added Files:
Tag: NameGeddon-branch
__init__.py test_connection.py test_timestamp.py test_txn.py
test_utils.py test_zodb.py undo.py
Log Message:
Initial renaming before debugging
=== Added File Zope3/src/zodb/tests/__init__.py ===
#
# This file is necessary to make this directory a package.
=== Added File Zope3/src/zodb/tests/test_connection.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import unittest
from persistence import Persistent
from transaction.tests.abstestIDataManager import IDataManagerTests
from zodb.storage.mapping import DB
from zodb.ztransaction import Transaction
class P(Persistent):
pass
class ConnectionTests(IDataManagerTests):
def setUp(self):
self.db = DB()
self.datamgr = self.db.open()
self.obj = P()
self.txn_factory = Transaction
def get_transaction(self):
t = super(ConnectionTests, self).get_transaction()
t.setUser('IDataManagerTests')
t.note('dummy note')
return t
def test_cacheGC(self):
self.datamgr.cacheGC()
def tearDown(self):
self.datamgr.close()
self.db.close()
def test_suite():
return unittest.makeSuite(ConnectionTests)
=== Added File Zope3/src/zodb/tests/test_timestamp.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test the TimeStamp utility type"""
import time
import unittest
from zodb.timestamp import TimeStamp
EPSILON = 0.000001
class TimeStampTests(unittest.TestCase):
def checkYMDTimeStamp(self):
self._check_ymd(2001, 6, 3)
def _check_ymd(self, yr, mo, dy):
ts = TimeStamp(yr, mo, dy)
self.assertEqual(ts.year(), yr)
self.assertEqual(ts.month(), mo)
self.assertEqual(ts.day(), dy)
self.assertEquals(ts.hour(), 0)
self.assertEquals(ts.minute(), 0)
self.assertEquals(ts.second(), 0)
t = time.gmtime(ts.timeTime())
self.assertEquals(yr, t[0])
self.assertEquals(mo, t[1])
self.assertEquals(dy, t[2])
def checkFullTimeStamp(self):
t = time.gmtime()
ts = TimeStamp(*t[:6])
# XXX floating point comparison
self.assertEquals(ts.timeTime() + time.timezone, time.mktime(t))
self.assertEqual(ts.year(), t[0])
self.assertEqual(ts.month(), t[1])
self.assertEqual(ts.day(), t[2])
self.assertEquals(ts.hour(), t[3])
self.assertEquals(ts.minute(), t[4])
self.assert_(abs(ts.second() - t[5]) < EPSILON)
def checkRawTimestamp(self):
t = time.gmtime()
ts1 = TimeStamp(*t[:6])
ts2 = TimeStamp(ts1.raw())
self.assertEquals(ts1, ts2)
self.assertEquals(ts1.timeTime(), ts2.timeTime())
self.assertEqual(ts1.year(), ts2.year())
self.assertEqual(ts1.month(), ts2.month())
self.assertEqual(ts1.day(), ts2.day())
self.assertEquals(ts1.hour(), ts2.hour())
self.assertEquals(ts1.minute(), ts2.minute())
self.assert_(abs(ts1.second() - ts2.second()) < EPSILON)
def checkDictKey(self):
t = time.gmtime()
ts1 = TimeStamp(*t[:6])
ts2 = TimeStamp(2000, *t[1:6])
d = {}
d[ts1] = 1
d[ts2] = 2
self.assertEquals(len(d), 2)
def checkCompare(self):
ts1 = TimeStamp(1972, 6, 27)
ts2 = TimeStamp(1971, 12, 12)
self.assert_(ts1 > ts2)
self.assert_(ts2 <= ts1)
def checkLaterThan(self):
# XXX what does laterThan() do?
t = time.gmtime()
ts = TimeStamp(*t[:6])
ts2 = ts.laterThan(ts)
self.assert_(ts2 > ts)
# XXX should test for bogus inputs to TimeStamp constructor
def checkTimeStamp(self):
# Alternate test suite
t = TimeStamp(2002, 1, 23, 10, 48, 5) # GMT
## self.assertEquals(str(t), '2002-01-23 10:48:05.000000')
self.assertEquals(t.raw(), '\x03B9H\x15UUU')
self.assertEquals(TimeStamp('\x03B9H\x15UUU'), t)
self.assertEquals(t.year(), 2002)
self.assertEquals(t.month(), 1)
self.assertEquals(t.day(), 23)
self.assertEquals(t.hour(), 10)
self.assertEquals(t.minute(), 48)
self.assertEquals(round(t.second()), 5)
self.assertEquals(t.timeTime(), 1011782885)
t1 = TimeStamp(2002, 1, 23, 10, 48, 10)
## self.assertEquals(str(t1), '2002-01-23 10:48:10.000000')
self.assert_(t == t)
self.assert_(t != t1)
self.assert_(t < t1)
self.assert_(t <= t1)
self.assert_(t1 >= t)
self.assert_(t1 > t)
self.failIf(t == t1)
self.failIf(t != t)
self.failIf(t > t1)
self.failIf(t >= t1)
self.failIf(t1 < t)
self.failIf(t1 <= t)
self.assertEquals(cmp(t, t), 0)
self.assertEquals(cmp(t, t1), -1)
self.assertEquals(cmp(t1, t), 1)
self.assertEquals(t1.laterThan(t), t1)
self.assert_(t.laterThan(t1) > t1)
self.assertEquals(TimeStamp(2002,1,23), TimeStamp(2002,1,23,0,0,0))
def test_suite():
return unittest.makeSuite(TimeStampTests, 'check')
=== Added File Zope3/src/zodb/tests/test_txn.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""High-level tests of the transaction interface"""
import os
import tempfile
import unittest
from transaction import get_transaction
import zodb
from zodb.db import DB
from zodb.storage.file import FileStorage
from zodb.storage.tests.minpo import MinPO
from zodb.interfaces import RollbackError
class TransactionTestBase(unittest.TestCase):
def setUp(self):
self.fs_path = tempfile.mktemp()
self.fs = FileStorage(self.fs_path)
db = DB(self.fs)
conn = db.open()
self.root = conn.root()
def tearDown(self):
get_transaction().abort()
self.fs.close()
for ext in '', '.index', '.lock', '.tmp':
path = self.fs_path + ext
if os.path.exists(path):
os.unlink(path)
class BasicTests:
def testSingleCommit(self, subtrans=None):
self.root["a"] = MinPO("a")
if subtrans:
get_transaction().savepoint()
else:
get_transaction().commit()
self.assertEqual(self.root["a"].value, "a")
def testMultipleCommits(self, subtrans=None):
a = self.root["a"] = MinPO("a")
get_transaction().commit()
a.extra_attr = MinPO("b")
if subtrans:
get_transaction().savepoint()
else:
get_transaction().commit()
del a
self.assertEqual(self.root["a"].value, "a")
self.assertEqual(self.root["a"].extra_attr, MinPO("b"))
def testCommitAndAbort(self, subtrans=None):
a = self.root["a"] = MinPO("a")
if subtrans:
get_transaction().savepoint()
else:
get_transaction().commit()
a.extra_attr = MinPO("b")
get_transaction().abort()
del a
if subtrans:
self.assert_("a" not in self.root)
else:
self.assertEqual(self.root["a"].value, "a")
self.assert_(not hasattr(self.root["a"], 'extra_attr'))
class SubtransTests(BasicTests):
def wrap_test(self, klass, meth_name):
unbound_method = getattr(klass, meth_name)
unbound_method(self, 1)
get_transaction().commit()
testSubSingleCommit = lambda self:\
self.wrap_test(BasicTests, "testSingleCommit")
testSubMultipleCommits = lambda self:\
self.wrap_test(BasicTests,
"testMultipleCommits")
testSubCommitAndAbort = lambda self:\
self.wrap_test(BasicTests,
"testCommitAndAbort")
class AllTests(TransactionTestBase, SubtransTests):
def testSavepointAndRollback(self):
self.root["a"] = MinPO()
rb1 = get_transaction().savepoint()
self.root["b"] = MinPO()
rb2 = get_transaction().savepoint()
self.assertEqual(len(self.root), 2)
self.assert_("a" in self.root)
self.assert_("b" in self.root)
rb2.rollback()
self.assertEqual(len(self.root), 1)
self.assert_("a" in self.root)
self.assert_("b" not in self.root)
self.root["c"] = MinPO()
rb3 = get_transaction().savepoint()
self.root["d"] = MinPO()
rb4 = get_transaction().savepoint()
rb3.rollback()
self.assertRaises(RollbackError, rb4.rollback)
self.root["e"] = MinPO()
rb5 = get_transaction().savepoint()
self.root["f"] = MinPO()
rb6 = get_transaction().savepoint()
self.root["g"] = MinPO()
rb6.rollback()
self.root["h"] = MinPO()
self.assertEqual(len(self.root), 3)
for name in "a", "e", "h":
self.assert_(name in self.root)
for name in "b", "c", "d", "f", "g":
self.assert_(name not in self.root)
def test_suite():
return unittest.makeSuite(AllTests)
def main():
tests = test_suite()
runner = unittest.TextTestRunner()
runner.run(tests)
if __name__ == "__main__":
main()
=== Added File Zope3/src/zodb/tests/test_utils.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test the routines to convert between long and 64-bit strings"""
import random
import unittest
NUM = 100
from zodb.utils import p64, u64
class TestUtils(unittest.TestCase):
small = [random.randrange(1, 1L<<32, int=long)
for i in range(NUM)]
large = [random.randrange(1L<<32, 1L<<64, int=long)
for i in range(NUM)]
all = small + large
def checkLongToStringToLong(self):
for num in self.all:
s = p64(num)
n2 = u64(s)
self.assertEquals(num, n2, "u64() failed")
def checkKnownConstants(self):
self.assertEquals("\000\000\000\000\000\000\000\001", p64(1))
self.assertEquals("\000\000\000\001\000\000\000\000", p64(1L<<32))
self.assertEquals(u64("\000\000\000\000\000\000\000\001"), 1)
self.assertEquals(u64("\000\000\000\001\000\000\000\000"), 1L<<32)
def test_suite():
return unittest.makeSuite(TestUtils, 'check')
if __name__ == "__main__":
loader = unittest.TestLoader()
loader.testMethodPrefix = "check"
unittest.main(testLoader=loader)
=== Added File Zope3/src/zodb/tests/test_zodb.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import os
import unittest
import tempfile
import ZODB.DB, ZODB.FileStorage
from zodb.utils import u64
from zodb.tests.undo import TransactionalUndoDB
from persistence.dict import PersistentDict
from transaction import get_transaction
_fsname = tempfile.mktemp() + ".fs"
class ExportImportTests:
def duplicate(self, abort_it, dup_name):
conn = self._db.open()
try:
root = conn.root()
ob = root['test']
self.assert_(len(ob) > 10, 'Insufficient test data')
try:
f = tempfile.TemporaryFile()
ob._p_jar.exportFile(ob._p_oid, f)
self.assert_(f.tell() > 0, 'Did not export correctly')
f.seek(0)
new_ob = ob._p_jar.importFile(f)
root[dup_name] = new_ob
f.close()
if abort_it:
get_transaction().abort()
else:
get_transaction().commit()
except Exception, err:
get_transaction().abort()
raise
finally:
conn.close()
def verify(self, abort_it, dup_name):
get_transaction().begin()
# Verify the duplicate.
conn = self._db.open()
try:
root = conn.root()
ob = root['test']
try:
ob2 = root[dup_name]
except KeyError:
if abort_it:
# Passed the test.
return
else:
raise
else:
if abort_it:
oid = ob2._p_oid
if oid is not None:
oid = u64(oid)
print oid, ob2.__class__, ob2._p_state
print ob2
self.fail("Did not abort duplication")
l1 = list(ob.items())
l1.sort()
l2 = list(ob2.items())
l2.sort()
l1 = [(k, v[0]) for k, v in l1]
l2 = [(k, v[0]) for k, v in l2]
self.assertEqual(l1, l2, 'Duplicate did not match')
self.assert_(ob._p_oid != ob2._p_oid, 'Did not duplicate')
self.assertEqual(ob._p_jar, ob2._p_jar, 'Not same connection')
oids = {}
for v in ob.values():
oids[v._p_oid] = 1
for v in ob2.values():
self.assert_(v._p_oid not in oids,
'Did not fully separate duplicate from original')
get_transaction().commit()
finally:
conn.close()
def checkDuplicate(self, abort_it=False, dup_name='test_duplicate'):
self.populate()
get_transaction().begin()
get_transaction().note('duplication')
self.duplicate(abort_it, dup_name)
self.verify(abort_it, dup_name)
def checkDuplicateAborted(self):
self.checkDuplicate(abort_it=True, dup_name='test_duplicate_aborted')
class ZODBTests(ExportImportTests, TransactionalUndoDB,
unittest.TestCase):
def setUp(self):
self._db = ZODB.FileStorage.DB(_fsname, create=1)
self._conn = self._db.open()
self._root = self._conn.root()
def populate(self):
get_transaction().begin()
conn = self._db.open()
root = conn.root()
root['test'] = pm = PersistentDict()
for n in range(100):
pm[n] = PersistentDict({0: 100 - n})
get_transaction().note('created test data')
get_transaction().commit()
conn.close()
def checkModifyGhost(self):
self.populate()
root = self._db.open().root()
o = root["test"][5]
o._p_activate()
o._p_deactivate()
o.anattr = "anattr"
self.assert_(o._p_changed)
get_transaction().commit()
self.assert_(not o._p_changed)
del o._p_changed
self.assertEqual(o.anattr, "anattr")
def tearDown(self):
self._db.close()
for ext in '', '.old', '.tmp', '.lock', '.index':
path = _fsname + ext
if os.path.exists(path):
os.remove(path)
def test_suite():
return unittest.makeSuite(ZODBTests, 'check')
if __name__=='__main__':
unittest.TextTestRunner().run(test_suite())
=== Added File Zope3/src/zodb/tests/undo.py === (467/567 lines abridged)
"""Check transactional undo performed via the database."""
import time
import unittest
from zodb import POSException
from zodb.ztransaction import Transaction
from zodb.utils import u64, p64, z64
from zodb.db import DB
from zodb.storage.tests.minpo import MinPO
from zodb.storage.tests.base import zodb_pickle, zodb_unpickle
from persistence import Persistent
from transaction import get_transaction
class C(Persistent):
pass
class TransactionalUndoDB(unittest.TestCase):
def checkSimpleTransactionalUndo(self):
obj = MinPO(23)
self._root["obj"] = obj
get_transaction().note("23")
get_transaction().commit()
obj.value = 24
get_transaction().note("24")
get_transaction().commit()
obj.value = 25
get_transaction().note("25")
get_transaction().commit()
info = self._db.undoInfo()
tid = info[0]['id']
self._db.undo(tid)
get_transaction().commit()
self._conn.sync()
self.assertEqual(obj.value, 24)
info = self._db.undoInfo()
tid = info[2]['id']
self._db.undo(tid)
get_transaction().commit()
self._conn.sync()
self.assertEqual(obj.value, 23)
# Undo object creation
info = self._db.undoInfo()
[-=- -=- -=- 467 lines omitted -=- -=- -=-]
tid = info[base + j]['id']
s.transactionalUndo(tid, t)
s.tpc_vote(t)
s.tpc_finish(t)
for i in range(BATCHES):
undo(i)
# There are now (2 + OBJECTS) * BATCHES transactions:
# BATCHES original transactions, followed by
# OBJECTS * BATCHES modifications, followed by
# BATCHES undos
iter = s.iterator()
offset = 0
eq = self.assertEqual
for i in range(BATCHES):
txn = iter[offset]
offset += 1
tid = p64(i + 1)
eq(txn.tid, tid)
L1 = [(rec.oid, rec.serial, rec.data_txn) for rec in txn]
L2 = [(oid, revid, None) for _tid, oid, revid in orig
if _tid == tid]
eq(L1, L2)
for i in range(BATCHES * OBJECTS):
txn = iter[offset]
offset += 1
eq(len([rec for rec in txn if rec.data_txn is None]), 1)
for i in range(BATCHES):
txn = iter[offset]
offset += 1
# The undos are performed in reverse order.
otid = p64(BATCHES - i)
L1 = [(rec.oid, rec.data_txn) for rec in txn]
L2 = [(oid, otid) for _tid, oid, revid in orig
if _tid == otid]
L1.sort()
L2.sort()
eq(L1, L2)
self.assertRaises(IndexError, iter.__getitem__, offset)