[Zodb-checkins] SVN: ZODB/branches/blob-merge-branch/src/Z Factor
out blob cache storage into a helper class for use by both
ClientStorage and BlobStorage.
Chris McDonough
chrism at plope.com
Mon Feb 27 14:11:08 EST 2006
Log message for revision 65531:
Factor out blob cache storage into a helper class for use by both ClientStorage and BlobStorage.
Changed:
U ZODB/branches/blob-merge-branch/src/ZEO/ClientStorage.py
U ZODB/branches/blob-merge-branch/src/ZEO/tests/testZEO.py
U ZODB/branches/blob-merge-branch/src/ZODB/Blobs/Blob.py
U ZODB/branches/blob-merge-branch/src/ZODB/Blobs/BlobStorage.py
U ZODB/branches/blob-merge-branch/src/ZODB/Blobs/tests/packing.txt
-=-
Modified: ZODB/branches/blob-merge-branch/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/blob-merge-branch/src/ZEO/ClientStorage.py 2006-02-27 19:09:29 UTC (rev 65530)
+++ ZODB/branches/blob-merge-branch/src/ZEO/ClientStorage.py 2006-02-27 19:11:08 UTC (rev 65531)
@@ -35,11 +35,11 @@
from ZEO.auth import get_module
from ZEO.zrpc.client import ConnectionManager
-from ZODB.Blobs.BlobStorage import BLOB_SUFFIX
from ZODB import POSException
from ZODB import utils
from ZODB.loglevels import BLATHER
from ZODB.Blobs.interfaces import IBlobStorage
+from ZODB.Blobs.Blob import FilesystemHelper
from persistent.TimeStamp import TimeStamp
logger = logging.getLogger('ZEO.ClientStorage')
@@ -316,17 +316,13 @@
# XXX need to check for POSIX-ness here
if blob_dir is not None:
- if not os.path.exists(blob_dir):
- os.makedirs(blob_dir, 0700)
- log2("Blob cache directory '%s' does not exist. "
- "Created new directory." % self.base_directory,
- level=logging.INFO)
- if (os.stat(blob_dir).st_mode & 077) != 0:
+ self.fshelper = FilesystemHelper(blob_dir)
+ self.fshelper.create()
+ if not self.fshelper.isSecure(blob_dir):
log2('Blob dir %s has insecure mode setting' % blob_dir,
level=logging.WARNING)
-
- self.blob_dir = blob_dir
-
+ else:
+ self.fshelper = None
# Initialize locks
self.blob_status_lock = threading.Lock()
self.blob_status = {}
@@ -929,37 +925,21 @@
os.unlink(blobfilename)
return serials
- def _getBlobPath(self, oid):
- return os.path.join(self.blob_dir,
- utils.oid_repr(oid)
- )
-
- def _getLoadingFilename(self, oid, serial):
- """Generate an intermediate filename for two-phase commit.
- """
- return self._getCleanFilename(oid, serial) + ".loading"
-
- def _getCleanFilename(self, oid, tid):
- return os.path.join(self._getBlobPath(oid),
- "%s%s" % (utils.tid_repr(tid),
- BLOB_SUFFIX,)
- )
-
def _do_load_blob(self, oid, serial, version):
"""Do the actual loading from the RPC server."""
- blob_filename = self._getCleanFilename(oid, serial)
+ blob_filename = self.fshelper.getBlobFilename(oid, serial)
if self._server is None:
raise ClientDisconnected()
- targetpath = self._getBlobPath(oid)
+ targetpath = self.fshelper.getPathForOID(oid)
if not os.path.exists(targetpath):
os.makedirs(targetpath, 0700)
# We write to a temporary file first, so we do not accidentally
# allow half-baked copies of this blob be loaded
- tempfilename = self._getLoadingFilename(oid, serial)
- tempfile = open(tempfilename, "wb")
-
+ tempfd, tempfilename = self.fshelper.blob_mkstemp(oid, serial)
+ tempfile = fdopen(tempfd, 'wb')
+
offset = 0
while True:
chunk = self._server.loadBlob(oid, serial, version, offset)
@@ -982,11 +962,11 @@
2a. Wait for other download to finish, return
3. If not beeing downloaded, start download
"""
- if self.blob_dir is None:
+ if self.fshelper is None:
raise POSException.Unsupported("No blob cache directory is "
"configured.")
- blob_filename = self._getCleanFilename(oid, serial)
+ blob_filename = self.fshelper.getBlobFilename(oid, serial)
# Case 1: Blob is available already, just use it
if os.path.exists(blob_filename):
return blob_filename
Modified: ZODB/branches/blob-merge-branch/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/blob-merge-branch/src/ZEO/tests/testZEO.py 2006-02-27 19:09:29 UTC (rev 65530)
+++ ZODB/branches/blob-merge-branch/src/ZEO/tests/testZEO.py 2006-02-27 19:11:08 UTC (rev 65531)
@@ -308,7 +308,7 @@
super(statusdict, self).__delitem__(k)
# ensure that we do locking properly
- filename = self._storage._getCleanFilename(oid, serial)
+ filename = self._storage.fshelper.getBlobFilename(oid, serial)
thestatuslock = self._storage.blob_status_lock = Dummy()
thebloblock = Dummy()
Modified: ZODB/branches/blob-merge-branch/src/ZODB/Blobs/Blob.py
===================================================================
--- ZODB/branches/blob-merge-branch/src/ZODB/Blobs/Blob.py 2006-02-27 19:09:29 UTC (rev 65530)
+++ ZODB/branches/blob-merge-branch/src/ZODB/Blobs/Blob.py 2006-02-27 19:11:08 UTC (rev 65531)
@@ -2,6 +2,7 @@
import os
import time
import tempfile
+import logging
from zope.interface import implements
@@ -12,6 +13,8 @@
from transaction.interfaces import IDataManager
from persistent import Persistent
+BLOB_SUFFIX = ".blob"
+
class Blob(Persistent):
implements(IBlob)
@@ -265,3 +268,87 @@
# we'll assume they will be for now in the name of not
# muddying the code needlessly.
self.close()
+
+logger = logging.getLogger('ZODB.Blobs')
+_pid = str(os.getpid())
+
+def log(msg, level=logging.INFO, subsys=_pid, exc_info=False):
+ message = "(%s) %s" % (subsys, msg)
+ logger.log(level, message, exc_info=exc_info)
+
+class FilesystemHelper:
+
+ # Storages that implement IBlobStorage can choose to use this
+ # helper class to generate and parse blob filenames. This is not
+ # a set-in-stone interface for all filesystem operations dealing
+ # with blobs and storages needn't indirect through this if they
+ # want to perform blob storage differently.
+
+ def __init__(self, base_dir):
+ self.base_dir = base_dir
+
+ def create(self):
+ if not os.path.exists(self.base_dir):
+ os.makedirs(self.base_dir, 0700)
+ log("Blob cache directory '%s' does not exist. "
+ "Created new directory." % self.base_dir,
+ level=logging.INFO)
+
+ def isSecure(self, path):
+ """ Ensure that (POSIX) path mode bits are 0700 """
+ return (os.stat(path).st_mode & 077) != 0
+
+ def getPathForOID(self, oid):
+ """ Given an OID, return the path on the filesystem where
+ the blob data relating to that OID is stored """
+ return os.path.join(self.base_dir, utils.oid_repr(oid))
+
+ def getBlobFilename(self, oid, tid):
+ """ Given an oid and a tid, return the full filename of the
+ 'committed' blob file related to that oid and tid. """
+ oid_path = self.getPathForOID(oid)
+ filename = "%s%s" % (utils.tid_repr(tid), BLOB_SUFFIX)
+ return os.path.join(oid_path, filename)
+
+ def blob_mkstemp(self, oid, tid):
+ """ Given an oid and a tid, return a temporary file descriptor
+ and a related filename. The file is guaranteed to exist on
+ the same partition as committed data, which is important for
+ being able to rename the file without a copy operation. The
+ directory in which the file will be placed, which is the
+ return value of self.getPathForOID(oid), must exist before
+ this method may be called successfully."""
+ oidpath = self.getPathForOID(oid)
+ fd, name = tempfile.mkstemp(suffix='.tmp', prefix=utils.tid_repr(tid),
+ dir=oidpath)
+ return fd, name
+
+ def splitBlobFilename(self, filename):
+ """Returns the oid and tid for a given blob filename.
+
+ If the filename cannot be recognized as a blob filename, (None, None)
+ is returned.
+ """
+ if not filename.endswith(BLOB_SUFFIX):
+ return None, None
+ path, filename = os.path.split(filename)
+ oid = os.path.split(path)[1]
+
+ serial = filename[:-len(BLOB_SUFFIX)]
+ oid = utils.repr_to_oid(oid)
+ serial = utils.repr_to_oid(serial)
+ return oid, serial
+
+ def getOIDsForSerial(self, search_serial):
+ """ Return all oids related to a particular tid that exist in
+ blob data """
+ oids = []
+ base_dir = self.base_dir
+ for oidpath in os.listdir(base_dir):
+ for filename in os.listdir(os.path.join(base_dir, oidpath)):
+ blob_path = os.path.join(base_dir, oidpath, filename)
+ oid, serial = self.splitBlobFilename(blob_path)
+ if search_serial == serial:
+ oids.append(oid)
+ return oids
+
Modified: ZODB/branches/blob-merge-branch/src/ZODB/Blobs/BlobStorage.py
===================================================================
--- ZODB/branches/blob-merge-branch/src/ZODB/Blobs/BlobStorage.py 2006-02-27 19:09:29 UTC (rev 65530)
+++ ZODB/branches/blob-merge-branch/src/ZODB/Blobs/BlobStorage.py 2006-02-27 19:11:08 UTC (rev 65531)
@@ -23,9 +23,9 @@
from ZODB import utils
from ZODB.Blobs.interfaces import IBlobStorage, IBlob
from ZODB.POSException import POSKeyError
+from ZODB.Blobs.Blob import BLOB_SUFFIX
+from ZODB.Blobs.Blob import FilesystemHelper
-BLOB_SUFFIX = ".blob"
-
logger = logging.getLogger('ZODB.BlobStorage')
class BlobStorage(ProxyBase):
@@ -33,7 +33,7 @@
implements(IBlobStorage)
- __slots__ = ('base_directory', 'dirty_oids')
+ __slots__ = ('fshelper', 'dirty_oids')
# Proxies can't have a __dict__ so specifying __slots__ here allows
# us to have instance attributes explicitly on the proxy.
@@ -43,7 +43,7 @@
def __init__(self, base_directory, storage):
# TODO Complain if storage is ClientStorage
ProxyBase.__init__(self, storage)
- self.base_directory = base_directory
+ self.fshelper = FilesystemHelper(base_directory)
if not os.path.exists(self.base_directory):
os.makedirs(self.base_directory, 0700)
logger.info("Blob directory '%s' does not exist. "
@@ -64,11 +64,11 @@
self._lock_acquire()
try:
- targetpath = self._getBlobPath(oid)
+ targetpath = self.fshelper.getPathForOID(oid)
if not os.path.exists(targetpath):
os.makedirs(targetpath, 0700)
- targetname = self._getCleanFilename(oid, serial)
+ targetname = self.fshelper.getBlobFilename(oid, serial)
os.rename(blobfilename, targetname)
# XXX if oid already in there, something is really hosed.
@@ -78,17 +78,6 @@
self._lock_release()
return self._tid
- def _getBlobPath(self, oid):
- return os.path.join(self.base_directory,
- utils.oid_repr(oid)
- )
-
- def _getCleanFilename(self, oid, tid):
- return os.path.join(self._getBlobPath(oid),
- "%s%s" % (utils.tid_repr(tid),
- BLOB_SUFFIX,)
- )
-
def tpc_finish(self, *arg, **kw):
""" We need to override the base storage's tpc_finish instead of
providing a _finish method because methods found on the proxied object
@@ -103,14 +92,14 @@
getProxiedObject(self).tpc_abort(*arg, **kw)
while self.dirty_oids:
oid, serial = self.dirty_oids.pop()
- clean = self._getCleanFilename(oid, serial)
+ clean = self.fshelper.getBlobFilename(oid, serial)
if os.exists(clean):
os.unlink(clean)
def loadBlob(self, oid, serial, version):
"""Return the filename where the blob file can be found.
"""
- filename = self._getCleanFilename(oid, serial)
+ filename = self.fshelper.getBlobFilename(oid, serial)
if not os.path.exists(filename):
raise POSKeyError, "Not an existing blob."
return filename
@@ -125,17 +114,18 @@
# XXX we should be tolerant of "garbage" directories/files in
# the base_directory here.
- for oid_repr in os.listdir(self.base_directory):
+ base_dir = self.fshelper.base_dir
+ for oid_repr in os.listdir(base_dir):
oid = utils.repr_to_oid(oid_repr)
- oid_path = os.path.join(self.base_directory, oid_repr)
+ oid_path = os.path.join(base_dir, oid_repr)
files = os.listdir(oid_path)
files.sort()
for filename in files:
filepath = os.path.join(oid_path, filename)
- whatever, serial = self._splitBlobFilename(filepath)
+ whatever, serial = self.fshelper.splitBlobFilename(filepath)
try:
- fn = self._getCleanFilename(oid, serial)
+ fn = self.fshelper.getBlobFilename(oid, serial)
self.loadSerial(oid, serial)
except POSKeyError:
os.unlink(filepath)
@@ -144,9 +134,10 @@
shutil.rmtree(oid_path)
def _packNonUndoing(self, packtime, referencesf):
- for oid_repr in os.listdir(self.base_directory):
+ base_dir = self.fshelper.base_dir
+ for oid_repr in os.listdir(base_dir):
oid = utils.repr_to_oid(oid_repr)
- oid_path = os.path.join(self.base_directory, oid_repr)
+ oid_path = os.path.join(base_dir, oid_repr)
exists = True
try:
@@ -193,41 +184,29 @@
orig_size = getProxiedObject(self).getSize()
blob_size = 0
- for oid in os.listdir(self.base_directory):
- for serial in os.listdir(os.path.join(self.base_directory, oid)):
+ base_dir = self.fshelper.base_dir
+ for oid in os.listdir(base_dir):
+ for serial in os.listdir(os.path.join(base_dir, oid)):
if not serial.endswith(BLOB_SUFFIX):
continue
- file_path = os.path.join(self.base_directory, oid, serial)
+ file_path = os.path.join(base_dir, oid, serial)
blob_size += os.stat(file_path).st_size
return orig_size + blob_size
- def _splitBlobFilename(self, filename):
- """Returns OID, TID for a given blob filename.
-
- If it's not a blob filename, (None, None) is returned.
- """
- if not filename.endswith(BLOB_SUFFIX):
- return None, None
- path, filename = os.path.split(filename)
- oid = os.path.split(path)[1]
-
- serial = filename[:-len(BLOB_SUFFIX)]
- oid = utils.repr_to_oid(oid)
- serial = utils.repr_to_oid(serial)
- return oid, serial
-
def undo(self, serial_id, transaction):
serial, keys = getProxiedObject(self).undo(serial_id, transaction)
self._lock_acquire()
try:
# The old serial_id is given in base64 encoding ...
serial_id = base64.decodestring(serial_id+ '\n')
- for oid in self._getOIDsForSerial(serial_id):
- data, serial_before, serial_after = \
- self.loadBefore(oid, serial_id)
- orig = file(self._getCleanFilename(oid, serial_before), "r")
- new = file(self._getCleanFilename(oid, serial), "w")
+ for oid in self.fshelper.getOIDsForSerial(serial_id):
+ data, serial_before, serial_after = self.loadBefore(oid,
+ serial_id)
+ orig_fn = self.fshelper.getBlobFilename(oid, serial_before)
+ orig = open(orig_fn, "r")
+ new_fn = self.fshelper.getBlobFilename(oid, serial)
+ new = open(new_fn, "wb")
utils.cp(orig, new)
orig.close()
new.close()
@@ -236,14 +215,3 @@
self._lock_release()
return serial, keys
- def _getOIDsForSerial(self, search_serial):
- oids = []
- for oidpath in os.listdir(self.base_directory):
- for filename in os.listdir(os.path.join(self.base_directory,
- oidpath)):
- blob_path = os.path.join(self.base_directory, oidpath,
- filename)
- oid, serial = self._splitBlobFilename(blob_path)
- if search_serial == serial:
- oids.append(oid)
- return oids
Modified: ZODB/branches/blob-merge-branch/src/ZODB/Blobs/tests/packing.txt
===================================================================
--- ZODB/branches/blob-merge-branch/src/ZODB/Blobs/tests/packing.txt 2006-02-27 19:09:29 UTC (rev 65530)
+++ ZODB/branches/blob-merge-branch/src/ZODB/Blobs/tests/packing.txt 2006-02-27 19:11:08 UTC (rev 65531)
@@ -83,13 +83,13 @@
>>> tids.append(blob_storage._tid)
>>> oid = root['blob']._p_oid
- >>> fns = [ blob_storage._getCleanFilename(oid, x) for x in tids ]
+ >>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
>>> [ os.path.exists(x) for x in fns ]
[True, True, True, True, True]
Get our blob filenames for this oid.
- >>> fns = [ blob_storage._getCleanFilename(oid, x) for x in tids ]
+ >>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
Do a pack to the slightly before the first revision was written:
@@ -203,13 +203,13 @@
>>> tids.append(blob_storage._tid)
>>> oid = root['blob']._p_oid
- >>> fns = [ blob_storage._getCleanFilename(oid, x) for x in tids ]
+ >>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
>>> [ os.path.exists(x) for x in fns ]
[True, True, True, True, True]
Get our blob filenames for this oid.
- >>> fns = [ blob_storage._getCleanFilename(oid, x) for x in tids ]
+ >>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
Do a pack to the slightly before the first revision was written:
More information about the Zodb-checkins
mailing list