[Zodb-checkins] SVN: ZODB/trunk/src/ Added direct blob support to FileStorage. This is to provide more

Jim Fulton jim at zope.com
Tue Dec 16 16:28:38 EST 2008


Log message for revision 94133:
  Added direct blob support to FileStorage. This is to provide more
  efficient packing (and undo).
  

Changed:
  U   ZODB/trunk/src/CHANGES.txt
  U   ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
  U   ZODB/trunk/src/ZODB/FileStorage/fspack.py
  U   ZODB/trunk/src/ZODB/FileStorage/interfaces.py
  U   ZODB/trunk/src/ZODB/blob.py
  U   ZODB/trunk/src/ZODB/tests/testFileStorage.py

-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt	2008-12-16 20:59:00 UTC (rev 94132)
+++ ZODB/trunk/src/CHANGES.txt	2008-12-16 21:28:38 UTC (rev 94133)
@@ -22,6 +22,14 @@
   XXX There are known issues with this implementation that need to be
   sorted out before it is "released".
 
+3.9.0a9 (2008-12-??)
+====================
+
+New Features
+------------
+
+- FileStorage now supports blobs directly.
+
 3.9.0a8 (2008-12-15)
 ====================
 

Modified: ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/FileStorage.py	2008-12-16 20:59:00 UTC (rev 94132)
+++ ZODB/trunk/src/ZODB/FileStorage/FileStorage.py	2008-12-16 21:28:38 UTC (rev 94133)
@@ -16,34 +16,36 @@
 $Revision: 1.16 $
 """
 
+from cPickle import Pickler, Unpickler, loads
+from persistent.TimeStamp import TimeStamp
+from struct import pack, unpack
+from types import StringType
+from zc.lockfile import LockFile
+from ZODB.FileStorage.format import CorruptedDataError
+from ZODB.FileStorage.format import FileStorageFormatter, DataHeader
+from ZODB.FileStorage.format import TRANS_HDR, TRANS_HDR_LEN
+from ZODB.FileStorage.format import TxnHeader, DATA_HDR, DATA_HDR_LEN
+from ZODB.FileStorage.fspack import FileStoragePacker
+from ZODB.fsIndex import fsIndex
+from ZODB import BaseStorage, ConflictResolution, POSException
+from ZODB.loglevels import BLATHER
+from ZODB.POSException import UndoError, POSKeyError, MultipleUndoErrors
+from ZODB.utils import p64, u64, z64
+
 import base64
-from cPickle import Pickler, Unpickler, loads
 import errno
+import logging
 import os
 import sys
 import time
-import logging
-from types import StringType
-from struct import pack, unpack
+import ZODB.blob
+import ZODB.interfaces
+import zope.interface
+import ZODB.utils
 
 # Not all platforms have fsync
 fsync = getattr(os, "fsync", None)
 
-import zope.interface
-import ZODB.interfaces
-from ZODB import BaseStorage, ConflictResolution, POSException
-from ZODB.POSException import UndoError, POSKeyError, MultipleUndoErrors
-from persistent.TimeStamp import TimeStamp
-from zc.lockfile import LockFile
-from ZODB.utils import p64, u64, cp, z64
-from ZODB.FileStorage.fspack import FileStoragePacker
-from ZODB.FileStorage.format import FileStorageFormatter, DataHeader
-from ZODB.FileStorage.format import TxnHeader, DATA_HDR, DATA_HDR_LEN
-from ZODB.FileStorage.format import TRANS_HDR, TRANS_HDR_LEN
-from ZODB.FileStorage.format import CorruptedDataError
-from ZODB.loglevels import BLATHER
-from ZODB.fsIndex import fsIndex
-
 packed_version = "FS21"
 
 logger = logging.getLogger('ZODB.FileStorage')
@@ -86,9 +88,12 @@
     def __init__(self, afile):
         self._file = afile
 
-class FileStorage(BaseStorage.BaseStorage,
-                  ConflictResolution.ConflictResolvingStorage,
-                  FileStorageFormatter):
+class FileStorage(
+    FileStorageFormatter,
+    ZODB.blob.BlobStorageMixin,
+    ConflictResolution.ConflictResolvingStorage,
+    BaseStorage.BaseStorage,
+    ):
 
     zope.interface.implements(
         ZODB.interfaces.IStorage,
@@ -102,7 +107,7 @@
     _pack_is_in_progress = False
 
     def __init__(self, file_name, create=False, read_only=False, stop=None,
-                 quota=None, pack_gc=True, packer=None):
+                 quota=None, pack_gc=True, packer=None, blob_dir=None):
 
         if read_only:
             self._is_read_only = True
@@ -198,6 +203,20 @@
 
         self._quota = quota
 
+        self.blob_dir = blob_dir
+        if blob_dir:
+            self._blob_init(blob_dir)
+            zope.interface.alsoProvides(self,
+                                        ZODB.interfaces.IBlobStorageRestoreable)
+        else:
+            self._blob_init_no_blobs()
+
+    def copyTransactionsFrom(self, other):
+        if self.blob_dir:
+            return ZODB.blob.BlobStorageMixin.copyTransactionsFrom(self, other)
+        else:
+            return BaseStorage.BaseStorage.copyTransactionsFrom(self, other)
+
     def _initIndex(self, index, tindex):
         self._index=index
         self._tindex=tindex
@@ -654,7 +673,7 @@
                 h.descr = descr
                 h.ext = ext
                 self._file.write(h.asString())
-                cp(self._tfile, self._file, dlen)
+                ZODB.utils.cp(self._tfile, self._file, dlen)
                 self._file.write(p64(tl))
                 self._file.flush()
             except:
@@ -695,11 +714,13 @@
         self._pos = self._nextpos
         self._index.update(self._tindex)
         self._ltid = tid
+        self._blob_tpc_finish()
 
     def _abort(self):
         if self._nextpos:
             self._file.truncate(self._pos)
             self._nextpos=0
+            self._blob_tpc_abort()
 
     def _undoDataInfo(self, oid, pos, tpos):
         """Return the tid, data pointer, and data for the oid record at pos
@@ -920,6 +941,18 @@
                 # Don't fail right away. We may be redeemed later!
                 failures[h.oid] = v
             else:
+
+                if self.blob_dir and not p and prev:
+                    up, userial = self._loadBackTxn(h.oid, prev)
+                    if ZODB.blob.is_blob_record(up):
+                        # We're undoing a blob modification operation.
+                        # We have to copy the blob data
+                        tmp = ZODB.utils.mktemp(dir=self.fshelper.temp_dir)
+                        ZODB.utils.cp(
+                            self.openCommittedBlobFile(h.oid, userial),
+                            open(tmp, 'wb'))
+                        self._blob_storeblob(h.oid, self._tid, tmp)
+                
                 new = DataHeader(h.oid, self._tid, ipos, otloc, 0, len(p))
 
                 # TODO:  This seek shouldn't be necessary, but some other
@@ -988,15 +1021,7 @@
         # Our default packer is built around the original packer.  We
         # simply adapt the old interface to the new.  We don't really
         # want to invest much in the old packer, at least for now.
-        p = FileStoragePacker(
-            storage._file.name,
-            stop,
-            storage._lock_acquire,
-            storage._lock_release,
-            storage._commit_lock_acquire,
-            storage._commit_lock_release,
-            storage.getSize(),
-            gc)
+        p = FileStoragePacker(storage, referencesf, stop, gc)
         opos = p.pack()
         if opos is None:
             return None
@@ -1032,6 +1057,12 @@
         if gc is None:
             gc = self._pack_gc
 
+        oldpath = self._file_name + ".old"
+        if os.path.exists(oldpath):
+            os.remove(oldpath)
+        if self.blob_dir and os.path.exists(self.blob_dir + ".old"):
+            ZODB.blob.remove_committed_dir(self.blob_dir + ".old")
+
         have_commit_lock = False
         try:
             pack_result = None
@@ -1043,13 +1074,10 @@
                 return
             have_commit_lock = True
             opos, index = pack_result
-            oldpath = self._file_name + ".old"
             self._lock_acquire()
             try:
                 self._file.close()
                 try:
-                    if os.path.exists(oldpath):
-                        os.remove(oldpath)
                     os.rename(self._file_name, oldpath)
                 except Exception:
                     self._file = open(self._file_name, 'r+b')
@@ -1061,6 +1089,9 @@
                 self._initIndex(index, self._tindex)
                 self._pos = opos
                 self._save_index()
+
+                if self.blob_dir:
+                    self._move_unpacked_blobs()
             finally:
                 self._lock_release()
         finally:
@@ -1070,6 +1101,68 @@
             self._pack_is_in_progress = False
             self._lock_release()
 
+    def _move_unpacked_blobs(self):
+        # Move any blobs linked or copied while packing to the
+        # pack dir, which will become the old dir
+        lblob_dir = len(self.blob_dir)
+        fshelper = self.fshelper
+        old = self.blob_dir+'.old'
+        os.mkdir(old, 0777)
+
+        # Helper to clean up dirs left empty after moving things to old
+        def maybe_remove_empty_dir_containing(path):
+            path = os.path.dirname(path)
+            if len(path) <= lblob_dir:
+                return
+            if not os.listdir(path):
+                os.rmdir(path)
+                maybe_remove_empty_dir_containing(path)
+
+        # Helper that moves a oid dir or revision file to the old dir.
+        def move(path):
+            dest = os.path.dirname(old+path[lblob_dir:])
+            if not os.path.exists(dest):
+                os.makedirs(dest, 0700)
+            os.rename(path, old+path[lblob_dir:])
+            maybe_remove_empty_dir_containing(path)
+            
+        # Fist step: "remove" oids or revisions by moving them to .old
+        # (Later, when we add an option to not keep old files, we'll
+        # be able to simply remove.)
+        for line in open(os.path.join(self.blob_dir, '.removed')):
+            line = line.strip().decode('hex')
+
+            if len(line) == 8:
+                # oid is garbage, re/move dir
+                path = fshelper.getPathForOID(line)
+                if not os.path.exists(path):
+                    # Hm, already gone. Odd.
+                    continue
+                move(path)
+                continue
+            
+            if len(line) != 16:
+                raise ValueError("Bad record in ", self.blob_dir, '.removed')
+            
+            oid, tid = line[:8], line[8:]
+            path = fshelper.getBlobFilename(oid, tid)
+            if not os.path.exists(path):
+                # Hm, already gone. Odd.
+                continue
+            move(path)
+            
+        # Second step, copy remaining files.
+        link_or_copy = ZODB.blob.link_or_copy
+        for path, dir_names, file_names in os.walk(self.blob_dir):
+            for file_name in file_names:
+                if not file_name.endswith('.blob'):
+                    continue
+                file_path = os.path.join(path, file_name)
+                dest = os.path.dirname(old+file_path[lblob_dir:])
+                if not os.path.exists(dest):
+                    os.makedirs(dest, 0700)
+                link_or_copy(file_path, old+file_path[lblob_dir:])
+        
     def iterator(self, start=None, stop=None):
         return FileIterator(self._file_name, start, stop)
 
@@ -1469,7 +1562,7 @@
                                name, oname)
                 o = open(oname,'wb')
                 file.seek(pos)
-                cp(file, o, file_size-pos)
+                ZODB.utils.cp(file, o, file_size-pos)
                 o.close()
                 break
     except:

Modified: ZODB/trunk/src/ZODB/FileStorage/fspack.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/fspack.py	2008-12-16 20:59:00 UTC (rev 94132)
+++ ZODB/trunk/src/ZODB/FileStorage/fspack.py	2008-12-16 21:28:38 UTC (rev 94133)
@@ -24,16 +24,15 @@
 a backpointer after that time.
 """
 
-import os
-
-from ZODB.serialize import referencesf
+from ZODB.FileStorage.format import DataHeader, TRANS_HDR_LEN
+from ZODB.FileStorage.format import FileStorageFormatter, CorruptedDataError
 from ZODB.utils import p64, u64, z64
 
-from ZODB.fsIndex import fsIndex
-from ZODB.FileStorage.format import FileStorageFormatter, CorruptedDataError
-from ZODB.FileStorage.format import DataHeader, TRANS_HDR_LEN
+import logging
+import os
+import ZODB.blob
+import ZODB.fsIndex
 import ZODB.POSException
-import logging
 
 logger = logging.getLogger(__name__)
 
@@ -147,7 +146,7 @@
 
 class GC(FileStorageFormatter):
 
-    def __init__(self, file, eof, packtime, gc):
+    def __init__(self, file, eof, packtime, gc, referencesf):
         self._file = file
         self._name = file.name
         self.eof = eof
@@ -155,8 +154,10 @@
         self.gc = gc
         # packpos: position of first txn header after pack time
         self.packpos = None
-        self.oid2curpos = fsIndex() # maps oid to current data record position
 
+        # {oid -> current data record position}:
+        self.oid2curpos = ZODB.fsIndex.fsIndex()
+
         # The set of reachable revisions of each object.
         #
         # This set as managed using two data structures.  The first is
@@ -166,12 +167,14 @@
         # second is a dictionary mapping objects to lists of
         # positions; it is used to handle the same number of objects
         # for which we must keep multiple revisions.
-        self.reachable = fsIndex()
+        self.reachable = ZODB.fsIndex.fsIndex()
         self.reach_ex = {}
 
         # keep ltid for consistency checks during initial scan
         self.ltid = z64
 
+        self.referencesf = referencesf
+
     def isReachable(self, oid, pos):
         """Return 1 if revision of `oid` at `pos` is reachable."""
 
@@ -319,7 +322,7 @@
         while dh.back:
             dh = self._read_data_header(dh.back)
         if dh.plen:
-            return referencesf(self._file.read(dh.plen))
+            return self.referencesf(self._file.read(dh.plen))
         else:
             return []
 
@@ -332,7 +335,16 @@
     # current_size is the storage's _pos.  All valid data at the start
     # lives before that offset (there may be a checkpoint transaction in
     # progress after it).
-    def __init__(self, path, stop, la, lr, cla, clr, current_size, gc=True):
+    def __init__(self, storage, referencesf, stop, gc=True):
+        self._storage = storage
+        if storage.blob_dir:
+            self.pack_blobs = True
+            self.blob_removed = open(
+                os.path.join(storage.blob_dir, '.removed'), 'w')
+        else:
+            self.pack_blobs = False
+            
+        path = storage._file.name
         self._name = path
         # We open our own handle on the storage so that much of pack can
         # proceed in parallel.  It's important to close this file at every
@@ -342,24 +354,24 @@
         self._path = path
         self._stop = stop
         self.locked = False
-        self.file_end = current_size
+        self.file_end = storage.getSize()
 
-        self.gc = GC(self._file, self.file_end, self._stop, gc)
+        self.gc = GC(self._file, self.file_end, self._stop, gc, referencesf)
 
         # The packer needs to acquire the parent's commit lock
         # during the copying stage, so the two sets of lock acquire
         # and release methods are passed to the constructor.
-        self._lock_acquire = la
-        self._lock_release = lr
-        self._commit_lock_acquire = cla
-        self._commit_lock_release = clr
+        self._lock_acquire = storage._lock_acquire
+        self._lock_release = storage._lock_release
+        self._commit_lock_acquire = storage._commit_lock_acquire
+        self._commit_lock_release = storage._commit_lock_release
 
         # The packer will use several indexes.
         # index: oid -> pos
         # tindex: oid -> pos, for current txn
         # oid2tid: not used by the packer
 
-        self.index = fsIndex()
+        self.index = ZODB.fsIndex.fsIndex()
         self.tindex = {}
         self.oid2tid = {}
         self.toid2tid = {}
@@ -465,18 +477,6 @@
 
         return pos, new_pos
 
-    def fetchBackpointer(self, oid, back):
-        """Return data and refs backpointer `back` to object `oid.
-
-        If `back` is 0 or ultimately resolves to 0, return None
-        and None.  In this case, the transaction undoes the object
-        creation.
-        """
-        if back == 0:
-            return None
-        data, tid = self._loadBackTxn(oid, back, 0)
-        return data
-
     def copyDataRecords(self, pos, th):
         """Copy any current data records between pos and tend.
 
@@ -492,8 +492,24 @@
         while pos < tend:
             h = self._read_data_header(pos)
             if not self.gc.isReachable(h.oid, pos):
+                if self.pack_blobs:
+                    # We need to find out if this is a blob, so get the data:
+                    if h.plen:
+                        data = self._file.read(h.plen)
+                    else:
+                        data = self.fetchDataViaBackpointer(h.oid, h.back)
+                    if data and ZODB.blob.is_blob_record(data):
+                        # We need to remove the blob record. Maybe we
+                        # need to remove oid:
+                        if h.oid not in self.gc.reachable:
+                            self.blob_removed.write(h.oid.encode('hex')+'\n')
+                        else:
+                            self.blob_removed.write(
+                                (h.oid+h.tid).encode('hex')+'\n')
+                
                 pos += h.recordlen()
                 continue
+
             pos += h.recordlen()
 
             # If we are going to copy any data, we need to copy
@@ -510,16 +526,25 @@
             if h.plen:
                 data = self._file.read(h.plen)
             else:
-                # If a current record has a backpointer, fetch
-                # refs and data from the backpointer.  We need
-                # to write the data in the new record.
-                data = self.fetchBackpointer(h.oid, h.back)
+                data = self.fetchDataViaBackpointer(h.oid, h.back)
 
             self.writePackedDataRecord(h, data, new_tpos)
             new_pos = self._tfile.tell()
 
         return new_tpos, pos
 
+    def fetchDataViaBackpointer(self, oid, back):
+        """Return the data for oid via backpointer back
+
+        If `back` is 0 or ultimately resolves to 0, return None.
+        In this case, the transaction undoes the object
+        creation.
+        """
+        if back == 0:
+            return None
+        data, tid = self._loadBackTxn(oid, back, 0)
+        return data
+
     def writePackedDataRecord(self, h, data, new_tpos):
         # Update the header to reflect current information, then write
         # it to the output file.
@@ -575,7 +600,7 @@
             if h.plen:
                 data = self._file.read(h.plen)
             else:
-                data = self.fetchBackpointer(h.oid, h.back)
+                data = self.fetchDataViaBackpointer(h.oid, h.back)
                 if h.back:
                     prev_txn = self.getTxnFromData(h.oid, h.back)
 

Modified: ZODB/trunk/src/ZODB/FileStorage/interfaces.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/interfaces.py	2008-12-16 20:59:00 UTC (rev 94132)
+++ ZODB/trunk/src/ZODB/FileStorage/interfaces.py	2008-12-16 21:28:38 UTC (rev 94133)
@@ -20,17 +20,34 @@
 
         The new file will have the same name as the old file with
         '.pack' appended. (The packer can get the old file name via
-        storage._file.name.)
+        storage._file.name.) If blobs are supported, if the storages
+        blob_dir attribute is not None or empty, then a .removed file
+        most be created in the blob directory. This file contains of
+        the form:
 
+           (oid+serial).encode('hex')+'\n'
+
+        or, of the form:
+
+           oid.encode('hex')+'\n'
+        
+
         If packing is unnecessary, or would not change the file, then
-        None is returned, otherwise a tule is returned with:
+        no pack or removed files are created None is returned,
+        otherwise a tuple is returned with:
 
         - the size of the packed file, and
 
         - the packed index
 
         If and only if packing was necessary (non-None) and there was
-        no error, then the commit lock must be acquired.
+        no error, then the commit lock must be acquired.  In addition,
+        it is up to FileStorage to:
+
+        - Rename the .pack file, and
+
+        - process the blob_dir/.removed file by removing the blobs
+          corresponding to the file records.        
         """
 
 class IFileStorage(zope.interface.Interface):

Modified: ZODB/trunk/src/ZODB/blob.py
===================================================================
--- ZODB/trunk/src/ZODB/blob.py	2008-12-16 20:59:00 UTC (rev 94132)
+++ ZODB/trunk/src/ZODB/blob.py	2008-12-16 21:28:38 UTC (rev 94133)
@@ -483,7 +483,15 @@
                 continue
             yield oid, path
 
+class NoBlobsFileSystemHelper:
 
+    @property
+    def temp_dir(self):
+        raise TypeError("Blobs are not supported")
+
+    getPathForOID = getBlobFilenamem = temp_dir
+
+
 class BlobStorageError(Exception):
     """The blob storage encountered an invalid state."""
 
@@ -575,53 +583,31 @@
 class BlobStorageMixin(object):
     """A mix-in to help storages support blobssupport blobs."""
 
-    zope.interface.implements(ZODB.interfaces.IBlobStorage)
-
-    def __init__(self, blob_dir, layout='automatic'):
+    def _blob_init(self, blob_dir, layout='automatic'):
         # XXX Log warning if storage is ClientStorage
         self.fshelper = FilesystemHelper(blob_dir, layout)
         self.fshelper.create()
         self.fshelper.checkSecure()
         self.dirty_oids = []
 
-    def temporaryDirectory(self):
-        return self.fshelper.temp_dir
+    def _blob_init_no_blobs(self):
+        self.fshelper = NoBlobsFileSystemHelper()
+        self.dirty_oids = []
 
-    @non_overridable
-    def _storeblob(self, oid, serial, blobfilename):
-        self._lock_acquire()
-        try:
-            self.fshelper.getPathForOID(oid, create=True)
-            targetname = self.fshelper.getBlobFilename(oid, serial)
-            rename_or_copy_blob(blobfilename, targetname)
+    def _blob_tpc_abort(self):
+        """Blob cleanup to be called from subclass tpc_abort
+        """
+        while self.dirty_oids:
+            oid, serial = self.dirty_oids.pop()
+            clean = self.fshelper.getBlobFilename(oid, serial)
+            if os.path.exists(clean):
+                remove_committed(clean)
 
-            # if oid already in there, something is really hosed.
-            # The underlying storage should have complained anyway
-            self.dirty_oids.append((oid, serial))
-        finally:
-            self._lock_release()
-            
-    @non_overridable
-    def storeBlob(self, oid, oldserial, data, blobfilename, version,
-                  transaction):
-        """Stores data that has a BLOB attached."""
-        assert not version, "Versions aren't supported."
-        serial = self.store(oid, oldserial, data, '', transaction)
-        self._storeblob(oid, serial, blobfilename)
-
-        return self._tid
-
-    @non_overridable
-    def restoreBlob(self, oid, serial, data, blobfilename, prev_txn,
-                    transaction):
-        """Write blob data already committed in a separate database
+    def _blob_tpc_finish(self):
+        """Blob cleanup to be called from subclass tpc_finish
         """
-        self.restore(oid, serial, data, '', prev_txn, transaction)
-        self._storeblob(oid, serial, blobfilename)
+        self.dirty_oids = []
 
-        return self._tid
-
-    @non_overridable
     def copyTransactionsFrom(self, other):
         for trans in other.iterator():
             self.tpc_begin(trans, trans.tid, trans.status)
@@ -646,23 +632,6 @@
             self.tpc_vote(trans)
             self.tpc_finish(trans)
 
-    @non_overridable
-    def blob_tpc_finish(self):
-        """Blob cleanup to be called from subclass tpc_finish
-        """
-        self.dirty_oids = []
-
-    @non_overridable
-    def blob_tpc_abort(self):
-        """Blob cleanup to be called from subclass tpc_abort
-        """
-        while self.dirty_oids:
-            oid, serial = self.dirty_oids.pop()
-            clean = self.fshelper.getBlobFilename(oid, serial)
-            if os.path.exists(clean):
-                remove_committed(clean)
-
-    @non_overridable
     def loadBlob(self, oid, serial):
         """Return the filename where the blob file can be found.
         """
@@ -671,7 +640,6 @@
             raise POSKeyError("No blob file", oid, serial)
         return filename
 
-    @non_overridable
     def openCommittedBlobFile(self, oid, serial, blob=None):
         blob_filename = self.loadBlob(oid, serial)
         if blob is None:
@@ -679,8 +647,42 @@
         else:
             return BlobFile(blob_filename, 'r', blob)
 
+    def restoreBlob(self, oid, serial, data, blobfilename, prev_txn,
+                    transaction):
+        """Write blob data already committed in a separate database
+        """
+        self.restore(oid, serial, data, '', prev_txn, transaction)
+        self._blob_storeblob(oid, serial, blobfilename)
 
-class BlobStorage(SpecificationDecoratorBase, BlobStorageMixin):
+        return self._tid
+
+    def _blob_storeblob(self, oid, serial, blobfilename):
+        self._lock_acquire()
+        try:
+            self.fshelper.getPathForOID(oid, create=True)
+            targetname = self.fshelper.getBlobFilename(oid, serial)
+            rename_or_copy_blob(blobfilename, targetname)
+
+            # if oid already in there, something is really hosed.
+            # The underlying storage should have complained anyway
+            self.dirty_oids.append((oid, serial))
+        finally:
+            self._lock_release()
+            
+    def storeBlob(self, oid, oldserial, data, blobfilename, version,
+                  transaction):
+        """Stores data that has a BLOB attached."""
+        assert not version, "Versions aren't supported."
+        serial = self.store(oid, oldserial, data, '', transaction)
+        self._blob_storeblob(oid, serial, blobfilename)
+
+        return self._tid
+
+    def temporaryDirectory(self):
+        return self.fshelper.temp_dir
+
+
+class BlobStorage(SpecificationDecoratorBase):
     """A storage to support blobs."""
 
     zope.interface.implements(ZODB.interfaces.IBlobStorage)
@@ -690,13 +692,14 @@
     __slots__ = ('fshelper', 'dirty_oids', '_BlobStorage__supportsUndo',
                  '_blobs_pack_is_in_progress', )
 
+
     def __new__(self, base_directory, storage, layout='automatic'):
         return SpecificationDecoratorBase.__new__(self, storage)
 
     def __init__(self, base_directory, storage, layout='automatic'):
         # XXX Log warning if storage is ClientStorage
         SpecificationDecoratorBase.__init__(self, storage)
-        BlobStorageMixin.__init__(self, base_directory, layout)
+        self._blob_init(base_directory, layout)
         try:
             supportsUndo = storage.supportsUndo
         except AttributeError:
@@ -722,7 +725,7 @@
         # providing a _finish method because methods found on the proxied 
         # object aren't rebound to the proxy
         getProxiedObject(self).tpc_finish(*arg, **kw)
-        self.blob_tpc_finish()
+        self._blob_tpc_finish()
 
     @non_overridable
     def tpc_abort(self, *arg, **kw):
@@ -730,7 +733,7 @@
         # providing an _abort method because methods found on the proxied object
         # aren't rebound to the proxy
         getProxiedObject(self).tpc_abort(*arg, **kw)
-        self.blob_tpc_abort()
+        self._blob_tpc_abort()
 
     @non_overridable
     def _packUndoing(self, packtime, referencesf):
@@ -856,6 +859,12 @@
         return undo_serial, keys
 
 
+for name, v in BlobStorageMixin.__dict__.items():
+    if isinstance(v, type(BlobStorageMixin.__dict__['storeBlob'])):
+        assert name not in BlobStorage.__dict__
+        setattr(BlobStorage, name, non_overridable(v))
+del name, v
+
 copied = logging.getLogger('ZODB.blob.copied').debug
 def rename_or_copy_blob(f1, f2, chmod=True):
     """Try to rename f1 to f2, fallback to copy.
@@ -894,9 +903,12 @@
                 filename = os.path.join(dirpath, filename)
                 remove_committed(filename)
         shutil.rmtree(path)
+
+    link_or_copy = shutil.copy
 else:
     remove_committed = os.remove
     remove_committed_dir = shutil.rmtree
+    link_or_copy = os.link
 
 
 def is_blob_record(record):

Modified: ZODB/trunk/src/ZODB/tests/testFileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testFileStorage.py	2008-12-16 20:59:00 UTC (rev 94132)
+++ ZODB/trunk/src/ZODB/tests/testFileStorage.py	2008-12-16 21:28:38 UTC (rev 94133)
@@ -14,6 +14,7 @@
 import os, unittest
 import transaction
 import ZODB.FileStorage
+import ZODB.tests.testblob
 import ZODB.tests.util
 import zope.testing.setupstack
 from ZODB import POSException
@@ -547,6 +548,40 @@
     >>> db.close()
     """
 
+def pack_with_open_blob_files():
+    """
+    Make sure packing works while there are open blob files.
+
+    >>> fs = ZODB.FileStorage.FileStorage('data.fs', blob_dir='blobs')
+    >>> db = ZODB.DB(fs)
+    >>> tm1 = transaction.TransactionManager()
+    >>> conn1 = db.open(tm1)
+    >>> import ZODB.blob
+    >>> conn1.root()[1] = ZODB.blob.Blob()
+    >>> conn1.add(conn1.root()[1])
+    >>> conn1.root()[1].open('w').write('some data')
+    >>> tm1.commit()
+    
+    >>> tm2 = transaction.TransactionManager()
+    >>> conn2 = db.open(tm2)
+    >>> f = conn1.root()[1].open()
+    >>> conn1.root()[2] = ZODB.blob.Blob()
+    >>> conn1.add(conn1.root()[2])
+    >>> conn1.root()[2].open('w').write('some more data')
+
+    >>> db.pack()
+    >>> f.read()
+    'some data'
+
+    >>> tm1.commit()
+    >>> conn2.sync()
+    >>> conn2.root()[2].open().read()
+    'some more data'
+
+    >>> db.close()
+    """
+    
+
 def test_suite():
     from zope.testing import doctest
 
@@ -558,6 +593,13 @@
     suite.addTest(doctest.DocTestSuite(
         setUp=zope.testing.setupstack.setUpDirectory,
         tearDown=zope.testing.setupstack.tearDown))
+    suite.addTest(ZODB.tests.testblob.storage_reusable_suite(
+        'BlobFileStorage',
+        lambda name, blob_dir:
+        ZODB.FileStorage.FileStorage('%s.fs' % name, blob_dir=blob_dir),
+        test_blob_storage_recovery=True,
+        test_packing=True,
+        ))
     return suite
 
 if __name__=='__main__':



More information about the Zodb-checkins mailing list