[Zope3-checkins] SVN: Zope3/trunk/src/zope/app/locking/ Fixed bug a
timeout bug when locking an object.
Michael Kerrin
michael.kerrin at openapp.biz
Tue Jan 3 14:38:32 EST 2006
Log message for revision 41113:
Fixed bug a timeout bug when locking an object.
Changed:
U Zope3/trunk/src/zope/app/locking/storage.py
U Zope3/trunk/src/zope/app/locking/tests.py
-=-
Modified: Zope3/trunk/src/zope/app/locking/storage.py
===================================================================
--- Zope3/trunk/src/zope/app/locking/storage.py 2006-01-03 19:15:42 UTC (rev 41112)
+++ Zope3/trunk/src/zope/app/locking/storage.py 2006-01-03 19:38:32 UTC (rev 41113)
@@ -111,6 +111,9 @@
"""
Set the current lock for an object.
"""
+ ## call cleanup first so that if there is already a lock that
+ ## has timed out for the object then we don't delete it.
+ self.cleanup()
keyref = IKeyReference(object)
self.locks[keyref] = lock
pid = lock.principal_id
@@ -119,7 +122,6 @@
value = self.timeouts.get(ts, [])
value.append(keyref)
self.timeouts[ts] = value
- self.cleanup()
def delLock(self, object):
"""
@@ -130,7 +132,7 @@
def cleanup(self):
# We occasionally want to clean up expired locks to keep them
- # from accumulating over time and slowing things down.
+ # from accumulating over time and slowing things down.
for key in self.timeouts.keys(max=int(timefunc())):
for keyref in self.timeouts[key]:
if self.locks.get(keyref, None) is not None:
Modified: Zope3/trunk/src/zope/app/locking/tests.py
===================================================================
--- Zope3/trunk/src/zope/app/locking/tests.py 2006-01-03 19:15:42 UTC (rev 41112)
+++ Zope3/trunk/src/zope/app/locking/tests.py 2006-01-03 19:38:32 UTC (rev 41113)
@@ -17,13 +17,25 @@
$Id:$
"""
-import sys, unittest
+import sys, unittest, time
from zope.component.tests.placelesssetup import PlacelessSetup
import zope.event
from zope.testing import doctest
from transaction import abort
+from zope.interface import Interface
+from zope.app.testing import ztapi
+from zope.app.file.file import File
+from zope.app.folder.folder import Folder
+from zope.app.locking.interfaces import ILockable, ILockTracker
+from zope.app.locking.adapter import LockingAdapterFactory
+from zope.app.locking.adapter import LockingPathAdapter
+from zope.app.locking.storage import ILockStorage, PersistentLockStorage
+from zope.app.traversing.interfaces import IPathAdapter
+from zope.app.keyreference.interfaces import IKeyReference
+from zope.security.testing import Principal, Participation
+
class FakeModule:
def __init__(self, dict):
self.__dict = dict
@@ -55,22 +67,75 @@
return cmp(id(self.object), id(other.object))
+class TestLockStorage(unittest.TestCase):
+ def setUp(self):
+ super(TestLockStorage, self).setUp()
+
+ ztapi.provideAdapter(Interface, IKeyReference, FakeKeyReference)
+ ztapi.provideAdapter(Interface, ILockable, LockingAdapterFactory)
+ ztapi.provideAdapter(None, IPathAdapter, LockingPathAdapter,
+ "locking")
+
+ self.storage = storage = PersistentLockStorage()
+ ztapi.provideUtility(ILockStorage, storage)
+ ztapi.provideUtility(ILockTracker, storage)
+
+ def tearDown(self):
+ super(TestLockStorage, self).tearDown()
+ del self.storage
+
+ def test_timeout(self):
+ # fake time function to avoid a time.sleep in tests
+ def faketime(t):
+ zope.app.locking.storage.timefunc = lambda : t
+
+ ## test the cleanup of timedout locks.
+ content = File('some content', 'text/plain')
+ lockable = ILockable(content)
+ participation = Participation(Principal('michael'))
+ zope.security.management.newInteraction(participation)
+ now = time.time()
+ faketime(now)
+ lockinfo = lockable.lock(timeout = 1)
+ lockinfo.created = now
+ # two seconds pass
+ faketime(now + 2.0)
+ ## now lockable.locked is False since the lock has timed out
+ self.assertEqual(lockable.locked(), False)
+ ## since lockable.locked is False lockable.lock should succeed
+ ## assume this is done 3 seconds after the first lock
+ lockinfo = lockable.lock(timeout = 20)
+ lockinfo.created = now + 3.0
+ faketime(now + 4.0) # let time pass
+ self.assertEqual(lockable.locked(), True)
+ zope.security.management.endInteraction()
+ # reset the time function
+ zope.app.locking.storage.timefunc = time.time
+
+ def test_folder_lock(self):
+ folder = Folder()
+ file = File('some content', 'text/plain')
+ folder['file'] = file
+
+ participation = Participation(Principal('michael'))
+ zope.security.management.newInteraction(participation)
+
+ lockablefolder = ILockable(folder)
+ lockablefolder.lock()
+ self.assertEqual(lockablefolder.locked(), True)
+ lockablefile = ILockable(folder['file'])
+ self.assertEqual(lockablefile.locked(), False)
+
+ zope.security.management.endInteraction()
+
def setUp(test):
ps.setUp()
dict = test.globs
dict.clear()
- dict['__name__'] = name
+ dict['__name__'] = name
sys.modules[name] = FakeModule(dict)
- from zope.app.testing import ztapi
- from zope.interface import Interface
- from zope.app.locking.interfaces import ILockable, ILockTracker
- from zope.app.locking.adapter import LockingAdapterFactory
- from zope.app.locking.adapter import LockingPathAdapter
- from zope.app.locking.storage import ILockStorage, PersistentLockStorage
- from zope.app.traversing.interfaces import IPathAdapter
-
ztapi.provideAdapter(Interface, IKeyReference, FakeKeyReference)
ztapi.provideAdapter(Interface, ILockable, LockingAdapterFactory)
ztapi.provideAdapter(None, IPathAdapter, LockingPathAdapter,
@@ -94,10 +159,15 @@
zope.event.subscribers.pop()
zope.app.locking.storage.timefunc = time.time
+
def test_suite():
- return doctest.DocFileSuite('README.txt', setUp=setUp, tearDown=tearDown,
- optionflags=(doctest.ELLIPSIS)
- )
+ suite = unittest.TestSuite()
+ suite.addTest(doctest.DocFileSuite('README.txt', setUp=setUp,
+ tearDown=tearDown,
+ optionflags=(doctest.ELLIPSIS)
+ ))
+ suite.addTest(unittest.makeSuite(TestLockStorage))
+ return suite
if __name__ == '__main__':
More information about the Zope3-Checkins
mailing list