[Zope-CVS] CVS: Products/AdaptableStorage/tests - Zope2TestBase.py:1.3 testASStorage.py:1.7 testSerialization.py:1.8
Shane Hathaway
shane@zope.com
Thu, 9 Jan 2003 09:34:40 -0500
Update of /cvs-repository/Products/AdaptableStorage/tests
In directory cvs.zope.org:/tmp/cvs-serv19323/tests
Modified Files:
Zope2TestBase.py testASStorage.py testSerialization.py
Log Message:
- Added LoadEvent and StoreEvent, which currently serve only to
clarify the code.
- Added tests of conflict detection.
- Added NoStateFoundError. Classification gateways now raise
NoStateFoundError at the right times so it's possible to detect
attempts to overwrite state with new objects
- Made it easier for a SQL gateway to use multiple tables by adding a
setupTables() method to SQLGatewayBase
- Made FieldSchema.addFieldType public.
=== Products/AdaptableStorage/tests/Zope2TestBase.py 1.2 => 1.3 ===
--- Products/AdaptableStorage/tests/Zope2TestBase.py:1.2 Tue Jan 7 10:14:29 2003
+++ Products/AdaptableStorage/tests/Zope2TestBase.py Thu Jan 9 09:34:07 2003
@@ -17,7 +17,7 @@
"""
from Acquisition import aq_base
-from ZODB import Persistent
+from ZODB import Persistent, POSException
from Persistence import PersistentMapping
from OFS.Folder import Folder
from OFS.ObjectManager import ObjectManager
@@ -253,3 +253,15 @@
finally:
conn.close()
+
+ def testNewObjectConflictDetection(self):
+ # Verify a new object won't overwrite existing objects by accident
+ conn = self.db.open()
+ try:
+ app = conn.root()['Application']
+ app.some_attr = 'stuff'
+ app._p_serial = '\0' * 8 # Pretend that it's new
+ self.assertRaises(POSException.ConflictError,
+ get_transaction().commit)
+ finally:
+ conn.close()
=== Products/AdaptableStorage/tests/testASStorage.py 1.6 => 1.7 ===
--- Products/AdaptableStorage/tests/testASStorage.py:1.6 Mon Jan 6 18:17:53 2003
+++ Products/AdaptableStorage/tests/testASStorage.py Thu Jan 9 09:34:07 2003
@@ -17,6 +17,7 @@
"""
import unittest
+from thread import start_new_thread, allocate_lock
import ZODB
from Persistence import PersistentMapping
@@ -27,6 +28,19 @@
from SerialTestBase import SerialTestBase
+def run_in_thread(f):
+ lock = allocate_lock()
+ def run(f=f, lock=lock):
+ try:
+ f()
+ finally:
+ lock.release()
+ lock.acquire()
+ start_new_thread(run, ())
+ lock.acquire()
+ lock.release()
+
+
class ASStorageTests (SerialTestBase, unittest.TestCase):
def setUp(self):
@@ -182,6 +196,62 @@
ob1 = conn1.loadStub(('test',))
self.assertEqual(ob1.strdata, ob.strdata)
self.assertEqual(ob1.items(), ob.items())
+ finally:
+ conn1.close()
+
+
+ def _changeTestRoot(self):
+ conn2 = self.db.open()
+ try:
+ ob2 = conn2.root()['TestRoot']
+ ob2.strdata = 'ghi'
+ get_transaction().commit()
+ finally:
+ conn2.close()
+
+
+ def testConflictDetection(self):
+ ob1 = PersistentMapping()
+ ob1.strdata = 'abc'
+
+ dummy = PersistentMapping()
+
+ conn1 = self.db.open()
+ try:
+ root = conn1.root()
+ get_transaction().begin()
+ root['TestRoot'] = ob1
+ root['TestRoot2'] = dummy
+ get_transaction().commit()
+ ob1.strdata = 'def'
+ run_in_thread(self._changeTestRoot)
+ # Verify that "def" doesn't get written, since it
+ # conflicts with "ghi".
+ self.assertRaises(ZODB.POSException.ConflictError,
+ get_transaction().commit)
+ self.assertEqual(ob1.strdata, "ghi")
+ finally:
+ conn1.close()
+
+
+ def testNewObjectConflictDetection(self):
+ # Verify a new object won't overwrite existing objects by accident
+ ob1 = PersistentMapping()
+ ob1.strdata = 'abc'
+
+ dummy = PersistentMapping()
+
+ conn1 = self.db.open()
+ try:
+ root = conn1.root()
+ get_transaction().begin()
+ root['TestRoot'] = ob1
+ root['TestRoot2'] = dummy
+ get_transaction().commit()
+ ob1.strdata = 'def'
+ ob1._p_serial = '\0' * 8 # Pretend that it's new
+ self.assertRaises(ZODB.POSException.ConflictError,
+ get_transaction().commit)
finally:
conn1.close()
=== Products/AdaptableStorage/tests/testSerialization.py 1.7 => 1.8 ===
--- Products/AdaptableStorage/tests/testSerialization.py:1.7 Tue Dec 31 16:47:52 2002
+++ Products/AdaptableStorage/tests/testSerialization.py Thu Jan 9 09:34:07 2003
@@ -61,8 +61,10 @@
mapper = self.root_mapper.getSubMapper('test_mapper')
event = sfw.SerializationEvent(kos, mapper, ('',), ob)
full_state = mapper.getSerializer().serialize(ob, event)
- event = sfw.MapperEvent(mapper, ('',))
+ event = sfw.StoreEvent(mapper, ('',))
mapper.getGateway().store(event, full_state)
+
+ event = sfw.LoadEvent(mapper, ('',))
full_state, serial = mapper.getGateway().load(event)
ob2 = PersistentMapping()
event = sfw.DeserializationEvent(kos, mapper, ('',), ob2)
@@ -93,15 +95,18 @@
mapper = self.root_mapper.getSubMapper('test_mapper_2')
event = sfw.SerializationEvent(kos, mapper, ('',), ob)
full_state = mapper.getSerializer().serialize(ob, event)
- event = sfw.MapperEvent(mapper, ('',))
+ event = sfw.StoreEvent(mapper, ('',))
mapper.getGateway().store(event, full_state)
+
# Now load the state into a different object
+ event = sfw.LoadEvent(mapper, ('',))
full_state, serial = mapper.getGateway().load(event)
ob2 = PersistentMapping()
event = sfw.DeserializationEvent(kos, mapper, ('',), ob2)
mapper.getSerializer().deserialize(ob2, event, full_state)
self.assertEqual(ob.extra.data, ob2.extra.data)
self.assertEqual(ob.keys(), ob2.keys())
+
# Check that both see the *same* object
self.assert_(ob2['a'] is ob2.extra, (ob2['a'], ob2.extra))
self.assert_(ob2['a'] is not data) # Verify it didn't cheat somehow