[Zope-CVS] CVS: Packages/pypes/pypes/tests - test_graph.py:1.8
test_identity.py:1.6
Casey Duncan
casey at zope.com
Mon Feb 9 16:14:59 EST 2004
Update of /cvs-repository/Packages/pypes/pypes/tests
In directory cvs.zope.org:/tmp/cvs-serv11692/pypes/tests
Modified Files:
test_graph.py test_identity.py
Log Message:
dos -> unix line endings
=== Packages/pypes/pypes/tests/test_graph.py 1.7 => 1.8 ===
--- Packages/pypes/pypes/tests/test_graph.py:1.7 Fri Jan 30 00:35:39 2004
+++ Packages/pypes/pypes/tests/test_graph.py Mon Feb 9 16:14:58 2004
@@ -1,726 +1,726 @@
-##############################################################################
-#
-# Copyright (c) 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Event service tests
-
-$Id$"""
-
-import unittest
-import ZODB
-from random import randint
-from zope.interface.verify import verifyObject
-from persistent import Persistent
-from common import PypesTestCase, PypesPersistentTest, TestClass, sort
-from pypes import graph
-
-
-class TestDirectedGraph(unittest.TestCase):
-
- def _newObj(self):
- try:
- self.t += 1
- except AttributeError:
- self.t = 0L
- return self.t
-
- def setUp(self):
- self.graph = graph.DirectedGraph()
-
- def testInterface(self):
- from pypes.interfaces import IGraph, IGraphNodes, IGraphEdges
- self.failUnless(verifyObject(IGraph, self.graph))
- self.failUnless(verifyObject(IGraphNodes, self.graph.nodes))
- self.failUnless(verifyObject(IGraphEdges, self.graph.edges))
-
- def testAddNodes(self):
- objs = [self._newObj() for i in range(10)]
- for o in objs:
- self.failUnless(self.graph.nodes.add(o))
- for o in objs:
- self.failIf(self.graph.nodes.add(o))
- self.assertEqual(len(objs), len(self.graph.nodes))
- for o in objs:
- self.failUnless(o in self.graph.nodes)
-
- def testAddEdges(self):
- objs = [self._newObj() for i in range(10)]
- edges = []
- for s in range(len(objs)):
- for t in range(len(objs) - s):
- edges.append((objs[s], objs[t]))
- for source, target in edges:
- self.failUnless(self.graph.edges.add(source, target))
- self.failUnless(source in self.graph.nodes)
- self.failUnless(target in self.graph.nodes)
- for source, target in edges:
- self.failIf(self.graph.edges.add(source, target))
- for pair in edges:
- self.failUnless(pair in self.graph.edges)
-
- def testAddAndGetValue(self):
- edges = [(self._newObj(), self._newObj()) for i in range(10)]
- data = 0
- for source, target in edges:
- self.graph.edges.add(source, target, data)
- data += 1
- data = 0
- for source, target in edges:
- self.assertEqual(self.graph.edges.get(source, target), data)
- data += 1
-
- def testAddPreservesValue(self):
- source, target = self._newObj(), self._newObj()
- self.graph.edges.add(source, target, 42)
- self.graph.edges.add(source, target)
- self.assertEqual(self.graph.edges.get(source, target), 42)
-
- def testGetValueFails(self):
- from pypes.exceptions import GraphValueError
- source, target = self._newObj(), self._newObj()
- self.graph.edges.add(source, target)
- self.assertRaises(
- GraphValueError, self.graph.edges.get, source, target)
-
- def testGetValueDefault(self):
- source, target = self._newObj(), self._newObj()
- self.graph.edges.add(source, target)
- self.assertEqual(
- self.graph.edges.get(source, target, 'yadda'), 'yadda')
-
- def testRemoveEdges(self):
- edges = [(self._newObj(), self._newObj()) for i in range(10)]
- for source, target in edges:
- self.graph.edges.add(source, target)
- for edge in edges:
- self.graph.edges.remove(*edge)
- self.failIf(edge in self.graph.edges)
- self.failIf(self.graph.edges)
-
- def testRemoveEdgeLeavesNodes(self):
- source, target = self._newObj(), self._newObj()
- self.graph.edges.add(source, target)
- self.graph.edges.remove(source, target)
- self.failUnless(source in self.graph.nodes)
- self.failUnless(target in self.graph.nodes)
-
- def testRemoveEdgeFails(self):
- from pypes.exceptions import GraphLookupError
- source, target = self._newObj(), self._newObj()
- self.assertRaises(
- GraphLookupError, self.graph.edges.remove, source, target)
-
- def testRemoveEdgeRemovesValue(self):
- from pypes.exceptions import GraphLookupError
- source, target = self._newObj(), self._newObj()
- self.graph.edges.add(source, target, 101)
- self.graph.edges.remove(source, target)
- self.assertRaises(
- GraphLookupError, self.graph.edges.get, source, target)
-
- def testRemoveNode(self):
- objs = [self._newObj() for i in range(10)]
- for o in objs:
- self.graph.nodes.add(o)
- for o in objs:
- self.graph.nodes.remove(o)
- self.failIf(o in self.graph.nodes)
- self.failIf(self.graph.nodes)
-
- def testRemoveNodeRemovesEdges(self):
- obj = [self._newObj() for i in range(5)]
- self.graph.edges.add(obj[0], obj[1])
- self.graph.edges.add(obj[0], obj[2])
- self.graph.edges.add(obj[1], obj[4])
- self.graph.edges.add(obj[3], obj[1])
- self.graph.edges.add(obj[4], obj[0])
- self.graph.nodes.remove(obj[4])
- self.failIf((obj[4], obj[0]) in self.graph.edges)
- self.failIf((obj[1], obj[4]) in self.graph.edges)
- self.failUnless((obj[0], obj[1]) in self.graph.edges)
- self.failUnless((obj[0], obj[2]) in self.graph.edges)
- self.failUnless((obj[3], obj[1]) in self.graph.edges)
- self.graph.nodes.remove(obj[0])
- self.failIf((obj[0], obj[1]) in self.graph.edges)
- self.failIf((obj[0], obj[2]) in self.graph.edges)
- self.failUnless((obj[3], obj[1]) in self.graph.edges)
- self.assertEqual(len(self.graph.edges), 1)
- self.assertEqual(len(self.graph.nodes), 3)
-
- def testRemoveNodeFails(self):
- from pypes.exceptions import GraphLookupError
- self.assertRaises(
- GraphLookupError, self.graph.nodes.remove, self._newObj())
-
- def testGetValueFails(self):
- from pypes.exceptions import GraphLookupError
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.assertRaises(
- GraphLookupError, self.graph.edges.get, ob1, ob2)
- self.graph.edges.add(ob1, ob2)
- self.assertRaises(
- GraphLookupError, self.graph.edges.get, ob1, ob3)
- self.assertRaises(
- GraphLookupError, self.graph.edges.get, ob3, ob2)
- self.assertRaises(
- GraphLookupError, self.graph.edges.get, ob2, ob1)
-
- def testAddReplacesValue(self):
- from pypes.exceptions import GraphValueError
- ob1, ob2 = self._newObj(), self._newObj()
- self.graph.edges.add(ob1, ob2)
- self.assertRaises(
- GraphValueError, self.graph.edges.get, ob1, ob2)
- self.graph.edges.add(ob1, ob2, 'data!')
- self.assertEqual(self.graph.edges.get(ob1, ob2), 'data!')
- self.graph.edges.add(ob1, ob2)
- self.assertEqual(self.graph.edges.get(ob1, ob2), 'data!')
-
- def testSetValue(self):
- ob1, ob2 = self._newObj(), self._newObj()
- self.graph.edges.add(ob1, ob2)
- self.graph.edges.set(ob1, ob2, '!atad')
- self.assertEqual(self.graph.edges.get(ob1, ob2), '!atad')
-
- def testSetValueFails(self):
- from pypes.exceptions import GraphLookupError
- ob1, ob2 = self._newObj(), self._newObj()
- self.assertRaises(
- GraphLookupError, self.graph.edges.set, ob1, ob2, 'sorry')
-
- def testEdgesContains(self):
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob1, ob2)
- self.graph.edges.add(ob2, ob3)
- self.failUnless((ob1, ob2) in self.graph.edges)
- self.failIf((ob2, ob1) in self.graph.edges)
- self.failUnless((ob2, ob3) in self.graph.edges)
- self.failIf((ob1, ob3) in self.graph.edges)
-
- def testIterNodes(self):
- objs = [self._newObj() for i in range(10)]
- for o in objs:
- self.graph.nodes.add(o)
- fromiter = list(self.graph.nodes)
- objs.sort()
- fromiter.sort()
- self.assertEqual(objs, fromiter)
-
- def testIterEdges(self):
- edges = [(self._newObj(), self._newObj()) for i in range(10)]
- for source, target in edges:
- self.graph.edges.add(source, target)
- fromiter = list(self.graph.edges)
- edges.sort()
- fromiter.sort()
- self.assertEqual(edges, fromiter)
-
- def testIterValues(self):
- edges = [(self._newObj(), self._newObj()) for i in range(10)]
- value = 0
- for source, target in edges:
- self.graph.edges.add(source, target, value)
- value += 1
- values = list(self.graph.edges.iterValues())
- value = 0
- for source, target in edges:
- self.failUnless(value in values)
- value += 1
-
- def testIterItems(self):
- edges = [(self._newObj(), self._newObj()) for i in range(10)]
- value = 0
- for source, target in edges:
- self.graph.edges.add(source, target, value)
- value += 1
- items = list(self.graph.edges.iterItems())
- value = 0
- for source, target in edges:
- self.failUnless(((source, target), value) in items)
- value += 1
-
- def testIterTopologicalSimple(self):
- objs = [self._newObj() for i in range(5)]
- for i in range(4):
- self.graph.edges.add(objs[i], objs[i+1])
- objs.reverse()
- self.assertEqual(list(self.graph.nodes.iterTopological()), objs)
-
- def testIterTopologicalWithCycle(self):
- from pypes.exceptions import GraphCycleError
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob1, ob2)
- self.graph.edges.add(ob2, ob3)
- self.graph.edges.add(ob3, ob1)
- self.assertRaises(
- GraphCycleError, list, self.graph.nodes.iterTopological())
-
- def testIterTopologicalNonTree(self):
- objs = [self._newObj() for i in range(7)]
- self.graph.edges.add(objs[0], objs[1])
- self.graph.edges.add(objs[1], objs[2])
- self.graph.edges.add(objs[2], objs[5])
- self.graph.edges.add(objs[3], objs[0])
- self.graph.edges.add(objs[3], objs[1])
- self.graph.edges.add(objs[3], objs[2])
- self.graph.edges.add(objs[3], objs[4])
- self.graph.edges.add(objs[3], objs[5])
- self.graph.edges.add(objs[3], objs[6])
- self.graph.edges.add(objs[4], objs[5])
- self.graph.edges.add(objs[6], objs[4])
- tsort = list(self.graph.nodes.iterTopological())
- self.assertEqual(len(tsort), len(objs))
- self.failUnless(objs[5] is tsort[0])
- self.failUnless(objs[3] is tsort[-1])
- idx = tsort.index
- self.failUnless(idx(objs[0]) > idx(objs[1]))
- self.failUnless(idx(objs[1]) > idx(objs[2]))
- self.failUnless(idx(objs[6]) > idx(objs[4]))
-
- def testIterTopologicalDisconnected(self):
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.nodes.add(ob1)
- self.graph.nodes.add(ob2)
- self.graph.nodes.add(ob3)
- tsort = list(self.graph.nodes.iterTopological())
- self.assertEqual(len(tsort), 3)
- self.failUnless(ob1 in tsort)
- self.failUnless(ob2 in tsort)
- self.failUnless(ob3 in tsort)
-
- ## XXX Need to add transitiveClosure tests
-
- def testDegree(self):
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob1, ob2)
- self.graph.edges.add(ob1, ob3)
- self.graph.edges.add(ob1, ob1)
- self.graph.edges.add(ob2, ob1)
- self.assertEqual(self.graph.nodes.degree(ob1), 3)
- self.assertEqual(self.graph.nodes.degree(ob2), 1)
- self.assertEqual(self.graph.nodes.degree(ob3), 0)
-
- def testTargets(self):
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob1, ob2)
- self.graph.edges.add(ob1, ob3)
- self.graph.edges.add(ob1, ob1)
- self.graph.edges.add(ob2, ob1)
- n = list(self.graph.nodes.targets(ob1))
- objs = [ob1, ob2, ob3]
- n.sort()
- objs.sort()
- self.assertEqual(n, objs)
- self.assertEqual(list(self.graph.nodes.targets(ob2)), [ob1])
- self.failIf(self.graph.nodes.targets(ob3))
-
- def testSources(self):
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob1, ob2)
- self.graph.edges.add(ob3, ob2)
- self.graph.edges.add(ob1, ob1)
- self.graph.edges.add(ob2, ob1)
- self.assertEqual(list(self.graph.nodes.sources(ob1)), [ob1, ob2])
- self.assertEqual(list(self.graph.nodes.sources(ob2)), [ob1, ob3])
- self.failIf(self.graph.nodes.sources(ob3))
-
- def testTargetsTransitive(self):
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob1, ob2)
- self.graph.edges.add(ob2, ob1)
- self.graph.edges.add(ob2, ob3)
- self.graph.edges.add(self._newObj(), self._newObj())
- r = list(self.graph.nodes.targets(ob1, 1))
- objs = [ob1, ob2, ob3]
- r.sort()
- objs.sort()
- self.assertEqual(r, objs)
- r = list(self.graph.nodes.targets(ob2, 1))
- r.sort()
- self.assertEqual(r, objs)
- self.failIf(self.graph.nodes.targets(ob3, 1))
-
- def testSourcesTransitive(self):
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob1, ob2)
- self.graph.edges.add(ob2, ob1)
- self.graph.edges.add(ob2, ob3)
- self.graph.edges.add(self._newObj(), self._newObj())
- self.assertEqual(list(self.graph.nodes.sources(ob3, 1)), [ob1, ob2])
- self.assertEqual(list(self.graph.nodes.sources(ob2, 1)), [ob1, ob2])
- self.assertEqual(list(self.graph.nodes.sources(ob1, 1)), [ob1, ob2])
-
- def _makeMatrix(self):
- nodes = {}
- for y in range(5):
- for x in range(5):
- n = nodes[x,y] = self._newObj()
- if x:
- if (x + y) % 2:
- self.graph.edges.add(nodes[x-1, y], n, 1)
- else:
- self.graph.edges.add(n, nodes[x-1, y], 1)
- if y:
- if (x + y) % 2:
- self.graph.edges.add(n, nodes[x, y-1], 1)
- else:
- self.graph.edges.add(nodes[x, y-1], n, 1)
- elif not x:
- self.graph.nodes.add(n)
- return nodes
-
- def testShortestPathMatrixWithoutValues(self):
- nodes = self._makeMatrix()
- result = self.graph.shortestPath(nodes[0, 0], nodes[4, 4])
- self.failIf(result is None)
- i, x1, y1 = 0, 0, 0
- path = [nodes[x1, y1]]
- while x1 < 4 or y1 < 4:
- if i % 2:
- y1 += 1
- else:
- x1 += 1
- path.append(nodes[x1, y1])
- i += 1
- self.assertEqual(result, path)
-
- def testShortestPathUnreachable(self):
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob1, ob2)
- self.graph.edges.add(ob3, ob2)
- self.assertEqual(self.graph.shortestPath(ob1, ob3), None)
-
- def testShortestPathWithValues(self):
- objs = [self._newObj() for i in range(5)]
- self.graph.edges.add(objs[0], objs[1], 10)
- self.graph.edges.add(objs[0], objs[2], 2)
- self.graph.edges.add(objs[2], objs[1], 10)
- self.graph.edges.add(objs[2], objs[3], 1)
- self.graph.edges.add(objs[3], objs[4], 1)
- self.graph.edges.add(objs[4], objs[1], 3)
- self.assertEqual(
- self.graph.shortestPath(objs[0], objs[1], 1),
- [objs[0], objs[2], objs[3], objs[4], objs[1]])
-
- def testShortestPathMatrixWithValues(self):
- nodes = self._makeMatrix()
- result = self.graph.shortestPath(nodes[0, 0], nodes[4, 4], 1)
- self.failIf(result is None)
- i, x1, y1 = 0, 0, 0
- path = [nodes[x1, y1]]
- while x1 < 4 or y1 < 4:
- if i % 2:
- y1 += 1
- else:
- x1 += 1
- path.append(nodes[x1, y1])
- i += 1
- self.assertEqual(result, path)
-
- def testShortestPathWithValuesUnreachable(self):
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob1, ob2, 3)
- self.graph.edges.add(ob3, ob2, 4)
- self.assertEqual(self.graph.shortestPath(ob1, ob3, 1), None)
-
- def testShortestPathWithBogusSource(self):
- from pypes.exceptions import GraphLookupError
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob3, ob2, 3)
- self.assertRaises(
- GraphLookupError, self.graph.shortestPath, ob1, ob2)
-
- def testShortestPathWithBogusTarget(self):
- from pypes.exceptions import GraphLookupError
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob3, ob2, 3)
- self.assertRaises(
- GraphLookupError, self.graph.shortestPath, ob2, ob1)
-
- def testAllPathsSimple(self):
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob1, ob2)
- self.graph.edges.add(ob1, ob3)
- self.graph.edges.add(ob2, ob3)
- paths = list(self.graph.allPaths())
- expected = [[ob1, ob2], [ob1, ob2, ob3], [ob1, ob3], [ob2, ob3]]
- self.assertEqual(sort(paths), sort(expected))
-
- def testAllPathsSimpleWithSource(self):
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob1, ob2)
- self.graph.edges.add(ob1, ob3)
- self.graph.edges.add(ob2, ob3)
- paths = list(self.graph.allPaths(source=ob1))
- expected = [[ob1, ob2], [ob1, ob2, ob3], [ob1, ob3]]
- self.assertEqual(sort(paths), sort(expected))
- paths = list(self.graph.allPaths(source=ob2))
- self.assertEqual(paths, [[ob2, ob3]])
- paths = list(self.graph.allPaths(source=ob3))
- self.assertEqual(paths, [])
-
- def testAllPathsSimpleWithTarget(self):
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob1, ob2)
- self.graph.edges.add(ob1, ob3)
- self.graph.edges.add(ob2, ob3)
- paths = list(self.graph.allPaths(target=ob1))
- self.assertEqual(paths, [])
- paths = list(self.graph.allPaths(target=ob2))
- self.assertEqual(paths, [[ob1, ob2]])
- paths = list(self.graph.allPaths(target=ob3))
- expected = [[ob1, ob2, ob3], [ob1, ob3], [ob2, ob3]]
- self.assertEqual(sort(paths), sort(expected))
-
- def testAllPathsSimpleWithSourceAndTarget(self):
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob1, ob2)
- self.graph.edges.add(ob1, ob3)
- self.graph.edges.add(ob2, ob3)
- paths = list(self.graph.allPaths(ob1, ob2))
- self.assertEqual(paths, [[ob1, ob2]])
- paths = list(self.graph.allPaths(ob1, ob3))
- expected = [[ob1, ob2, ob3], [ob1, ob3], [ob2, ob3]]
- paths = list(self.graph.allPaths(ob2, ob3))
- self.assertEqual(paths, [[ob2, ob3]])
- paths = list(self.graph.allPaths(ob2, ob1))
- self.assertEqual(paths, [])
-
- def testAllPathsWithCycle(self):
- from pypes.graph import cycle
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob1, ob2)
- self.graph.edges.add(ob2, ob1)
- self.graph.edges.add(ob3, ob1)
- paths = list(self.graph.allPaths())
- expected = [[ob1, ob2], cycle([ob1, ob2, ob1]),
- [ob2, ob1], cycle([ob2, ob1, ob2]),
- [ob3, ob1], [ob3, ob1, ob2], cycle([ob3, ob1, ob2, ob1])]
- paths.sort()
- expected.sort()
- self.assertEqual(paths, expected)
- for i in range(len(paths)):
- self.failUnless(type(paths[i]) is type(expected[i]))
-
- def testAllPathsWithBogusSource(self):
- from pypes.exceptions import GraphLookupError
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob3, ob2)
- self.assertRaises(GraphLookupError, self.graph.allPaths(ob1, ob2).next)
-
- def testAllPathsWithBogusTarget(self):
- from pypes.exceptions import GraphLookupError
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob3, ob2)
- self.assertRaises(GraphLookupError, self.graph.allPaths(ob3, ob1).next)
-
- def testAllPathsFullyConnected(self):
- objs = [self._newObj() for i in range(3)]
- for s in objs:
- for t in objs:
- self.graph.edges.add(s, t)
- allpaths = list(self.graph.allPaths())
- count = 0
- for i in objs:
- for j in objs:
- self.failUnless([i, j] in allpaths)
- count += 1
- if j is not i:
- for k in objs:
- self.failUnless([i, j, k] in allpaths)
- count += 1
- if k not in (j, i):
- for m in objs:
- self.failUnless([i, j, k, m] in allpaths)
- count += 1
- self.assertEqual(count, len(allpaths))
-
- def testAllPathsUnconnected(self):
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.nodes.add(ob1)
- self.graph.nodes.add(ob2)
- self.graph.nodes.add(ob3)
- self.assertEqual(list(self.graph.allPaths()), [])
-
-
-class TestDirectedIdGraph(PypesTestCase, TestDirectedGraph):
-
- def setUp(self):
- from pypes.graph import DirectedIdGraph
- self.graph = DirectedIdGraph()
- PypesTestCase.setUp(self)
-
- def testAddNodeUnregistered(self):
- from pypes.exceptions import IdentityError
- ob = TestClass()
- self.assertRaises(IdentityError, self.graph.nodes.add, ob)
-
- def testRemoveNodeUnregistered(self):
- from pypes.exceptions import IdentityError
- ob = TestClass()
- self.assertRaises(IdentityError, self.graph.nodes.remove, ob)
-
- def testContainsNodeUnregistered(self):
- from pypes.exceptions import IdentityError
- ob = TestClass()
- try:
- ob in self.graph.nodes
- except IdentityError:
- pass
- except:
- raise
- else:
- self.fail()
-
- def testDegreeNodeUnregistered(self):
- from pypes.exceptions import IdentityError
- ob = TestClass()
- self.assertRaises(IdentityError, self.graph.nodes.degree, ob)
-
- def testSourcesNodeUnregistered(self):
- from pypes.exceptions import IdentityError
- ob = TestClass()
- self.assertRaises(IdentityError, self.graph.nodes.sources, ob)
-
- def testTargetsNodeUnregistered(self):
- from pypes.exceptions import IdentityError
- ob = TestClass()
- self.assertRaises(IdentityError, self.graph.nodes.targets, ob)
-
- def testAddEdgeUnregistered(self):
- from pypes.exceptions import IdentityError
- ob1, ob2 = TestClass(), self._newObj()
- self.assertRaises(IdentityError, self.graph.edges.add, ob1, ob2)
- self.assertRaises(IdentityError, self.graph.edges.add, ob2, ob1)
-
- def testRemoveEdgeUnregistered(self):
- from pypes.exceptions import IdentityError
- ob1, ob2 = TestClass(), self._newObj()
- self.assertRaises(IdentityError, self.graph.edges.remove, ob1, ob2)
- self.assertRaises(IdentityError, self.graph.edges.remove, ob2, ob1)
-
- def testGetEdgeValueUnregistered(self):
- from pypes.exceptions import IdentityError
- ob1, ob2 = TestClass(), self._newObj()
- self.assertRaises(IdentityError, self.graph.edges.get, ob1, ob2)
- self.assertRaises(IdentityError, self.graph.edges.get, ob2, ob1)
-
- def testSetEdgeValueUnregistered(self):
- from pypes.exceptions import IdentityError
- ob1, ob2 = TestClass(), self._newObj()
- self.assertRaises(IdentityError, self.graph.edges.set, ob1, ob2, 0)
- self.assertRaises(IdentityError, self.graph.edges.set, ob2, ob1, 0)
-
- def testContainsEdgeUnregistered(self):
- from pypes.exceptions import IdentityError
- ob1, ob2 = TestClass(), self._newObj()
- try:
- (ob1, ob2) in self.graph.edges
- except IdentityError:
- pass
- except:
- raise
- else:
- self.fail()
- try:
- (ob2, ob1) in self.graph.edges
- except IdentityError:
- pass
- except:
- raise
- else:
- self.fail()
-
- def testShortestPathUnregistered(self):
- from pypes.exceptions import IdentityError
- ob1, ob2 = TestClass(), self._newObj()
- self.assertRaises(IdentityError, self.graph.shortestPath, ob1, ob2)
- self.assertRaises(IdentityError, self.graph.shortestPath, ob2, ob1)
-
- def testAllPathsUnregistered(self):
- from pypes.exceptions import IdentityError
- ob1, ob2 = TestClass(), self._newObj()
- self.assertRaises(IdentityError, self.graph.allPaths(ob1, ob2).next)
-
-
-class TestPQueue(unittest.TestCase):
-
- def setUp(self):
- from pypes.graph import PQueue
- self.q = PQueue()
-
- def testInsertAndPop(self):
- from random import randint
- values = [randint(0, 10000) for i in range(10)]
- for i in values:
- self.q.insert(str(i), i)
- values.sort()
- while self.q:
- k, m = self.q.popMin()
- v = values.pop(0)
- self.assertEqual(k, str(v))
- self.assertEqual(m, v)
- k, m = self.q.popMax()
- v = values.pop()
- self.assertEqual(k, str(v))
- self.assertEqual(m, v)
-
-class TestDirectedIdGraphPersistence(PypesPersistentTest):
-
- def setUp(self):
- from pypes.graph import DirectedIdGraph
- super(TestDirectedIdGraphPersistence, self).setUp()
- self.root = self.db.open().root()
- self.root['g'] = self.graph = DirectedIdGraph()
- get_transaction().commit()
-
- def testBasicPersistence(self):
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- self.graph.edges.add(ob1, ob2)
- self.graph.edges.add(ob1, ob3)
- self.graph.edges.add(ob2, ob1)
- self.graph.edges.add(ob2, ob3)
- get_transaction().commit()
-
- g2 = self.db.open().root()['g']
- self.assertEqual(len(g2.nodes), 3)
- self.assertEqual(len(g2.edges), 4)
- self.failUnless((ob1, ob2) in g2.edges)
- self.failUnless((ob1, ob3) in g2.edges)
- self.failUnless((ob2, ob1) in g2.edges)
- self.failUnless((ob2, ob3) in g2.edges)
-
- def testConcurrentAddNodes(self):
- ob1, ob2= self._newObj(), self._newObj()
- g2 = self.db.open().root()['g']
- list(g2.nodes); list(g2.edges)
-
- self.graph.nodes.add(ob1)
- get_transaction().commit()
-
- g2.nodes.add(ob2)
- get_transaction().commit()
-
- def testConcurrentAddEdges(self):
- ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
- g2 = self.db.open().root()['g']
- list(g2.nodes); list(g2.edges)
-
- self.graph.edges.add(ob1, ob2)
- g2.edges.add(ob1, ob2)
- get_transaction().commit()
-
- g2.edges.add(ob2, ob3)
- get_transaction().commit()
-
-if __name__ == '__main__':
- unittest.main()
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Event service tests
+
+$Id$"""
+
+import unittest
+import ZODB
+from random import randint
+from zope.interface.verify import verifyObject
+from persistent import Persistent
+from common import PypesTestCase, PypesPersistentTest, TestClass, sort
+from pypes import graph
+
+
+class TestDirectedGraph(unittest.TestCase):
+
+ def _newObj(self):
+ try:
+ self.t += 1
+ except AttributeError:
+ self.t = 0L
+ return self.t
+
+ def setUp(self):
+ self.graph = graph.DirectedGraph()
+
+ def testInterface(self):
+ from pypes.interfaces import IGraph, IGraphNodes, IGraphEdges
+ self.failUnless(verifyObject(IGraph, self.graph))
+ self.failUnless(verifyObject(IGraphNodes, self.graph.nodes))
+ self.failUnless(verifyObject(IGraphEdges, self.graph.edges))
+
+ def testAddNodes(self):
+ objs = [self._newObj() for i in range(10)]
+ for o in objs:
+ self.failUnless(self.graph.nodes.add(o))
+ for o in objs:
+ self.failIf(self.graph.nodes.add(o))
+ self.assertEqual(len(objs), len(self.graph.nodes))
+ for o in objs:
+ self.failUnless(o in self.graph.nodes)
+
+ def testAddEdges(self):
+ objs = [self._newObj() for i in range(10)]
+ edges = []
+ for s in range(len(objs)):
+ for t in range(len(objs) - s):
+ edges.append((objs[s], objs[t]))
+ for source, target in edges:
+ self.failUnless(self.graph.edges.add(source, target))
+ self.failUnless(source in self.graph.nodes)
+ self.failUnless(target in self.graph.nodes)
+ for source, target in edges:
+ self.failIf(self.graph.edges.add(source, target))
+ for pair in edges:
+ self.failUnless(pair in self.graph.edges)
+
+ def testAddAndGetValue(self):
+ edges = [(self._newObj(), self._newObj()) for i in range(10)]
+ data = 0
+ for source, target in edges:
+ self.graph.edges.add(source, target, data)
+ data += 1
+ data = 0
+ for source, target in edges:
+ self.assertEqual(self.graph.edges.get(source, target), data)
+ data += 1
+
+ def testAddPreservesValue(self):
+ source, target = self._newObj(), self._newObj()
+ self.graph.edges.add(source, target, 42)
+ self.graph.edges.add(source, target)
+ self.assertEqual(self.graph.edges.get(source, target), 42)
+
+ def testGetValueFails(self):
+ from pypes.exceptions import GraphValueError
+ source, target = self._newObj(), self._newObj()
+ self.graph.edges.add(source, target)
+ self.assertRaises(
+ GraphValueError, self.graph.edges.get, source, target)
+
+ def testGetValueDefault(self):
+ source, target = self._newObj(), self._newObj()
+ self.graph.edges.add(source, target)
+ self.assertEqual(
+ self.graph.edges.get(source, target, 'yadda'), 'yadda')
+
+ def testRemoveEdges(self):
+ edges = [(self._newObj(), self._newObj()) for i in range(10)]
+ for source, target in edges:
+ self.graph.edges.add(source, target)
+ for edge in edges:
+ self.graph.edges.remove(*edge)
+ self.failIf(edge in self.graph.edges)
+ self.failIf(self.graph.edges)
+
+ def testRemoveEdgeLeavesNodes(self):
+ source, target = self._newObj(), self._newObj()
+ self.graph.edges.add(source, target)
+ self.graph.edges.remove(source, target)
+ self.failUnless(source in self.graph.nodes)
+ self.failUnless(target in self.graph.nodes)
+
+ def testRemoveEdgeFails(self):
+ from pypes.exceptions import GraphLookupError
+ source, target = self._newObj(), self._newObj()
+ self.assertRaises(
+ GraphLookupError, self.graph.edges.remove, source, target)
+
+ def testRemoveEdgeRemovesValue(self):
+ from pypes.exceptions import GraphLookupError
+ source, target = self._newObj(), self._newObj()
+ self.graph.edges.add(source, target, 101)
+ self.graph.edges.remove(source, target)
+ self.assertRaises(
+ GraphLookupError, self.graph.edges.get, source, target)
+
+ def testRemoveNode(self):
+ objs = [self._newObj() for i in range(10)]
+ for o in objs:
+ self.graph.nodes.add(o)
+ for o in objs:
+ self.graph.nodes.remove(o)
+ self.failIf(o in self.graph.nodes)
+ self.failIf(self.graph.nodes)
+
+ def testRemoveNodeRemovesEdges(self):
+ obj = [self._newObj() for i in range(5)]
+ self.graph.edges.add(obj[0], obj[1])
+ self.graph.edges.add(obj[0], obj[2])
+ self.graph.edges.add(obj[1], obj[4])
+ self.graph.edges.add(obj[3], obj[1])
+ self.graph.edges.add(obj[4], obj[0])
+ self.graph.nodes.remove(obj[4])
+ self.failIf((obj[4], obj[0]) in self.graph.edges)
+ self.failIf((obj[1], obj[4]) in self.graph.edges)
+ self.failUnless((obj[0], obj[1]) in self.graph.edges)
+ self.failUnless((obj[0], obj[2]) in self.graph.edges)
+ self.failUnless((obj[3], obj[1]) in self.graph.edges)
+ self.graph.nodes.remove(obj[0])
+ self.failIf((obj[0], obj[1]) in self.graph.edges)
+ self.failIf((obj[0], obj[2]) in self.graph.edges)
+ self.failUnless((obj[3], obj[1]) in self.graph.edges)
+ self.assertEqual(len(self.graph.edges), 1)
+ self.assertEqual(len(self.graph.nodes), 3)
+
+ def testRemoveNodeFails(self):
+ from pypes.exceptions import GraphLookupError
+ self.assertRaises(
+ GraphLookupError, self.graph.nodes.remove, self._newObj())
+
+ def testGetValueFails(self):
+ from pypes.exceptions import GraphLookupError
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.assertRaises(
+ GraphLookupError, self.graph.edges.get, ob1, ob2)
+ self.graph.edges.add(ob1, ob2)
+ self.assertRaises(
+ GraphLookupError, self.graph.edges.get, ob1, ob3)
+ self.assertRaises(
+ GraphLookupError, self.graph.edges.get, ob3, ob2)
+ self.assertRaises(
+ GraphLookupError, self.graph.edges.get, ob2, ob1)
+
+ def testAddReplacesValue(self):
+ from pypes.exceptions import GraphValueError
+ ob1, ob2 = self._newObj(), self._newObj()
+ self.graph.edges.add(ob1, ob2)
+ self.assertRaises(
+ GraphValueError, self.graph.edges.get, ob1, ob2)
+ self.graph.edges.add(ob1, ob2, 'data!')
+ self.assertEqual(self.graph.edges.get(ob1, ob2), 'data!')
+ self.graph.edges.add(ob1, ob2)
+ self.assertEqual(self.graph.edges.get(ob1, ob2), 'data!')
+
+ def testSetValue(self):
+ ob1, ob2 = self._newObj(), self._newObj()
+ self.graph.edges.add(ob1, ob2)
+ self.graph.edges.set(ob1, ob2, '!atad')
+ self.assertEqual(self.graph.edges.get(ob1, ob2), '!atad')
+
+ def testSetValueFails(self):
+ from pypes.exceptions import GraphLookupError
+ ob1, ob2 = self._newObj(), self._newObj()
+ self.assertRaises(
+ GraphLookupError, self.graph.edges.set, ob1, ob2, 'sorry')
+
+ def testEdgesContains(self):
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob1, ob2)
+ self.graph.edges.add(ob2, ob3)
+ self.failUnless((ob1, ob2) in self.graph.edges)
+ self.failIf((ob2, ob1) in self.graph.edges)
+ self.failUnless((ob2, ob3) in self.graph.edges)
+ self.failIf((ob1, ob3) in self.graph.edges)
+
+ def testIterNodes(self):
+ objs = [self._newObj() for i in range(10)]
+ for o in objs:
+ self.graph.nodes.add(o)
+ fromiter = list(self.graph.nodes)
+ objs.sort()
+ fromiter.sort()
+ self.assertEqual(objs, fromiter)
+
+ def testIterEdges(self):
+ edges = [(self._newObj(), self._newObj()) for i in range(10)]
+ for source, target in edges:
+ self.graph.edges.add(source, target)
+ fromiter = list(self.graph.edges)
+ edges.sort()
+ fromiter.sort()
+ self.assertEqual(edges, fromiter)
+
+ def testIterValues(self):
+ edges = [(self._newObj(), self._newObj()) for i in range(10)]
+ value = 0
+ for source, target in edges:
+ self.graph.edges.add(source, target, value)
+ value += 1
+ values = list(self.graph.edges.iterValues())
+ value = 0
+ for source, target in edges:
+ self.failUnless(value in values)
+ value += 1
+
+ def testIterItems(self):
+ edges = [(self._newObj(), self._newObj()) for i in range(10)]
+ value = 0
+ for source, target in edges:
+ self.graph.edges.add(source, target, value)
+ value += 1
+ items = list(self.graph.edges.iterItems())
+ value = 0
+ for source, target in edges:
+ self.failUnless(((source, target), value) in items)
+ value += 1
+
+ def testIterTopologicalSimple(self):
+ objs = [self._newObj() for i in range(5)]
+ for i in range(4):
+ self.graph.edges.add(objs[i], objs[i+1])
+ objs.reverse()
+ self.assertEqual(list(self.graph.nodes.iterTopological()), objs)
+
+ def testIterTopologicalWithCycle(self):
+ from pypes.exceptions import GraphCycleError
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob1, ob2)
+ self.graph.edges.add(ob2, ob3)
+ self.graph.edges.add(ob3, ob1)
+ self.assertRaises(
+ GraphCycleError, list, self.graph.nodes.iterTopological())
+
+ def testIterTopologicalNonTree(self):
+ objs = [self._newObj() for i in range(7)]
+ self.graph.edges.add(objs[0], objs[1])
+ self.graph.edges.add(objs[1], objs[2])
+ self.graph.edges.add(objs[2], objs[5])
+ self.graph.edges.add(objs[3], objs[0])
+ self.graph.edges.add(objs[3], objs[1])
+ self.graph.edges.add(objs[3], objs[2])
+ self.graph.edges.add(objs[3], objs[4])
+ self.graph.edges.add(objs[3], objs[5])
+ self.graph.edges.add(objs[3], objs[6])
+ self.graph.edges.add(objs[4], objs[5])
+ self.graph.edges.add(objs[6], objs[4])
+ tsort = list(self.graph.nodes.iterTopological())
+ self.assertEqual(len(tsort), len(objs))
+ self.failUnless(objs[5] is tsort[0])
+ self.failUnless(objs[3] is tsort[-1])
+ idx = tsort.index
+ self.failUnless(idx(objs[0]) > idx(objs[1]))
+ self.failUnless(idx(objs[1]) > idx(objs[2]))
+ self.failUnless(idx(objs[6]) > idx(objs[4]))
+
+ def testIterTopologicalDisconnected(self):
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.nodes.add(ob1)
+ self.graph.nodes.add(ob2)
+ self.graph.nodes.add(ob3)
+ tsort = list(self.graph.nodes.iterTopological())
+ self.assertEqual(len(tsort), 3)
+ self.failUnless(ob1 in tsort)
+ self.failUnless(ob2 in tsort)
+ self.failUnless(ob3 in tsort)
+
+ ## XXX Need to add transitiveClosure tests
+
+ def testDegree(self):
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob1, ob2)
+ self.graph.edges.add(ob1, ob3)
+ self.graph.edges.add(ob1, ob1)
+ self.graph.edges.add(ob2, ob1)
+ self.assertEqual(self.graph.nodes.degree(ob1), 3)
+ self.assertEqual(self.graph.nodes.degree(ob2), 1)
+ self.assertEqual(self.graph.nodes.degree(ob3), 0)
+
+ def testTargets(self):
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob1, ob2)
+ self.graph.edges.add(ob1, ob3)
+ self.graph.edges.add(ob1, ob1)
+ self.graph.edges.add(ob2, ob1)
+ n = list(self.graph.nodes.targets(ob1))
+ objs = [ob1, ob2, ob3]
+ n.sort()
+ objs.sort()
+ self.assertEqual(n, objs)
+ self.assertEqual(list(self.graph.nodes.targets(ob2)), [ob1])
+ self.failIf(self.graph.nodes.targets(ob3))
+
+ def testSources(self):
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob1, ob2)
+ self.graph.edges.add(ob3, ob2)
+ self.graph.edges.add(ob1, ob1)
+ self.graph.edges.add(ob2, ob1)
+ self.assertEqual(list(self.graph.nodes.sources(ob1)), [ob1, ob2])
+ self.assertEqual(list(self.graph.nodes.sources(ob2)), [ob1, ob3])
+ self.failIf(self.graph.nodes.sources(ob3))
+
+ def testTargetsTransitive(self):
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob1, ob2)
+ self.graph.edges.add(ob2, ob1)
+ self.graph.edges.add(ob2, ob3)
+ self.graph.edges.add(self._newObj(), self._newObj())
+ r = list(self.graph.nodes.targets(ob1, 1))
+ objs = [ob1, ob2, ob3]
+ r.sort()
+ objs.sort()
+ self.assertEqual(r, objs)
+ r = list(self.graph.nodes.targets(ob2, 1))
+ r.sort()
+ self.assertEqual(r, objs)
+ self.failIf(self.graph.nodes.targets(ob3, 1))
+
+ def testSourcesTransitive(self):
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob1, ob2)
+ self.graph.edges.add(ob2, ob1)
+ self.graph.edges.add(ob2, ob3)
+ self.graph.edges.add(self._newObj(), self._newObj())
+ self.assertEqual(list(self.graph.nodes.sources(ob3, 1)), [ob1, ob2])
+ self.assertEqual(list(self.graph.nodes.sources(ob2, 1)), [ob1, ob2])
+ self.assertEqual(list(self.graph.nodes.sources(ob1, 1)), [ob1, ob2])
+
+ def _makeMatrix(self):
+ nodes = {}
+ for y in range(5):
+ for x in range(5):
+ n = nodes[x,y] = self._newObj()
+ if x:
+ if (x + y) % 2:
+ self.graph.edges.add(nodes[x-1, y], n, 1)
+ else:
+ self.graph.edges.add(n, nodes[x-1, y], 1)
+ if y:
+ if (x + y) % 2:
+ self.graph.edges.add(n, nodes[x, y-1], 1)
+ else:
+ self.graph.edges.add(nodes[x, y-1], n, 1)
+ elif not x:
+ self.graph.nodes.add(n)
+ return nodes
+
+ def testShortestPathMatrixWithoutValues(self):
+ nodes = self._makeMatrix()
+ result = self.graph.shortestPath(nodes[0, 0], nodes[4, 4])
+ self.failIf(result is None)
+ i, x1, y1 = 0, 0, 0
+ path = [nodes[x1, y1]]
+ while x1 < 4 or y1 < 4:
+ if i % 2:
+ y1 += 1
+ else:
+ x1 += 1
+ path.append(nodes[x1, y1])
+ i += 1
+ self.assertEqual(result, path)
+
+ def testShortestPathUnreachable(self):
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob1, ob2)
+ self.graph.edges.add(ob3, ob2)
+ self.assertEqual(self.graph.shortestPath(ob1, ob3), None)
+
+ def testShortestPathWithValues(self):
+ objs = [self._newObj() for i in range(5)]
+ self.graph.edges.add(objs[0], objs[1], 10)
+ self.graph.edges.add(objs[0], objs[2], 2)
+ self.graph.edges.add(objs[2], objs[1], 10)
+ self.graph.edges.add(objs[2], objs[3], 1)
+ self.graph.edges.add(objs[3], objs[4], 1)
+ self.graph.edges.add(objs[4], objs[1], 3)
+ self.assertEqual(
+ self.graph.shortestPath(objs[0], objs[1], 1),
+ [objs[0], objs[2], objs[3], objs[4], objs[1]])
+
+ def testShortestPathMatrixWithValues(self):
+ nodes = self._makeMatrix()
+ result = self.graph.shortestPath(nodes[0, 0], nodes[4, 4], 1)
+ self.failIf(result is None)
+ i, x1, y1 = 0, 0, 0
+ path = [nodes[x1, y1]]
+ while x1 < 4 or y1 < 4:
+ if i % 2:
+ y1 += 1
+ else:
+ x1 += 1
+ path.append(nodes[x1, y1])
+ i += 1
+ self.assertEqual(result, path)
+
+ def testShortestPathWithValuesUnreachable(self):
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob1, ob2, 3)
+ self.graph.edges.add(ob3, ob2, 4)
+ self.assertEqual(self.graph.shortestPath(ob1, ob3, 1), None)
+
+ def testShortestPathWithBogusSource(self):
+ from pypes.exceptions import GraphLookupError
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob3, ob2, 3)
+ self.assertRaises(
+ GraphLookupError, self.graph.shortestPath, ob1, ob2)
+
+ def testShortestPathWithBogusTarget(self):
+ from pypes.exceptions import GraphLookupError
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob3, ob2, 3)
+ self.assertRaises(
+ GraphLookupError, self.graph.shortestPath, ob2, ob1)
+
+ def testAllPathsSimple(self):
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob1, ob2)
+ self.graph.edges.add(ob1, ob3)
+ self.graph.edges.add(ob2, ob3)
+ paths = list(self.graph.allPaths())
+ expected = [[ob1, ob2], [ob1, ob2, ob3], [ob1, ob3], [ob2, ob3]]
+ self.assertEqual(sort(paths), sort(expected))
+
+ def testAllPathsSimpleWithSource(self):
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob1, ob2)
+ self.graph.edges.add(ob1, ob3)
+ self.graph.edges.add(ob2, ob3)
+ paths = list(self.graph.allPaths(source=ob1))
+ expected = [[ob1, ob2], [ob1, ob2, ob3], [ob1, ob3]]
+ self.assertEqual(sort(paths), sort(expected))
+ paths = list(self.graph.allPaths(source=ob2))
+ self.assertEqual(paths, [[ob2, ob3]])
+ paths = list(self.graph.allPaths(source=ob3))
+ self.assertEqual(paths, [])
+
+ def testAllPathsSimpleWithTarget(self):
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob1, ob2)
+ self.graph.edges.add(ob1, ob3)
+ self.graph.edges.add(ob2, ob3)
+ paths = list(self.graph.allPaths(target=ob1))
+ self.assertEqual(paths, [])
+ paths = list(self.graph.allPaths(target=ob2))
+ self.assertEqual(paths, [[ob1, ob2]])
+ paths = list(self.graph.allPaths(target=ob3))
+ expected = [[ob1, ob2, ob3], [ob1, ob3], [ob2, ob3]]
+ self.assertEqual(sort(paths), sort(expected))
+
+ def testAllPathsSimpleWithSourceAndTarget(self):
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob1, ob2)
+ self.graph.edges.add(ob1, ob3)
+ self.graph.edges.add(ob2, ob3)
+ paths = list(self.graph.allPaths(ob1, ob2))
+ self.assertEqual(paths, [[ob1, ob2]])
+ paths = list(self.graph.allPaths(ob1, ob3))
+ expected = [[ob1, ob2, ob3], [ob1, ob3], [ob2, ob3]]
+ paths = list(self.graph.allPaths(ob2, ob3))
+ self.assertEqual(paths, [[ob2, ob3]])
+ paths = list(self.graph.allPaths(ob2, ob1))
+ self.assertEqual(paths, [])
+
+ def testAllPathsWithCycle(self):
+ from pypes.graph import cycle
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob1, ob2)
+ self.graph.edges.add(ob2, ob1)
+ self.graph.edges.add(ob3, ob1)
+ paths = list(self.graph.allPaths())
+ expected = [[ob1, ob2], cycle([ob1, ob2, ob1]),
+ [ob2, ob1], cycle([ob2, ob1, ob2]),
+ [ob3, ob1], [ob3, ob1, ob2], cycle([ob3, ob1, ob2, ob1])]
+ paths.sort()
+ expected.sort()
+ self.assertEqual(paths, expected)
+ for i in range(len(paths)):
+ self.failUnless(type(paths[i]) is type(expected[i]))
+
+ def testAllPathsWithBogusSource(self):
+ from pypes.exceptions import GraphLookupError
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob3, ob2)
+ self.assertRaises(GraphLookupError, self.graph.allPaths(ob1, ob2).next)
+
+ def testAllPathsWithBogusTarget(self):
+ from pypes.exceptions import GraphLookupError
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob3, ob2)
+ self.assertRaises(GraphLookupError, self.graph.allPaths(ob3, ob1).next)
+
+ def testAllPathsFullyConnected(self):
+ objs = [self._newObj() for i in range(3)]
+ for s in objs:
+ for t in objs:
+ self.graph.edges.add(s, t)
+ allpaths = list(self.graph.allPaths())
+ count = 0
+ for i in objs:
+ for j in objs:
+ self.failUnless([i, j] in allpaths)
+ count += 1
+ if j is not i:
+ for k in objs:
+ self.failUnless([i, j, k] in allpaths)
+ count += 1
+ if k not in (j, i):
+ for m in objs:
+ self.failUnless([i, j, k, m] in allpaths)
+ count += 1
+ self.assertEqual(count, len(allpaths))
+
+ def testAllPathsUnconnected(self):
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.nodes.add(ob1)
+ self.graph.nodes.add(ob2)
+ self.graph.nodes.add(ob3)
+ self.assertEqual(list(self.graph.allPaths()), [])
+
+
+class TestDirectedIdGraph(PypesTestCase, TestDirectedGraph):
+
+ def setUp(self):
+ from pypes.graph import DirectedIdGraph
+ self.graph = DirectedIdGraph()
+ PypesTestCase.setUp(self)
+
+ def testAddNodeUnregistered(self):
+ from pypes.exceptions import IdentityError
+ ob = TestClass()
+ self.assertRaises(IdentityError, self.graph.nodes.add, ob)
+
+ def testRemoveNodeUnregistered(self):
+ from pypes.exceptions import IdentityError
+ ob = TestClass()
+ self.assertRaises(IdentityError, self.graph.nodes.remove, ob)
+
+ def testContainsNodeUnregistered(self):
+ from pypes.exceptions import IdentityError
+ ob = TestClass()
+ try:
+ ob in self.graph.nodes
+ except IdentityError:
+ pass
+ except:
+ raise
+ else:
+ self.fail()
+
+ def testDegreeNodeUnregistered(self):
+ from pypes.exceptions import IdentityError
+ ob = TestClass()
+ self.assertRaises(IdentityError, self.graph.nodes.degree, ob)
+
+ def testSourcesNodeUnregistered(self):
+ from pypes.exceptions import IdentityError
+ ob = TestClass()
+ self.assertRaises(IdentityError, self.graph.nodes.sources, ob)
+
+ def testTargetsNodeUnregistered(self):
+ from pypes.exceptions import IdentityError
+ ob = TestClass()
+ self.assertRaises(IdentityError, self.graph.nodes.targets, ob)
+
+ def testAddEdgeUnregistered(self):
+ from pypes.exceptions import IdentityError
+ ob1, ob2 = TestClass(), self._newObj()
+ self.assertRaises(IdentityError, self.graph.edges.add, ob1, ob2)
+ self.assertRaises(IdentityError, self.graph.edges.add, ob2, ob1)
+
+ def testRemoveEdgeUnregistered(self):
+ from pypes.exceptions import IdentityError
+ ob1, ob2 = TestClass(), self._newObj()
+ self.assertRaises(IdentityError, self.graph.edges.remove, ob1, ob2)
+ self.assertRaises(IdentityError, self.graph.edges.remove, ob2, ob1)
+
+ def testGetEdgeValueUnregistered(self):
+ from pypes.exceptions import IdentityError
+ ob1, ob2 = TestClass(), self._newObj()
+ self.assertRaises(IdentityError, self.graph.edges.get, ob1, ob2)
+ self.assertRaises(IdentityError, self.graph.edges.get, ob2, ob1)
+
+ def testSetEdgeValueUnregistered(self):
+ from pypes.exceptions import IdentityError
+ ob1, ob2 = TestClass(), self._newObj()
+ self.assertRaises(IdentityError, self.graph.edges.set, ob1, ob2, 0)
+ self.assertRaises(IdentityError, self.graph.edges.set, ob2, ob1, 0)
+
+ def testContainsEdgeUnregistered(self):
+ from pypes.exceptions import IdentityError
+ ob1, ob2 = TestClass(), self._newObj()
+ try:
+ (ob1, ob2) in self.graph.edges
+ except IdentityError:
+ pass
+ except:
+ raise
+ else:
+ self.fail()
+ try:
+ (ob2, ob1) in self.graph.edges
+ except IdentityError:
+ pass
+ except:
+ raise
+ else:
+ self.fail()
+
+ def testShortestPathUnregistered(self):
+ from pypes.exceptions import IdentityError
+ ob1, ob2 = TestClass(), self._newObj()
+ self.assertRaises(IdentityError, self.graph.shortestPath, ob1, ob2)
+ self.assertRaises(IdentityError, self.graph.shortestPath, ob2, ob1)
+
+ def testAllPathsUnregistered(self):
+ from pypes.exceptions import IdentityError
+ ob1, ob2 = TestClass(), self._newObj()
+ self.assertRaises(IdentityError, self.graph.allPaths(ob1, ob2).next)
+
+
+class TestPQueue(unittest.TestCase):
+
+ def setUp(self):
+ from pypes.graph import PQueue
+ self.q = PQueue()
+
+ def testInsertAndPop(self):
+ from random import randint
+ values = [randint(0, 10000) for i in range(10)]
+ for i in values:
+ self.q.insert(str(i), i)
+ values.sort()
+ while self.q:
+ k, m = self.q.popMin()
+ v = values.pop(0)
+ self.assertEqual(k, str(v))
+ self.assertEqual(m, v)
+ k, m = self.q.popMax()
+ v = values.pop()
+ self.assertEqual(k, str(v))
+ self.assertEqual(m, v)
+
+class TestDirectedIdGraphPersistence(PypesPersistentTest):
+
+ def setUp(self):
+ from pypes.graph import DirectedIdGraph
+ super(TestDirectedIdGraphPersistence, self).setUp()
+ self.root = self.db.open().root()
+ self.root['g'] = self.graph = DirectedIdGraph()
+ get_transaction().commit()
+
+ def testBasicPersistence(self):
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ self.graph.edges.add(ob1, ob2)
+ self.graph.edges.add(ob1, ob3)
+ self.graph.edges.add(ob2, ob1)
+ self.graph.edges.add(ob2, ob3)
+ get_transaction().commit()
+
+ g2 = self.db.open().root()['g']
+ self.assertEqual(len(g2.nodes), 3)
+ self.assertEqual(len(g2.edges), 4)
+ self.failUnless((ob1, ob2) in g2.edges)
+ self.failUnless((ob1, ob3) in g2.edges)
+ self.failUnless((ob2, ob1) in g2.edges)
+ self.failUnless((ob2, ob3) in g2.edges)
+
+ def testConcurrentAddNodes(self):
+ ob1, ob2= self._newObj(), self._newObj()
+ g2 = self.db.open().root()['g']
+ list(g2.nodes); list(g2.edges)
+
+ self.graph.nodes.add(ob1)
+ get_transaction().commit()
+
+ g2.nodes.add(ob2)
+ get_transaction().commit()
+
+ def testConcurrentAddEdges(self):
+ ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+ g2 = self.db.open().root()['g']
+ list(g2.nodes); list(g2.edges)
+
+ self.graph.edges.add(ob1, ob2)
+ g2.edges.add(ob1, ob2)
+ get_transaction().commit()
+
+ g2.edges.add(ob2, ob3)
+ get_transaction().commit()
+
+if __name__ == '__main__':
+ unittest.main()
=== Packages/pypes/pypes/tests/test_identity.py 1.5 => 1.6 ===
--- Packages/pypes/pypes/tests/test_identity.py:1.5 Sun Feb 8 22:56:22 2004
+++ Packages/pypes/pypes/tests/test_identity.py Mon Feb 9 16:14:58 2004
@@ -1,637 +1,637 @@
-##############################################################################
-#
-# Copyright (c) 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Identity service tests
-
-$Id$"""
-
-import ZODB
-from ZODB.POSException import ConflictError
-import unittest
-from random import randint
-from zope.interface.verify import verifyObject
-from persistent import Persistent
-from BTrees.OOBTree import OOTreeSet, difference
-from types import TypeType
-from common \
- import PypesTestCase, TestClass, PersistenceTest, PypesPersistentTest
-
-class TestIdentityService(unittest.TestCase):
-
- def setUp(self):
- from pypes.identity import IdentityService
- self.identity = IdentityService()
-
- def testInterfaces(self):
- from pypes.interfaces import IPersistentService, IIdentityService
- self.failUnless(verifyObject(IPersistentService, self.identity))
- self.failUnless(verifyObject(IIdentityService, self.identity))
-
- def testIdTypeReturnsType(self):
- self.failUnless(isinstance(self.identity.idType(), TypeType))
-
- def testIdIsCorrectType(self):
- ident = self.identity.register(TestClass())
- self.failUnless(isinstance(ident, self.identity.idType()))
-
- def testIdIsTrue(self):
- self.failUnless(self.identity.register(TestClass()))
-
- def testIdsUniqueAndOrderable(self):
- ids = OOTreeSet()
- register = self.identity.register
- for i in xrange(1000):
- self.failUnless(ids.insert(register(TestClass())))
- self.assertEqual(len(ids), 1000)
-
- def testRegisterFailure(self):
- from pypes.exceptions import IdentityError
- self.assertRaises(
- IdentityError, self.identity.register, None)
-
- def testDuplicateRegistrationFails(self):
- from pypes.exceptions import IdentityError
- o = TestClass()
- self.identity.register(o)
- self.assertRaises(
- IdentityError, self.identity.register, o)
-
- def testHasId(self):
- ids = []
- register = self.identity.register
- for i in xrange(1000):
- ids.append(register(TestClass()))
- ids.sort()
- hasId = self.identity.hasId
- for i in ids:
- self.failUnless(hasId(i))
-
- def testNotHasId(self):
- from pypes.identity import IdentityService
- altidentity = IdentityService()
- register = self.identity.register
- ids = OOTreeSet()
- for i in xrange(500):
- ids.insert(register(TestClass()))
- hasId = self.identity.hasId
- for i in xrange(500):
- ident = altidentity.register(TestClass())
- self.failUnless(
- bool(hasId(ident)) == bool(ids.has_key(ident)))
- self.failIf(hasId(0))
-
- def testHasIdWrongType(self):
- self.assertRaises(TypeError, self.identity.hasId, None)
- self.assertRaises(TypeError, self.identity.hasId, TestClass)
- self.assertRaises(TypeError, self.identity.hasId, TestClass())
-
- def testContains(self):
- objs = []
- register = self.identity.register
- for i in xrange(1000):
- o = TestClass()
- objs.append(o)
- register(o)
- while objs:
- o = objs.pop(randint(0, len(objs) - 1))
- self.failUnless(o in self.identity)
-
- def testNotContains(self):
- register = self.identity.register
- for i in xrange(333):
- self.failIf(TestClass() in self.identity)
- for i in xrange(333):
- register(TestClass())
- for i in xrange(333):
- self.failIf(TestClass() in self.identity)
-
- def testRemoveId(self):
- from pypes.exceptions import IdentityKeyError
- from pypes.identity import pypesid
- o = TestClass()
- self.failIf(o in self.identity)
- ident = self.identity.register(o)
- self.failUnless(o in self.identity)
- self.identity.removeId(ident)
- self.failIf(o in self.identity)
- self.assertRaises(IdentityKeyError, self.identity.removeId, ident)
- self.failUnless(pypesid(o) is None)
-
- def testRemoveObject(self):
- from pypes.exceptions import IdentityError
- from pypes.identity import pypesid
- o = TestClass()
- self.failIf(o in self.identity)
- self.identity.register(o)
- self.failUnless(o in self.identity)
- self.identity.remove(o)
- self.failIf(o in self.identity)
- self.assertRaises(IdentityError, self.identity.remove, o)
- self.failUnless(pypesid(o) is None)
-
- def testGetObject(self):
- ids = []
- objs = {}
- register = self.identity.register
- for i in xrange(1000):
- o = TestClass()
- ident = register(o)
- ids.append(ident)
- objs[ident] = o
- o.id = ident
- ids.sort()
- getObject = self.identity.getObject
- for ident in ids:
- self.assertEqual(getObject(ident).id, ident)
- self.failUnless(getObject(ident) is objs[ident])
-
- def testGetObjectFails(self):
- from pypes.identity import IdentityService
- from pypes.exceptions import IdentityKeyError
- altidentity = IdentityService()
- ident = self.identity.register(TestClass())
- badident = altidentity.register(TestClass())
- while ident == badident:
- badident = altidentity.register(TestClass())
- self.assertRaises(IdentityKeyError, self.identity.getObject, badident)
-
- def testLen(self):
- self.failIf(self.identity)
- ident = self.identity.register(TestClass())
- self.assertEqual(len(self.identity), 1)
- self.identity.removeId(ident)
- self.assertEqual(len(self.identity), 0)
- for i in xrange(100):
- self.identity.register(TestClass())
- self.assertEqual(len(self.identity), i + 1)
-
- def testIterate(self):
- objs = {}
- register = self.identity.register
- for i in xrange(1000):
- o = TestClass()
- objs[o] = register(o)
- for o in self.identity:
- del objs[o]
- self.failIf(objs)
-
- def testIterateIds(self):
- ids = {}
- register = self.identity.register
- for i in xrange(1000):
- ids[register(TestClass())] = None
- for i in self.identity.iterIds():
- del ids[i]
- self.failIf(ids)
-
- def testIdSet(self):
- self.failIf(self.identity.idSet())
- ids = OOTreeSet()
- register = self.identity.register
- for i in xrange(100):
- ids.insert(register(TestClass()))
- self.failIf(difference(OOTreeSet(self.identity.idSet()), ids))
-
-class TestListener(object):
-
- def __init__(self):
- self.received = []
-
- def receive(self, msg):
- self.received.append(msg)
-
-class TestIdentityServiceEvents(PypesTestCase):
-
- def testIdRegisteredEvent(self):
- from pypes import services
- from pypes.identity import pypesid, IdRegisteredMessage
- event = services.event(self.conn)
- tl = TestListener()
- event.registerListener(tl, 'receive', IdRegisteredMessage)
- o = self._newObj()
- msg, = tl.received
- self.failUnless(msg.object is o)
- self.failUnless(msg.id is pypesid(o))
-
- def testIdRegisteredEventMultiple(self):
- from pypes import services
- from pypes.identity import IdRegisteredMessage
- event = services.event(self.conn)
- tl = TestListener()
- event.registerListener(tl, 'receive', IdRegisteredMessage)
- objs = [self._newObj() for i in range(10)]
- self.assertEqual(len(tl.received), 10)
- for o in objs:
- self.identity.remove(o)
- self.assertEqual(len(tl.received), 10)
-
- def testIdUnregisteredEvent(self):
- from pypes import services
- from pypes.identity import pypesid, IdUnregisteredMessage
- event = services.event(self.conn)
- tl = TestListener()
- event.registerListener(tl, 'receive', IdUnregisteredMessage)
- o = self._newObj()
- id = pypesid(o)
- self.assertEqual(len(tl.received), 0)
- self.identity.remove(o)
- msg, = tl.received
- self.failUnless(msg.object is o)
- self.failUnless(msg.id is id)
-
- def testIdUnregisteredEventMultiple(self):
- from pypes import services
- from pypes.identity import IdUnregisteredMessage
- event = services.event(self.conn)
- tl = TestListener()
- event.registerListener(tl, 'receive', IdUnregisteredMessage)
- objs = [self._newObj() for i in range(10)]
- self.assertEqual(len(tl.received), 0)
- for o in objs:
- self.identity.remove(o)
- self.assertEqual(len(tl.received), 10)
-
- def testListenForIdEventsFunc(self):
- from pypes import services
- from pypes.identity import listenForIdEvents
- from pypes.identity import IdRegisteredMessage, IdUnregisteredMessage
- event = services.event(self.conn)
- tl = TestListener()
- listenForIdEvents(tl, self.conn, 'receive', 'receive')
- self.assertEqual(len(tl.received), 0)
- obj = self._newObj()
- self.assertEqual(len(tl.received), 1)
- self.identity.remove(obj)
- msg1, msg2 = tl.received
- self.failUnless(msg1.object is obj)
- self.failUnless(isinstance(msg1, IdRegisteredMessage))
- self.failUnless(msg2.object is obj)
- self.failUnless(isinstance(msg2, IdUnregisteredMessage))
-
-class TestIdentityServicePersistence(PersistenceTest):
-
- def testBasicPersistence(self):
- from pypes.identity import IdentityService, pypesid
- r1 = self.db.open().root()
- ids = r1['t'] = IdentityService()
- obj = TestClass()
- obj.name = 'test'
- objid = ids.register(obj)
- get_transaction().commit()
-
- r2 = self.db.open().root()
- ids2 = r2['t']
- obj = ids2.getObject(objid)
- self.assertEqual(ids._p_serial, ids._p_serial)
- self.failUnless(isinstance(obj, TestClass))
- self.assertEqual(obj.name, 'test')
- self.assertEqual(pypesid(obj), objid)
-
- def testConcurrentRegistration(self):
- from pypes.identity import IdentityService
- r1 = self.db.open().root()
- ids1 = r1['t'] = IdentityService()
- get_transaction().commit()
-
- r2 = self.db.open().root()
- ids2 = r2['t']
- list(ids2); list(ids2.iterIds()) # Make sure its loaded
-
- ids1.register(TestClass())
- get_transaction().commit()
-
- ids2.register(TestClass())
- get_transaction().commit()
-
-class TestIdentitySet(PypesTestCase):
-
- def setUp(self):
- from pypes.identity import IdentitySet
- PypesTestCase.setUp(self)
- self.set = IdentitySet()
-
- def testInterfaces(self):
- from pypes.interfaces import IIdentitySet
- self.failUnless(verifyObject(IIdentitySet, self.set))
-
- def testFromUniqueSequence(self):
- from pypes.identity import IdentitySet
- objs = [self._newObj() for i in xrange(100)]
- set = IdentitySet(objs)
- for o in objs:
- self.failUnless(o in set)
-
- def testFromNonUniqueSequence(self):
- from pypes.identity import IdentitySet
- objs = [self._newObj() for i in xrange(10)]
- objs.extend(objs)
- del objs[7:13]
- set = IdentitySet(objs)
- seen = {}
- for o in objs:
- self.failIf(o not in set)
- seen[o] = None
- seen = seen.keys()
- seen.sort()
- set = list(set)
- set.sort()
- self.assertEqual(seen, set)
-
- def testAddUnique(self):
- objs = []
- for i in xrange(100):
- o = self._newObj()
- self.failUnless(self.set.add(o))
- objs.append(o)
- for o in objs:
- self.failUnless(o in self.set)
- self.assertEqual(len(objs), len(self.set))
-
- def testAddDupes(self):
- for i in xrange(100):
- o = self._newObj()
- self.failUnless(self.set.add(o))
- self.failIf(self.set.add(o))
- self.failUnless(o in self.set)
- self.assertEqual(len(self.set), 100)
-
- def testInsertUnregistered(self):
- from pypes.exceptions import IdentityError
- self.assertRaises(IdentityError, self.set.add, TestClass())
-
- def testRemove(self):
- objs = []
- for i in xrange(100):
- o = self._newObj()
- self.set.add(o)
- objs.append(o)
- while objs:
- o = objs.pop()
- self.set.remove(o)
- self.failIf(o in self.set)
- self.assertEqual(len(objs), len(self.set))
-
- def testRemoveNonMember(self):
- from pypes.exceptions import SetLookupError
- self.assertRaises(SetLookupError, self.set.remove, self._newObj())
-
- def testUpdate(self):
- objs = [self._newObj() for i in xrange(100)]
- self.set.update(objs)
- for o in objs:
- self.failUnless(o in self.set)
- objs2 = [self._newObj() for i in xrange(100)]
- self.set.update(objs2)
- for o in objs + objs2:
- self.failUnless(o in self.set)
-
- def testUpdateDupes(self):
- objs = [self._newObj() for i in xrange(100)]
- self.set.update(objs)
- objs += [self._newObj() for i in xrange(100)]
- objs.sort()
- self.set.update(objs)
- #self.assertEqual(len(self.set), 20)
-
- def testUpdateUnregistered(self):
- from pypes.exceptions import IdentityError
- objs = [self._newObj() for i in xrange(10)]
- objs += [TestClass() for i in xrange(10)]
- self.assertRaises(IdentityError, self.set.update, objs)
-
- def testIter(self):
- objs = {}
- for i in xrange(100):
- o = self._newObj()
- objs[o] = None
- self.set.update(objs.keys())
- for o in self.set:
- del objs[o]
- self.failIf(objs)
-
- def testIterUnregisteredYieldsNone(self):
- from pypes.services import identity
- o = self._newObj()
- self.set.add(o)
- identity(self.conn).remove(o)
- self.failUnless(iter(self.set).next() is None)
-
- def testUnion(self):
- from pypes.identity import IdentitySet
- objs1 = [self._newObj() for i in xrange(10)]
- objs2 = [self._newObj() for i in xrange(10)]
- all = objs1 + objs2
- objs2 += objs1[:3]
- objs1 += objs2[:3]
- set1 = IdentitySet(objs1)
- set2 = IdentitySet(objs2)
- union = set1.union(set2)
- self.failIf(union is set1)
- self.failIf(union is set2)
- self.failUnless(len(union), 20)
- for o in all:
- self.failUnless(o in union)
-
- def testOrUnion(self):
- from pypes.identity import IdentitySet
- objs1 = [self._newObj() for i in xrange(10)]
- objs2 = [self._newObj() for i in xrange(10)]
- expected = objs1 + objs2
- objs2 += objs1[:3]
- objs1 += objs2[:3]
- set1 = IdentitySet(objs1)
- set2 = IdentitySet(objs2)
- union = set1 | set2
- self.failIf(union is set1)
- self.failIf(union is set2)
- self.failUnless(len(union), len(expected))
- for o in expected:
- self.failUnless(o in union)
-
- def testDifference(self):
- from pypes.identity import IdentitySet
- objs1 = [self._newObj() for i in xrange(10)]
- objs2 = [self._newObj() for i in xrange(10)]
- objs2 += objs1[-3:]
- expected = objs1[:-3]
- set1 = IdentitySet(objs1)
- set2 = IdentitySet(objs2)
- diff = set1.difference(set2)
- self.failIf(diff is set1)
- self.failIf(diff is set2)
- self.failUnless(len(diff), len(expected))
- for o in expected:
- self.failUnless(o in diff)
-
- def testMinusDifference(self):
- from pypes.identity import IdentitySet
- objs1 = [self._newObj() for i in xrange(10)]
- objs2 = [self._newObj() for i in xrange(10)]
- objs2 += objs1[-3:]
- expected = objs1[:-3]
- set1 = IdentitySet(objs1)
- set2 = IdentitySet(objs2)
- diff = set1 - set2
- self.failIf(diff is set1)
- self.failIf(diff is set2)
- self.failUnless(len(diff), len(expected))
- for o in expected:
- self.failUnless(o in diff)
-
- def testDiffNoOverlap(self):
- from pypes.identity import IdentitySet
- set1 = IdentitySet([self._newObj() for i in xrange(10)])
- set2 = IdentitySet([self._newObj() for i in xrange(10)])
- self.failUnless(set1.difference(set2) == set1)
-
- def testIntersection(self):
- from pypes.identity import IdentitySet
- objs1 = [self._newObj() for i in xrange(10)]
- objs2 = [self._newObj() for i in xrange(10)]
- objs2 += objs1[-4:]
- expected = objs1[-4:]
- set1 = IdentitySet(objs1)
- set2 = IdentitySet(objs2)
- sect = set1.intersection(set2)
- self.failIf(sect is set1)
- self.failIf(sect is set2)
- self.failUnless(len(sect), len(expected))
- for o in expected:
- self.failUnless(o in sect)
-
- def testAndIntersection(self):
- from pypes.identity import IdentitySet
- objs1 = [self._newObj() for i in xrange(10)]
- objs2 = [self._newObj() for i in xrange(10)]
- objs2 += objs1[-4:]
- expected = objs1[-4:]
- set1 = IdentitySet(objs1)
- set2 = IdentitySet(objs2)
- sect = set1 & set2
- self.failIf(sect is set1)
- self.failIf(sect is set2)
- self.failUnless(len(sect), len(expected))
- for o in expected:
- self.failUnless(o in sect)
-
- def testEmptyIntersection(self):
- from pypes.identity import IdentitySet
- set1 = IdentitySet([self._newObj() for i in xrange(10)])
- set2 = IdentitySet([self._newObj() for i in xrange(10)])
- self.failIf(set1.intersection(set2))
-
- def testIsSuperAndSubset(self):
- from pypes.identity import IdentitySet
- objs1 = [self._newObj() for i in xrange(10)]
- objs2 = objs1[2:-2]
- set1 = IdentitySet(objs1)
- set2 = IdentitySet(objs2)
- self.failUnless(set2.issubset(set1))
- self.failIf(set1.issubset(set2))
- self.failUnless(set1.issuperset(set2))
- self.failIf(set2.issuperset(set1))
-
- def testIsNotSuperAndSubset(self):
- from pypes.identity import IdentitySet
- objs1 = [self._newObj() for i in xrange(10)]
- objs2 = [self._newObj() for i in xrange(10)]
- objs2 += objs1[2:-2]
- set1 = IdentitySet(objs1)
- set2 = IdentitySet(objs2)
- self.failIf(set2.issubset(set1))
- self.failIf(set1.issubset(set2))
- self.failIf(set1.issuperset(set2))
- self.failIf(set2.issuperset(set1))
-
- def testEquivilanceIsSuperAndSubset(self):
- from pypes.identity import IdentitySet
- objs = [self._newObj() for i in xrange(10)]
- set1 = IdentitySet(objs)
- set2 = IdentitySet(objs)
- self.failUnless(set1.issubset(set2))
- self.failUnless(set2.issubset(set1))
- self.failUnless(set1.issuperset(set2))
- self.failUnless(set2.issuperset(set1))
- self.assertEqual(set1, set2)
-
-class TestIdentitySetPersistence(PypesPersistentTest):
-
- def testSetPersistence(self):
- from pypes.identity import IdentitySet
- self.root['objs'] = [self._newObj() for i in range(10)]
- set = IdentitySet(self.root['objs'])
- self.root['set'] = set
- get_transaction().commit()
-
- r2 = self.db.open().root()
- for obj in r2['objs']:
- self.failUnless(obj in r2['set'])
-
- def testConcurrentAdd(self):
- from pypes.identity import IdentitySet
-
- set1 = self.root['set'] = IdentitySet()
- get_transaction().commit()
-
- r2 = self.db.open().root()
- set2 = r2['set']
- list(set2) # Make sure its loaded
-
- set1.add(self._newObj())
- get_transaction().commit()
-
- set2.add(self._newObj())
- get_transaction().commit()
-
- set3 = self.db.open().root()['set']
- self.assertEqual(len(set3), 2)
-
- def testDuplicateAddFails(self):
- from pypes.identity import IdentitySet
- set1 = self.root['set'] = IdentitySet()
- get_transaction().commit()
-
- r2 = self.db.open().root()
- set2 = r2['set']
- list(set2) # Make sure its loaded
-
- obj = self._newObj()
- set1.add(obj)
- get_transaction().commit()
-
- set2.add(obj)
- self.assertRaises(ConflictError, get_transaction().commit)
-
- set3 = self.db.open().root()['set']
- self.assertEqual(len(set3), 1)
-
- def testConcurrentUpdate(self):
- from pypes.identity import IdentitySet
- set1 = self.root['set'] = IdentitySet()
- get_transaction().commit()
-
- r2 = self.db.open().root()
- set2 = r2['set']
- list(set2) # Make sure its loaded
-
- set1.update([self._newObj() for i in range(10)])
- get_transaction().commit()
-
- set2.update([self._newObj() for i in range(10)])
- get_transaction().commit()
-
- set3 = self.db.open().root()['set']
- self.assertEqual(len(set3), 20)
-
-if __name__ == '__main__':
- unittest.main()
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Identity service tests
+
+$Id$"""
+
+import ZODB
+from ZODB.POSException import ConflictError
+import unittest
+from random import randint
+from zope.interface.verify import verifyObject
+from persistent import Persistent
+from BTrees.OOBTree import OOTreeSet, difference
+from types import TypeType
+from common \
+ import PypesTestCase, TestClass, PersistenceTest, PypesPersistentTest
+
+class TestIdentityService(unittest.TestCase):
+
+ def setUp(self):
+ from pypes.identity import IdentityService
+ self.identity = IdentityService()
+
+ def testInterfaces(self):
+ from pypes.interfaces import IPersistentService, IIdentityService
+ self.failUnless(verifyObject(IPersistentService, self.identity))
+ self.failUnless(verifyObject(IIdentityService, self.identity))
+
+ def testIdTypeReturnsType(self):
+ self.failUnless(isinstance(self.identity.idType(), TypeType))
+
+ def testIdIsCorrectType(self):
+ ident = self.identity.register(TestClass())
+ self.failUnless(isinstance(ident, self.identity.idType()))
+
+ def testIdIsTrue(self):
+ self.failUnless(self.identity.register(TestClass()))
+
+ def testIdsUniqueAndOrderable(self):
+ ids = OOTreeSet()
+ register = self.identity.register
+ for i in xrange(1000):
+ self.failUnless(ids.insert(register(TestClass())))
+ self.assertEqual(len(ids), 1000)
+
+ def testRegisterFailure(self):
+ from pypes.exceptions import IdentityError
+ self.assertRaises(
+ IdentityError, self.identity.register, None)
+
+ def testDuplicateRegistrationFails(self):
+ from pypes.exceptions import IdentityError
+ o = TestClass()
+ self.identity.register(o)
+ self.assertRaises(
+ IdentityError, self.identity.register, o)
+
+ def testHasId(self):
+ ids = []
+ register = self.identity.register
+ for i in xrange(1000):
+ ids.append(register(TestClass()))
+ ids.sort()
+ hasId = self.identity.hasId
+ for i in ids:
+ self.failUnless(hasId(i))
+
+ def testNotHasId(self):
+ from pypes.identity import IdentityService
+ altidentity = IdentityService()
+ register = self.identity.register
+ ids = OOTreeSet()
+ for i in xrange(500):
+ ids.insert(register(TestClass()))
+ hasId = self.identity.hasId
+ for i in xrange(500):
+ ident = altidentity.register(TestClass())
+ self.failUnless(
+ bool(hasId(ident)) == bool(ids.has_key(ident)))
+ self.failIf(hasId(0))
+
+ def testHasIdWrongType(self):
+ self.assertRaises(TypeError, self.identity.hasId, None)
+ self.assertRaises(TypeError, self.identity.hasId, TestClass)
+ self.assertRaises(TypeError, self.identity.hasId, TestClass())
+
+ def testContains(self):
+ objs = []
+ register = self.identity.register
+ for i in xrange(1000):
+ o = TestClass()
+ objs.append(o)
+ register(o)
+ while objs:
+ o = objs.pop(randint(0, len(objs) - 1))
+ self.failUnless(o in self.identity)
+
+ def testNotContains(self):
+ register = self.identity.register
+ for i in xrange(333):
+ self.failIf(TestClass() in self.identity)
+ for i in xrange(333):
+ register(TestClass())
+ for i in xrange(333):
+ self.failIf(TestClass() in self.identity)
+
+ def testRemoveId(self):
+ from pypes.exceptions import IdentityKeyError
+ from pypes.identity import pypesid
+ o = TestClass()
+ self.failIf(o in self.identity)
+ ident = self.identity.register(o)
+ self.failUnless(o in self.identity)
+ self.identity.removeId(ident)
+ self.failIf(o in self.identity)
+ self.assertRaises(IdentityKeyError, self.identity.removeId, ident)
+ self.failUnless(pypesid(o) is None)
+
+ def testRemoveObject(self):
+ from pypes.exceptions import IdentityError
+ from pypes.identity import pypesid
+ o = TestClass()
+ self.failIf(o in self.identity)
+ self.identity.register(o)
+ self.failUnless(o in self.identity)
+ self.identity.remove(o)
+ self.failIf(o in self.identity)
+ self.assertRaises(IdentityError, self.identity.remove, o)
+ self.failUnless(pypesid(o) is None)
+
+ def testGetObject(self):
+ ids = []
+ objs = {}
+ register = self.identity.register
+ for i in xrange(1000):
+ o = TestClass()
+ ident = register(o)
+ ids.append(ident)
+ objs[ident] = o
+ o.id = ident
+ ids.sort()
+ getObject = self.identity.getObject
+ for ident in ids:
+ self.assertEqual(getObject(ident).id, ident)
+ self.failUnless(getObject(ident) is objs[ident])
+
+ def testGetObjectFails(self):
+ from pypes.identity import IdentityService
+ from pypes.exceptions import IdentityKeyError
+ altidentity = IdentityService()
+ ident = self.identity.register(TestClass())
+ badident = altidentity.register(TestClass())
+ while ident == badident:
+ badident = altidentity.register(TestClass())
+ self.assertRaises(IdentityKeyError, self.identity.getObject, badident)
+
+ def testLen(self):
+ self.failIf(self.identity)
+ ident = self.identity.register(TestClass())
+ self.assertEqual(len(self.identity), 1)
+ self.identity.removeId(ident)
+ self.assertEqual(len(self.identity), 0)
+ for i in xrange(100):
+ self.identity.register(TestClass())
+ self.assertEqual(len(self.identity), i + 1)
+
+ def testIterate(self):
+ objs = {}
+ register = self.identity.register
+ for i in xrange(1000):
+ o = TestClass()
+ objs[o] = register(o)
+ for o in self.identity:
+ del objs[o]
+ self.failIf(objs)
+
+ def testIterateIds(self):
+ ids = {}
+ register = self.identity.register
+ for i in xrange(1000):
+ ids[register(TestClass())] = None
+ for i in self.identity.iterIds():
+ del ids[i]
+ self.failIf(ids)
+
+ def testIdSet(self):
+ self.failIf(self.identity.idSet())
+ ids = OOTreeSet()
+ register = self.identity.register
+ for i in xrange(100):
+ ids.insert(register(TestClass()))
+ self.failIf(difference(OOTreeSet(self.identity.idSet()), ids))
+
+class TestListener(object):
+
+ def __init__(self):
+ self.received = []
+
+ def receive(self, msg):
+ self.received.append(msg)
+
+class TestIdentityServiceEvents(PypesTestCase):
+
+ def testIdRegisteredEvent(self):
+ from pypes import services
+ from pypes.identity import pypesid, IdRegisteredMessage
+ event = services.event(self.conn)
+ tl = TestListener()
+ event.registerListener(tl, 'receive', IdRegisteredMessage)
+ o = self._newObj()
+ msg, = tl.received
+ self.failUnless(msg.object is o)
+ self.failUnless(msg.id is pypesid(o))
+
+ def testIdRegisteredEventMultiple(self):
+ from pypes import services
+ from pypes.identity import IdRegisteredMessage
+ event = services.event(self.conn)
+ tl = TestListener()
+ event.registerListener(tl, 'receive', IdRegisteredMessage)
+ objs = [self._newObj() for i in range(10)]
+ self.assertEqual(len(tl.received), 10)
+ for o in objs:
+ self.identity.remove(o)
+ self.assertEqual(len(tl.received), 10)
+
+ def testIdUnregisteredEvent(self):
+ from pypes import services
+ from pypes.identity import pypesid, IdUnregisteredMessage
+ event = services.event(self.conn)
+ tl = TestListener()
+ event.registerListener(tl, 'receive', IdUnregisteredMessage)
+ o = self._newObj()
+ id = pypesid(o)
+ self.assertEqual(len(tl.received), 0)
+ self.identity.remove(o)
+ msg, = tl.received
+ self.failUnless(msg.object is o)
+ self.failUnless(msg.id is id)
+
+ def testIdUnregisteredEventMultiple(self):
+ from pypes import services
+ from pypes.identity import IdUnregisteredMessage
+ event = services.event(self.conn)
+ tl = TestListener()
+ event.registerListener(tl, 'receive', IdUnregisteredMessage)
+ objs = [self._newObj() for i in range(10)]
+ self.assertEqual(len(tl.received), 0)
+ for o in objs:
+ self.identity.remove(o)
+ self.assertEqual(len(tl.received), 10)
+
+ def testListenForIdEventsFunc(self):
+ from pypes import services
+ from pypes.identity import listenForIdEvents
+ from pypes.identity import IdRegisteredMessage, IdUnregisteredMessage
+ event = services.event(self.conn)
+ tl = TestListener()
+ listenForIdEvents(tl, self.conn, 'receive', 'receive')
+ self.assertEqual(len(tl.received), 0)
+ obj = self._newObj()
+ self.assertEqual(len(tl.received), 1)
+ self.identity.remove(obj)
+ msg1, msg2 = tl.received
+ self.failUnless(msg1.object is obj)
+ self.failUnless(isinstance(msg1, IdRegisteredMessage))
+ self.failUnless(msg2.object is obj)
+ self.failUnless(isinstance(msg2, IdUnregisteredMessage))
+
+class TestIdentityServicePersistence(PersistenceTest):
+
+ def testBasicPersistence(self):
+ from pypes.identity import IdentityService, pypesid
+ r1 = self.db.open().root()
+ ids = r1['t'] = IdentityService()
+ obj = TestClass()
+ obj.name = 'test'
+ objid = ids.register(obj)
+ get_transaction().commit()
+
+ r2 = self.db.open().root()
+ ids2 = r2['t']
+ obj = ids2.getObject(objid)
+ self.assertEqual(ids._p_serial, ids._p_serial)
+ self.failUnless(isinstance(obj, TestClass))
+ self.assertEqual(obj.name, 'test')
+ self.assertEqual(pypesid(obj), objid)
+
+ def testConcurrentRegistration(self):
+ from pypes.identity import IdentityService
+ r1 = self.db.open().root()
+ ids1 = r1['t'] = IdentityService()
+ get_transaction().commit()
+
+ r2 = self.db.open().root()
+ ids2 = r2['t']
+ list(ids2); list(ids2.iterIds()) # Make sure its loaded
+
+ ids1.register(TestClass())
+ get_transaction().commit()
+
+ ids2.register(TestClass())
+ get_transaction().commit()
+
+class TestIdentitySet(PypesTestCase):
+
+ def setUp(self):
+ from pypes.identity import IdentitySet
+ PypesTestCase.setUp(self)
+ self.set = IdentitySet()
+
+ def testInterfaces(self):
+ from pypes.interfaces import IIdentitySet
+ self.failUnless(verifyObject(IIdentitySet, self.set))
+
+ def testFromUniqueSequence(self):
+ from pypes.identity import IdentitySet
+ objs = [self._newObj() for i in xrange(100)]
+ set = IdentitySet(objs)
+ for o in objs:
+ self.failUnless(o in set)
+
+ def testFromNonUniqueSequence(self):
+ from pypes.identity import IdentitySet
+ objs = [self._newObj() for i in xrange(10)]
+ objs.extend(objs)
+ del objs[7:13]
+ set = IdentitySet(objs)
+ seen = {}
+ for o in objs:
+ self.failIf(o not in set)
+ seen[o] = None
+ seen = seen.keys()
+ seen.sort()
+ set = list(set)
+ set.sort()
+ self.assertEqual(seen, set)
+
+ def testAddUnique(self):
+ objs = []
+ for i in xrange(100):
+ o = self._newObj()
+ self.failUnless(self.set.add(o))
+ objs.append(o)
+ for o in objs:
+ self.failUnless(o in self.set)
+ self.assertEqual(len(objs), len(self.set))
+
+ def testAddDupes(self):
+ for i in xrange(100):
+ o = self._newObj()
+ self.failUnless(self.set.add(o))
+ self.failIf(self.set.add(o))
+ self.failUnless(o in self.set)
+ self.assertEqual(len(self.set), 100)
+
+ def testInsertUnregistered(self):
+ from pypes.exceptions import IdentityError
+ self.assertRaises(IdentityError, self.set.add, TestClass())
+
+ def testRemove(self):
+ objs = []
+ for i in xrange(100):
+ o = self._newObj()
+ self.set.add(o)
+ objs.append(o)
+ while objs:
+ o = objs.pop()
+ self.set.remove(o)
+ self.failIf(o in self.set)
+ self.assertEqual(len(objs), len(self.set))
+
+ def testRemoveNonMember(self):
+ from pypes.exceptions import SetLookupError
+ self.assertRaises(SetLookupError, self.set.remove, self._newObj())
+
+ def testUpdate(self):
+ objs = [self._newObj() for i in xrange(100)]
+ self.set.update(objs)
+ for o in objs:
+ self.failUnless(o in self.set)
+ objs2 = [self._newObj() for i in xrange(100)]
+ self.set.update(objs2)
+ for o in objs + objs2:
+ self.failUnless(o in self.set)
+
+ def testUpdateDupes(self):
+ objs = [self._newObj() for i in xrange(100)]
+ self.set.update(objs)
+ objs += [self._newObj() for i in xrange(100)]
+ objs.sort()
+ self.set.update(objs)
+ #self.assertEqual(len(self.set), 20)
+
+ def testUpdateUnregistered(self):
+ from pypes.exceptions import IdentityError
+ objs = [self._newObj() for i in xrange(10)]
+ objs += [TestClass() for i in xrange(10)]
+ self.assertRaises(IdentityError, self.set.update, objs)
+
+ def testIter(self):
+ objs = {}
+ for i in xrange(100):
+ o = self._newObj()
+ objs[o] = None
+ self.set.update(objs.keys())
+ for o in self.set:
+ del objs[o]
+ self.failIf(objs)
+
+ def testIterUnregisteredYieldsNone(self):
+ from pypes.services import identity
+ o = self._newObj()
+ self.set.add(o)
+ identity(self.conn).remove(o)
+ self.failUnless(iter(self.set).next() is None)
+
+ def testUnion(self):
+ from pypes.identity import IdentitySet
+ objs1 = [self._newObj() for i in xrange(10)]
+ objs2 = [self._newObj() for i in xrange(10)]
+ all = objs1 + objs2
+ objs2 += objs1[:3]
+ objs1 += objs2[:3]
+ set1 = IdentitySet(objs1)
+ set2 = IdentitySet(objs2)
+ union = set1.union(set2)
+ self.failIf(union is set1)
+ self.failIf(union is set2)
+ self.failUnless(len(union), 20)
+ for o in all:
+ self.failUnless(o in union)
+
+ def testOrUnion(self):
+ from pypes.identity import IdentitySet
+ objs1 = [self._newObj() for i in xrange(10)]
+ objs2 = [self._newObj() for i in xrange(10)]
+ expected = objs1 + objs2
+ objs2 += objs1[:3]
+ objs1 += objs2[:3]
+ set1 = IdentitySet(objs1)
+ set2 = IdentitySet(objs2)
+ union = set1 | set2
+ self.failIf(union is set1)
+ self.failIf(union is set2)
+ self.failUnless(len(union), len(expected))
+ for o in expected:
+ self.failUnless(o in union)
+
+ def testDifference(self):
+ from pypes.identity import IdentitySet
+ objs1 = [self._newObj() for i in xrange(10)]
+ objs2 = [self._newObj() for i in xrange(10)]
+ objs2 += objs1[-3:]
+ expected = objs1[:-3]
+ set1 = IdentitySet(objs1)
+ set2 = IdentitySet(objs2)
+ diff = set1.difference(set2)
+ self.failIf(diff is set1)
+ self.failIf(diff is set2)
+ self.failUnless(len(diff), len(expected))
+ for o in expected:
+ self.failUnless(o in diff)
+
+ def testMinusDifference(self):
+ from pypes.identity import IdentitySet
+ objs1 = [self._newObj() for i in xrange(10)]
+ objs2 = [self._newObj() for i in xrange(10)]
+ objs2 += objs1[-3:]
+ expected = objs1[:-3]
+ set1 = IdentitySet(objs1)
+ set2 = IdentitySet(objs2)
+ diff = set1 - set2
+ self.failIf(diff is set1)
+ self.failIf(diff is set2)
+ self.failUnless(len(diff), len(expected))
+ for o in expected:
+ self.failUnless(o in diff)
+
+ def testDiffNoOverlap(self):
+ from pypes.identity import IdentitySet
+ set1 = IdentitySet([self._newObj() for i in xrange(10)])
+ set2 = IdentitySet([self._newObj() for i in xrange(10)])
+ self.failUnless(set1.difference(set2) == set1)
+
+ def testIntersection(self):
+ from pypes.identity import IdentitySet
+ objs1 = [self._newObj() for i in xrange(10)]
+ objs2 = [self._newObj() for i in xrange(10)]
+ objs2 += objs1[-4:]
+ expected = objs1[-4:]
+ set1 = IdentitySet(objs1)
+ set2 = IdentitySet(objs2)
+ sect = set1.intersection(set2)
+ self.failIf(sect is set1)
+ self.failIf(sect is set2)
+ self.failUnless(len(sect), len(expected))
+ for o in expected:
+ self.failUnless(o in sect)
+
+ def testAndIntersection(self):
+ from pypes.identity import IdentitySet
+ objs1 = [self._newObj() for i in xrange(10)]
+ objs2 = [self._newObj() for i in xrange(10)]
+ objs2 += objs1[-4:]
+ expected = objs1[-4:]
+ set1 = IdentitySet(objs1)
+ set2 = IdentitySet(objs2)
+ sect = set1 & set2
+ self.failIf(sect is set1)
+ self.failIf(sect is set2)
+ self.failUnless(len(sect), len(expected))
+ for o in expected:
+ self.failUnless(o in sect)
+
+ def testEmptyIntersection(self):
+ from pypes.identity import IdentitySet
+ set1 = IdentitySet([self._newObj() for i in xrange(10)])
+ set2 = IdentitySet([self._newObj() for i in xrange(10)])
+ self.failIf(set1.intersection(set2))
+
+ def testIsSuperAndSubset(self):
+ from pypes.identity import IdentitySet
+ objs1 = [self._newObj() for i in xrange(10)]
+ objs2 = objs1[2:-2]
+ set1 = IdentitySet(objs1)
+ set2 = IdentitySet(objs2)
+ self.failUnless(set2.issubset(set1))
+ self.failIf(set1.issubset(set2))
+ self.failUnless(set1.issuperset(set2))
+ self.failIf(set2.issuperset(set1))
+
+ def testIsNotSuperAndSubset(self):
+ from pypes.identity import IdentitySet
+ objs1 = [self._newObj() for i in xrange(10)]
+ objs2 = [self._newObj() for i in xrange(10)]
+ objs2 += objs1[2:-2]
+ set1 = IdentitySet(objs1)
+ set2 = IdentitySet(objs2)
+ self.failIf(set2.issubset(set1))
+ self.failIf(set1.issubset(set2))
+ self.failIf(set1.issuperset(set2))
+ self.failIf(set2.issuperset(set1))
+
+ def testEquivilanceIsSuperAndSubset(self):
+ from pypes.identity import IdentitySet
+ objs = [self._newObj() for i in xrange(10)]
+ set1 = IdentitySet(objs)
+ set2 = IdentitySet(objs)
+ self.failUnless(set1.issubset(set2))
+ self.failUnless(set2.issubset(set1))
+ self.failUnless(set1.issuperset(set2))
+ self.failUnless(set2.issuperset(set1))
+ self.assertEqual(set1, set2)
+
+class TestIdentitySetPersistence(PypesPersistentTest):
+
+ def testSetPersistence(self):
+ from pypes.identity import IdentitySet
+ self.root['objs'] = [self._newObj() for i in range(10)]
+ set = IdentitySet(self.root['objs'])
+ self.root['set'] = set
+ get_transaction().commit()
+
+ r2 = self.db.open().root()
+ for obj in r2['objs']:
+ self.failUnless(obj in r2['set'])
+
+ def testConcurrentAdd(self):
+ from pypes.identity import IdentitySet
+
+ set1 = self.root['set'] = IdentitySet()
+ get_transaction().commit()
+
+ r2 = self.db.open().root()
+ set2 = r2['set']
+ list(set2) # Make sure its loaded
+
+ set1.add(self._newObj())
+ get_transaction().commit()
+
+ set2.add(self._newObj())
+ get_transaction().commit()
+
+ set3 = self.db.open().root()['set']
+ self.assertEqual(len(set3), 2)
+
+ def testDuplicateAddFails(self):
+ from pypes.identity import IdentitySet
+ set1 = self.root['set'] = IdentitySet()
+ get_transaction().commit()
+
+ r2 = self.db.open().root()
+ set2 = r2['set']
+ list(set2) # Make sure its loaded
+
+ obj = self._newObj()
+ set1.add(obj)
+ get_transaction().commit()
+
+ set2.add(obj)
+ self.assertRaises(ConflictError, get_transaction().commit)
+
+ set3 = self.db.open().root()['set']
+ self.assertEqual(len(set3), 1)
+
+ def testConcurrentUpdate(self):
+ from pypes.identity import IdentitySet
+ set1 = self.root['set'] = IdentitySet()
+ get_transaction().commit()
+
+ r2 = self.db.open().root()
+ set2 = r2['set']
+ list(set2) # Make sure its loaded
+
+ set1.update([self._newObj() for i in range(10)])
+ get_transaction().commit()
+
+ set2.update([self._newObj() for i in range(10)])
+ get_transaction().commit()
+
+ set3 = self.db.open().root()['set']
+ self.assertEqual(len(set3), 20)
+
+if __name__ == '__main__':
+ unittest.main()
More information about the Zope-CVS
mailing list