[Zope-CVS] CVS: Packages/pypes/pypes/tests - test_extent.py:1.3
Casey Duncan
casey at zope.com
Tue Feb 10 01:25:38 EST 2004
Update of /cvs-repository/Packages/pypes/pypes/tests
In directory cvs.zope.org:/tmp/cvs-serv8526/tests
Modified Files:
test_extent.py
Log Message:
Begin class extent map implementation
=== Packages/pypes/pypes/tests/test_extent.py 1.2 => 1.3 ===
--- Packages/pypes/pypes/tests/test_extent.py:1.2 Mon Feb 9 01:20:12 2004
+++ Packages/pypes/pypes/tests/test_extent.py Tue Feb 10 01:25:37 2004
@@ -167,18 +167,18 @@
def __init__(self):
self._listeners = Set()
- def registerListener(obj, method_name, message_type):
+ def registerListener(self, obj, method_name, message_type):
self._listeners.add((obj, method_name, message_type))
- def unregisterListener(obj, method_name, message_type):
+ def unregisterListener(self, obj, method_name, message_type):
self._listeners.remove((obj, method_name, message_type))
- def send(message):
+ def send(self, message):
for obj, method, msg_type in self._listeners:
if isinstance(message, msg_type):
- getattr(obj. method)(message)
+ getattr(obj, method)(message)
- def hasReg(obj, method_name, message_type):
+ def hasReg(self, obj, method_name, message_type):
return (obj, method_name, message_type) in self._listeners
class DummyIdentityService:
@@ -188,22 +188,40 @@
self._objs = {}
self._conn = conn
- def register(obj):
+ def register(self, obj):
from pypes import services
- from pypes.identity import IdRegisteredEvent
- obj._pypesid_ = self._nextid
- self._objs[self._nextid] = obj
+ from pypes.identity import IdRegisteredMessage
+ ident = obj._pypes_id_ = self._nextid
+ self._objs[id] = obj
self._nextid += 1
- services.event(obj).send(IdRegisteredEvent(obj, obj._pypesid_))
+ services.event(self._conn).send(IdRegisteredMessage(obj, ident))
- def remove(obj):
+ def remove(self, obj):
from pypes import services
- from pypes.identity import IdUnregisteredEvent
- id = obj._pypesid_
+ from pypes.identity import IdUnregisteredMessage
+ ident = obj._pypes_id_
del self._objs[id]
- del obj._pypesid_
- services.event(obj).send(IdUnregisteredEvent(obj, id))
+ del obj._pypes_id_
+ services.event(self._conn).send(IdUnregisteredMessage(obj, ident))
+
+class TestExtent:
+
+ def __init__(self, key, keyset, setmap):
+ self.key = key
+ self._keys = keyset
+ self._idsets = setmap
+ def __len__(self):
+ length = 0
+ for key in self._keys.keys():
+ length += len(self._idsets[key])
+ return length
+
+ def __contains__(self, obj):
+ for key in self._keys.keys():
+ if obj in self._idsets[key]:
+ return True
+ return False
class TestClassExtentMap(unittest.TestCase):
@@ -216,33 +234,61 @@
addService(self.conn, 'event', self.event)
self.identity = DummyIdentityService(self.conn)
addService(self.conn, 'identity', self.identity)
- orig_extent_factory = ClassExtentMap._extent_factory
- ClassExtentMap._extent_factory = TestExtent
self.cemap = ClassExtentMap(self.conn)
- ClassExtentMap._extent_factory = orig_extent_factory
+ self.cemap._extent_factory = TestExtent
def _makeObj(self, cls, *args, **kw):
obj = cls(*args, **kw)
self.identity.register(obj)
return obj
- def xtestInterface(self):
+ def testInterface(self):
from pypes.interfaces import IExtentMap
self.failUnless(verifyObject(IExtentMap, self.cemap))
- def xtestIsListeningForIdEvents(self):
- from pypes.identity import IdRegisteredEvent, IdUnregisteredEvent
+ def testIsListeningForIdEvents(self):
+ from pypes.identity import IdRegisteredMessage, IdUnregisteredMessage
self.failUnless(self.event.hasReg(
- self.cemap, '_notifyIdRegistered', IdRegisteredEvent))
+ self.cemap, '_notifyIdRegistered', IdRegisteredMessage))
self.failUnless(self.event.hasReg(
- self.cemap, '_notifyIdUnregistered', IdUnregisteredEvent))
+ self.cemap, '_notifyIdUnregistered', IdUnregisteredMessage))
- def xtestAddSimpleObject(self):
- obj = BarClass()
- self.cemap.add(obj)
- extent = self.cemap[BarClass]
+ def testAddObject(self):
+ obj = self._makeObj(FooClass)
+ extent = self.cemap[FooClass]
+ classes = [ex.key for ex in self.cemap]
+ self.assertEqual(classes, [FooClass])
self.assertEqual(len(extent), 1)
self.failUnless(obj in extent)
+
+ def xtestAddObjectMultipleBases(self):
+ obj = self._makeObj(SplatClass)
+ classes = [ex.key for ex in self.cemap]
+ self.assertEqual(sort(classes), sort([BarClass, BazClass, SplatClass]))
+ for cls in classes:
+ self.assertEqual(len(self.cemap[cls]), 1)
+
+ def testAddManyObjects1Class(self):
+ objs = [self._makeObj(BarClass) for i in range(10)]
+ extent = self.cemap[BarClass]
+ classes = [ex.key for ex in self.cemap]
+ self.assertEqual(classes, [BarClass])
+ self.assertEqual(len(extent), len(objs))
+ for ob in objs:
+ self.failUnless(ob in extent)
+
+ def xtestAddManyObjectsMultipleClasses(self):
+ objs = []
+ for i in range(5):
+ for cls in [FooClass, BarClass, BazClass, SplatClass]:
+ objs.append(self._makeObj(cls))
+ classes = [ex.key for ex in self.cemap]
+ self.assertEqual(
+ sort(classes), sort([FooClass, BarClass, BazClass, SplatClass]))
+ self.assertEqual(len(self.cemap[FooClass]), 5)
+ self.assertEqual(len(self.cemap[BarClass]), 15)
+ self.assertEqual(len(self.cemap[BazClass]), 10)
+ self.assertEqual(len(self.cemap[SplatClass]), 10)
if __name__ == '__main__':
unittest.main()
More information about the Zope-CVS
mailing list