[Zope-Checkins] CVS: Zope/lib/python/Products/Sessions/tests - testSessionDataManager.py:1.7
Chris McDonough
chrism@zope.com
Sat, 17 Nov 2001 17:46:36 -0500
Update of /cvs-repository/Zope/lib/python/Products/Sessions/tests
In directory cvs.zope.org:/tmp/cvs-serv14333
Modified Files:
testSessionDataManager.py
Log Message:
Broke stress tests out from unit tests.
=== Zope/lib/python/Products/Sessions/tests/testSessionDataManager.py 1.6 => 1.7 ===
def testBadExternalSDCPath(self):
- self.app.session_data_manager.REQUEST['REQUEST_METHOD'] = 'GET' # fake out webdav
+ # fake out webdav
+ self.app.session_data_manager.REQUEST['REQUEST_METHOD'] = 'GET'
self.app.session_data_manager.setContainerPath('/fudgeffoloo')
try:
self.app.session_data_manager.getSessionData()
@@ -260,123 +261,9 @@
sd.set('foo', aq_wrapped)
self.assertRaises(UnpickleableError, get_transaction().commit)
-class TestMultiThread(TestCase):
- def testNonOverlappingSids(self):
- readers = []
- writers = []
- readiters = 20
- writeiters = 5
- readout = []
- writeout = []
- numreaders = 20
- numwriters = 5
- sdm_name = 'session_data_manager'
- db = _getDB()
- for i in range(numreaders):
- thread = ReaderThread(db, readiters, sdm_name)
- readers.append(thread)
- for i in range(numwriters):
- thread = WriterThread(db, writeiters, sdm_name)
- writers.append(thread)
- for thread in readers:
- thread.start()
- time.sleep(0.1)
- for thread in writers:
- thread.start()
- time.sleep(0.1)
- while threading.activeCount() > 1:
- time.sleep(1)
-
- for thread in readers:
- assert thread.out == [], thread.out
-
- def testOverlappingSids(self):
- readers = []
- writers = []
- readiters = 20
- writeiters = 5
- readout = []
- writeout = []
- numreaders = 20
- numwriters = 5
- sdm_name = 'session_data_manager'
- db = _getDB()
- for i in range(numreaders):
- thread = ReaderThread(db, readiters, sdm_name)
- readers.append(thread)
- for i in range(numwriters):
- thread = WriterThread(db, writeiters, sdm_name)
- writers.append(thread)
- for thread in readers:
- thread.start()
- time.sleep(0.1)
- for thread in writers:
- thread.start()
- time.sleep(0.1)
- while threading.activeCount() > 1:
- time.sleep(1)
-
- for thread in readers:
- assert thread.out == [], thread.out
-
-class BaseReaderWriter(threading.Thread):
- def __init__(self, db, iters, sdm_name):
- self.conn = db.open()
- self.app = self.conn.root()['Application']
- self.app = makerequest.makerequest(self.app)
- token = self.app.browser_id_manager._getNewBrowserId()
- self.app.REQUEST.browser_id_ = token
- self.iters = iters
- self.sdm_name = sdm_name
- self.out = []
- threading.Thread.__init__(self)
-
- def run(self):
- i = 0
- try:
- while 1:
- try:
- self.run1()
- return
- except ConflictError:
- i = i + 1
- #print "conflict %d" % i
- if i > 3: raise
- finally:
- self.conn.close()
- del self.app
-
-class ReaderThread(BaseReaderWriter):
- def run1(self):
- session_data_manager = getattr(self.app, self.sdm_name)
- data = session_data_manager.getSessionData(create=1)
- t = time.time()
- data[t] = 1
- get_transaction().commit()
- for i in range(self.iters):
- data = session_data_manager.getSessionData()
- if not data.has_key(t):
- self.out.append(1)
- time.sleep(whrandom.choice(range(3)))
- get_transaction().commit()
-
-class WriterThread(BaseReaderWriter):
- def run1(self):
- session_data_manager = getattr(self.app, self.sdm_name)
- for i in range(self.iters):
- data = session_data_manager.getSessionData()
- data[time.time()] = 1
- n = whrandom.choice(range(3))
- time.sleep(n)
- if n % 2 == 0:
- get_transaction().commit()
- else:
- get_transaction().abort()
-
def test_suite():
test_datamgr = makeSuite(TestSessionManager, 'test')
- test_multithread = makeSuite(TestMultiThread, 'test')
- suite = TestSuite((test_datamgr, test_multithread))
+ suite = TestSuite((test_datamgr,))
return suite
if __name__ == '__main__':