[Checkins] SVN: gocept.zeoraid/trunk/src/gocept/zeoraid/ merged and cleaned up dirceu-distributed-write-calls branch, added test
Thomas Lotze
tl at gocept.com
Thu Jan 8 04:08:17 EST 2009
Log message for revision 94611:
merged and cleaned up dirceu-distributed-write-calls branch, added test
Changed:
U gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
U gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
-=-
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2009-01-08 07:26:31 UTC (rev 94610)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2009-01-08 09:08:16 UTC (rev 94611)
@@ -57,6 +57,31 @@
return check_writable
+class ThreadedApplyStorage(threading.Thread):
+
+ reliable = None
+ result = None
+ exception = None
+
+ def __init__(self, storage_name, method_name, args, kw,
+ expect_connected, apply_storage):
+ super(ThreadedApplyStorage, self).__init__()
+ self.storage_name = storage_name
+ self.method_name = method_name
+ self.args = args
+ self.kw = kw
+ self.expect_connected = expect_connected
+ self.__apply_storage = apply_storage
+
+ def run(self):
+ try:
+ self.reliable, self.result = self.__apply_storage(
+ self.storage_name, self.method_name, self.args,
+ self.kw, self.expect_connected)
+ except Exception, e:
+ self.exception = e
+
+
class RAIDStorage(object):
"""The RAID storage is a drop-in replacement for the client storages that
are configured.
@@ -90,6 +115,9 @@
# for generating new TIDs.
_last_tid = None
+ # Timeout for threaded/parallel operations on backend storages.
+ timeout = 60
+
def __init__(self, name, openers, read_only=False, blob_dir=None,
shared_blob_dir=False):
self.__name__ = name
@@ -169,9 +197,9 @@
del self.storages_optimal[:]
for thread in self._threads:
- # We give all the threads a chance to get done within one second.
+ # We give all the threads a chance to get done quickly.
# This is mostly a convenience for the tests to not annoy.
- thread.join(1)
+ thread.join(5)
def getName(self):
"""The name of the storage."""
@@ -587,10 +615,18 @@
if not self.storages_optimal and fail:
raise gocept.zeoraid.interfaces.RAIDError("No storages remain.")
- def __apply_storage(self, name, method_name, args=(), kw={},
+ def __apply_storage(self, storage_name, method_name, args=(), kw={},
expect_connected=True):
+ """Calls a method on a given backend storage.
+
+ Returns a tuple (reliable, result).
+
+ All exceptions raised by this method indicate a user error. Storage
+ failure is signalled by declaring the result unreliable.
+
+ """
# XXX storage might be degraded by now, need to check.
- storage = self.storages[name]
+ storage = self.storages[storage_name]
method = getattr(storage, method_name)
reliable = True
result = None
@@ -613,7 +649,7 @@
reliable = False
if not reliable:
- self._degrade_storage(name)
+ self._degrade_storage(storage_name)
return (reliable, result)
@ensure_open_storage
@@ -642,7 +678,7 @@
ignore_noop=False):
"""Calls the given method on all optimal backend storages in order.
- `args` can be given as an n-tupel with the positional arguments that
+ `args` can be given as an n-tuple with the positional arguments that
should be passed to each storage.
Alternatively `args` can be a callable that returns an iterable. The
@@ -650,9 +686,6 @@
N-th storage.
"""
- results = []
- exceptions = []
-
if callable(args):
argument_iterable = args()
else:
@@ -667,18 +700,32 @@
applicable_storages = [storage for storage in applicable_storages
if storage not in exclude]
- for name in applicable_storages:
- try:
- args = argument_iterable.next()
- reliable, result = self.__apply_storage(
- name, method_name, args, kw, expect_connected)
- except Exception, e:
- exceptions.append(e)
- raise
- else:
- if reliable:
- results.append(result)
+ # Run __apply_storage on all applicable storages in parallel.
+ threads = []
+ for storage_name in applicable_storages:
+ args = argument_iterable.next()
+ t = ThreadedApplyStorage(storage_name, method_name, args, kw,
+ expect_connected, self.__apply_storage)
+ threads.append(t)
+ t.start()
+ # Wait for threads to finish and pick up results.
+ results = []
+ exceptions = []
+ for thread in threads:
+ # XXX The timeout should be calculated such that the total time
+ # spent in this loop doesn't grow with the number of storages.
+ thread.join(self.timeout)
+ if thread.isAlive():
+ # Storage timed out.
+ self._degrade_storage(thread.storage_name)
+ self._threads.add(thread)
+ continue
+ if thread.exception:
+ exceptions.append(thread.exception)
+ elif thread.reliable:
+ results.append(thread.result)
+
# Analyse result consistency.
consistent = True
if exceptions and results:
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2009-01-08 07:26:31 UTC (rev 94610)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2009-01-08 09:08:16 UTC (rev 94611)
@@ -1230,7 +1230,16 @@
self._storage.new_oid()
self.assertEquals('optimal', self._storage.raid_status())
+ def test_timeoutBackend(self):
+ self._storage.timeout = 2
+ def slow_tpc_begin(*args):
+ time.sleep(4)
+ self._backend(0).tpc_begin = slow_tpc_begin
+ t = transaction.Transaction()
+ self._storage.tpc_begin(t)
+ self.assertEquals('degraded', self._storage.raid_status())
+
class FailingStorageTests(FailingStorageTestBase,
FailingStorageTestSetup):
More information about the Checkins
mailing list