[Zope-Checkins] CVS: StandaloneZODB/ZODB/tests - testDB.py:1.1 testCache.py:1.2 testTransaction.py:1.7
Jeremy Hylton
jeremy@zope.com
Mon, 15 Apr 2002 14:55:12 -0400
Update of /cvs-repository/StandaloneZODB/ZODB/tests
In directory cvs.zope.org:/tmp/cvs-serv3778
Modified Files:
testCache.py testTransaction.py
Added Files:
testDB.py
Log Message:
Add a bunch more tests of the LRU cache mechanism and of DB methods.
Also, reformat a test in testTransaction.
=== Added File StandaloneZODB/ZODB/tests/testDB.py ===
import time
import unittest
import ZODB
import ZODB.MappingStorage
from ZODB.tests.MinPO import MinPO
class DBTests(unittest.TestCase):
def setUp(self):
store = ZODB.MappingStorage.MappingStorage()
self.db = ZODB.DB(store)
def tearDown(self):
self.db.close()
def dowork(self, version=''):
c = self.db.open(version)
r = c.root()
o = r[time.time()] = MinPO(0)
get_transaction().commit()
for i in range(25):
o.value = MinPO(i)
get_transaction().commit()
o = o.value
print r.items()
c.close()
# make sure the basic methods are callable
def testSets(self):
# test set methods that have non-trivial implementations
self.db.setCacheDeactivateAfter(12) # deprecated
self.db.setCacheSize(15)
self.db.setVersionCacheDeactivateAfter(12) # deprecated
self.db.setVersionCacheSize(15)
def test_suite():
return unittest.makeSuite(DBTests)
=== StandaloneZODB/ZODB/tests/testCache.py 1.1 => 1.2 ===
objects in memory under the assumption that they may be used again.
"""
+from __future__ import nested_scopes
import random
import time
@@ -12,17 +13,20 @@
import ZODB
import ZODB.MappingStorage
+from ZODB.cPickleCache import PickleCache
+from ZODB.POSException import ConflictError
from ZODB.PersistentMapping import PersistentMapping
from ZODB.tests.MinPO import MinPO
from ZODB.utils import p64
+from Persistence import Persistent
+
class CacheTestBase(unittest.TestCase):
def setUp(self):
store = ZODB.MappingStorage.MappingStorage()
self.db = ZODB.DB(store,
- cache_size = self.CACHE_SIZE,
- cache_deactivate_after = self.CACHE_DEACTIVATE_AFTER)
+ cache_size = self.CACHE_SIZE)
self.conns = []
def tearDown(self):
@@ -33,12 +37,12 @@
NUM_COLLECTIONS = 10
MAX_OBJECTS = 100
CACHE_SIZE = 20
- CACHE_DEACTIVATE_AFTER = 5
def noodle_new_connection(self):
"""Do some reads and writes on a new connection."""
c = self.db.open()
+ self.conns.append(c)
self.noodle_connection(c)
def noodle_connection(self, c):
@@ -106,5 +110,180 @@
# XXX same for the get and invalidate methods
+ def checkLRUitems(self):
+ # get a cache
+ c = self.conns[0]._cache
+ c.lru_items()
+
+ def checkClassItems(self):
+ c = self.conns[0]._cache
+ c.klass_items()
+
+class LRUCacheTests(CacheTestBase):
+
+ def checkLRU(self):
+ # verify the LRU behavior of the cache
+ CACHE_SIZE = 5
+ self.db.setCacheSize(CACHE_SIZE)
+ c = self.db.open()
+ r = c.root()
+ l = [None] * 10
+ for i in range(10):
+ l[i] = r[i] = MinPO(i)
+ # the root is the only thing in the cache, because all the
+ # other objects are new
+ self.assertEqual(len(c._cache), 1)
+ get_transaction().commit()
+ # commit() will register the objects, placing them in the cache.
+ # at the end of commit, the cache will be reduced down to CACHE_SIZE
+ # items
+ self.assertEqual(c._cache.ringlen(), CACHE_SIZE)
+ x = c._cache.get(p64(0), None)
+ self.assertEqual(x._p_changed, None) # the root is ghosted
+ for i in range(len(l)):
+ if i < CACHE_SIZE:
+ self.assertEqual(l[i]._p_changed, None)
+ else:
+ self.assertEqual(l[i]._p_changed, 0)
+
+ def checkSize(self):
+ self.assertEqual(self.db.cacheSize(), 0)
+ self.assertEqual(self.db.cacheDetailSize(), [])
+
+ CACHE_SIZE = 10
+ self.db.setCacheSize(CACHE_SIZE)
+
+ CONNS = 3
+ for i in range(CONNS):
+ self.noodle_new_connection()
+
+ self.assertEquals(self.db.cacheSize(), CACHE_SIZE * CONNS)
+ details = self.db.cacheDetailSize()
+ self.assertEquals(len(details), CONNS)
+ for d in details:
+ self.assertEquals(d['ngsize'], CACHE_SIZE)
+ # the root is also in the cache as ghost, because
+ # the connection holds a reference to it
+ self.assertEquals(d['size'], CACHE_SIZE + 1)
+
+ def checkDetail(self):
+ CACHE_SIZE = 10
+ self.db.setCacheSize(CACHE_SIZE)
+
+ CONNS = 3
+ for i in range(CONNS):
+ self.noodle_new_connection()
+
+ for klass, count in self.db.cacheDetail():
+ if klass.endswith('PersistentMapping'):
+ # one root per connection
+ self.assertEqual(count, CONNS)
+ if klass.endswith('MinPO'):
+ self.assertEqual(count, CONNS * CACHE_SIZE)
+
+ for details in self.db.cacheExtremeDetail():
+ # one dict per object. keys:
+ if details['klass'].endswith('PersistentMapping'):
+ self.assertEqual(details['state'], None)
+ else:
+ self.assert_(details['klass'].endswith('MinPO'))
+ self.assertEqual(details['state'], 0)
+
+class StubDataManager:
+ def setklassstate(self, object):
+ pass
+
+class StubObject(Persistent):
+ pass
+
+class CacheErrors(unittest.TestCase):
+
+ def setUp(self):
+ self.jar = StubDataManager()
+ self.cache = PickleCache(self.jar)
+
+ def checkGetBogusKey(self):
+ self.assertRaises(KeyError, self.cache.get, p64(0))
+ try:
+ self.cache[12]
+ except KeyError:
+ pass
+ else:
+ self.fail("expected KeyError")
+ try:
+ self.cache[12] = 12
+ except TypeError:
+ pass
+ else:
+ self.fail("expected TyepError")
+ try:
+ del self.cache[12]
+ except TypeError:
+ pass
+ else:
+ self.fail("expected TypeError")
+
+ def checkBogusObject(self):
+ def add(key, obj):
+ self.cache[key] = obj
+
+ key = p64(2)
+ # value isn't persistent
+ self.assertRaises(TypeError, add, key, 12)
+
+ o = StubObject()
+ # o._p_oid == None
+ self.assertRaises(ValueError, add, key, o)
+
+ o._p_oid = key
+ # o._p_jar == None
+ self.assertRaises(Exception, add, key, o)
+
+ o._p_jar = self.jar
+ self.cache[key] = o
+ # make sure it can be added multiple times
+ self.cache[key] = o
+
+ # same object, different keys
+ self.assertRaises(ValueError, add, p64(0), o)
+
+ def checkTwoCaches(self):
+ jar2 = StubDataManager()
+ cache2 = PickleCache(jar2)
+
+ o = StubObject()
+ key = o._p_oid = p64(1)
+ o._p_jar = jar2
+
+ cache2[key] = o
+
+ try:
+ self.cache[key] = o
+ except ValueError:
+ pass
+ else:
+ self.fail("expected ValueError because object already in cache")
+
+ def checkReadOnlyAttrsWhenCached(self):
+ o = StubObject()
+ key = o._p_oid = p64(1)
+ o._p_jar = self.jar
+ self.cache[key] = o
+ try:
+ o._p_oid = p64(2)
+ except ValueError:
+ pass
+ else:
+ self.fail("expect that you can't change oid of cached object")
+ try:
+ del o._p_jar
+ except ValueError:
+ pass
+ else:
+ self.fail("expect that you can't delete jar of cached object")
+
def test_suite():
- return unittest.makeSuite(DBMethods, 'check')
+ s = unittest.makeSuite(DBMethods, 'check')
+ s.addTest(unittest.makeSuite(LRUCacheTests, 'check'))
+ s.addTest(unittest.makeSuite(CacheErrors, 'check'))
+ return s
=== StandaloneZODB/ZODB/tests/testTransaction.py 1.6 => 1.7 ===
def modify(self, nojar=0, tracing=0):
-
if not nojar:
-
if self.nost:
self._p_jar = NoSubTransactionJar(tracing=tracing)
-
else:
self._p_jar = SubTransactionJar(tracing=tracing)
-
- else: pass
-
get_transaction().register(self)
-
-
-class TestTxnException(Exception): pass
+class TestTxnException(Exception):
+ pass
class BasicJar: