[Zope-CVS] CVS: Products/Ape/lib/apelib/tests -
testscanner.py:1.2.2.2
Shane Hathaway
shane at zope.com
Sat Jan 3 15:36:40 EST 2004
Update of /cvs-repository/Products/Ape/lib/apelib/tests
In directory cvs.zope.org:/tmp/cvs-serv4617/tests
Modified Files:
Tag: ape-0_8-branch
testscanner.py
Log Message:
Changed names in the scanner to make it a little easier to understand.
=== Products/Ape/lib/apelib/tests/testscanner.py 1.2.2.1 => 1.2.2.2 ===
--- Products/Ape/lib/apelib/tests/testscanner.py:1.2.2.1 Sat Dec 20 02:31:07 2003
+++ Products/Ape/lib/apelib/tests/testscanner.py Sat Jan 3 15:36:39 2004
@@ -19,7 +19,7 @@
import unittest
from time import time
-from apelib.zodb3.scanner import ScanControl
+from apelib.zodb3.scanner import PoolScanControl, Scanner
class FakeRepository:
@@ -31,7 +31,8 @@
if repo is not self:
raise AssertionError, "repo must be self"
if str(location) != location:
- raise AssertionError, "location is expected to be a string"
+ raise AssertionError(
+ "location %s is not a string" % repr(location))
# Always report a change
res[source] = 1001
return res
@@ -42,13 +43,17 @@
repo = FakeRepository()
def getPollSources(self, oid):
- return {(self.repo, oid): 10}
+ return {(self.repo, str(oid)): 10}
class ScanControlTests(unittest.TestCase):
def setUp(self):
- ctl = self.ctl = ScanControl()
+ storage = self.storage = FakeStorage()
+ scanner = self.scanner = Scanner()
+ storage.scanner = scanner
+ scanner.storage = storage
+ ctl = self.ctl = PoolScanControl(storage)
self.conn1 = ctl.newConnection()
self.conn2 = ctl.newConnection()
@@ -84,32 +89,29 @@
class ScannerTests(unittest.TestCase):
def setUp(self):
- ctl = self.ctl = ScanControl()
+ storage = self.storage = FakeStorage()
+ scanner = self.scanner = Scanner()
+ storage.scanner = scanner
+ scanner.storage = storage
+ ctl = self.ctl = PoolScanControl(storage)
self.conn1 = ctl.newConnection()
self.conn2 = ctl.newConnection()
- self.scanner = ctl.scanner
self.repo = FakeRepository()
def testAddSource(self):
new_sources = {(self.repo, '5'): 0}
- self.scanner.setPollSources(5, new_sources)
+ self.scanner.afterLoad(5, new_sources)
self.assertEqual(len(self.scanner.future), 1)
self.assertEqual(self.scanner.future[5][0], new_sources)
- def testChangeSources(self):
+ def testNoUpdatesWhenNotInvalidating(self):
+ # Don't change current except in scan(), where invalidation
+ # messages are possible.
self.conn1.setOIDs([5])
- old_sources = {(self.repo, '5'): 0}
- self.scanner.setPollSources(5, old_sources)
- self.assertEqual(len(self.scanner.future), 0)
- self.assertEqual(len(self.scanner.current), 1)
- self.assertEqual(self.scanner.current[5], old_sources)
-
- new_sources = {(self.repo, '6'): 0, (self.repo, '7'): 0, }
- self.scanner.setPollSources(5, new_sources)
- self.assertEqual(len(self.scanner.future), 0)
- self.assertEqual(len(self.scanner.current), 1)
- self.assertEqual(self.scanner.current[5], new_sources)
+ sources = {(self.repo, '5'): 0}
+ self.scanner.afterLoad(5, sources)
+ self.assertNotEqual(self.scanner.current[5], sources)
def testRemoveOID(self):
self.conn1.setOIDs([5])
@@ -120,7 +122,7 @@
def testScan(self):
self.conn1.setOIDs([5])
new_sources = {(self.repo, '6'): 0, (self.repo, '7'): 0, }
- self.scanner.setPollSources(5, new_sources)
+ self.scanner.afterLoad(5, new_sources)
to_invalidate = self.scanner.scan()
self.assertEqual(len(to_invalidate), 1)
@@ -134,17 +136,15 @@
def testFindNewSources(self):
# Verify the scanner calls storage.getSources() and saves the result.
- storage = FakeStorage()
- self.scanner.setStorage(storage)
self.conn1.setOIDs([5])
- expect_sources = storage.getPollSources(5)
+ expect_sources = self.storage.getPollSources(5)
self.assertEqual(self.scanner.current[5], expect_sources)
def testUseCachedSources(self):
# Verify the scanner uses previously cached sources when available.
repo = FakeRepository()
sources = {(repo, '999'): -1}
- self.scanner.setPollSources(5, sources)
+ self.scanner.afterLoad(5, sources)
self.conn1.setOIDs([5])
self.assertEqual(self.scanner.current[5], sources)
@@ -152,10 +152,10 @@
# Verify the scanner sees sources according to transactions.
repo = FakeRepository()
sources = {(repo, '999'): -1}
- self.scanner.setPollSources(5, sources)
+ self.scanner.afterLoad(5, sources)
self.conn1.setOIDs([5])
sources_2 = {(repo, '999'): -2}
- self.scanner.setUncommittedSources(123, 5, sources_2)
+ self.scanner.afterStore(5, 123, sources_2)
# Uncommitted data should have no effect on sources
self.assertEqual(self.scanner.current[5], sources)
self.scanner.afterCommit(123)
@@ -170,7 +170,7 @@
# Verify the scanner ignores sources on transaction abort.
repo = FakeRepository()
sources = {(repo, '999'): -2}
- self.scanner.setUncommittedSources(123, 5, sources)
+ self.scanner.afterStore(5, 123, sources)
self.assertEqual(self.scanner.uncommitted[123][5], sources)
self.scanner.afterAbort(123)
self.assert_(not self.scanner.uncommitted.has_key(123))
More information about the Zope-CVS
mailing list