[Checkins] SVN: gocept.zeoraid/trunk/ Started work on blob support.
Extended FailingStorage to support blobs.
Christian Theune
ct at gocept.com
Mon Jan 28 09:19:33 EST 2008
Log message for revision 83282:
Started work on blob support. Extended FailingStorage to support blobs.
Implemented basic test for blob functionality.
Changed:
U gocept.zeoraid/trunk/ROADMAP.txt
U gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
U gocept.zeoraid/trunk/src/gocept/zeoraid/tests/failingstorage.py
U gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
-=-
Modified: gocept.zeoraid/trunk/ROADMAP.txt
===================================================================
--- gocept.zeoraid/trunk/ROADMAP.txt 2008-01-28 13:22:08 UTC (rev 83281)
+++ gocept.zeoraid/trunk/ROADMAP.txt 2008-01-28 14:19:32 UTC (rev 83282)
@@ -28,12 +28,19 @@
- Re-check API usage and definition for ZODB 3.8 as our base.
+ - Ensure that blob-caching parameters are equal for all clientstorages
+
+ - Provide RAID-aware blob storage implementation that ignores requests on a
+ shared file system that were handled already and are consistent.
+
Feature-completeness
--------------------
- Rebuild storage using the copy mechanism in ZODB to get all historic
records completely. (Only rebuild completely, not incrementally)
+ - Rebuild/recover with blobs!
+
- Create a limit for the transaction rate when recovering so that the
recovery doesn't clog up the live servers.
@@ -49,6 +56,9 @@
- XXX pack may never be run while a storage is recovering.
+ - XXX Blobs: Hard links created for the multiple backend storages need to be tracked
+ and cleaned up.
+
Future
======
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2008-01-28 13:22:08 UTC (rev 83281)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2008-01-28 14:19:32 UTC (rev 83282)
@@ -7,6 +7,9 @@
import threading
import time
import logging
+import tempfile
+import os
+import os.path
import zope.interface
@@ -348,15 +351,40 @@
# IBlobStorage
+ # XXX degradation tests
@storeBlob_38_compatible
@ensure_writable
def storeBlob(self, oid, oldserial, data, blob, transaction):
"""Stores data that has a BLOB attached."""
- # XXX
+ if transaction is not self._transaction:
+ raise ZODB.POSException.StorageTransactionError(self, transaction)
+ def get_blob_data():
+ # Client storages expect to be the only ones operating on the blob
+ # file. We need to create individual appearances of the original
+ # file so that they can move the file to their cache location.
+ yield (oid, oldserial, data, blob, '', transaction)
+ base_dir = tempfile.mkdtemp(dir=os.path.dirname(blob))
+ copies = 0
+ while True:
+ # We need to create a new directory to make sure that
+ # atomicity of file creation is preserved.
+ copies += 1
+ new_blob = os.path.join(base_dir, '%i.blob' % copies)
+ os.link(blob, new_blob)
+ yield (oid, oldserial, data, new_blob, '', transaction)
+
+ self._write_lock.acquire()
+ try:
+ self._apply_all_storages('storeBlob', get_blob_data)
+ return self._tid
+ finally:
+ self._write_lock.release()
+
+ # XXX degradation tests
def loadBlob(self, oid, serial):
"""Return the filename of the Blob data for this OID and serial."""
- # XXX
+ return self._apply_single_storage('loadBlob', (oid, serial))
def temporaryDirectory(self):
"""Return a directory that should be used for uncommitted blob data.
@@ -489,6 +517,7 @@
# Handle StorageErrors first, otherwise they would be swallowed
# when POSErrors are.
reliable = False
+ raise
except (ZODB.POSException.POSError,
transaction.interfaces.TransactionError), e:
# These exceptions are valid answers from the storage. They don't
@@ -520,15 +549,37 @@
@ensure_open_storage
def _apply_all_storages(self, method_name, args=(), kw={},
expect_connected=True):
- """Calls the given method on all optimal backend storages in order."""
+ """Calls the given method on all optimal backend storages in order.
+
+ `args` can be given as an n-tupel with the positional arguments that
+ should be passed to each storage.
+
+ Alternatively `args` can be a callable that returns an iterable. The
+ N-th item of the iterable is expected to be a tuple, passed to the
+ N-th storage.
+
+ """
results = []
exceptions = []
+
+ if callable(args):
+ argument_iterable = args()
+ else:
+ # Provide a fallback if `args` is given as a simple tuple.
+ static_arguments = args
+ def dummy_generator():
+ while True:
+ yield static_arguments
+ argument_iterable = dummy_generator()
+
for name in self.storages_optimal[:]:
try:
+ args = argument_iterable.next()
reliable, result = self.__apply_storage(
name, method_name, args, kw, expect_connected)
except Exception, e:
exceptions.append(e)
+ raise
else:
if reliable:
results.append(result)
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/failingstorage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/failingstorage.py 2008-01-28 13:22:08 UTC (rev 83281)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/failingstorage.py 2008-01-28 14:19:32 UTC (rev 83282)
@@ -10,43 +10,65 @@
import ZODB.config
import ZODB.FileStorage
+import zope.proxy
+
class Opener(ZODB.config.BaseConfig):
def open(self):
- return FailingStorage(self.name)
+ blob_dir = tempfile.mkdtemp()
+ file_handle, file_name = tempfile.mkstemp()
+ fs = ZODB.FileStorage.FileStorage(file_name)
+ return FailingStorage(blob_dir, fs)
def failing_method(name):
"""Produces a method that can be made to fail."""
def fail(self, *args, **kw):
if name == self._fail:
+ self._fail = None
raise Exception()
- return getattr(ZODB.FileStorage.FileStorage, name)(self, *args, **kw)
+ if hasattr(ZODB.blob.BlobStorage, name):
+ original_method = getattr(ZODB.blob.BlobStorage, name).fget(self)
+ else:
+ original_method = getattr(zope.proxy.getProxiedObject(self), name)
+ return original_method(*args, **kw)
return fail
-class FailingStorage(ZODB.FileStorage.FileStorage):
+class FailingStorage(ZODB.blob.BlobStorage):
- _fail = None
+ __slots__ = ('_fail',) + ZODB.blob.BlobStorage.__slots__
- def __init__(self, name):
- self.name = name
- file_handle, file_name = tempfile.mkstemp()
- ZODB.FileStorage.FileStorage.__init__(self, file_name)
+ def __init__(self, base_directory, storage):
+ ZODB.blob.BlobStorage.__init__(
+ self, base_directory, storage)
+ self._fail = None
+ @zope.proxy.non_overridable
def close(self):
- ZODB.FileStorage.FileStorage.close(self)
- self.cleanup()
+ if self._fail == 'open':
+ self._fail = None
+ raise Exception()
+ zope.proxy.getProxiedObject(self).close()
+ zope.proxy.getProxiedObject(self).cleanup()
+ # XXX rmtree blobdir
+ @zope.proxy.non_overridable
def getExtensionMethods(self):
return dict(fail=None)
- history = failing_method('history')
- loadSerial = failing_method('loadSerial')
+ # Create a set of stub methods that have to be made to fail but are set as
+ # non-data descriptors on the proxy object.
+ __stub_methods__ = ['history', 'loadSerial', 'close', 'getSize',
+ 'pack', 'tpc_abort', 'tpc_finish']
+ for name in __stub_methods__:
+ method = zope.proxy.non_overridable(failing_method(name))
+ locals()[name] = method
+ @zope.proxy.non_overridable
def fail(self, method_name):
- if method_name in ['history', 'loadSerial']:
+ if method_name in self.__stub_methods__:
# Those methods are copied/references by the server code, we can't
# rebind them here.
self._fail = method_name
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2008-01-28 13:22:08 UTC (rev 83281)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2008-01-28 14:19:32 UTC (rev 83282)
@@ -59,7 +59,6 @@
zconf = forker.ZEOConfig(('', port))
zport, adminaddr, pid, path = forker.start_zeo_server(self.getConfig(),
zconf, port)
-
self._servers.append(adminaddr)
self._storages.append(ZEOOpener(zport, storage='1',
min_disconnect_poll=0.5, wait=1,
@@ -124,6 +123,7 @@
# Ensure compatibility
gocept.zeoraid.compatibility.setup()
+ self._blob_dirs = []
self._servers = []
self._storages = []
for i in xrange(self.backend_count):
@@ -134,9 +134,12 @@
<failingstorage 1>
</failingstorage>""",
zconf, port)
+ blob_dir = tempfile.mkdtemp()
+ self._blob_dirs.append(blob_dir)
self._servers.append(adminaddr)
self._storages.append(ZEOOpener(zport, storage='1',
cache_size=12,
+ blob_dir=blob_dir,
min_disconnect_poll=0.5, wait=1,
wait_timeout=60))
self._storage = gocept.zeoraid.storage.RAIDStorage('teststorage',
@@ -631,7 +634,22 @@
self._storage.tpc_begin(t)
self.assertEquals('optimal', self._storage.raid_status())
+ def test_blob_usage(self):
+ oid = self._storage.new_oid()
+ handle, blob_file_name = tempfile.mkstemp()
+ open(blob_file_name, 'w').write('I am a happy blob.')
+ t = transaction.Transaction()
+ self._storage.tpc_begin(t)
+ self._storage.storeBlob(
+ oid, ZODB.utils.z64, 'foo', blob_file_name, '', t)
+ self._storage.tpc_vote(t)
+ self._storage.tpc_finish(t)
+ stored_file_name = self._storage.loadBlob(
+ oid, self._storage.lastTransaction())
+ self.assertEquals('I am a happy blob.',
+ open(stored_file_name, 'r').read())
+
class ZEOReplicationStorageTests(ZEOStorageBackendTests,
ReplicationStorageTests,
ThreadTests.ThreadTests):
More information about the Checkins
mailing list