[Zodb-checkins] SVN: ZODB/trunk/src/ Added direct blob support to FileStorage. This is to provide more
Jim Fulton
jim at zope.com
Tue Dec 16 16:28:38 EST 2008
Log message for revision 94133:
Added direct blob support to FileStorage. This is to provide more
efficient packing (and undo).
Changed:
U ZODB/trunk/src/CHANGES.txt
U ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
U ZODB/trunk/src/ZODB/FileStorage/fspack.py
U ZODB/trunk/src/ZODB/FileStorage/interfaces.py
U ZODB/trunk/src/ZODB/blob.py
U ZODB/trunk/src/ZODB/tests/testFileStorage.py
-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt 2008-12-16 20:59:00 UTC (rev 94132)
+++ ZODB/trunk/src/CHANGES.txt 2008-12-16 21:28:38 UTC (rev 94133)
@@ -22,6 +22,14 @@
XXX There are known issues with this implementation that need to be
sorted out before it is "released".
+3.9.0a9 (2008-12-??)
+====================
+
+New Features
+------------
+
+- FileStorage now supports blobs directly.
+
3.9.0a8 (2008-12-15)
====================
Modified: ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/FileStorage.py 2008-12-16 20:59:00 UTC (rev 94132)
+++ ZODB/trunk/src/ZODB/FileStorage/FileStorage.py 2008-12-16 21:28:38 UTC (rev 94133)
@@ -16,34 +16,36 @@
$Revision: 1.16 $
"""
+from cPickle import Pickler, Unpickler, loads
+from persistent.TimeStamp import TimeStamp
+from struct import pack, unpack
+from types import StringType
+from zc.lockfile import LockFile
+from ZODB.FileStorage.format import CorruptedDataError
+from ZODB.FileStorage.format import FileStorageFormatter, DataHeader
+from ZODB.FileStorage.format import TRANS_HDR, TRANS_HDR_LEN
+from ZODB.FileStorage.format import TxnHeader, DATA_HDR, DATA_HDR_LEN
+from ZODB.FileStorage.fspack import FileStoragePacker
+from ZODB.fsIndex import fsIndex
+from ZODB import BaseStorage, ConflictResolution, POSException
+from ZODB.loglevels import BLATHER
+from ZODB.POSException import UndoError, POSKeyError, MultipleUndoErrors
+from ZODB.utils import p64, u64, z64
+
import base64
-from cPickle import Pickler, Unpickler, loads
import errno
+import logging
import os
import sys
import time
-import logging
-from types import StringType
-from struct import pack, unpack
+import ZODB.blob
+import ZODB.interfaces
+import zope.interface
+import ZODB.utils
# Not all platforms have fsync
fsync = getattr(os, "fsync", None)
-import zope.interface
-import ZODB.interfaces
-from ZODB import BaseStorage, ConflictResolution, POSException
-from ZODB.POSException import UndoError, POSKeyError, MultipleUndoErrors
-from persistent.TimeStamp import TimeStamp
-from zc.lockfile import LockFile
-from ZODB.utils import p64, u64, cp, z64
-from ZODB.FileStorage.fspack import FileStoragePacker
-from ZODB.FileStorage.format import FileStorageFormatter, DataHeader
-from ZODB.FileStorage.format import TxnHeader, DATA_HDR, DATA_HDR_LEN
-from ZODB.FileStorage.format import TRANS_HDR, TRANS_HDR_LEN
-from ZODB.FileStorage.format import CorruptedDataError
-from ZODB.loglevels import BLATHER
-from ZODB.fsIndex import fsIndex
-
packed_version = "FS21"
logger = logging.getLogger('ZODB.FileStorage')
@@ -86,9 +88,12 @@
def __init__(self, afile):
self._file = afile
-class FileStorage(BaseStorage.BaseStorage,
- ConflictResolution.ConflictResolvingStorage,
- FileStorageFormatter):
+class FileStorage(
+ FileStorageFormatter,
+ ZODB.blob.BlobStorageMixin,
+ ConflictResolution.ConflictResolvingStorage,
+ BaseStorage.BaseStorage,
+ ):
zope.interface.implements(
ZODB.interfaces.IStorage,
@@ -102,7 +107,7 @@
_pack_is_in_progress = False
def __init__(self, file_name, create=False, read_only=False, stop=None,
- quota=None, pack_gc=True, packer=None):
+ quota=None, pack_gc=True, packer=None, blob_dir=None):
if read_only:
self._is_read_only = True
@@ -198,6 +203,20 @@
self._quota = quota
+ self.blob_dir = blob_dir
+ if blob_dir:
+ self._blob_init(blob_dir)
+ zope.interface.alsoProvides(self,
+ ZODB.interfaces.IBlobStorageRestoreable)
+ else:
+ self._blob_init_no_blobs()
+
+ def copyTransactionsFrom(self, other):
+ if self.blob_dir:
+ return ZODB.blob.BlobStorageMixin.copyTransactionsFrom(self, other)
+ else:
+ return BaseStorage.BaseStorage.copyTransactionsFrom(self, other)
+
def _initIndex(self, index, tindex):
self._index=index
self._tindex=tindex
@@ -654,7 +673,7 @@
h.descr = descr
h.ext = ext
self._file.write(h.asString())
- cp(self._tfile, self._file, dlen)
+ ZODB.utils.cp(self._tfile, self._file, dlen)
self._file.write(p64(tl))
self._file.flush()
except:
@@ -695,11 +714,13 @@
self._pos = self._nextpos
self._index.update(self._tindex)
self._ltid = tid
+ self._blob_tpc_finish()
def _abort(self):
if self._nextpos:
self._file.truncate(self._pos)
self._nextpos=0
+ self._blob_tpc_abort()
def _undoDataInfo(self, oid, pos, tpos):
"""Return the tid, data pointer, and data for the oid record at pos
@@ -920,6 +941,18 @@
# Don't fail right away. We may be redeemed later!
failures[h.oid] = v
else:
+
+ if self.blob_dir and not p and prev:
+ up, userial = self._loadBackTxn(h.oid, prev)
+ if ZODB.blob.is_blob_record(up):
+ # We're undoing a blob modification operation.
+ # We have to copy the blob data
+ tmp = ZODB.utils.mktemp(dir=self.fshelper.temp_dir)
+ ZODB.utils.cp(
+ self.openCommittedBlobFile(h.oid, userial),
+ open(tmp, 'wb'))
+ self._blob_storeblob(h.oid, self._tid, tmp)
+
new = DataHeader(h.oid, self._tid, ipos, otloc, 0, len(p))
# TODO: This seek shouldn't be necessary, but some other
@@ -988,15 +1021,7 @@
# Our default packer is built around the original packer. We
# simply adapt the old interface to the new. We don't really
# want to invest much in the old packer, at least for now.
- p = FileStoragePacker(
- storage._file.name,
- stop,
- storage._lock_acquire,
- storage._lock_release,
- storage._commit_lock_acquire,
- storage._commit_lock_release,
- storage.getSize(),
- gc)
+ p = FileStoragePacker(storage, referencesf, stop, gc)
opos = p.pack()
if opos is None:
return None
@@ -1032,6 +1057,12 @@
if gc is None:
gc = self._pack_gc
+ oldpath = self._file_name + ".old"
+ if os.path.exists(oldpath):
+ os.remove(oldpath)
+ if self.blob_dir and os.path.exists(self.blob_dir + ".old"):
+ ZODB.blob.remove_committed_dir(self.blob_dir + ".old")
+
have_commit_lock = False
try:
pack_result = None
@@ -1043,13 +1074,10 @@
return
have_commit_lock = True
opos, index = pack_result
- oldpath = self._file_name + ".old"
self._lock_acquire()
try:
self._file.close()
try:
- if os.path.exists(oldpath):
- os.remove(oldpath)
os.rename(self._file_name, oldpath)
except Exception:
self._file = open(self._file_name, 'r+b')
@@ -1061,6 +1089,9 @@
self._initIndex(index, self._tindex)
self._pos = opos
self._save_index()
+
+ if self.blob_dir:
+ self._move_unpacked_blobs()
finally:
self._lock_release()
finally:
@@ -1070,6 +1101,68 @@
self._pack_is_in_progress = False
self._lock_release()
+ def _move_unpacked_blobs(self):
+ # Move any blobs linked or copied while packing to the
+ # pack dir, which will become the old dir
+ lblob_dir = len(self.blob_dir)
+ fshelper = self.fshelper
+ old = self.blob_dir+'.old'
+ os.mkdir(old, 0777)
+
+ # Helper to clean up dirs left empty after moving things to old
+ def maybe_remove_empty_dir_containing(path):
+ path = os.path.dirname(path)
+ if len(path) <= lblob_dir:
+ return
+ if not os.listdir(path):
+ os.rmdir(path)
+ maybe_remove_empty_dir_containing(path)
+
+ # Helper that moves a oid dir or revision file to the old dir.
+ def move(path):
+ dest = os.path.dirname(old+path[lblob_dir:])
+ if not os.path.exists(dest):
+ os.makedirs(dest, 0700)
+ os.rename(path, old+path[lblob_dir:])
+ maybe_remove_empty_dir_containing(path)
+
+ # Fist step: "remove" oids or revisions by moving them to .old
+ # (Later, when we add an option to not keep old files, we'll
+ # be able to simply remove.)
+ for line in open(os.path.join(self.blob_dir, '.removed')):
+ line = line.strip().decode('hex')
+
+ if len(line) == 8:
+ # oid is garbage, re/move dir
+ path = fshelper.getPathForOID(line)
+ if not os.path.exists(path):
+ # Hm, already gone. Odd.
+ continue
+ move(path)
+ continue
+
+ if len(line) != 16:
+ raise ValueError("Bad record in ", self.blob_dir, '.removed')
+
+ oid, tid = line[:8], line[8:]
+ path = fshelper.getBlobFilename(oid, tid)
+ if not os.path.exists(path):
+ # Hm, already gone. Odd.
+ continue
+ move(path)
+
+ # Second step, copy remaining files.
+ link_or_copy = ZODB.blob.link_or_copy
+ for path, dir_names, file_names in os.walk(self.blob_dir):
+ for file_name in file_names:
+ if not file_name.endswith('.blob'):
+ continue
+ file_path = os.path.join(path, file_name)
+ dest = os.path.dirname(old+file_path[lblob_dir:])
+ if not os.path.exists(dest):
+ os.makedirs(dest, 0700)
+ link_or_copy(file_path, old+file_path[lblob_dir:])
+
def iterator(self, start=None, stop=None):
return FileIterator(self._file_name, start, stop)
@@ -1469,7 +1562,7 @@
name, oname)
o = open(oname,'wb')
file.seek(pos)
- cp(file, o, file_size-pos)
+ ZODB.utils.cp(file, o, file_size-pos)
o.close()
break
except:
Modified: ZODB/trunk/src/ZODB/FileStorage/fspack.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/fspack.py 2008-12-16 20:59:00 UTC (rev 94132)
+++ ZODB/trunk/src/ZODB/FileStorage/fspack.py 2008-12-16 21:28:38 UTC (rev 94133)
@@ -24,16 +24,15 @@
a backpointer after that time.
"""
-import os
-
-from ZODB.serialize import referencesf
+from ZODB.FileStorage.format import DataHeader, TRANS_HDR_LEN
+from ZODB.FileStorage.format import FileStorageFormatter, CorruptedDataError
from ZODB.utils import p64, u64, z64
-from ZODB.fsIndex import fsIndex
-from ZODB.FileStorage.format import FileStorageFormatter, CorruptedDataError
-from ZODB.FileStorage.format import DataHeader, TRANS_HDR_LEN
+import logging
+import os
+import ZODB.blob
+import ZODB.fsIndex
import ZODB.POSException
-import logging
logger = logging.getLogger(__name__)
@@ -147,7 +146,7 @@
class GC(FileStorageFormatter):
- def __init__(self, file, eof, packtime, gc):
+ def __init__(self, file, eof, packtime, gc, referencesf):
self._file = file
self._name = file.name
self.eof = eof
@@ -155,8 +154,10 @@
self.gc = gc
# packpos: position of first txn header after pack time
self.packpos = None
- self.oid2curpos = fsIndex() # maps oid to current data record position
+ # {oid -> current data record position}:
+ self.oid2curpos = ZODB.fsIndex.fsIndex()
+
# The set of reachable revisions of each object.
#
# This set as managed using two data structures. The first is
@@ -166,12 +167,14 @@
# second is a dictionary mapping objects to lists of
# positions; it is used to handle the same number of objects
# for which we must keep multiple revisions.
- self.reachable = fsIndex()
+ self.reachable = ZODB.fsIndex.fsIndex()
self.reach_ex = {}
# keep ltid for consistency checks during initial scan
self.ltid = z64
+ self.referencesf = referencesf
+
def isReachable(self, oid, pos):
"""Return 1 if revision of `oid` at `pos` is reachable."""
@@ -319,7 +322,7 @@
while dh.back:
dh = self._read_data_header(dh.back)
if dh.plen:
- return referencesf(self._file.read(dh.plen))
+ return self.referencesf(self._file.read(dh.plen))
else:
return []
@@ -332,7 +335,16 @@
# current_size is the storage's _pos. All valid data at the start
# lives before that offset (there may be a checkpoint transaction in
# progress after it).
- def __init__(self, path, stop, la, lr, cla, clr, current_size, gc=True):
+ def __init__(self, storage, referencesf, stop, gc=True):
+ self._storage = storage
+ if storage.blob_dir:
+ self.pack_blobs = True
+ self.blob_removed = open(
+ os.path.join(storage.blob_dir, '.removed'), 'w')
+ else:
+ self.pack_blobs = False
+
+ path = storage._file.name
self._name = path
# We open our own handle on the storage so that much of pack can
# proceed in parallel. It's important to close this file at every
@@ -342,24 +354,24 @@
self._path = path
self._stop = stop
self.locked = False
- self.file_end = current_size
+ self.file_end = storage.getSize()
- self.gc = GC(self._file, self.file_end, self._stop, gc)
+ self.gc = GC(self._file, self.file_end, self._stop, gc, referencesf)
# The packer needs to acquire the parent's commit lock
# during the copying stage, so the two sets of lock acquire
# and release methods are passed to the constructor.
- self._lock_acquire = la
- self._lock_release = lr
- self._commit_lock_acquire = cla
- self._commit_lock_release = clr
+ self._lock_acquire = storage._lock_acquire
+ self._lock_release = storage._lock_release
+ self._commit_lock_acquire = storage._commit_lock_acquire
+ self._commit_lock_release = storage._commit_lock_release
# The packer will use several indexes.
# index: oid -> pos
# tindex: oid -> pos, for current txn
# oid2tid: not used by the packer
- self.index = fsIndex()
+ self.index = ZODB.fsIndex.fsIndex()
self.tindex = {}
self.oid2tid = {}
self.toid2tid = {}
@@ -465,18 +477,6 @@
return pos, new_pos
- def fetchBackpointer(self, oid, back):
- """Return data and refs backpointer `back` to object `oid.
-
- If `back` is 0 or ultimately resolves to 0, return None
- and None. In this case, the transaction undoes the object
- creation.
- """
- if back == 0:
- return None
- data, tid = self._loadBackTxn(oid, back, 0)
- return data
-
def copyDataRecords(self, pos, th):
"""Copy any current data records between pos and tend.
@@ -492,8 +492,24 @@
while pos < tend:
h = self._read_data_header(pos)
if not self.gc.isReachable(h.oid, pos):
+ if self.pack_blobs:
+ # We need to find out if this is a blob, so get the data:
+ if h.plen:
+ data = self._file.read(h.plen)
+ else:
+ data = self.fetchDataViaBackpointer(h.oid, h.back)
+ if data and ZODB.blob.is_blob_record(data):
+ # We need to remove the blob record. Maybe we
+ # need to remove oid:
+ if h.oid not in self.gc.reachable:
+ self.blob_removed.write(h.oid.encode('hex')+'\n')
+ else:
+ self.blob_removed.write(
+ (h.oid+h.tid).encode('hex')+'\n')
+
pos += h.recordlen()
continue
+
pos += h.recordlen()
# If we are going to copy any data, we need to copy
@@ -510,16 +526,25 @@
if h.plen:
data = self._file.read(h.plen)
else:
- # If a current record has a backpointer, fetch
- # refs and data from the backpointer. We need
- # to write the data in the new record.
- data = self.fetchBackpointer(h.oid, h.back)
+ data = self.fetchDataViaBackpointer(h.oid, h.back)
self.writePackedDataRecord(h, data, new_tpos)
new_pos = self._tfile.tell()
return new_tpos, pos
+ def fetchDataViaBackpointer(self, oid, back):
+ """Return the data for oid via backpointer back
+
+ If `back` is 0 or ultimately resolves to 0, return None.
+ In this case, the transaction undoes the object
+ creation.
+ """
+ if back == 0:
+ return None
+ data, tid = self._loadBackTxn(oid, back, 0)
+ return data
+
def writePackedDataRecord(self, h, data, new_tpos):
# Update the header to reflect current information, then write
# it to the output file.
@@ -575,7 +600,7 @@
if h.plen:
data = self._file.read(h.plen)
else:
- data = self.fetchBackpointer(h.oid, h.back)
+ data = self.fetchDataViaBackpointer(h.oid, h.back)
if h.back:
prev_txn = self.getTxnFromData(h.oid, h.back)
Modified: ZODB/trunk/src/ZODB/FileStorage/interfaces.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/interfaces.py 2008-12-16 20:59:00 UTC (rev 94132)
+++ ZODB/trunk/src/ZODB/FileStorage/interfaces.py 2008-12-16 21:28:38 UTC (rev 94133)
@@ -20,17 +20,34 @@
The new file will have the same name as the old file with
'.pack' appended. (The packer can get the old file name via
- storage._file.name.)
+ storage._file.name.) If blobs are supported, if the storages
+ blob_dir attribute is not None or empty, then a .removed file
+ most be created in the blob directory. This file contains of
+ the form:
+ (oid+serial).encode('hex')+'\n'
+
+ or, of the form:
+
+ oid.encode('hex')+'\n'
+
+
If packing is unnecessary, or would not change the file, then
- None is returned, otherwise a tule is returned with:
+ no pack or removed files are created None is returned,
+ otherwise a tuple is returned with:
- the size of the packed file, and
- the packed index
If and only if packing was necessary (non-None) and there was
- no error, then the commit lock must be acquired.
+ no error, then the commit lock must be acquired. In addition,
+ it is up to FileStorage to:
+
+ - Rename the .pack file, and
+
+ - process the blob_dir/.removed file by removing the blobs
+ corresponding to the file records.
"""
class IFileStorage(zope.interface.Interface):
Modified: ZODB/trunk/src/ZODB/blob.py
===================================================================
--- ZODB/trunk/src/ZODB/blob.py 2008-12-16 20:59:00 UTC (rev 94132)
+++ ZODB/trunk/src/ZODB/blob.py 2008-12-16 21:28:38 UTC (rev 94133)
@@ -483,7 +483,15 @@
continue
yield oid, path
+class NoBlobsFileSystemHelper:
+ @property
+ def temp_dir(self):
+ raise TypeError("Blobs are not supported")
+
+ getPathForOID = getBlobFilenamem = temp_dir
+
+
class BlobStorageError(Exception):
"""The blob storage encountered an invalid state."""
@@ -575,53 +583,31 @@
class BlobStorageMixin(object):
"""A mix-in to help storages support blobssupport blobs."""
- zope.interface.implements(ZODB.interfaces.IBlobStorage)
-
- def __init__(self, blob_dir, layout='automatic'):
+ def _blob_init(self, blob_dir, layout='automatic'):
# XXX Log warning if storage is ClientStorage
self.fshelper = FilesystemHelper(blob_dir, layout)
self.fshelper.create()
self.fshelper.checkSecure()
self.dirty_oids = []
- def temporaryDirectory(self):
- return self.fshelper.temp_dir
+ def _blob_init_no_blobs(self):
+ self.fshelper = NoBlobsFileSystemHelper()
+ self.dirty_oids = []
- @non_overridable
- def _storeblob(self, oid, serial, blobfilename):
- self._lock_acquire()
- try:
- self.fshelper.getPathForOID(oid, create=True)
- targetname = self.fshelper.getBlobFilename(oid, serial)
- rename_or_copy_blob(blobfilename, targetname)
+ def _blob_tpc_abort(self):
+ """Blob cleanup to be called from subclass tpc_abort
+ """
+ while self.dirty_oids:
+ oid, serial = self.dirty_oids.pop()
+ clean = self.fshelper.getBlobFilename(oid, serial)
+ if os.path.exists(clean):
+ remove_committed(clean)
- # if oid already in there, something is really hosed.
- # The underlying storage should have complained anyway
- self.dirty_oids.append((oid, serial))
- finally:
- self._lock_release()
-
- @non_overridable
- def storeBlob(self, oid, oldserial, data, blobfilename, version,
- transaction):
- """Stores data that has a BLOB attached."""
- assert not version, "Versions aren't supported."
- serial = self.store(oid, oldserial, data, '', transaction)
- self._storeblob(oid, serial, blobfilename)
-
- return self._tid
-
- @non_overridable
- def restoreBlob(self, oid, serial, data, blobfilename, prev_txn,
- transaction):
- """Write blob data already committed in a separate database
+ def _blob_tpc_finish(self):
+ """Blob cleanup to be called from subclass tpc_finish
"""
- self.restore(oid, serial, data, '', prev_txn, transaction)
- self._storeblob(oid, serial, blobfilename)
+ self.dirty_oids = []
- return self._tid
-
- @non_overridable
def copyTransactionsFrom(self, other):
for trans in other.iterator():
self.tpc_begin(trans, trans.tid, trans.status)
@@ -646,23 +632,6 @@
self.tpc_vote(trans)
self.tpc_finish(trans)
- @non_overridable
- def blob_tpc_finish(self):
- """Blob cleanup to be called from subclass tpc_finish
- """
- self.dirty_oids = []
-
- @non_overridable
- def blob_tpc_abort(self):
- """Blob cleanup to be called from subclass tpc_abort
- """
- while self.dirty_oids:
- oid, serial = self.dirty_oids.pop()
- clean = self.fshelper.getBlobFilename(oid, serial)
- if os.path.exists(clean):
- remove_committed(clean)
-
- @non_overridable
def loadBlob(self, oid, serial):
"""Return the filename where the blob file can be found.
"""
@@ -671,7 +640,6 @@
raise POSKeyError("No blob file", oid, serial)
return filename
- @non_overridable
def openCommittedBlobFile(self, oid, serial, blob=None):
blob_filename = self.loadBlob(oid, serial)
if blob is None:
@@ -679,8 +647,42 @@
else:
return BlobFile(blob_filename, 'r', blob)
+ def restoreBlob(self, oid, serial, data, blobfilename, prev_txn,
+ transaction):
+ """Write blob data already committed in a separate database
+ """
+ self.restore(oid, serial, data, '', prev_txn, transaction)
+ self._blob_storeblob(oid, serial, blobfilename)
-class BlobStorage(SpecificationDecoratorBase, BlobStorageMixin):
+ return self._tid
+
+ def _blob_storeblob(self, oid, serial, blobfilename):
+ self._lock_acquire()
+ try:
+ self.fshelper.getPathForOID(oid, create=True)
+ targetname = self.fshelper.getBlobFilename(oid, serial)
+ rename_or_copy_blob(blobfilename, targetname)
+
+ # if oid already in there, something is really hosed.
+ # The underlying storage should have complained anyway
+ self.dirty_oids.append((oid, serial))
+ finally:
+ self._lock_release()
+
+ def storeBlob(self, oid, oldserial, data, blobfilename, version,
+ transaction):
+ """Stores data that has a BLOB attached."""
+ assert not version, "Versions aren't supported."
+ serial = self.store(oid, oldserial, data, '', transaction)
+ self._blob_storeblob(oid, serial, blobfilename)
+
+ return self._tid
+
+ def temporaryDirectory(self):
+ return self.fshelper.temp_dir
+
+
+class BlobStorage(SpecificationDecoratorBase):
"""A storage to support blobs."""
zope.interface.implements(ZODB.interfaces.IBlobStorage)
@@ -690,13 +692,14 @@
__slots__ = ('fshelper', 'dirty_oids', '_BlobStorage__supportsUndo',
'_blobs_pack_is_in_progress', )
+
def __new__(self, base_directory, storage, layout='automatic'):
return SpecificationDecoratorBase.__new__(self, storage)
def __init__(self, base_directory, storage, layout='automatic'):
# XXX Log warning if storage is ClientStorage
SpecificationDecoratorBase.__init__(self, storage)
- BlobStorageMixin.__init__(self, base_directory, layout)
+ self._blob_init(base_directory, layout)
try:
supportsUndo = storage.supportsUndo
except AttributeError:
@@ -722,7 +725,7 @@
# providing a _finish method because methods found on the proxied
# object aren't rebound to the proxy
getProxiedObject(self).tpc_finish(*arg, **kw)
- self.blob_tpc_finish()
+ self._blob_tpc_finish()
@non_overridable
def tpc_abort(self, *arg, **kw):
@@ -730,7 +733,7 @@
# providing an _abort method because methods found on the proxied object
# aren't rebound to the proxy
getProxiedObject(self).tpc_abort(*arg, **kw)
- self.blob_tpc_abort()
+ self._blob_tpc_abort()
@non_overridable
def _packUndoing(self, packtime, referencesf):
@@ -856,6 +859,12 @@
return undo_serial, keys
+for name, v in BlobStorageMixin.__dict__.items():
+ if isinstance(v, type(BlobStorageMixin.__dict__['storeBlob'])):
+ assert name not in BlobStorage.__dict__
+ setattr(BlobStorage, name, non_overridable(v))
+del name, v
+
copied = logging.getLogger('ZODB.blob.copied').debug
def rename_or_copy_blob(f1, f2, chmod=True):
"""Try to rename f1 to f2, fallback to copy.
@@ -894,9 +903,12 @@
filename = os.path.join(dirpath, filename)
remove_committed(filename)
shutil.rmtree(path)
+
+ link_or_copy = shutil.copy
else:
remove_committed = os.remove
remove_committed_dir = shutil.rmtree
+ link_or_copy = os.link
def is_blob_record(record):
Modified: ZODB/trunk/src/ZODB/tests/testFileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testFileStorage.py 2008-12-16 20:59:00 UTC (rev 94132)
+++ ZODB/trunk/src/ZODB/tests/testFileStorage.py 2008-12-16 21:28:38 UTC (rev 94133)
@@ -14,6 +14,7 @@
import os, unittest
import transaction
import ZODB.FileStorage
+import ZODB.tests.testblob
import ZODB.tests.util
import zope.testing.setupstack
from ZODB import POSException
@@ -547,6 +548,40 @@
>>> db.close()
"""
+def pack_with_open_blob_files():
+ """
+ Make sure packing works while there are open blob files.
+
+ >>> fs = ZODB.FileStorage.FileStorage('data.fs', blob_dir='blobs')
+ >>> db = ZODB.DB(fs)
+ >>> tm1 = transaction.TransactionManager()
+ >>> conn1 = db.open(tm1)
+ >>> import ZODB.blob
+ >>> conn1.root()[1] = ZODB.blob.Blob()
+ >>> conn1.add(conn1.root()[1])
+ >>> conn1.root()[1].open('w').write('some data')
+ >>> tm1.commit()
+
+ >>> tm2 = transaction.TransactionManager()
+ >>> conn2 = db.open(tm2)
+ >>> f = conn1.root()[1].open()
+ >>> conn1.root()[2] = ZODB.blob.Blob()
+ >>> conn1.add(conn1.root()[2])
+ >>> conn1.root()[2].open('w').write('some more data')
+
+ >>> db.pack()
+ >>> f.read()
+ 'some data'
+
+ >>> tm1.commit()
+ >>> conn2.sync()
+ >>> conn2.root()[2].open().read()
+ 'some more data'
+
+ >>> db.close()
+ """
+
+
def test_suite():
from zope.testing import doctest
@@ -558,6 +593,13 @@
suite.addTest(doctest.DocTestSuite(
setUp=zope.testing.setupstack.setUpDirectory,
tearDown=zope.testing.setupstack.tearDown))
+ suite.addTest(ZODB.tests.testblob.storage_reusable_suite(
+ 'BlobFileStorage',
+ lambda name, blob_dir:
+ ZODB.FileStorage.FileStorage('%s.fs' % name, blob_dir=blob_dir),
+ test_blob_storage_recovery=True,
+ test_packing=True,
+ ))
return suite
if __name__=='__main__':
More information about the Zodb-checkins
mailing list