[Zodb-checkins] SVN: ZODB/branches/jim-zeo-blob/src/ZEO/ Changed the storeBlob implementation to use iterators to avoid loading

Jim Fulton jim at zope.com
Tue May 15 18:43:48 EDT 2007


Log message for revision 75784:
  Changed the storeBlob implementation to use iterators to avoid loading
  blobs into memory.
  

Changed:
  U   ZODB/branches/jim-zeo-blob/src/ZEO/ClientStorage.py
  U   ZODB/branches/jim-zeo-blob/src/ZEO/ServerStub.py
  U   ZODB/branches/jim-zeo-blob/src/ZEO/StorageServer.py

-=-
Modified: ZODB/branches/jim-zeo-blob/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-zeo-blob/src/ZEO/ClientStorage.py	2007-05-15 22:29:32 UTC (rev 75783)
+++ ZODB/branches/jim-zeo-blob/src/ZEO/ClientStorage.py	2007-05-15 22:43:47 UTC (rev 75784)
@@ -896,9 +896,11 @@
         """Storage API: store a blob object."""
         serials = self.store(oid, serial, data, version, txn)
         if self.blob_cache_writable:
-            self._storeBlob_shared(oid, serial, data, blobfilename, version, txn)
+            self._storeBlob_shared(
+                oid, serial, data, blobfilename, version, txn)
         else:
-            self._storeBlob_copy(oid, serial, data, blobfilename, version, txn)
+            self._server.storeBlob(
+                oid, serial, data, blobfilename, version, txn)
         return serials
 
     def _storeBlob_shared(self, oid, serial, data, filename, version, txn):
@@ -910,24 +912,10 @@
         os.close(fd)
         os.rename(filename, target)
         # Now tell the server where we put it
-        self._server.storeBlobShared(oid, serial, data,
-                                     os.path.basename(target), version, id(txn))
+        self._server.storeBlobShared(
+            oid, serial, data,
+            os.path.basename(target), version, id(txn))
 
-    def _storeBlob_copy(self, oid, serial, data, blobfilename, version, txn):
-        """Version of storeBlob() that copies the data over the ZEO protocol."""
-        blobfile = open(blobfilename, "rb")
-        while True:
-            chunk = blobfile.read(1<<16)
-            # even if the blobfile is completely empty, we need to call
-            # storeBlob at least once in order to be able to call
-            # storeBlobEnd successfully.
-            self._server.storeBlob(oid, serial, chunk, version, id(txn))
-            if not chunk:
-                self._server.storeBlobEnd(oid, serial, data, version, id(txn))
-                break
-        blobfile.close()
-        os.unlink(blobfilename)
-
     def _do_load_blob(self, oid, serial, version):
         """Do the actual loading from the RPC server."""
         blob_filename = self.fshelper.getBlobFilename(oid, serial)

Modified: ZODB/branches/jim-zeo-blob/src/ZEO/ServerStub.py
===================================================================
--- ZODB/branches/jim-zeo-blob/src/ZEO/ServerStub.py	2007-05-15 22:29:32 UTC (rev 75783)
+++ ZODB/branches/jim-zeo-blob/src/ZEO/ServerStub.py	2007-05-15 22:43:47 UTC (rev 75784)
@@ -13,6 +13,7 @@
 ##############################################################################
 """RPC stubs for interface exported by StorageServer."""
 
+import os
 import time
 
 ##
@@ -219,12 +220,31 @@
     def storea(self, oid, serial, data, version, id):
         self.rpc.callAsync('storea', oid, serial, data, version, id)
 
-    def storeBlobEnd(self, oid, serial, data, version, id):
-        self.rpc.callAsync('storeBlobEnd', oid, serial, data, version, id)
+    def storeBlob(self, oid, serial, data, blobfilename, version, txn):
 
-    def storeBlob(self, oid, serial, chunk, version, id):
-        self.rpc.callAsync('storeBlob', oid, serial, chunk, version, id)
+        # Store a blob to the server.  We don't want to real all of
+        # the data into memory, so we use a message iterator.  This
+        # allows us to read the blob data as needed.
 
+        if blobfilename is None:
+            self.rpc.callAsync('storeEmptyBlob',
+                               oid, serial, data, version, id(txn))
+            return
+
+        def store():
+            yield ('storeBlobStart', ())
+            f = open(blobfilename, 'rb')
+            while 1:
+                chunk = f.read(59000)
+                if not chunk:
+                    break
+                yield ('storeBlobChunk', (chunk, ))
+            f.close()
+            os.remove(blobfilename)
+            yield ('storeBlobEnd', (oid, serial, data, version, id(txn)))
+
+        self.rpc.callAsyncIterator(store())
+
     def storeBlobShared(self, oid, serial, data, filename, version, id):
         self.rpc.callAsync('storeBlobShared', oid, serial, data, filename, 
                            version, id)

Modified: ZODB/branches/jim-zeo-blob/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-zeo-blob/src/ZEO/StorageServer.py	2007-05-15 22:29:32 UTC (rev 75783)
+++ ZODB/branches/jim-zeo-blob/src/ZEO/StorageServer.py	2007-05-15 22:43:47 UTC (rev 75784)
@@ -25,6 +25,7 @@
 import logging
 import os
 import sys
+import tempfile
 import threading
 import time
 import warnings
@@ -103,7 +104,7 @@
         self.log_label = _label
         self.authenticated = 0
         self.auth_realm = auth_realm
-        self.blob_transfer = {}
+        self.blob_tempfile = None
         self.blob_log = []
         self.blob_loads = {}
         # The authentication protocol may define extra methods.
@@ -525,25 +526,23 @@
         self.stats.stores += 1
         self.txnlog.store(oid, serial, data, version)
 
+
+    def storeBlobStart(self):
+        assert self.blob_tempfile is None
+        self.blob_tempfile = tempfile.mkstemp(
+            dir=self.storage.temporaryDirectory())
+        
+    def storeBlobChunk(self, chunk):
+        os.write(self.blob_tempfile[0], chunk)
+
     def storeBlobEnd(self, oid, serial, data, version, id):
-        key = (oid, id)
-        if key not in self.blob_transfer:
-            raise Exception, "Can't finish a non-started Blob"
-        tempname, tempfile = self.blob_transfer.pop(key)
-        tempfile.close()
+        fd, tempname = self.blob_tempfile
+        self.blob_tempfile = None
+        os.close(fd)
         self.blob_log.append((oid, serial, data, tempname, version))
 
-    def storeBlob(self, oid, serial, chunk, version, id):
-        # XXX check that underlying storage supports blobs
-        key = (oid, id)
-        if key not in self.blob_transfer:
-            tempname = mktemp()
-            tempfile = open(tempname, "wb")
-            # XXX Force close and remove them when Storage closes
-            self.blob_transfer[key] = (tempname, tempfile)
-        else:
-            tempname, tempfile = self.blob_transfer[key]
-        tempfile.write(chunk)
+    def storeEmptyBlob(self, oid, serial, data, version, id):
+        self.blob_log.append((oid, serial, data, None, version))
 
     def storeBlobShared(self, oid, serial, data, filename, version, id):
         # Reconstruct the full path from the filename in the OID directory



More information about the Zodb-checkins mailing list