[Zodb-checkins] SVN: ZODB/trunk/ - Merged ctheune-blob-merge-branch
to trunk.
Christian Theune
ct at gocept.com
Wed Nov 29 10:30:37 EST 2006
Log message for revision 71330:
- Merged ctheune-blob-merge-branch to trunk.
Changed:
U ZODB/trunk/NEWS.txt
U ZODB/trunk/src/ZEO/ClientStorage.py
U ZODB/trunk/src/ZEO/ServerStub.py
U ZODB/trunk/src/ZEO/StorageServer.py
U ZODB/trunk/src/ZEO/tests/testZEO.py
A ZODB/trunk/src/ZODB/Blobs/
U ZODB/trunk/src/ZODB/Connection.py
U ZODB/trunk/src/ZODB/DB.py
U ZODB/trunk/src/ZODB/ExportImport.py
U ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
U ZODB/trunk/src/ZODB/component.xml
U ZODB/trunk/src/ZODB/config.py
A ZODB/trunk/src/ZODB/tests/loggingsupport.py
U ZODB/trunk/src/ZODB/tests/testConfig.py
U ZODB/trunk/src/ZODB/tests/testConnectionSavepoint.txt
U ZODB/trunk/src/ZODB/utils.py
_U ZODB/trunk/src/zope/
-=-
Modified: ZODB/trunk/NEWS.txt
===================================================================
--- ZODB/trunk/NEWS.txt 2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/NEWS.txt 2006-11-29 15:30:36 UTC (rev 71330)
@@ -25,7 +25,16 @@
Clean up weird import dance with ZODB. This is unnecessary since the
transaction module stopped being imported in ZODB/__init__.py in rev 39622.
+Blobs
+-----
+- (3.8a1) Added new blob feature. See the ZODB/Blobs directory for
+ documentation.
+
+ ZODB now handles (reasonably) large binary objects efficiently. Useful to
+ use from a few kilobytes to at least multiple hundred megabytes.
+
+
What's new on ZODB 3.7b2?
=========================
Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py 2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZEO/ClientStorage.py 2006-11-29 15:30:36 UTC (rev 71330)
@@ -27,6 +27,7 @@
import types
import logging
+from zope.interface import implements
from ZEO import ServerStub
from ZEO.cache import ClientCache
from ZEO.TransactionBuffer import TransactionBuffer
@@ -35,7 +36,10 @@
from ZEO.zrpc.client import ConnectionManager
from ZODB import POSException
+from ZODB import utils
from ZODB.loglevels import BLATHER
+from ZODB.Blobs.interfaces import IBlobStorage
+from ZODB.Blobs.Blob import FilesystemHelper
from persistent.TimeStamp import TimeStamp
logger = logging.getLogger('ZEO.ClientStorage')
@@ -93,6 +97,7 @@
tpc_begin().
"""
+ implements(IBlobStorage)
# Classes we instantiate. A subclass might override.
TransactionBufferClass = TransactionBuffer
@@ -106,7 +111,8 @@
wait_for_server_on_startup=None, # deprecated alias for wait
wait=None, wait_timeout=None,
read_only=0, read_only_fallback=0,
- username='', password='', realm=None):
+ username='', password='', realm=None,
+ blob_dir=None):
"""ClientStorage constructor.
This is typically invoked from a custom_zodb.py file.
@@ -177,6 +183,11 @@
password -- string with plaintext password to be used
when authenticated.
+ realm -- not documented.
+
+ blob_dir -- directory path for blob data. 'blob data' is data that
+ is retrieved via the loadBlob API.
+
Note that the authentication protocol is defined by the server
and is detected by the ClientStorage upon connecting (see
testConnection() and doAuth() for details).
@@ -303,6 +314,18 @@
# is executing.
self._lock = threading.Lock()
+ # XXX need to check for POSIX-ness here
+ if blob_dir is not None:
+ self.fshelper = FilesystemHelper(blob_dir)
+ self.fshelper.create()
+ self.fshelper.checkSecure()
+ else:
+ self.fshelper = None
+
+ # Initialize locks
+ self.blob_status_lock = threading.Lock()
+ self.blob_status = {}
+
# Decide whether to use non-temporary files
if client is not None:
dir = var or os.getcwd()
@@ -866,6 +889,118 @@
self._tbuf.store(oid, version, data)
return self._check_serials()
+ def storeBlob(self, oid, serial, data, blobfilename, version, txn):
+ """Storage API: store a blob object."""
+ serials = self.store(oid, serial, data, version, txn)
+ blobfile = open(blobfilename, "rb")
+ while True:
+ chunk = blobfile.read(1<<16)
+ # even if the blobfile is completely empty, we need to call
+ # storeBlob at least once in order to be able to call
+ # storeBlobEnd successfully.
+ self._server.storeBlob(oid, serial, chunk, version, id(txn))
+ if not chunk:
+ self._server.storeBlobEnd(oid, serial, data, version, id(txn))
+ break
+ blobfile.close()
+ os.unlink(blobfilename)
+ return serials
+
+ def _do_load_blob(self, oid, serial, version):
+ """Do the actual loading from the RPC server."""
+ blob_filename = self.fshelper.getBlobFilename(oid, serial)
+ if self._server is None:
+ raise ClientDisconnected()
+
+ targetpath = self.fshelper.getPathForOID(oid)
+ if not os.path.exists(targetpath):
+ os.makedirs(targetpath, 0700)
+
+ # We write to a temporary file first, so we do not accidentally
+ # allow half-baked copies of this blob be loaded
+ tempfd, tempfilename = self.fshelper.blob_mkstemp(oid, serial)
+ tempfile = os.fdopen(tempfd, 'wb')
+
+ offset = 0
+ while True:
+ chunk = self._server.loadBlob(oid, serial, version, offset)
+ if not chunk:
+ break
+ offset += len(chunk)
+ tempfile.write(chunk)
+
+ tempfile.close()
+ # XXX will fail on Windows if file is open
+ os.rename(tempfilename, blob_filename)
+ return blob_filename
+
+ def loadBlob(self, oid, serial, version):
+ """Loading a blob has to know about loading the same blob
+ from another thread as the same time.
+
+ 1. Check if the blob is downloaded already
+ 2. Check whether it is currently beeing downloaded
+ 2a. Wait for other download to finish, return
+ 3. If not beeing downloaded, start download
+ """
+ if self.fshelper is None:
+ raise POSException.Unsupported("No blob cache directory is "
+ "configured.")
+
+ blob_filename = self.fshelper.getBlobFilename(oid, serial)
+ # Case 1: Blob is available already, just use it
+ if os.path.exists(blob_filename):
+ log2("Found blob %s/%s in cache." % (utils.oid_repr(oid),
+ utils.tid_repr(serial)), level=BLATHER)
+ return blob_filename
+
+ # Case 2,3: Blob might still be downloading or not there yet
+
+ # Try to get or create a lock for the downloading of this blob,
+ # identified by it's oid and serial
+ lock_key = (oid, serial)
+
+ # We need to make the check for an existing lock and the possible
+ # creation of a new one atomic, so there is another lock:
+ self.blob_status_lock.acquire()
+ try:
+ if not self.blob_status.has_key(oid):
+ self.blob_status[lock_key] = self.getBlobLock()
+ lock = self.blob_status[lock_key]
+ finally:
+ self.blob_status_lock.release()
+
+ # We acquire the lock to either start downloading, or wait
+ # for another download to finish
+ lock.acquire()
+ try:
+ # If there was another download that is finished by now,
+ # we just take the result.
+ if os.path.exists(blob_filename):
+ log2("Found blob %s/%s in cache after it was downloaded "
+ "from another thread." % (utils.oid_repr(oid),
+ utils.tid_repr(serial)), level=BLATHER)
+ return blob_filename
+
+ # Otherwise we download and use that
+ return self._do_load_blob(oid, serial, version)
+ finally:
+ # When done we remove the download lock ...
+ lock.release()
+
+ # And the status information isn't needed as well,
+ # but we have to use the second lock here as well, to avoid
+ # making the creation of this status lock non-atomic (see above)
+ self.blob_status_lock.acquire()
+ try:
+ del self.blob_status[lock_key]
+ finally:
+ self.blob_status_lock.release()
+
+ def getBlobLock(self):
+ # indirection to support unit testing
+ return Lock()
+
def tpc_vote(self, txn):
"""Storage API: vote on a transaction."""
if txn is not self._transaction:
Modified: ZODB/trunk/src/ZEO/ServerStub.py
===================================================================
--- ZODB/trunk/src/ZEO/ServerStub.py 2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZEO/ServerStub.py 2006-11-29 15:30:36 UTC (rev 71330)
@@ -220,6 +220,12 @@
def storea(self, oid, serial, data, version, id):
self.rpc.callAsync('storea', oid, serial, data, version, id)
+ def storeBlobEnd(self, oid, serial, data, version, id):
+ self.rpc.callAsync('storeBlobEnd', oid, serial, data, version, id)
+
+ def storeBlob(self, oid, serial, chunk, version, id):
+ self.rpc.callAsync('storeBlob', oid, serial, chunk, version, id)
+
##
# Start two-phase commit for a transaction
# @param id id used by client to identify current transaction. The
@@ -262,6 +268,9 @@
def load(self, oid, version):
return self.rpc.call('load', oid, version)
+ def loadBlob(self, oid, serial, version, offset):
+ return self.rpc.call('loadBlob', oid, serial, version, offset)
+
def getSerial(self, oid):
return self.rpc.call('getSerial', oid)
Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py 2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZEO/StorageServer.py 2006-11-29 15:30:36 UTC (rev 71330)
@@ -42,20 +42,24 @@
from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.serialize import referencesf
-from ZODB.utils import u64, oid_repr
+from ZODB.utils import u64, oid_repr, mktemp
from ZODB.loglevels import BLATHER
+
logger = logging.getLogger('ZEO.StorageServer')
+
# TODO: This used to say "ZSS", which is now implied in the logger name.
# Can this be either set to str(os.getpid()) (if that makes sense) or removed?
_label = "" # default label used for logging.
+
def set_label():
"""Internal helper to reset the logging label (e.g. after fork())."""
global _label
_label = "%s" % os.getpid()
+
def log(message, level=logging.INFO, label=None, exc_info=False):
"""Internal helper to log a message."""
label = label or _label
@@ -63,9 +67,11 @@
message = "(%s) %s" % (label, message)
logger.log(level, message, exc_info=exc_info)
+
class StorageServerError(StorageError):
"""Error reported when an unpicklable exception is raised."""
+
class ZEOStorage:
"""Proxy to underlying storage for a single remote client."""
@@ -93,6 +99,9 @@
self.log_label = _label
self.authenticated = 0
self.auth_realm = auth_realm
+ self.blob_transfer = {}
+ self.blob_log = []
+ self.blob_loads = {}
# The authentication protocol may define extra methods.
self._extensions = {}
for func in self.extensions:
@@ -154,8 +163,7 @@
record_iternext = getattr(self.storage, 'record_iternext', None)
if record_iternext is not None:
self.record_iternext = record_iternext
-
-
+
try:
fn = self.storage.getExtensionMethods
except AttributeError:
@@ -460,6 +468,38 @@
self.stats.stores += 1
self.txnlog.store(oid, serial, data, version)
+ def storeBlobEnd(self, oid, serial, data, version, id):
+ key = (oid, id)
+ if key not in self.blob_transfer:
+ raise Exception, "Can't finish a non-started Blob"
+ tempname, tempfile = self.blob_transfer.pop(key)
+ tempfile.close()
+ self.blob_log.append((oid, serial, data, tempname, version))
+
+ def storeBlob(self, oid, serial, chunk, version, id):
+ # XXX check that underlying storage supports blobs
+ key = (oid, id)
+ if key not in self.blob_transfer:
+ tempname = mktemp()
+ tempfile = open(tempname, "wb")
+ self.blob_transfer[key] = (tempname, tempfile) # XXX Force close and remove them when Storage closes
+ else:
+ tempname, tempfile = self.blob_transfer[key]
+
+ tempfile.write(chunk)
+
+ def loadBlob(self, oid, serial, version, offset):
+ key = (oid, serial)
+ if not key in self.blob_loads:
+ self.blob_loads[key] = \
+ open(self.storage.loadBlob(oid, serial, version))
+ blobdata = self.blob_loads[key]
+ blobdata.seek(offset)
+ chunk = blobdata.read(4096)
+ if not chunk:
+ del self.blob_loads[key]
+ return chunk
+
# The following four methods return values, so they must acquire
# the storage lock and begin the transaction before returning.
@@ -602,6 +642,13 @@
# load oid, serial, data, version
if not self._store(*loader.load()):
break
+
+ # Blob support
+ while self.blob_log:
+ oid, oldserial, data, blobfilename, version = self.blob_log.pop()
+ self.storage.storeBlob(oid, oldserial, data, blobfilename,
+ version, self.transaction,)
+
resp = self._thunk()
if delay is not None:
delay.reply(resp)
@@ -919,6 +966,7 @@
if conn.obj in cl:
cl.remove(conn.obj)
+
class StubTimeoutThread:
def begin(self, client):
@@ -987,11 +1035,13 @@
else:
time.sleep(howlong)
+
def run_in_thread(method, *args):
t = SlowMethodThread(method, args)
t.start()
return t.delay
+
class SlowMethodThread(threading.Thread):
"""Thread to run potentially slow storage methods.
Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py 2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py 2006-11-29 15:30:36 UTC (rev 71330)
@@ -23,6 +23,7 @@
import tempfile
import time
import unittest
+import shutil
# ZODB test support
import ZODB
@@ -141,14 +142,16 @@
self._pids = [pid]
self._servers = [adminaddr]
self._conf_path = path
+ self.blob_cache_dir = tempfile.mkdtemp() # This is the blob cache for ClientStorage
self._storage = ClientStorage(zport, '1', cache_size=20000000,
min_disconnect_poll=0.5, wait=1,
- wait_timeout=60)
+ wait_timeout=60, blob_dir=self.blob_cache_dir)
self._storage.registerDB(DummyDB(), None)
def tearDown(self):
self._storage.close()
os.remove(self._conf_path)
+ shutil.rmtree(self.blob_cache_dir)
for server in self._servers:
forker.shutdown_zeo_server(server)
if hasattr(os, 'waitpid'):
@@ -210,7 +213,6 @@
def getConfig(self):
return """<mappingstorage 1/>"""
-
class HeartbeatTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
"""Make sure a heartbeat is being sent and that it does no harm
@@ -395,6 +397,139 @@
ConnectionInvalidationOnReconnect,
]
+class BlobAdaptedFileStorageTests(GenericTests):
+ """ZEO backed by a BlobStorage-adapted FileStorage."""
+ def setUp(self):
+ self.blobdir = tempfile.mkdtemp() # This is the blob directory on the ZEO server
+ self.filestorage = tempfile.mktemp()
+ super(BlobAdaptedFileStorageTests, self).setUp()
+
+ def tearDown(self):
+ super(BlobAdaptedFileStorageTests, self).tearDown()
+ shutil.rmtree(self.blobdir)
+
+ def getConfig(self):
+ return """
+ <blobstorage 1>
+ blob-dir %s
+ <filestorage 2>
+ path %s
+ </filestorage>
+ </blobstorage>
+ """ % (self.blobdir, self.filestorage)
+
+ def checkStoreBlob(self):
+ from ZODB.utils import oid_repr, tid_repr
+ from ZODB.Blobs.Blob import Blob
+ from ZODB.Blobs.BlobStorage import BLOB_SUFFIX
+ from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
+ handle_serials
+ import transaction
+
+ somedata = 'a' * 10
+
+ blob = Blob()
+ bd_fh = blob.open('w')
+ bd_fh.write(somedata)
+ bd_fh.close()
+ tfname = bd_fh.name
+ oid = self._storage.new_oid()
+ data = zodb_pickle(blob)
+ self.assert_(os.path.exists(tfname))
+
+ t = transaction.Transaction()
+ try:
+ self._storage.tpc_begin(t)
+ r1 = self._storage.storeBlob(oid, ZERO, data, tfname, '', t)
+ r2 = self._storage.tpc_vote(t)
+ revid = handle_serials(oid, r1, r2)
+ self._storage.tpc_finish(t)
+ except:
+ self._storage.tpc_abort(t)
+ raise
+ self.assert_(not os.path.exists(tfname))
+ filename = os.path.join(self.blobdir, oid_repr(oid),
+ tid_repr(revid) + BLOB_SUFFIX)
+ self.assert_(os.path.exists(filename))
+ self.assertEqual(somedata, open(filename).read())
+
+ def checkLoadBlob(self):
+ from ZODB.Blobs.Blob import Blob
+ from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
+ handle_serials
+ import transaction
+
+ version = ''
+ somedata = 'a' * 10
+
+ blob = Blob()
+ bd_fh = blob.open('w')
+ bd_fh.write(somedata)
+ bd_fh.close()
+ tfname = bd_fh.name
+ oid = self._storage.new_oid()
+ data = zodb_pickle(blob)
+
+ t = transaction.Transaction()
+ try:
+ self._storage.tpc_begin(t)
+ r1 = self._storage.storeBlob(oid, ZERO, data, tfname, '', t)
+ r2 = self._storage.tpc_vote(t)
+ serial = handle_serials(oid, r1, r2)
+ self._storage.tpc_finish(t)
+ except:
+ self._storage.tpc_abort(t)
+ raise
+
+
+ class Dummy:
+ def __init__(self):
+ self.acquired = 0
+ self.released = 0
+ def acquire(self):
+ self.acquired += 1
+ def release(self):
+ self.released += 1
+
+ class statusdict(dict):
+ def __init__(self):
+ self.added = []
+ self.removed = []
+
+ def __setitem__(self, k, v):
+ self.added.append(k)
+ super(statusdict, self).__setitem__(k, v)
+
+ def __delitem__(self, k):
+ self.removed.append(k)
+ super(statusdict, self).__delitem__(k)
+
+ # ensure that we do locking properly
+ filename = self._storage.fshelper.getBlobFilename(oid, serial)
+ thestatuslock = self._storage.blob_status_lock = Dummy()
+ thebloblock = Dummy()
+
+ def getBlobLock():
+ return thebloblock
+
+ # override getBlobLock to test that locking is performed
+ self._storage.getBlobLock = getBlobLock
+ thestatusdict = self._storage.blob_status = statusdict()
+
+ filename = self._storage.loadBlob(oid, serial, version)
+
+ self.assertEqual(thestatuslock.acquired, 2)
+ self.assertEqual(thestatuslock.released, 2)
+
+ self.assertEqual(thebloblock.acquired, 1)
+ self.assertEqual(thebloblock.released, 1)
+
+ self.assertEqual(thestatusdict.added, [(oid, serial)])
+ self.assertEqual(thestatusdict.removed, [(oid, serial)])
+
+test_classes = [FileStorageTests, MappingStorageTests,
+ BlobAdaptedFileStorageTests]
+
def test_suite():
suite = unittest.TestSuite()
for klass in test_classes:
Copied: ZODB/trunk/src/ZODB/Blobs (from rev 71329, ZODB/branches/blob-merge-branch/src/ZODB/Blobs)
Modified: ZODB/trunk/src/ZODB/Connection.py
===================================================================
--- ZODB/trunk/src/ZODB/Connection.py 2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZODB/Connection.py 2006-11-29 15:30:36 UTC (rev 71330)
@@ -20,6 +20,8 @@
import tempfile
import threading
import warnings
+import os
+import shutil
from time import time
from persistent import PickleCache
@@ -27,6 +29,8 @@
# interfaces
from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection
+from ZODB.Blobs.interfaces import IBlob, IBlobStorage
+from ZODB.Blobs.BlobStorage import BlobStorage
from transaction.interfaces import ISavepointDataManager
from transaction.interfaces import IDataManagerSavepoint
from transaction.interfaces import ISynchronizer
@@ -39,8 +43,11 @@
from ZODB import POSException
from ZODB.POSException import InvalidObjectReference, ConnectionStateError
from ZODB.POSException import ConflictError, ReadConflictError
+from ZODB.POSException import Unsupported
+from ZODB.POSException import POSKeyError
from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr
from ZODB.utils import p64, u64, z64, oid_repr, positive_id
+from ZODB import utils
global_reset_counter = 0
@@ -591,7 +598,29 @@
raise ConflictError(object=obj)
self._modified.append(oid)
p = writer.serialize(obj) # This calls __getstate__ of obj
- s = self._storage.store(oid, serial, p, self._version, transaction)
+
+ # This is a workaround to calling IBlob.proivdedBy(obj). Calling
+ # Interface.providedBy on a object to be stored can invertible
+ # set the '__providedBy__' and '__implemented__' attributes on the
+ # object. This interferes the storing of the object by requesting
+ # that the values of these objects should be stored with the ZODB.
+ providedBy = getattr(obj, '__providedBy__', None)
+ if providedBy is not None and IBlob in providedBy:
+ if not IBlobStorage.providedBy(self._storage):
+ raise Unsupported(
+ "Storing Blobs in %s is not supported." %
+ repr(self._storage))
+ s = self._storage.storeBlob(oid, serial, p,
+ obj._p_blob_uncommitted,
+ self._version, transaction)
+ # we invalidate the object here in order to ensure
+ # that that the next attribute access of its name
+ # unghostify it, which will cause its blob data
+ # to be reattached "cleanly"
+ obj._p_invalidate()
+ else:
+ s = self._storage.store(oid, serial, p, self._version,
+ transaction)
self._store_count += 1
# Put the object in the cache before handling the
# response, just in case the response contains the
@@ -801,7 +830,7 @@
if self._invalidatedCache:
- raise ReadConflictError()
+ raise ReadConflictError()
if (obj._p_oid in self._invalidated and
not myhasattr(obj, "_p_independent")):
@@ -830,6 +859,13 @@
self._reader.setGhostState(obj, p)
obj._p_serial = serial
+ # Blob support
+ providedBy = getattr(obj, '__providedBy__', None)
+ if providedBy is not None and IBlob in providedBy:
+ obj._p_blob_uncommitted = None
+ obj._p_blob_data = \
+ self._storage.loadBlob(obj._p_oid, serial, self._version)
+
def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
if not (self._mvcc and self._setstate_noncurrent(obj)):
@@ -1049,8 +1085,9 @@
def savepoint(self):
if self._savepoint_storage is None:
- self._savepoint_storage = TmpStore(self._version,
- self._normal_storage)
+ # XXX what to do about IBlobStorages?
+ tmpstore = TmpStore(self._version, self._normal_storage)
+ self._savepoint_storage = tmpstore
self._storage = self._savepoint_storage
self._creating.clear()
@@ -1082,7 +1119,7 @@
self._storage = self._normal_storage
self._savepoint_storage = None
- self._log.debug("Commiting savepoints of size %s", src.getSize())
+ self._log.debug("Committing savepoints of size %s", src.getSize())
oids = src.index.keys()
# Copy invalidating and creating info from temporary storage:
@@ -1091,10 +1128,20 @@
for oid in oids:
data, serial = src.load(oid, src)
- s = self._storage.store(oid, serial, data,
- self._version, transaction)
+ try:
+ blobfilename = src.loadBlob(oid, serial, self._version)
+ except POSKeyError:
+ s = self._storage.store(oid, serial, data,
+ self._version, transaction)
+ else:
+ s = self._storage.storeBlob(oid, serial, data, blobfilename,
+ self._version, transaction)
+ # we invalidate the object here in order to ensure
+ # that that the next attribute access of its name
+ # unghostify it, which will cause its blob data
+ # to be reattached "cleanly"
+ self.invalidate(s, {oid:True})
self._handle_serial(s, oid, change=False)
-
src.close()
def _abort_savepoint(self):
@@ -1137,9 +1184,14 @@
def rollback(self):
self.datamanager._rollback(self.state)
+BLOB_SUFFIX = ".blob"
+BLOB_DIRTY = "store"
+
class TmpStore:
"""A storage-like thing to support savepoints."""
+ implements(IBlobStorage)
+
def __init__(self, base_version, storage):
self._storage = storage
for method in (
@@ -1149,6 +1201,10 @@
setattr(self, method, getattr(storage, method))
self._base_version = base_version
+ tmpdir = os.environ.get('ZODB_BLOB_TEMPDIR')
+ if tmpdir is None:
+ tmpdir = tempfile.mkdtemp()
+ self._blobdir = tmpdir
self._file = tempfile.TemporaryFile()
# position: current file position
# _tpos: file position at last commit point
@@ -1162,6 +1218,7 @@
def close(self):
self._file.close()
+ shutil.rmtree(self._blobdir)
def load(self, oid, version):
pos = self.index.get(oid)
@@ -1193,6 +1250,37 @@
self.position += l + len(header)
return serial
+ def storeBlob(self, oid, serial, data, blobfilename, version,
+ transaction):
+ serial = self.store(oid, serial, data, version, transaction)
+ assert isinstance(serial, str) # XXX in theory serials could be
+ # something else
+
+ targetpath = self._getBlobPath(oid)
+ if not os.path.exists(targetpath):
+ os.makedirs(targetpath, 0700)
+
+ targetname = self._getCleanFilename(oid, serial)
+ os.rename(blobfilename, targetname)
+
+ def loadBlob(self, oid, serial, version):
+ """Return the filename where the blob file can be found.
+ """
+ filename = self._getCleanFilename(oid, serial)
+ if not os.path.exists(filename):
+ raise POSKeyError, "Not an existing blob."
+ return filename
+
+ def _getBlobPath(self, oid):
+ return os.path.join(self._blobdir,
+ utils.oid_repr(oid)
+ )
+
+ def _getCleanFilename(self, oid, tid):
+ return os.path.join(self._getBlobPath(oid),
+ "%s%s" % (utils.tid_repr(tid),
+ BLOB_SUFFIX,)
+ )
def reset(self, position, index):
self._file.truncate(position)
self.position = position
@@ -1206,3 +1294,4 @@
# a copy of the index here. An alternative would be to ensure that
# all callers pass copies. As is, our callers do not make copies.
self.index = index.copy()
+
Modified: ZODB/trunk/src/ZODB/DB.py
===================================================================
--- ZODB/trunk/src/ZODB/DB.py 2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZODB/DB.py 2006-11-29 15:30:36 UTC (rev 71330)
@@ -232,10 +232,10 @@
# Setup storage
self._storage=storage
storage.registerDB(self, None)
- if not hasattr(storage,'tpc_vote'):
+ if not hasattr(storage, 'tpc_vote'):
storage.tpc_vote = lambda *args: None
try:
- storage.load(z64,'')
+ storage.load(z64, '')
except KeyError:
# Create the database's root in the storage if it doesn't exist
from persistent.mapping import PersistentMapping
Modified: ZODB/trunk/src/ZODB/ExportImport.py
===================================================================
--- ZODB/trunk/src/ZODB/ExportImport.py 2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZODB/ExportImport.py 2006-11-29 15:30:36 UTC (rev 71330)
@@ -13,13 +13,16 @@
##############################################################################
"""Support for database export and import."""
+import os
+
from cStringIO import StringIO
from cPickle import Pickler, Unpickler
from tempfile import TemporaryFile
import logging
-from ZODB.POSException import ExportError
-from ZODB.utils import p64, u64
+from ZODB.POSException import ExportError, POSKeyError
+from ZODB.utils import p64, u64, cp, mktemp
+from ZODB.Blobs.interfaces import IBlobStorage
from ZODB.serialize import referencesf
logger = logging.getLogger('ZODB.ExportImport')
@@ -49,6 +52,21 @@
else:
referencesf(p, oids)
f.writelines([oid, p64(len(p)), p])
+ # Blob support
+ if not IBlobStorage.providedBy(self._storage):
+ continue
+ try:
+ blobfilename = self._storage.loadBlob(oid,
+ serial, self._version)
+ except POSKeyError: # Looks like this is not a blob
+ continue
+
+ f.write(blob_begin_marker)
+ f.write(p64(os.stat(blobfilename).st_size))
+ blobdata = open(blobfilename, "rb")
+ cp(blobdata, f)
+ blobdata.close()
+
f.write(export_end_marker)
return f
@@ -113,17 +131,20 @@
version = self._version
while 1:
- h = f.read(16)
- if h == export_end_marker:
+ header = f.read(16)
+ if header == export_end_marker:
break
- if len(h) != 16:
+ if len(header) != 16:
raise ExportError("Truncated export file")
- l = u64(h[8:16])
- p = f.read(l)
- if len(p) != l:
+
+ # Extract header information
+ ooid = header[:8]
+ length = u64(header[8:16])
+ data = f.read(length)
+
+ if len(data) != length:
raise ExportError("Truncated export file")
- ooid = h[:8]
if oids:
oid = oids[ooid]
if isinstance(oid, tuple):
@@ -132,7 +153,21 @@
oids[ooid] = oid = self._storage.new_oid()
return_oid_list.append(oid)
- pfile = StringIO(p)
+ # Blob support
+ blob_begin = f.read(len(blob_begin_marker))
+ if blob_begin == blob_begin_marker:
+ # Copy the blob data to a temporary file
+ # and remember the name
+ blob_len = u64(f.read(8))
+ blob_filename = mktemp()
+ blob_file = open(blob_filename, "wb")
+ cp(f, blob_file, blob_len)
+ blob_file.close()
+ else:
+ f.seek(-len(blob_begin_marker),1)
+ blob_filename = None
+
+ pfile = StringIO(data)
unpickler = Unpickler(pfile)
unpickler.persistent_load = persistent_load
@@ -142,12 +177,17 @@
pickler.dump(unpickler.load())
pickler.dump(unpickler.load())
- p = newp.getvalue()
+ data = newp.getvalue()
- self._storage.store(oid, None, p, version, transaction)
+ if blob_filename is not None:
+ self._storage.storeBlob(oid, None, data, blob_filename,
+ version, transaction)
+ else:
+ self._storage.store(oid, None, data, version, transaction)
export_end_marker = '\377'*16
+blob_begin_marker = '\000BLOBSTART'
class Ghost(object):
__slots__ = ("oid",)
Modified: ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/FileStorage.py 2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZODB/FileStorage/FileStorage.py 2006-11-29 15:30:36 UTC (rev 71330)
@@ -628,7 +628,7 @@
finally:
self._lock_release()
- def store(self, oid, serial, data, version, transaction):
+ def store(self, oid, oldserial, data, version, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
if transaction is not self._transaction:
@@ -651,12 +651,12 @@
pnv = h.pnv
cached_tid = h.tid
- if serial != cached_tid:
+ if oldserial != cached_tid:
rdata = self.tryToResolveConflict(oid, cached_tid,
- serial, data)
+ oldserial, data)
if rdata is None:
raise POSException.ConflictError(
- oid=oid, serials=(cached_tid, serial), data=data)
+ oid=oid, serials=(cached_tid, oldserial), data=data)
else:
data = rdata
@@ -686,7 +686,7 @@
raise FileStorageQuotaError(
"The storage quota has been exceeded.")
- if old and serial != cached_tid:
+ if old and oldserial != cached_tid:
return ConflictResolution.ResolvedSerial
else:
return self._tid
Modified: ZODB/trunk/src/ZODB/component.xml
===================================================================
--- ZODB/trunk/src/ZODB/component.xml 2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZODB/component.xml 2006-11-29 15:30:36 UTC (rev 71330)
@@ -65,6 +65,11 @@
<sectiontype name="zeoclient" datatype=".ZEOClient"
implements="ZODB.storage">
<multikey name="server" datatype="socket-connection-address" required="yes"/>
+ <key name="blob-dir" required="no">
+ <description>
+ Path name to the blob storage directory.
+ </description>
+ </key>
<key name="storage" default="1">
<description>
The name of the storage that the client wants to use. If the
@@ -189,4 +194,18 @@
</description>
</sectiontype>
+ <sectiontype name="blobstorage" datatype=".BlobStorage"
+ implements="ZODB.storage">
+ <key name="blob-dir" required="yes">
+ <description>
+ Path name to the blob storage directory.
+ </description>
+ </key>
+ <section type="ZODB.storage" name="*" attribute="base"/>
+ </sectiontype>
+
+
+
+
+
</component>
Modified: ZODB/trunk/src/ZODB/config.py
===================================================================
--- ZODB/trunk/src/ZODB/config.py 2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZODB/config.py 2006-11-29 15:30:36 UTC (rev 71330)
@@ -86,7 +86,7 @@
self.config = config
self.name = config.getSectionName()
- def open(self):
+ def open(self, database_name='unnamed', databases=None):
"""Open and return the storage object."""
raise NotImplementedError
@@ -134,6 +134,14 @@
read_only=self.config.read_only,
quota=self.config.quota)
+class BlobStorage(BaseConfig):
+
+ def open(self):
+ from ZODB.Blobs.BlobStorage import BlobStorage
+ base = self.config.base.open()
+ return BlobStorage(self.config.blob_dir, base)
+
+
class ZEOClient(BaseConfig):
def open(self):
@@ -143,6 +151,7 @@
L = [server.address for server in self.config.server]
return ClientStorage(
L,
+ blob_dir=self.config.blob_dir,
storage=self.config.storage,
cache_size=self.config.cache_size,
name=self.config.name,
Copied: ZODB/trunk/src/ZODB/tests/loggingsupport.py (from rev 71329, ZODB/branches/blob-merge-branch/src/ZODB/tests/loggingsupport.py)
Modified: ZODB/trunk/src/ZODB/tests/testConfig.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testConfig.py 2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZODB/tests/testConfig.py 2006-11-29 15:30:36 UTC (rev 71330)
@@ -102,6 +102,9 @@
# an elaborate comment explaining this instead. Go ahead,
# grep for 9.
from ZEO.ClientStorage import ClientDisconnected
+ import ZConfig
+ from ZODB.config import getDbSchema
+ from StringIO import StringIO
cfg = """
<zodb>
<zeoclient>
@@ -110,9 +113,26 @@
</zeoclient>
</zodb>
"""
+ config, handle = ZConfig.loadConfigFile(getDbSchema(), StringIO(cfg))
+ self.assertEqual(config.database.config.storage.config.blob_dir,
+ None)
self.assertRaises(ClientDisconnected, self._test, cfg)
+ cfg = """
+ <zodb>
+ <zeoclient>
+ blob-dir /tmp
+ server localhost:56897
+ wait false
+ </zeoclient>
+ </zodb>
+ """
+ config, handle = ZConfig.loadConfigFile(getDbSchema(), StringIO(cfg))
+ self.assertEqual(config.database.config.storage.config.blob_dir,
+ '/tmp')
+ self.assertRaises(ClientDisconnected, self._test, cfg)
+
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(ZODBConfigTest))
Modified: ZODB/trunk/src/ZODB/tests/testConnectionSavepoint.txt
===================================================================
--- ZODB/trunk/src/ZODB/tests/testConnectionSavepoint.txt 2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZODB/tests/testConnectionSavepoint.txt 2006-11-29 15:30:36 UTC (rev 71330)
@@ -193,3 +193,4 @@
InvalidSavepointRollbackError
>>> transaction.abort()
+
Modified: ZODB/trunk/src/ZODB/utils.py
===================================================================
--- ZODB/trunk/src/ZODB/utils.py 2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZODB/utils.py 2006-11-29 15:30:36 UTC (rev 71330)
@@ -16,11 +16,13 @@
import time
import struct
from struct import pack, unpack
-from binascii import hexlify
+from binascii import hexlify, unhexlify
import cPickle as pickle
from cStringIO import StringIO
import weakref
import warnings
+from tempfile import mkstemp
+import os
from persistent.TimeStamp import TimeStamp
@@ -82,21 +84,34 @@
U64 = u64
-def cp(f1, f2, l):
+def cp(f1, f2, length=None):
+ """Copy all data from one file to another.
+
+ It copies the data from the current position of the input file (f1)
+ appending it to the current position of the output file (f2).
+
+ It copies at most 'length' bytes. If 'length' isn't given, it copies
+ until the end of the input file.
+ """
read = f1.read
write = f2.write
n = 8192
- while l > 0:
- if n > l:
- n = l
- d = read(n)
- if not d:
+ if length is None:
+ old_pos = f1.tell()
+ f1.seek(0,2)
+ length = f1.tell()
+ f1.seek(old_pos)
+
+ while length > 0:
+ if n > length:
+ n = length
+ data = read(n)
+ if not data:
break
- write(d)
- l = l - len(d)
+ write(data)
+ length -= len(data)
-
def newTimeStamp(old=None,
TimeStamp=TimeStamp,
time=time.time, gmtime=time.gmtime):
@@ -120,6 +135,13 @@
else:
return repr(oid)
+def repr_to_oid(repr):
+ if repr.startswith("0x"):
+ repr = repr[2:]
+ as_bin = unhexlify(repr)
+ as_bin = "\x00"*(8-len(as_bin)) + as_bin
+ return as_bin
+
serial_repr = oid_repr
tid_repr = serial_repr
@@ -265,3 +287,12 @@
# We're cheating by breaking into the internals of Python's
# WeakValueDictionary here (accessing its .data attribute).
return self.data.data.values()
+
+
+def mktemp(dir=None):
+ """Create a temp file, known by name, in a semi-secure manner."""
+ handle, filename = mkstemp(dir=dir)
+ os.close(handle)
+ return filename
+
+
Property changes on: ZODB/trunk/src/zope
___________________________________________________________________
Name: svn:externals
- interface svn://svn.zope.org/repos/main/Zope3/tags/Zope-3.2.0/src/zope/interface
proxy svn://svn.zope.org/repos/main/Zope3/tags/Zope-3.2.0/src/zope/proxy
testing svn://svn.zope.org/repos/main/zope.testing/trunk/src/zope/testing
+ interface svn://svn.zope.org/repos/main/Zope3/trunk/src/zope/interface
proxy svn://svn.zope.org/repos/main/Zope3/trunk/src/zope/proxy
testing svn://svn.zope.org/repos/main/zope.testing/trunk/src/zope/testing
More information about the Zodb-checkins
mailing list