[Zodb-checkins] SVN: ZODB/trunk/src/ Added support for copying and recovery of blob storages:

Jim Fulton jim at zope.com
Sat Oct 18 13:34:25 EDT 2008


Log message for revision 92360:
  Added support for copying and recovery of blob storages:
  
  - Added a helper function, ZODB.blob.is_blob_record for testing whether
    a data record is for a blob.  This can be used when iterating over a
    storage to detect blob records so that blob data can be copied.
  
  In the future, we may want to build this into a blob-aware
    iteration interface, so that records get blob file attributes
    automatically.
  
  - Added the IBlobStorageRestoreable interfaces for blob storages
    that support recovery via a restoreBlob method.
  
  - Updated ZODB.blob.BlobStorage to implement
    IBlobStorageRestoreable and to have a copyTransactionsFrom method
    that also copies blob data.
  
  Also removed the version argument from the history method.
  

Changed:
  U   ZODB/trunk/src/CHANGES.txt
  U   ZODB/trunk/src/ZODB/blob.py
  U   ZODB/trunk/src/ZODB/interfaces.py
  U   ZODB/trunk/src/ZODB/tests/IteratorStorage.py
  U   ZODB/trunk/src/ZODB/tests/testblob.py

-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt	2008-10-18 17:34:22 UTC (rev 92359)
+++ ZODB/trunk/src/CHANGES.txt	2008-10-18 17:34:24 UTC (rev 92360)
@@ -8,6 +8,23 @@
 New Features
 ------------
 
+- Added support for copying and recovery of blob storages:
+
+  - Added a helper function, ZODB.blob.is_blob_record for testing whether
+    a data record is for a blob.  This can be used when iterating over a
+    storage to detect blob records so that blob data can be copied.
+
+    In the future, we may want to build this into a blob-aware
+    iteration interface, so that records get blob file attributes
+    automatically.
+
+  - Added the IBlobStorageRestoreable interfaces for blob storages
+    that support recovery via a restoreBlob method.
+
+  - Updated ZODB.blob.BlobStorage to implement
+    IBlobStorageRestoreable and to have a copyTransactionsFrom method
+    that also copies blob data.
+
 - New `ClientStorage` configuration option `drop_cache_rather_verify`.
   If this option is true then the ZEO client cache is dropped instead of
   the long (unoptimized) verification. For large caches, setting this

Modified: ZODB/trunk/src/ZODB/blob.py
===================================================================
--- ZODB/trunk/src/ZODB/blob.py	2008-10-18 17:34:22 UTC (rev 92359)
+++ ZODB/trunk/src/ZODB/blob.py	2008-10-18 17:34:24 UTC (rev 92360)
@@ -14,6 +14,7 @@
 """Blobs
 """
 
+import cPickle
 import base64
 import logging
 import os
@@ -443,6 +444,10 @@
         self.__supportsUndo = supportsUndo
         self._blobs_pack_is_in_progress = False
 
+        if ZODB.interfaces.IStorageRestoreable.providedBy(storage):
+            zope.interface.alsoProvides(self,
+                                        ZODB.interfaces.IBlobStorageRestoreable)
+
     @non_overridable
     def temporaryDirectory(self):
         return self.fshelper.temp_dir
@@ -452,14 +457,9 @@
         normal_storage = getProxiedObject(self)
         return '<BlobStorage proxy for %r at %s>' % (normal_storage,
                                                      hex(id(self)))
+
     @non_overridable
-    def storeBlob(self, oid, oldserial, data, blobfilename, version,
-                  transaction):
-        """Stores data that has a BLOB attached."""
-        serial = self.store(oid, oldserial, data, version, transaction)
-        assert isinstance(serial, str) # XXX in theory serials could be 
-                                       # something else
-
+    def _storeblob(self, oid, serial, blobfilename):
         self._lock_acquire()
         try:
             targetpath = self.fshelper.getPathForOID(oid)
@@ -474,9 +474,53 @@
             self.dirty_oids.append((oid, serial))
         finally:
             self._lock_release()
+            
+    @non_overridable
+    def storeBlob(self, oid, oldserial, data, blobfilename, version,
+                  transaction):
+        """Stores data that has a BLOB attached."""
+        assert not version, "Versions aren't supported."
+        serial = self.store(oid, oldserial, data, '', transaction)
+        self._storeblob(oid, serial, blobfilename)
+
         return self._tid
 
     @non_overridable
+    def restoreBlob(self, oid, serial, data, blobfilename, prev_txn,
+                    transaction):
+        """Write blob data already committed in a separate database
+        """
+        self.restore(oid, serial, data, '', prev_txn, transaction)
+        self._storeblob(oid, serial, blobfilename)
+
+        return self._tid
+
+    @non_overridable
+    def copyTransactionsFrom(self, other):
+        for trans in other.iterator():
+            self.tpc_begin(trans, trans.tid, trans.status)
+            for record in trans:
+                blobfilename = None
+                if is_blob_record(record.data):
+                    try:
+                        blobfilename = other.loadBlob(record.oid, record.tid)
+                    except POSKeyError:
+                        pass
+                if blobfilename is not None:
+                    fd, name = tempfile.mkstemp(
+                        suffix='.tmp', dir=self.fshelper.temp_dir)
+                    os.close(fd)
+                    utils.cp(open(blobfilename), open(name, 'wb'))
+                    self.restoreBlob(record.oid, record.tid, record.data,
+                                     name, record.data_txn, trans)
+                else:
+                    self.restore(record.oid, record.tid, record.data,
+                                 '', record.data_txn, trans)
+
+            self.tpc_vote(trans)
+            self.tpc_finish(trans)
+
+    @non_overridable
     def tpc_finish(self, *arg, **kw):
         # We need to override the base storage's tpc_finish instead of
         # providing a _finish method because methods found on the proxied 
@@ -692,3 +736,13 @@
 else:
     remove_committed = os.remove
     remove_committed_dir = shutil.rmtree
+
+
+def is_blob_record(record):
+    """Check whether a database record is a blob record.
+
+    This is primarily intended to be used when copying data from one
+    storage to another.
+    
+    """
+    return cPickle.loads(record) is ZODB.blob.Blob

Modified: ZODB/trunk/src/ZODB/interfaces.py
===================================================================
--- ZODB/trunk/src/ZODB/interfaces.py	2008-10-18 17:34:22 UTC (rev 92359)
+++ ZODB/trunk/src/ZODB/interfaces.py	2008-10-18 17:34:24 UTC (rev 92360)
@@ -446,7 +446,7 @@
         This is used soley for informational purposes.
         """
 
-    def history(oid, version='', size=1):
+    def history(oid, size=1):
         """Return a sequence of history information dictionaries.
 
         Up to size objects (including no objects) may be returned.
@@ -1009,7 +1009,15 @@
         If Blobs use this, then commits can be performed with a simple rename.
         """
 
+class IBlobStorageRestoreable(IBlobStorage, IStorageRestoreable):
 
+    def storeBlob(oid, serial, data, blobfilename, prev_txn, transaction):
+        """Write blob data already committed in a separate database
+
+        See the restore and storeBlob methods.
+        """
+
+
 class BlobError(Exception):
     pass
 

Modified: ZODB/trunk/src/ZODB/tests/IteratorStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/IteratorStorage.py	2008-10-18 17:34:22 UTC (rev 92359)
+++ ZODB/trunk/src/ZODB/tests/IteratorStorage.py	2008-10-18 17:34:24 UTC (rev 92360)
@@ -25,8 +25,8 @@
 from transaction import Transaction
 
 import itertools
+import ZODB.blob
 
-
 class IteratorCompare:
 
     def iter_verify(self, txniter, revids, val0):
@@ -214,6 +214,18 @@
                 eq(rec1.oid,     rec2.oid)
                 eq(rec1.tid,  rec2.tid)
                 eq(rec1.data,    rec2.data)
+                if ZODB.blob.is_blob_record(rec1.data):
+                    try:
+                        fn1 = storage1.loadBlob(rec1.oid, rec1.tid)
+                    except ZODB.POSException.POSKeyError:
+                        self.assertRaises(
+                            ZODB.POSException.POSKeyError,
+                            storage2.loadBlob, rec1.oid, rec1.tid)
+                    else:
+                        fn2 = storage2.loadBlob(rec1.oid, rec1.tid)
+                        self.assert_(fn1 != fn2)
+                        eq(open(fn1).read(), open(fn2).read())
+                
             # Make sure there are no more records left in rec1 and rec2,
             # meaning they were the same length.
             # Additionally, check that we're backwards compatible to the

Modified: ZODB/trunk/src/ZODB/tests/testblob.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testblob.py	2008-10-18 17:34:22 UTC (rev 92359)
+++ ZODB/trunk/src/ZODB/tests/testblob.py	2008-10-18 17:34:24 UTC (rev 92360)
@@ -12,10 +12,15 @@
 #
 ##############################################################################
 
-import base64, os, re, shutil, stat, sys, tempfile, unittest
+import base64, os, re, shutil, stat, sys, tempfile, unittest, random, struct
 import time
+
+import ZODB.tests.IteratorStorage
+
 from zope.testing import doctest, renormalizing
+import zope.testing.setupstack
 import ZODB.tests.util
+import ZODB.interfaces
 
 from StringIO import StringIO
 from pickle import Pickler
@@ -275,6 +280,49 @@
 
         database.close()
 
+
+class RecoveryBlobStorage(unittest.TestCase,
+                          ZODB.tests.IteratorStorage.IteratorDeepCompare):
+
+    def setUp(self):
+        self.globs = {}
+        zope.testing.setupstack.setUpDirectory(self)
+        self._storage = BlobStorage(
+            'src_blobs', ZODB.FileStorage.FileStorage("Source.fs", create=True))
+        self._dst = BlobStorage(
+            'dest_blobs', ZODB.FileStorage.FileStorage("Dest.fs", create=True))
+
+    def tearDown(self):
+        self._storage.close()
+        self._dst.close()
+        zope.testing.setupstack.tearDown(self)
+
+    # Requires a setUp() that creates a self._dst destination storage
+    def testSimpleBlobRecovery(self):
+        self.assert_(
+            ZODB.interfaces.IBlobStorageRestoreable.providedBy(self._storage)
+            )
+        db = DB(self._storage)
+        conn = db.open()
+        conn.root()[1] = ZODB.blob.Blob()
+        transaction.commit()
+        conn.root()[2] = ZODB.blob.Blob()
+        conn.root()[2].open('w').write('some data')
+        transaction.commit()
+        conn.root()[3] = ZODB.blob.Blob()
+        conn.root()[3].open('w').write(
+            (''.join(struct.pack(">I", random.randint(0, (1<<32)-1))
+                     for i in range(random.randint(10000,20000)))
+             )[:-random.randint(1,4)]
+            )
+        transaction.commit()
+        conn.root()[2] = ZODB.blob.Blob()
+        conn.root()[2].open('w').write('some other data')
+        transaction.commit()
+        self._dst.copyTransactionsFrom(self._storage)
+        self.compare(self._storage, self._dst)
+    
+
 def gc_blob_removes_uncommitted_data():
     """
     >>> from ZODB.blob import Blob
@@ -540,6 +588,22 @@
     >>> os.unlink(storagefile+".tmp")
 """
 
+def is_blob_record():
+    """
+    >>> fs = FileStorage('Data.fs')
+    >>> bs = ZODB.blob.BlobStorage('blobs', fs)
+    >>> db = DB(bs)
+    >>> conn = db.open()
+    >>> conn.root()['blob'] = ZODB.blob.Blob()
+    >>> transaction.commit()
+    >>> ZODB.blob.is_blob_record(fs.load(ZODB.utils.p64(0), '')[0])
+    False
+    >>> ZODB.blob.is_blob_record(fs.load(ZODB.utils.p64(1), '')[0])
+    True
+
+    >>> db.close()
+    """
+
 def setUp(test):
     ZODB.tests.util.setUp(test)
     def rmtree(path):
@@ -575,6 +639,7 @@
         ))
     suite.addTest(unittest.makeSuite(BlobCloneTests))
     suite.addTest(unittest.makeSuite(BlobUndoTests))
+    suite.addTest(unittest.makeSuite(RecoveryBlobStorage))
 
     return suite
 



More information about the Zodb-checkins mailing list