[Checkins] SVN: gocept.zeoraid/trunk/src/gocept/zeoraid/
implemented and tested recovery in the Raid storage
Thomas Lotze
tl at gocept.com
Thu Feb 21 08:31:10 EST 2008
Log message for revision 84111:
implemented and tested recovery in the Raid storage
Changed:
U gocept.zeoraid/trunk/src/gocept/zeoraid/recovery.py
U gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
U gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
U gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py
-=-
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/recovery.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/recovery.py 2008-02-21 13:30:41 UTC (rev 84110)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/recovery.py 2008-02-21 13:31:10 UTC (rev 84111)
@@ -84,6 +84,8 @@
yield ('verified',)
+ restorable = hasattr(self.target, 'restore')
+
# Recover from that point on until the target storage has all
# transactions that exist in the source storage at the time of
# finalization. Therefore we need to check continuously for new
@@ -104,8 +106,12 @@
self.target.tpc_begin(txn_info, txn_info.tid, txn_info.status)
for r in txn_info:
- self.target.restore(r.oid, r.tid, r.data, r.version,
- r.data_txn, txn_info)
+ if restorable:
+ self.target.restore(r.oid, r.tid, r.data, r.version,
+ r.data_txn, txn_info)
+ else:
+ self.target.store(r.oid, r.tid, r.data, r.version,
+ txn_info)
self.target.tpc_vote(txn_info)
self.target.tpc_finish(txn_info)
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2008-02-21 13:30:41 UTC (rev 84110)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2008-02-21 13:31:10 UTC (rev 84111)
@@ -33,6 +33,7 @@
import ZODB.blob
import gocept.zeoraid.interfaces
+import gocept.zeoraid.recovery
logger = logging.getLogger('gocept.zeoraid')
@@ -67,6 +68,7 @@
ZODB.interfaces.IBlobStorage,
ZODB.interfaces.IStorageUndoable,
ZODB.interfaces.IStorageCurrentRecordIteration,
+ ZODB.interfaces.IStorageIteration,
ZEO.interfaces.IServeable,
)
@@ -135,11 +137,9 @@
for degraded_storages in tids.values():
self.storages_degraded.extend(degraded_storages)
- # XXX Degrade storages that don't have the right max OID.
+ # No storage is recovering initially
+ self.storage_recovering = None
- # No storages are recovering initially
- self.storages_recovering = []
-
# IStorage
def close(self):
@@ -423,6 +423,14 @@
"""Iterate over the records in a storage."""
return self._apply_single_storage('record_iternext', (next,))
+ # IStorageIteration
+
+ def iterator(self, start=None, stop=None):
+ """Return an IStorageTransactionInformation iterator."""
+ # XXX This should really include fail-over for iterators over storages
+ # that degrade or recover while this iterator is running.
+ return self._apply_single_storage('iterator', (start, stop))
+
# IServeable
# Note: We opt to not implement lastInvalidations until ClientStorage does.
@@ -449,7 +457,7 @@
# XXX
@ensure_open_storage
def raid_status(self):
- if self.storages_recovering:
+ if self.storage_recovering:
return 'recovering'
if not self.storages_degraded:
return 'optimal'
@@ -460,7 +468,7 @@
# XXX
@ensure_open_storage
def raid_details(self):
- return [self.storages_optimal, self.storages_recovering, self.storages_degraded]
+ return [self.storages_optimal, self.storage_recovering, self.storages_degraded]
# XXX
@ensure_open_storage
@@ -471,11 +479,10 @@
# XXX
@ensure_open_storage
def raid_recover(self, name):
- # XXX: Need to sync `max oid` after recovery
if name not in self.storages_degraded:
return
self.storages_degraded.remove(name)
- self.storages_recovering.append(name)
+ self.storage_recovering = name
t = threading.Thread(target=self._recover_impl, args=(name,))
self._threads.add(t)
t.setDaemon(True)
@@ -613,156 +620,43 @@
raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
def _recover_impl(self, name):
- try:
- # First pass: Transfer all oids without hindering running transactions
- begin = time.time()
- self._recover_first(name)
- end = time.time()
-
- # Second pass: Start the TPC on a reference storage to block other
- # transactions so we can catch up. The second pass should be
- # significantly faster than the first.
- begin = time.time()
- self._recover_second(name)
- end = time.time()
- except Exception:
- # *something* went wrong. Put the storage back to degraded.
- logger.exception('Failure recovering %r: ' % (name,))
- try:
- self._degrade_storage(name)
- except Exception:
- logger.exception(
- 'Failure degrading %r after failed recovery: ' % (name,))
- raise
- raise
-
- def _recover_second(self, name):
- storage = self.storages[name]
- reference_storage = self.storages[self.storages_optimal[0]]
- # Start a transation on the reference storage to acquire the
- # commit log # and prevent other people from committing in the second phase.
- # XXX This needs to be optimized in a way that the second phase
- # gets re-run as long as possible, only holding the commit lock if
- # no transactions remain that need to be replayed and putting the
- # recovered storage back into the array of optimal storages.
- while 1:
- tm = transaction.TransactionManager()
- t = tm.get()
- last_transaction = storage.lastTransaction()
- reference_storage.tpc_begin(t)
- unrecovered_transactions = self._unrecovered_transactions
- if unrecovered_transactions:
- # We acquired the commit lock and there are transactions that
- # have been committed and were not yet transferred to the
- # recovering storage. We have to try to replay those and then
- # check again. We can remove the commit lock for now.
- self._unrecovered_transactions = {}
- reference_storage.tpc_abort(t)
-
- # RRR: Refactor into its own method?
- tm2 = transaction.TransactionManager()
- t2 = tm2.get()
-
- # Get the unrecovered transactions in the order they were
- # recorded.
- tids = sorted(unrecovered_transactions.keys())
- for tid in tids:
- oids = unrecovered_transactions[tid]
- # We create one transaction for all oids that belong to one
- # transaction.
- storage.tpc_begin(t2, tid=tid)
- for oid in oids:
- data, tid_ = reference_storage.load(oid, '')
- if tid_ > tid:
- # If the current tid of the object is newer
- # than the one we logged, we can ignore it, because
- # there will be another entry for this oid in a
- # later transaction.
- continue
- try:
- oldserial = storage.getTid(oid)
- except ZODB.POSException.POSKeyError:
- # This means that the object is new and didn't have an
- # old transaction yet.
- # XXX Might this also happen with non-undoable storages?
- oldserial = ZODB.utils.z64
- storage.store(oid, oldserial, data, '', t2)
- storage.tpc_vote(t2)
- storage.tpc_finish(t2)
- # /RRR
- else:
- # We acquired the commit lock and no committed transactions
- # are waiting in the log. This means the recovering storage
- # has caught up by now and we can put it into optimal state
- # again.
- self.storages_recovering.remove(name)
- if self._db:
- # We are registered with a database already. We need to
- # re-register the recovered storage to make invalidations
- # pass through.
- self.storages[name].registerDB(self._db)
- self.storages_optimal.append(name)
- # We can also stop logging stores now.
- self._log_stores = False
- reference_storage.tpc_abort(t)
- break
-
- def _recover_first(self, name):
- """The inner loop of the recovery code. Does the actual work."""
- # Re-open storage
storage = self.openers[name].open()
self.storages[name] = storage
- # XXX Bring the storage to the current stage. This only copies the
- # current data, so RAID currently does support neither undo nor versions.
- next_oid = None
- tm = transaction.TransactionManager()
- t = tm.get()
- # XXX we assume that the last written transaction actually is consistent. We need
- # a consistency check.
- last_transaction = storage.lastTransaction()
- # This flag starts logging all succcessfull stores and updates those oids
- # in the second pass again.
- max_transaction = self.storages[self.storages_optimal[0]].lastTransaction()
- self._unrecovered_transactions = {}
- self._log_stores = True
- # The init flag allows us to phrase the break condition of the
- # following loop a little bit more elegantly.
- init = True
- while 1:
- if next_oid is None and not init:
- break
+ recovery = gocept.zeoraid.recovery.Recovery(
+ self, storage, self._finalize_recovery)
+ for msg in recovery():
+ logger.info(str(msg))
- init = False
- oid, tid, data, next_oid = self._apply_single_storage(
- 'record_iternext', (next_oid,))
+ def _finalize_recovery(self, storage):
+ self._write_lock.acquire()
+ try:
+ self.storages_optimal.append(self.storage_recovering)
+ self._synchronise_oids()
+ self.storage_recovering = None
+ finally:
+ self._write_lock.release()
- if tid > max_transaction:
- continue
+ def _synchronise_oids(self):
+ # Try allocating the same OID from all storages. This is done by
+ # determining the maximum and making all other storages increase
+ # their OID until they hit the maximum. While any storage yields
+ # an OID above the maximum, we try again with that value.
+ max_oid = None
+ lagging = self.storages_optimal[:]
+ while lagging:
+ storage = lagging.pop()
+ while True:
+ reliable, oid = self.__apply_storage(storage, 'new_oid')
+ if not reliable:
+ break
+ if oid < max_oid:
+ continue
+ if oid > max_oid:
+ max_oid = oid
+ lagging = [s for s in self.storages_optimal
+ if s != storage]
+ break
- if tid <= last_transaction:
- try:
- old_data = storage.loadSerial(oid, tid)
- except ZODB.POSException.POSKeyError:
- pass
- else:
- if old_data == data:
- continue
-
- # There is a newer version of the object available or the existing
- # version was incorrect. Overwrite it with the right data.
- try:
- oldserial = storage.getTid(oid)
- except ZODB.POSException.POSKeyError:
- oldserial = ZODB.utils.z64
-
-
- assert oldserial <= tid, "last_transaction and oldserial are not in-sync"
-
- storage.tpc_begin(t, tid=tid)
- storage.store(oid, oldserial, data, '', t)
- storage.tpc_vote(t)
- storage.tpc_finish(t)
-
def _new_tid(self, old_tid):
"""Generates a new TID."""
if old_tid is None:
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2008-02-21 13:30:41 UTC (rev 84110)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2008-02-21 13:31:10 UTC (rev 84111)
@@ -31,6 +31,7 @@
MTStorage, ReadOnlyStorage, RecoveryStorage
import gocept.zeoraid.storage
+import gocept.zeoraid.tests.test_recovery
from ZEO.ClientStorage import ClientStorage
from ZEO.tests import forker, CommitLockTests, ThreadTests
@@ -1079,6 +1080,23 @@
self._disable_storage(0)
self._storage.getExtensionMethods()
+ def test_recover(self):
+ self._dostore()
+ self._dostore()
+ self._dostore()
+ self._disable_storage(0)
+ self._dostore()
+ self._dostore()
+ self._storage.raid_recover(self._storage.storages_degraded[0])
+ while self._storage.storage_recovering:
+ time.sleep(0.02)
+ self.assertEquals('optimal', self._storage.raid_status())
+ gocept.zeoraid.tests.test_recovery.compare(
+ self, self._backend(0), self._backend(1))
+ self._storage.new_oid()
+ self.assertEquals('optimal', self._storage.raid_status())
+
+
class ZEOReplicationStorageTests(ZEOStorageBackendTests,
ReplicationStorageTests,
ThreadTests.ThreadTests):
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py 2008-02-21 13:30:41 UTC (rev 84110)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py 2008-02-21 13:31:10 UTC (rev 84111)
@@ -27,6 +27,25 @@
import gocept.zeoraid.recovery
+def compare(test, source, target):
+ recovery = gocept.zeoraid.recovery.Recovery(
+ source, target, lambda target: None)
+ protocol = list(recovery())
+ test.assertEquals([('verified',), ('recovered',)], protocol[-2:])
+ for source_txn, target_txn in zip(source.iterator(),
+ target.iterator()):
+ # We need not compare the transaction metadata because that has
+ # already been done by the recovery's verification run.
+ source_records = list(source_txn)
+ target_records = list(target_txn)
+ test.assertEquals(len(source_records), len(target_records))
+ for source_record, target_record in zip(source_records,
+ target_records):
+ for name in 'oid', 'tid', 'data', 'version', 'data_txn':
+ test.assertEquals(getattr(source_record, name),
+ getattr(target_record, name))
+
+
class ContinuousStorageIterator(ZODB.tests.StorageTestBase.StorageTestBase):
def setUp(self):
@@ -100,22 +119,7 @@
return tid
def compare(self, source, target):
- recovery = gocept.zeoraid.recovery.Recovery(
- source, target, lambda target: None)
- protocol = list(recovery())
- self.assertEquals([('verified',), ('recovered',)], protocol[-2:])
- for source_txn, target_txn in zip(source.iterator(),
- target.iterator()):
- # We need not compare the transaction metadata because that has
- # already been done by the recovery's verification run.
- source_records = list(source_txn)
- target_records = list(target_txn)
- self.assertEquals(len(source_records), len(target_records))
- for source_record, target_record in zip(source_records,
- target_records):
- for name in 'oid', 'tid', 'data', 'version', 'data_txn':
- self.assertEquals(getattr(source_record, name),
- getattr(target_record, name))
+ compare(self, source, target)
def setUp(self):
self.source = ZODB.FileStorage.FileStorage(tempfile.mktemp())
More information about the Checkins
mailing list