[Zodb-checkins] SVN: ZODB/branches/ctheune-blobszerocopy/src/Z -
Added `shared writable cache mode` to allow using a shared
writable blob
Christian Theune
ct at gocept.com
Thu Mar 8 15:05:02 EST 2007
Log message for revision 73072:
- Added `shared writable cache mode` to allow using a shared writable blob
area to avoid transferring blobs through ZEO.
- Added test and fix for `getBlobLock` when loading a blob
Changed:
U ZODB/branches/ctheune-blobszerocopy/src/ZEO/ClientStorage.py
U ZODB/branches/ctheune-blobszerocopy/src/ZEO/ServerStub.py
U ZODB/branches/ctheune-blobszerocopy/src/ZEO/StorageServer.py
U ZODB/branches/ctheune-blobszerocopy/src/ZEO/tests/testZEO.py
U ZODB/branches/ctheune-blobszerocopy/src/ZODB/component.xml
U ZODB/branches/ctheune-blobszerocopy/src/ZODB/config.py
-=-
Modified: ZODB/branches/ctheune-blobszerocopy/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/ctheune-blobszerocopy/src/ZEO/ClientStorage.py 2007-03-08 19:38:41 UTC (rev 73071)
+++ ZODB/branches/ctheune-blobszerocopy/src/ZEO/ClientStorage.py 2007-03-08 20:05:00 UTC (rev 73072)
@@ -112,7 +112,7 @@
wait=None, wait_timeout=None,
read_only=0, read_only_fallback=0,
username='', password='', realm=None,
- blob_dir=None):
+ blob_dir=None, blob_cache_writable=False):
"""ClientStorage constructor.
This is typically invoked from a custom_zodb.py file.
@@ -188,6 +188,10 @@
blob_dir -- directory path for blob data. 'blob data' is data that
is retrieved via the loadBlob API.
+ blob_cache_writable -- Flag whether the blob_dir is a writable shared
+ filesystem that should be used instead of transferring blob data over
+ zrpc.
+
Note that the authentication protocol is defined by the server
and is detected by the ClientStorage upon connecting (see
testConnection() and doAuth() for details).
@@ -315,6 +319,8 @@
self._lock = threading.Lock()
# XXX need to check for POSIX-ness here
+ self.blob_dir = blob_dir
+ self.blob_cache_writable = blob_cache_writable
if blob_dir is not None:
self.fshelper = FilesystemHelper(blob_dir)
self.fshelper.create()
@@ -892,6 +898,26 @@
def storeBlob(self, oid, serial, data, blobfilename, version, txn):
"""Storage API: store a blob object."""
serials = self.store(oid, serial, data, version, txn)
+ if self.blob_cache_writable:
+ self._storeBlob_shared(oid, serial, data, blobfilename, version, txn)
+ else:
+ self._storeBlob_copy(oid, serial, data, blobfilename, version, txn)
+ return serials
+
+ def _storeBlob_shared(self, oid, serial, data, filename, version, txn):
+ # First, move the blob into the blob directory
+ dir = self.fshelper.getPathForOID(oid)
+ if not os.path.exists(dir):
+ os.mkdir(dir)
+ fd, target = self.fshelper.blob_mkstemp(oid, serial)
+ os.close(fd)
+ os.rename(filename, target)
+ # Now tell the server where we put it
+ self._server.storeBlobShared(oid, serial, data,
+ os.path.basename(target), version, id(txn))
+
+ def _storeBlob_copy(self, oid, serial, data, blobfilename, version, txn):
+ """Version of storeBlob() that copies the data over the ZEO protocol."""
blobfile = open(blobfilename, "rb")
while True:
chunk = blobfile.read(1<<16)
@@ -904,7 +930,6 @@
break
blobfile.close()
os.unlink(blobfilename)
- return serials
def _do_load_blob(self, oid, serial, version):
"""Do the actual loading from the RPC server."""
@@ -999,7 +1024,7 @@
def getBlobLock(self):
# indirection to support unit testing
- return Lock()
+ return threading.Lock()
def tpc_vote(self, txn):
"""Storage API: vote on a transaction."""
Modified: ZODB/branches/ctheune-blobszerocopy/src/ZEO/ServerStub.py
===================================================================
--- ZODB/branches/ctheune-blobszerocopy/src/ZEO/ServerStub.py 2007-03-08 19:38:41 UTC (rev 73071)
+++ ZODB/branches/ctheune-blobszerocopy/src/ZEO/ServerStub.py 2007-03-08 20:05:00 UTC (rev 73072)
@@ -226,6 +226,10 @@
def storeBlob(self, oid, serial, chunk, version, id):
self.rpc.callAsync('storeBlob', oid, serial, chunk, version, id)
+ def storeBlobShared(self, oid, serial, data, filename, version, id):
+ self.rpc.callAsync('storeBlobShared', oid, serial, data, filename,
+ version, id)
+
##
# Start two-phase commit for a transaction
# @param id id used by client to identify current transaction. The
Modified: ZODB/branches/ctheune-blobszerocopy/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/ctheune-blobszerocopy/src/ZEO/StorageServer.py 2007-03-08 19:38:41 UTC (rev 73071)
+++ ZODB/branches/ctheune-blobszerocopy/src/ZEO/StorageServer.py 2007-03-08 20:05:00 UTC (rev 73072)
@@ -482,12 +482,18 @@
if key not in self.blob_transfer:
tempname = mktemp()
tempfile = open(tempname, "wb")
- self.blob_transfer[key] = (tempname, tempfile) # XXX Force close and remove them when Storage closes
+ # XXX Force close and remove them when Storage closes
+ self.blob_transfer[key] = (tempname, tempfile)
else:
tempname, tempfile = self.blob_transfer[key]
+ tempfile.write(chunk)
- tempfile.write(chunk)
-
+ def storeBlobShared(self, oid, serial, data, filename, version, id):
+ # Reconstruct the full path from the filename in the OID directory
+ filename = os.path.join(self.storage.fshelper.getPathForOID(oid),
+ filename)
+ self.blob_log.append((oid, serial, data, filename, version))
+
def loadBlob(self, oid, serial, version, offset):
key = (oid, serial)
if not key in self.blob_loads:
Modified: ZODB/branches/ctheune-blobszerocopy/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/ctheune-blobszerocopy/src/ZEO/tests/testZEO.py 2007-03-08 19:38:41 UTC (rev 73071)
+++ ZODB/branches/ctheune-blobszerocopy/src/ZEO/tests/testZEO.py 2007-03-08 20:05:00 UTC (rev 73072)
@@ -133,6 +133,9 @@
"""Combine tests from various origins in one class."""
+ blob_cache_writable = False
+ blob_cache_dir = None
+
def setUp(self):
logger.info("setUp() %s", self.id())
port = get_port()
@@ -142,10 +145,12 @@
self._pids = [pid]
self._servers = [adminaddr]
self._conf_path = path
- self.blob_cache_dir = tempfile.mkdtemp() # This is the blob cache for ClientStorage
+ if not self.blob_cache_dir:
+ self.blob_cache_dir = tempfile.mkdtemp() # This is the blob cache for ClientStorage
self._storage = ClientStorage(zport, '1', cache_size=20000000,
min_disconnect_poll=0.5, wait=1,
- wait_timeout=60, blob_dir=self.blob_cache_dir)
+ wait_timeout=60, blob_dir=self.blob_cache_dir,
+ blob_cache_writable=self.blob_cache_writable)
self._storage.registerDB(DummyDB(), None)
def tearDown(self):
@@ -397,16 +402,14 @@
ConnectionInvalidationOnReconnect,
]
-class BlobAdaptedFileStorageTests(GenericTests):
- """ZEO backed by a BlobStorage-adapted FileStorage."""
- def setUp(self):
- self.blobdir = tempfile.mkdtemp() # This is the blob directory on the ZEO server
- self.filestorage = tempfile.mktemp()
- super(BlobAdaptedFileStorageTests, self).setUp()
+class CommonBlobTests:
def tearDown(self):
super(BlobAdaptedFileStorageTests, self).tearDown()
- shutil.rmtree(self.blobdir)
+ if os.path.exists(self.blobdir):
+ # Might be gone already if the super() method deleted
+ # the shared directory. Don't worry.
+ shutil.rmtree(self.blobdir)
def getConfig(self):
return """
@@ -452,7 +455,7 @@
tid_repr(revid) + BLOB_SUFFIX)
self.assert_(os.path.exists(filename))
self.assertEqual(somedata, open(filename).read())
-
+
def checkLoadBlob(self):
from ZODB.Blobs.Blob import Blob
from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
@@ -481,7 +484,47 @@
self._storage.tpc_abort(t)
raise
+ filename = self._storage.loadBlob(oid, serial, version)
+ self.assertEquals(somedata, open(filename, 'rb').read())
+
+class BlobAdaptedFileStorageTests(GenericTests, CommonBlobTests):
+ """ZEO backed by a BlobStorage-adapted FileStorage."""
+
+ def setUp(self):
+ self.blobdir = tempfile.mkdtemp() # This is the blob directory on the ZEO server
+ self.filestorage = tempfile.mktemp()
+ super(BlobAdaptedFileStorageTests, self).setUp()
+
+ def checkLoadBlobLocks(self):
+ from ZODB.Blobs.Blob import Blob
+ from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
+ handle_serials
+ import transaction
+
+ version = ''
+ somedata = 'a' * 10
+
+ blob = Blob()
+ bd_fh = blob.open('w')
+ bd_fh.write(somedata)
+ bd_fh.close()
+ tfname = bd_fh.name
+ oid = self._storage.new_oid()
+ data = zodb_pickle(blob)
+
+ t = transaction.Transaction()
+ try:
+ self._storage.tpc_begin(t)
+ r1 = self._storage.storeBlob(oid, ZERO, data, tfname, '', t)
+ r2 = self._storage.tpc_vote(t)
+ serial = handle_serials(oid, r1, r2)
+ self._storage.tpc_finish(t)
+ except:
+ self._storage.tpc_abort(t)
+ raise
+
+
class Dummy:
def __init__(self):
self.acquired = 0
@@ -527,8 +570,18 @@
self.assertEqual(thestatusdict.added, [(oid, serial)])
self.assertEqual(thestatusdict.removed, [(oid, serial)])
+
+class BlobWritableCacheTests(GenericTests, CommonBlobTests):
+
+ def setUp(self):
+ self.blobdir = self.blob_cache_dir = tempfile.mkdtemp()
+ self.filestorage = tempfile.mktemp()
+ self.blob_cache_writable = True
+ super(BlobWritableCacheTests, self).setUp()
+
+
test_classes = [FileStorageTests, MappingStorageTests,
- BlobAdaptedFileStorageTests]
+ BlobAdaptedFileStorageTests, BlobWritableCacheTests]
def test_suite():
suite = unittest.TestSuite()
Modified: ZODB/branches/ctheune-blobszerocopy/src/ZODB/component.xml
===================================================================
--- ZODB/branches/ctheune-blobszerocopy/src/ZODB/component.xml 2007-03-08 19:38:41 UTC (rev 73071)
+++ ZODB/branches/ctheune-blobszerocopy/src/ZODB/component.xml 2007-03-08 20:05:00 UTC (rev 73072)
@@ -67,9 +67,18 @@
<multikey name="server" datatype="socket-connection-address" required="yes"/>
<key name="blob-dir" required="no">
<description>
- Path name to the blob storage directory.
+ Path name to the blob cache directory.
</description>
</key>
+ <key name="blob-cache-writable" required="no" default="no"
+ datatype="boolean">
+ <description>
+ Tells whether the cache is a shared writable directory
+ and that the ZEO protocol should not transfer the file
+ but only the filename when committing.
+ </description>
+ </key>
+
<key name="storage" default="1">
<description>
The name of the storage that the client wants to use. If the
Modified: ZODB/branches/ctheune-blobszerocopy/src/ZODB/config.py
===================================================================
--- ZODB/branches/ctheune-blobszerocopy/src/ZODB/config.py 2007-03-08 19:38:41 UTC (rev 73071)
+++ ZODB/branches/ctheune-blobszerocopy/src/ZODB/config.py 2007-03-08 20:05:00 UTC (rev 73072)
@@ -141,7 +141,7 @@
base = self.config.base.open()
return BlobStorage(self.config.blob_dir, base)
-
+
class ZEOClient(BaseConfig):
def open(self):
@@ -152,6 +152,7 @@
return ClientStorage(
L,
blob_dir=self.config.blob_dir,
+ blob_cache_writable=self.config.blob_cache_writable,
storage=self.config.storage,
cache_size=self.config.cache_size,
name=self.config.name,
More information about the Zodb-checkins
mailing list