[Zodb-checkins] SVN: ZODB/branches/ctheune-blobszerocopy/src/Z - Added `shared writable cache mode` to allow using a shared writable blob

Christian Theune ct at gocept.com
Thu Mar 8 15:05:02 EST 2007


Log message for revision 73072:
   - Added `shared writable cache mode` to allow using a shared writable blob
     area to avoid transferring blobs through ZEO.
   - Added test and fix for `getBlobLock` when loading a blob
  

Changed:
  U   ZODB/branches/ctheune-blobszerocopy/src/ZEO/ClientStorage.py
  U   ZODB/branches/ctheune-blobszerocopy/src/ZEO/ServerStub.py
  U   ZODB/branches/ctheune-blobszerocopy/src/ZEO/StorageServer.py
  U   ZODB/branches/ctheune-blobszerocopy/src/ZEO/tests/testZEO.py
  U   ZODB/branches/ctheune-blobszerocopy/src/ZODB/component.xml
  U   ZODB/branches/ctheune-blobszerocopy/src/ZODB/config.py

-=-
Modified: ZODB/branches/ctheune-blobszerocopy/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/ctheune-blobszerocopy/src/ZEO/ClientStorage.py	2007-03-08 19:38:41 UTC (rev 73071)
+++ ZODB/branches/ctheune-blobszerocopy/src/ZEO/ClientStorage.py	2007-03-08 20:05:00 UTC (rev 73072)
@@ -112,7 +112,7 @@
                  wait=None, wait_timeout=None,
                  read_only=0, read_only_fallback=0,
                  username='', password='', realm=None,
-                 blob_dir=None):
+                 blob_dir=None, blob_cache_writable=False):
         """ClientStorage constructor.
 
         This is typically invoked from a custom_zodb.py file.
@@ -188,6 +188,10 @@
         blob_dir -- directory path for blob data.  'blob data' is data that
             is retrieved via the loadBlob API.
 
+        blob_cache_writable -- Flag whether the blob_dir is a writable shared
+        filesystem that should be used instead of transferring blob data over
+        zrpc.
+
         Note that the authentication protocol is defined by the server
         and is detected by the ClientStorage upon connecting (see
         testConnection() and doAuth() for details).
@@ -315,6 +319,8 @@
         self._lock = threading.Lock()
 
         # XXX need to check for POSIX-ness here
+        self.blob_dir = blob_dir
+        self.blob_cache_writable = blob_cache_writable
         if blob_dir is not None:
             self.fshelper = FilesystemHelper(blob_dir)
             self.fshelper.create()
@@ -892,6 +898,26 @@
     def storeBlob(self, oid, serial, data, blobfilename, version, txn):
         """Storage API: store a blob object."""
         serials = self.store(oid, serial, data, version, txn)
+        if self.blob_cache_writable:
+            self._storeBlob_shared(oid, serial, data, blobfilename, version, txn)
+        else:
+            self._storeBlob_copy(oid, serial, data, blobfilename, version, txn)
+        return serials
+
+    def _storeBlob_shared(self, oid, serial, data, filename, version, txn):
+        # First, move the blob into the blob directory
+        dir = self.fshelper.getPathForOID(oid)
+        if not os.path.exists(dir):
+            os.mkdir(dir)
+        fd, target = self.fshelper.blob_mkstemp(oid, serial)
+        os.close(fd)
+        os.rename(filename, target)
+        # Now tell the server where we put it
+        self._server.storeBlobShared(oid, serial, data,
+                                     os.path.basename(target), version, id(txn))
+
+    def _storeBlob_copy(self, oid, serial, data, blobfilename, version, txn):
+        """Version of storeBlob() that copies the data over the ZEO protocol."""
         blobfile = open(blobfilename, "rb")
         while True:
             chunk = blobfile.read(1<<16)
@@ -904,7 +930,6 @@
                 break
         blobfile.close()
         os.unlink(blobfilename)
-        return serials
 
     def _do_load_blob(self, oid, serial, version):
         """Do the actual loading from the RPC server."""
@@ -999,7 +1024,7 @@
 
     def getBlobLock(self):
         # indirection to support unit testing
-        return Lock()
+        return threading.Lock()
 
     def tpc_vote(self, txn):
         """Storage API: vote on a transaction."""

Modified: ZODB/branches/ctheune-blobszerocopy/src/ZEO/ServerStub.py
===================================================================
--- ZODB/branches/ctheune-blobszerocopy/src/ZEO/ServerStub.py	2007-03-08 19:38:41 UTC (rev 73071)
+++ ZODB/branches/ctheune-blobszerocopy/src/ZEO/ServerStub.py	2007-03-08 20:05:00 UTC (rev 73072)
@@ -226,6 +226,10 @@
     def storeBlob(self, oid, serial, chunk, version, id):
         self.rpc.callAsync('storeBlob', oid, serial, chunk, version, id)
 
+    def storeBlobShared(self, oid, serial, data, filename, version, id):
+        self.rpc.callAsync('storeBlobShared', oid, serial, data, filename, 
+                           version, id)
+
     ##
     # Start two-phase commit for a transaction
     # @param id id used by client to identify current transaction.  The

Modified: ZODB/branches/ctheune-blobszerocopy/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/ctheune-blobszerocopy/src/ZEO/StorageServer.py	2007-03-08 19:38:41 UTC (rev 73071)
+++ ZODB/branches/ctheune-blobszerocopy/src/ZEO/StorageServer.py	2007-03-08 20:05:00 UTC (rev 73072)
@@ -482,12 +482,18 @@
         if key not in self.blob_transfer:
             tempname = mktemp()
             tempfile = open(tempname, "wb")
-            self.blob_transfer[key] = (tempname, tempfile)   # XXX Force close and remove them when Storage closes
+            # XXX Force close and remove them when Storage closes
+            self.blob_transfer[key] = (tempname, tempfile)
         else:
             tempname, tempfile = self.blob_transfer[key]
+        tempfile.write(chunk)
 
-        tempfile.write(chunk)
- 
+    def storeBlobShared(self, oid, serial, data, filename, version, id):
+        # Reconstruct the full path from the filename in the OID directory
+        filename = os.path.join(self.storage.fshelper.getPathForOID(oid),
+                                filename)
+        self.blob_log.append((oid, serial, data, filename, version))
+
     def loadBlob(self, oid, serial, version, offset):
         key = (oid, serial)
         if not key in self.blob_loads:

Modified: ZODB/branches/ctheune-blobszerocopy/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/ctheune-blobszerocopy/src/ZEO/tests/testZEO.py	2007-03-08 19:38:41 UTC (rev 73071)
+++ ZODB/branches/ctheune-blobszerocopy/src/ZEO/tests/testZEO.py	2007-03-08 20:05:00 UTC (rev 73072)
@@ -133,6 +133,9 @@
 
     """Combine tests from various origins in one class."""
 
+    blob_cache_writable = False
+    blob_cache_dir = None
+
     def setUp(self):
         logger.info("setUp() %s", self.id())
         port = get_port()
@@ -142,10 +145,12 @@
         self._pids = [pid]
         self._servers = [adminaddr]
         self._conf_path = path
-        self.blob_cache_dir = tempfile.mkdtemp()  # This is the blob cache for ClientStorage
+        if not self.blob_cache_dir:
+            self.blob_cache_dir = tempfile.mkdtemp()  # This is the blob cache for ClientStorage
         self._storage = ClientStorage(zport, '1', cache_size=20000000,
                                       min_disconnect_poll=0.5, wait=1,
-                                      wait_timeout=60, blob_dir=self.blob_cache_dir)
+                                      wait_timeout=60, blob_dir=self.blob_cache_dir,
+                                      blob_cache_writable=self.blob_cache_writable)
         self._storage.registerDB(DummyDB(), None)
 
     def tearDown(self):
@@ -397,16 +402,14 @@
                 ConnectionInvalidationOnReconnect,
                ]
 
-class BlobAdaptedFileStorageTests(GenericTests):
-    """ZEO backed by a BlobStorage-adapted FileStorage."""
-    def setUp(self):
-        self.blobdir = tempfile.mkdtemp()  # This is the blob directory on the ZEO server
-        self.filestorage = tempfile.mktemp()
-        super(BlobAdaptedFileStorageTests, self).setUp()
+class CommonBlobTests:
 
     def tearDown(self):
         super(BlobAdaptedFileStorageTests, self).tearDown()
-        shutil.rmtree(self.blobdir)
+        if os.path.exists(self.blobdir):
+            # Might be gone already if the super() method deleted
+            # the shared directory. Don't worry.
+            shutil.rmtree(self.blobdir)
 
     def getConfig(self):
         return """
@@ -452,7 +455,7 @@
                                 tid_repr(revid) + BLOB_SUFFIX)
         self.assert_(os.path.exists(filename))
         self.assertEqual(somedata, open(filename).read())
-        
+
     def checkLoadBlob(self):
         from ZODB.Blobs.Blob import Blob
         from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
@@ -481,7 +484,47 @@
             self._storage.tpc_abort(t)
             raise
 
+        filename = self._storage.loadBlob(oid, serial, version)
+        self.assertEquals(somedata, open(filename, 'rb').read())
 
+
+class BlobAdaptedFileStorageTests(GenericTests, CommonBlobTests):
+    """ZEO backed by a BlobStorage-adapted FileStorage."""
+
+    def setUp(self):
+        self.blobdir = tempfile.mkdtemp()  # This is the blob directory on the ZEO server
+        self.filestorage = tempfile.mktemp()
+        super(BlobAdaptedFileStorageTests, self).setUp()
+
+    def checkLoadBlobLocks(self):
+        from ZODB.Blobs.Blob import Blob
+        from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
+             handle_serials
+        import transaction
+
+        version = ''
+        somedata = 'a' * 10
+
+        blob = Blob()
+        bd_fh = blob.open('w')
+        bd_fh.write(somedata)
+        bd_fh.close()
+        tfname = bd_fh.name
+        oid = self._storage.new_oid()
+        data = zodb_pickle(blob)
+
+        t = transaction.Transaction()
+        try:
+            self._storage.tpc_begin(t)
+            r1 = self._storage.storeBlob(oid, ZERO, data, tfname, '', t)
+            r2 = self._storage.tpc_vote(t)
+            serial = handle_serials(oid, r1, r2)
+            self._storage.tpc_finish(t)
+        except:
+            self._storage.tpc_abort(t)
+            raise
+
+
         class Dummy:
             def __init__(self):
                 self.acquired = 0
@@ -527,8 +570,18 @@
         self.assertEqual(thestatusdict.added, [(oid, serial)])
         self.assertEqual(thestatusdict.removed, [(oid, serial)])
 
+
+class BlobWritableCacheTests(GenericTests, CommonBlobTests):
+
+    def setUp(self):
+        self.blobdir = self.blob_cache_dir = tempfile.mkdtemp()
+        self.filestorage = tempfile.mktemp()
+        self.blob_cache_writable = True
+        super(BlobWritableCacheTests, self).setUp()
+
+
 test_classes = [FileStorageTests, MappingStorageTests,
-                BlobAdaptedFileStorageTests]
+                BlobAdaptedFileStorageTests, BlobWritableCacheTests]
 
 def test_suite():
     suite = unittest.TestSuite()

Modified: ZODB/branches/ctheune-blobszerocopy/src/ZODB/component.xml
===================================================================
--- ZODB/branches/ctheune-blobszerocopy/src/ZODB/component.xml	2007-03-08 19:38:41 UTC (rev 73071)
+++ ZODB/branches/ctheune-blobszerocopy/src/ZODB/component.xml	2007-03-08 20:05:00 UTC (rev 73072)
@@ -67,9 +67,18 @@
     <multikey name="server" datatype="socket-connection-address" required="yes"/>
     <key name="blob-dir" required="no">
       <description>
-        Path name to the blob storage directory.
+        Path name to the blob cache directory.
       </description>
     </key>
+    <key name="blob-cache-writable" required="no" default="no"
+        datatype="boolean">
+      <description>
+          Tells whether the cache is a shared writable directory
+          and that the ZEO protocol should not transfer the file
+          but only the filename when committing.
+      </description>
+    </key>
+
     <key name="storage" default="1">
       <description>
         The name of the storage that the client wants to use.  If the

Modified: ZODB/branches/ctheune-blobszerocopy/src/ZODB/config.py
===================================================================
--- ZODB/branches/ctheune-blobszerocopy/src/ZODB/config.py	2007-03-08 19:38:41 UTC (rev 73071)
+++ ZODB/branches/ctheune-blobszerocopy/src/ZODB/config.py	2007-03-08 20:05:00 UTC (rev 73072)
@@ -141,7 +141,7 @@
         base = self.config.base.open()
         return BlobStorage(self.config.blob_dir, base)
 
-        
+
 class ZEOClient(BaseConfig):
 
     def open(self):
@@ -152,6 +152,7 @@
         return ClientStorage(
             L,
             blob_dir=self.config.blob_dir,
+            blob_cache_writable=self.config.blob_cache_writable,
             storage=self.config.storage,
             cache_size=self.config.cache_size,
             name=self.config.name,



More information about the Zodb-checkins mailing list