[Zope-Checkins] CVS: Products/Sessions/tests - testSessionDataManager.py:1.4
Chris McDonough
chrism@zope.com
Tue, 13 Nov 2001 17:58:47 -0500
Update of /cvs-repository/Products/Sessions/tests
In directory cvs.zope.org:/tmp/cvs-serv4386
Modified Files:
testSessionDataManager.py
Log Message:
Rework multithread tests and re-enable testforeignadd tests.
=== Products/Sessions/tests/testSessionDataManager.py 1.3 => 1.4 ===
stuff = {}
-def _getApp():
-
- app = stuff.get('app', None)
- if not app:
+def _getDB():
+ db = stuff.get('db')
+ if not db:
ds = DemoStorage(quota=(1<<20))
db = ZODB.DB(ds, pool_size=60)
conn = db.open()
root = conn.root()
app = Application()
root['Application']= app
+ _populate(app)
get_transaction().commit()
- stuff['app'] = app
- stuff['conn'] = conn
stuff['db'] = db
- return app
+ conn.close()
+ return db
-def _openApp():
- conn = stuff['db'].open()
- root = conn.root()
- app = root['Application']
- return conn, app
-
-def _delApp():
+def _delDB():
get_transaction().abort()
- stuff['conn'].close()
- del stuff['conn']
- del stuff['app']
del stuff['db']
-def f(sdo):
- pass
-
class Foo(Acquisition.Implicit): pass
-class TestBase(TestCase):
- def setUp(self):
- self.app = makerequest.makerequest(_getApp())
- timeout = self.timeout = 1
-
-
- # Try to work around some testrunner snafus
- if 1 and __name__ is not '__main__':
+def _populate(app):
+ bidmgr = BrowserIdManager(idmgr_name)
+ tf = MountedTemporaryFolder(tf_name, title="Temporary Folder")
+ toc = TransientObjectContainer(toc_name, title='Temporary '
+ 'Transient Object Container', timeout_mins=20)
+ session_data_manager=SessionDataManager(id='session_data_manager',
+ path='/'+tf_name+'/'+toc_name, title='Session Data Manager')
- bidmgr = BrowserIdManager(idmgr_name)
+ try: app._delObject(idmgr_name)
+ except AttributeError: pass
- tf = MountedTemporaryFolder(tf_name, title="Temporary Folder")
+ try: app._delObject(tf_name)
+ except AttributeError: pass
- toc = TransientObjectContainer(toc_name, title='Temporary '
- 'Transient Object Container', timeout_mins=20)
+ try: app._delObject('session_data_manager')
+ except AttributeError: pass
- session_data_manager=SessionDataManager(id='session_data_manager',
- path='/'+tf_name+'/'+toc_name, title='Session Data Manager')
+ app._setObject(idmgr_name, bidmgr)
- try: self.app._delObject(idmgr_name)
- except AttributeError: pass
+ app._setObject('session_data_manager', session_data_manager)
- try: self.app._delObject(tf_name)
- except AttributeError: pass
-
- try: self.app._delObject('session_data_manager')
- except AttributeError: pass
-
- self.app._setObject(idmgr_name, bidmgr)
- self.app._setObject(tf_name, tf)
-
- get_transaction().commit()
-
- self.app.temp_folder._setObject(toc_name, toc)
- self.app._setObject('session_data_manager', session_data_manager)
- get_transaction().commit()
+ app._setObject(tf_name, tf)
+ get_transaction().commit()
- # leans on the fact that these things exist by app init
+ app.temp_folder._setObject(toc_name, toc)
+ get_transaction().commit()
-
-## admin = self.app.acl_users.getUser('admin')
-## if admin is None:
-## raise "Need to define an 'admin' user before running these tests"
-## admin = admin.__of__(self.app.acl_users)
-## self.app.session_data_manager.changeOwnership(admin)
+class TestBase(TestCase):
+ def setUp(self):
+ db = _getDB()
+ conn = db.open()
+ root = conn.root()
+ self.app = makerequest.makerequest(root['Application'])
+ timeout = self.timeout = 1
def tearDown(self):
- get_transaction().abort()
- #self.app._p_jar.close()
- #self.app = None
- _delApp()
+ _delDB()
del self.app
class TestSessionManager(TestBase):
@@ -270,15 +243,14 @@
sd.set('foo', 'bar')
assert get_transaction().commit(1) == None
- # Why would this have failed?? Not sure what it was meant to test
- #def testForeignObject(self):
- # self.assertRaises(InvalidObjectReference, self._foreignAdd)
-
- #def _foreignAdd(self):
- # ob = self.app.session_data_manager
- # sd = self.app.session_data_manager.getSessionData()
- # sd.set('foo', ob)
- # get_transaction().commit()
+ def testForeignObject(self):
+ self.assertRaises(InvalidObjectReference, self._foreignAdd)
+
+ def _foreignAdd(self):
+ ob = self.app.session_data_manager
+ sd = self.app.session_data_manager.getSessionData()
+ sd.set('foo', ob)
+ get_transaction().commit()
def testAqWrappedObjectsFail(self):
a = Foo()
@@ -288,7 +260,7 @@
sd.set('foo', aq_wrapped)
self.assertRaises(UnpickleableError, get_transaction().commit)
-class TestMultiThread(TestBase):
+class TestMultiThread(TestCase):
def testNonOverlappingSids(self):
readers = []
writers = []
@@ -298,20 +270,13 @@
writeout = []
numreaders = 20
numwriters = 5
- #rlock = threading.Lock()
- rlock = DumboLock()
sdm_name = 'session_data_manager'
+ db = _getDB()
for i in range(numreaders):
- mgr = getattr(self.app, idmgr_name)
- sid = mgr._getNewToken()
- app = aq_base(self.app)
- thread = ReaderThread(sid, app, readiters, sdm_name, rlock)
+ thread = ReaderThread(db, readiters, sdm_name)
readers.append(thread)
for i in range(numwriters):
- mgr = getattr(self.app, idmgr_name)
- sid = mgr._getNewToken()
- app = aq_base(self.app)
- thread = WriterThread(sid, app, writeiters, sdm_name, rlock)
+ thread = WriterThread(db, writeiters, sdm_name)
writers.append(thread)
for thread in readers:
thread.start()
@@ -334,18 +299,13 @@
writeout = []
numreaders = 20
numwriters = 5
- #rlock = threading.Lock()
- rlock = DumboLock()
sdm_name = 'session_data_manager'
- mgr = getattr(self.app, idmgr_name)
- sid = mgr._getNewToken()
+ db = _getDB()
for i in range(numreaders):
- app = aq_base(self.app)
- thread = ReaderThread(sid, app, readiters, sdm_name, rlock)
+ thread = ReaderThread(db, readiters, sdm_name)
readers.append(thread)
for i in range(numwriters):
- app = aq_base(self.app)
- thread = WriterThread(sid, app, writeiters, sdm_name, rlock)
+ thread = WriterThread(db, writeiters, sdm_name)
writers.append(thread)
for thread in readers:
thread.start()
@@ -359,134 +319,60 @@
for thread in readers:
assert thread.out == [], thread.out
-class ReaderThread(threading.Thread):
- def __init__(self, sid, app, iters, sdm_name, rlock):
- self.sid = sid
- self.conn, self.app = _openApp()
+class BaseReaderWriter(threading.Thread):
+ def __init__(self, db, iters, sdm_name):
+ self.conn = db.open()
+ self.app = self.conn.root()['Application']
+ self.app = makerequest.makerequest(self.app)
+ token = self.app.browser_id_manager._getNewToken()
+ self.app.REQUEST.session_token_ = token
self.iters = iters
self.sdm_name = sdm_name
self.out = []
- self.rlock = rlock
- #print "Reader SID %s" % sid
threading.Thread.__init__(self)
- def run1(self):
- try:
- self.rlock.acquire("Reader 1")
- self.app = self.conn.root()['Application']
- self.app = makerequest.makerequest(self.app)
- self.app.REQUEST.session_token_ = self.sid
- session_data_manager = getattr(self.app, self.sdm_name)
- data = session_data_manager.getSessionData(create=1)
- t = time.time()
- data[t] = 1
- get_transaction().commit()
- self.rlock.release("Reader 1")
- for i in range(self.iters):
- self.rlock.acquire("Reader 2")
- try:
- data = session_data_manager.getSessionData()
- except KeyError: # Ugh
- raise ConflictError
- if not data.has_key(t): self.out.append(1)
- self.rlock.release("Reader 2")
- time.sleep(whrandom.choice(range(3)))
- self.rlock.acquire("Reader 3")
- get_transaction().commit()
- self.rlock.release("Reader 3")
- finally:
- self.rlock.release("Reader catchall")
- try:
- self.conn.close()
- except AttributeError: pass # ugh
-
def run(self):
-
i = 0
-
- while 1:
- try:
- self.run1()
- return
- except ConflictError:
- i = i + 1
- #print "conflict %d" % i
- if i > 3: raise
- pass
-
-class WriterThread(threading.Thread):
- def __init__(self, sid, app, iters, sdm_name, rlock):
- self.sid = sid
- self.conn, self.app = _openApp()
- self.iters = iters
- self.sdm_name = sdm_name
- self.rlock = rlock
- #print "Writer SID %s" % sid
- threading.Thread.__init__(self)
-
- def run1(self):
try:
- self.rlock.acquire("Writer 1")
- self.app = self.conn.root()['Application']
- self.app = makerequest.makerequest(self.app)
- self.app.REQUEST.session_token_ = self.sid
- session_data_manager = getattr(self.app, self.sdm_name)
- self.rlock.release("Writer 1")
- for i in range(self.iters):
- self.rlock.acquire("Writer 2")
+ while 1:
try:
- data = session_data_manager.getSessionData()
- except KeyError: # Ugh
- raise ConflictError
- data[time.time()] = 1
- n = whrandom.choice(range(8))
- self.rlock.release("Writer 2")
- time.sleep(n)
- self.rlock.acquire("Writer 3")
- if n % 2 == 0:
- get_transaction().commit()
- else:
- get_transaction().abort()
- self.rlock.release("Writer 3")
+ self.run1()
+ return
+ except ConflictError:
+ i = i + 1
+ print "conflict %d" % i
+ if i > 3: raise
finally:
- self.rlock.release("Writer Catchall")
- try:
- self.conn.close()
- except AttributeError: pass # ugh
-
- def run(self):
-
- i = 0
+ self.conn.close()
+ del self.app
+
+class ReaderThread(BaseReaderWriter):
+ def run1(self):
+ session_data_manager = getattr(self.app, self.sdm_name)
+ data = session_data_manager.getSessionData(create=1)
+ t = time.time()
+ data[t] = 1
+ get_transaction().commit()
+ for i in range(self.iters):
+ data = session_data_manager.getSessionData()
+ if not data.has_key(t):
+ self.out.append(1)
+ time.sleep(whrandom.choice(range(3)))
+ get_transaction().commit()
- while 1:
- try:
- self.run1()
- return
- except ConflictError:
- i = i + 1
- #print "conflict %d" % i
- if i > 3: raise
- pass
-
+class WriterThread(BaseReaderWriter):
+ def run1(self):
+ session_data_manager = getattr(self.app, self.sdm_name)
+ for i in range(self.iters):
+ data = session_data_manager.getSessionData()
+ data[time.time()] = 1
+ n = whrandom.choice(range(3))
+ time.sleep(n)
+ if n % 2 == 0:
+ get_transaction().commit()
+ else:
+ get_transaction().abort()
-class DumboLock:
- def __init__(self):
- self.lock = threading.Lock()
- self._locked_ = 0
- def acquire(self, msg):
- #print "Acquiring lock %s" % msg
- #self.lock.acquire()
- #if self._locked_ == 1:
- # print "already locked?!"
- self._locked_ = 1
- def release(self, msg):
- #print "Releasing lock %s" % msg
- #if self._locked_ == 0:
- # print "already released?!"
- # return
- #self.lock.release()
- self._locked_ = 0
-
def test_suite():
test_datamgr = makeSuite(TestSessionManager, 'test')
test_multithread = makeSuite(TestMultiThread, 'test')