[Zodb-checkins] SVN: ZODB/trunk/src/ZEO/ Reimplemented the ZEO Blob protocol:

Jim Fulton jim at zope.com
Fri May 18 14:02:22 EDT 2007

Log message for revision 75844:
  Reimplemented the ZEO Blob protocol:
  - Avoid more than one round-trip call when loading blobs via copy from
    the server.
  - Avoid loading large amounts of blob data into memory.  The old
    storeBlob implementation was likely to queue blob adta faster than
    it could be sent, leading to a large memory foot print for the
    queue. Now, iterators are used to read data from files only when the
    network layer is ready to send it.
  - Fixed storeBlob to move the input file to the blob cache (when not
    sharing the blob directiry with the server).
  - Extended the loadBlob locking model to work with multiple processes
    by using file locks rather than threading locks.  A common
    configuration is to use a client process per core, so that a machine
    is likely to have many client processes and it should be possible
    for the client processes to share a common blob cache.

  U   ZODB/trunk/src/ZEO/ClientStorage.py
  U   ZODB/trunk/src/ZEO/ClientStub.py
  U   ZODB/trunk/src/ZEO/ServerStub.py
  U   ZODB/trunk/src/ZEO/StorageServer.py
  U   ZODB/trunk/src/ZEO/TransactionBuffer.py
  U   ZODB/trunk/src/ZEO/tests/testZEO.py

Modified: ZODB/trunk/src/ZEO/ClientStorage.py
--- ZODB/trunk/src/ZEO/ClientStorage.py	2007-05-18 18:02:19 UTC (rev 75843)
+++ ZODB/trunk/src/ZEO/ClientStorage.py	2007-05-18 18:02:21 UTC (rev 75844)
@@ -21,6 +21,7 @@
 import cPickle
 import os
 import socket
+import sys
 import tempfile
 import threading
 import time
@@ -35,6 +36,7 @@
 from ZEO.auth import get_module
 from ZEO.zrpc.client import ConnectionManager
+import ZODB.lock_file
 from ZODB import POSException
 from ZODB import utils
 from ZODB.loglevels import BLATHER
@@ -329,10 +331,6 @@
             self.fshelper = None
-        # Initialize locks
-        self.blob_status_lock = threading.Lock()
-        self.blob_status = {}
         # Decide whether to use non-temporary files
         if client is not None:
             dir = var or os.getcwd()
@@ -896,9 +894,13 @@
         """Storage API: store a blob object."""
         serials = self.store(oid, serial, data, version, txn)
         if self.blob_cache_writable:
-            self._storeBlob_shared(oid, serial, data, blobfilename, version, txn)
+            self._storeBlob_shared(
+                oid, serial, data, blobfilename, version, txn)
-            self._storeBlob_copy(oid, serial, data, blobfilename, version, txn)
+            self._server.storeBlob(
+                oid, serial, data, blobfilename, version, txn)
+            if blobfilename is not None:
+                self._tbuf.storeBlob(oid, blobfilename)
         return serials
     def _storeBlob_shared(self, oid, serial, data, filename, version, txn):
@@ -908,121 +910,145 @@
         fd, target = self.fshelper.blob_mkstemp(oid, serial)
-        os.rename(filename, target)
+        if sys.platform == 'win32':
+            # On windows, we can't rename to an existing file.  That's
+            # OK.  We don't care what file we get as long as it is
+            # unique.  We'll just keep trying until the rename suceeds.
+            os.remove(target)
+            i = 0
+            while 1:
+                try:
+                    os.rename(filename, target + str(i))
+                except OSError:
+                    i += 1
+                else:
+                    break
+            target += str(i)
+        else:
+            os.rename(filename, target)
         # Now tell the server where we put it
-        self._server.storeBlobShared(oid, serial, data,
-                                     os.path.basename(target), version, id(txn))
+        self._server.storeBlobShared(
+            oid, serial, data,
+            os.path.basename(target), version, id(txn))
-    def _storeBlob_copy(self, oid, serial, data, blobfilename, version, txn):
-        """Version of storeBlob() that copies the data over the ZEO protocol."""
-        blobfile = open(blobfilename, "rb")
-        while True:
-            chunk = blobfile.read(1<<16)
-            # even if the blobfile is completely empty, we need to call
-            # storeBlob at least once in order to be able to call
-            # storeBlobEnd successfully.
-            self._server.storeBlob(oid, serial, chunk, version, id(txn))
-            if not chunk:
-                self._server.storeBlobEnd(oid, serial, data, version, id(txn))
-                break
-        blobfile.close()
-        os.unlink(blobfilename)
+    def _have_blob(self, blob_filename, oid, serial):
+        if os.path.exists(blob_filename):
+            log2("Found blob %s/%s in cache." % (utils.oid_repr(oid),
+                utils.tid_repr(serial)), level=BLATHER)
+            return True
+        return False
-    def _do_load_blob(self, oid, serial, version):
-        """Do the actual loading from the RPC server."""
+    def recieveBlobStart(self, oid, serial):
         blob_filename = self.fshelper.getBlobFilename(oid, serial)
-        if self._server is None:
-            raise ClientDisconnected()
+        assert not os.path.exists(blob_filename)
+        assert os.path.exists(blob_filename+'.lock')
+        blob_filename += '.dl'
+        assert not os.path.exists(blob_filename)
+        f = open(blob_filename, 'wb')
+        f.close()
-        targetpath = self.fshelper.getPathForOID(oid)
-        if not os.path.exists(targetpath):
-            os.makedirs(targetpath, 0700)
+    def recieveBlobChunk(self, oid, serial, chunk):
+        blob_filename = self.fshelper.getBlobFilename(oid, serial)+'.dl'
+        assert os.path.exists(blob_filename)
+        f = open(blob_filename, 'ab')
+        f.write(chunk)
+        f.close()
+    def recieveBlobStop(self, oid, serial):
+        blob_filename = self.fshelper.getBlobFilename(oid, serial)
+        os.rename(blob_filename+'.dl', blob_filename)
+    def loadBlob(self, oid, serial):
-        # We write to a temporary file first, so we do not accidentally 
-        # allow half-baked copies of this blob be loaded
-        tempfd, tempfilename = self.fshelper.blob_mkstemp(oid, serial)
-        tempfile = os.fdopen(tempfd, 'wb')
-        offset = 0
-        while True:
-            chunk = self._server.loadBlob(oid, serial, version, offset)
-            if not chunk:
-                break
-            offset += len(chunk)
-            tempfile.write(chunk)
-        tempfile.close()
-        # XXX will fail on Windows if file is open
-        os.rename(tempfilename, blob_filename)
-        return blob_filename
-    def loadBlob(self, oid, serial, version):
-        """Loading a blob has to know about loading the same blob
-           from another thread as the same time.
-            1. Check if the blob is downloaded already
-            2. Check whether it is currently beeing downloaded
-            2a. Wait for other download to finish, return 
-            3. If not beeing downloaded, start download
-        """
+        # Load a blob.  If it isn't present and we have a shared blob
+        # directory, then assume that it doesn't exist on the server
+        # and return None.
         if self.fshelper is None:
             raise POSException.Unsupported("No blob cache directory is "
         blob_filename = self.fshelper.getBlobFilename(oid, serial)
         # Case 1: Blob is available already, just use it
-        if os.path.exists(blob_filename):
-            log2("Found blob %s/%s in cache." % (utils.oid_repr(oid),
-                utils.tid_repr(serial)), level=BLATHER)
+        if self._have_blob(blob_filename, oid, serial):
             return blob_filename
-        # Case 2,3: Blob might still be downloading or not there yet
+        if self.blob_cache_writable:
+            # We're using a server shared cache.  If the file isn't
+            # here, it's not anywahere.
+            return None
-        # Try to get or create a lock for the downloading of this blob, 
-        # identified by it's oid and serial
-        lock_key = (oid, serial)
-        # We need to make the check for an existing lock and the possible
-        # creation of a new one atomic, so there is another lock:
-        self.blob_status_lock.acquire()
+        # First, we'll create the directory for this oid, if it doesn't exist. 
+        targetpath = self.fshelper.getPathForOID(oid)
+        if not os.path.exists(targetpath):
+            try:
+                os.makedirs(targetpath, 0700)
+            except OSError:
+                # We might have lost a race.  If so, the directory
+                # must exist now
+                assert os.path.exists(targetpath)
+        # OK, it's not here and we (or someone) needs to get it.  We
+        # want to avoid getting it multiple times.  We want to avoid
+        # getting it multiple times even accross separate client
+        # processes on the same machine. We'll use file locking.
+        lockfilename = blob_filename+'.lock'
-            if not self.blob_status.has_key(oid):
-                self.blob_status[lock_key] = self.getBlobLock()
-            lock = self.blob_status[lock_key]
-        finally:
-            self.blob_status_lock.release()
+            lock = ZODB.lock_file.LockFile(lockfilename)
+        except ZODB.lock_file.LockError:
-        # We acquire the lock to either start downloading, or wait
-        # for another download to finish
-        lock.acquire()
+            # Someone is already downloading the Blob. Wait for the
+            # lock to be freed.  How long should we be willing to wait?
+            # TODO: maybe find some way to assess download progress.
+            while 1:
+                time.sleep(0.1)
+                try:
+                    lock = ZODB.lock_file.LockFile(lockfilename)
+                except ZODB.lock_file.LockError:
+                    pass
+                else:
+                    # We have the lock. We should be able to get the file now.
+                    lock.close()
+                    try:
+                        os.remove(lockfilename)
+                    except OSError:
+                        pass
+                    break
+            if self._have_blob(blob_filename, oid, serial):
+                return blob_filename
+            return None
-            # If there was another download that is finished by now,
-            # we just take the result.
-            if os.path.exists(blob_filename):
-                log2("Found blob %s/%s in cache after it was downloaded "
-                     "from another thread." % (utils.oid_repr(oid),
-                     utils.tid_repr(serial)), level=BLATHER)
+            # We got the lock, so it's our job to download it.  First,
+            # we'll double check that someone didn't download it while we
+            # were getting the lock:
+            if self._have_blob(blob_filename, oid, serial):
                 return blob_filename
-            # Otherwise we download and use that
-            return self._do_load_blob(oid, serial, version)
+            # Ask the server to send it to us.  When this function
+            # returns, it will have been sent. (The recieving will
+            # have been handled by the asyncore thread.)
+            self._server.sendBlob(oid, serial)
+            if self._have_blob(blob_filename, oid, serial):
+                return blob_filename
+            return None
-            # When done we remove the download lock ...
-            lock.release()
-            # And the status information isn't needed as well,
-            # but we have to use the second lock here as well, to avoid
-            # making the creation of this status lock non-atomic (see above)
-            self.blob_status_lock.acquire()
+            lock.close()
-                del self.blob_status[lock_key]
-            finally:
-                self.blob_status_lock.release()
+                os.remove(lockfilename)
+            except OSError:
+                pass
-    def getBlobLock(self):
-        # indirection to support unit testing
-        return threading.Lock()
     def tpc_vote(self, txn):
         """Storage API: vote on a transaction."""
         if txn is not self._transaction:
@@ -1144,6 +1170,20 @@
                 if s != ResolvedSerial:
                     assert s == tid, (s, tid)
                     self._cache.store(oid, version, s, None, data)
+        if self.fshelper is not None:
+            blobs = self._tbuf.blobs
+            while blobs:
+                oid, blobfilename = blobs.pop()
+                targetpath = self.fshelper.getPathForOID(oid)
+                if not os.path.exists(targetpath):
+                    os.makedirs(targetpath, 0700)
+                os.rename(blobfilename,
+                          self.fshelper.getBlobFilename(oid, tid),
+                          )
     def undo(self, trans_id, txn):

Modified: ZODB/trunk/src/ZEO/ClientStub.py
--- ZODB/trunk/src/ZEO/ClientStub.py	2007-05-18 18:02:19 UTC (rev 75843)
+++ ZODB/trunk/src/ZEO/ClientStub.py	2007-05-18 18:02:21 UTC (rev 75844)
@@ -60,3 +60,18 @@
     def info(self, arg):
         self.rpc.callAsync('info', arg)
+    def storeBlob(self, oid, serial, blobfilename):
+        def store():
+            yield ('recieveBlobStart', (oid, serial))
+            f = open(blobfilename, 'rb')
+            while 1:
+                chunk = f.read(59000)
+                if not chunk:
+                    break
+                yield ('recieveBlobChunk', (oid, serial, chunk, ))
+            f.close()
+            yield ('recieveBlobStop', (oid, serial))
+        self.rpc.callAsyncIterator(store())

Modified: ZODB/trunk/src/ZEO/ServerStub.py
--- ZODB/trunk/src/ZEO/ServerStub.py	2007-05-18 18:02:19 UTC (rev 75843)
+++ ZODB/trunk/src/ZEO/ServerStub.py	2007-05-18 18:02:21 UTC (rev 75844)
@@ -13,6 +13,7 @@
 """RPC stubs for interface exported by StorageServer."""
+import os
 import time
@@ -219,12 +220,30 @@
     def storea(self, oid, serial, data, version, id):
         self.rpc.callAsync('storea', oid, serial, data, version, id)
-    def storeBlobEnd(self, oid, serial, data, version, id):
-        self.rpc.callAsync('storeBlobEnd', oid, serial, data, version, id)
+    def storeBlob(self, oid, serial, data, blobfilename, version, txn):
-    def storeBlob(self, oid, serial, chunk, version, id):
-        self.rpc.callAsync('storeBlob', oid, serial, chunk, version, id)
+        # Store a blob to the server.  We don't want to real all of
+        # the data into memory, so we use a message iterator.  This
+        # allows us to read the blob data as needed.
+        if blobfilename is None:
+            self.rpc.callAsync('storeEmptyBlob',
+                               oid, serial, data, version, id(txn))
+            return
+        def store():
+            yield ('storeBlobStart', ())
+            f = open(blobfilename, 'rb')
+            while 1:
+                chunk = f.read(59000)
+                if not chunk:
+                    break
+                yield ('storeBlobChunk', (chunk, ))
+            f.close()
+            yield ('storeBlobEnd', (oid, serial, data, version, id(txn)))
+        self.rpc.callAsyncIterator(store())
     def storeBlobShared(self, oid, serial, data, filename, version, id):
         self.rpc.callAsync('storeBlobShared', oid, serial, data, filename, 
                            version, id)
@@ -271,8 +290,8 @@
     def load(self, oid, version):
         return self.rpc.call('load', oid, version)
-    def loadBlob(self, oid, serial, version, offset):
-        return self.rpc.call('loadBlob', oid, serial, version, offset)
+    def sendBlob(self, oid, serial):
+        return self.rpc.call('sendBlob', oid, serial)
     def getTid(self, oid):
         return self.rpc.call('getTid', oid)

Modified: ZODB/trunk/src/ZEO/StorageServer.py
--- ZODB/trunk/src/ZEO/StorageServer.py	2007-05-18 18:02:19 UTC (rev 75843)
+++ ZODB/trunk/src/ZEO/StorageServer.py	2007-05-18 18:02:21 UTC (rev 75844)
@@ -25,6 +25,7 @@
 import logging
 import os
 import sys
+import tempfile
 import threading
 import time
 import warnings
@@ -103,9 +104,8 @@
         self.log_label = _label
         self.authenticated = 0
         self.auth_realm = auth_realm
-        self.blob_transfer = {}
+        self.blob_tempfile = None
         self.blob_log = []
-        self.blob_loads = {}
         # The authentication protocol may define extra methods.
         self._extensions = {}
         for func in self.extensions:
@@ -525,25 +525,22 @@
         self.stats.stores += 1
         self.txnlog.store(oid, serial, data, version)
+    def storeBlobStart(self):
+        assert self.blob_tempfile is None
+        self.blob_tempfile = tempfile.mkstemp(
+            dir=self.storage.temporaryDirectory())
+    def storeBlobChunk(self, chunk):
+        os.write(self.blob_tempfile[0], chunk)
     def storeBlobEnd(self, oid, serial, data, version, id):
-        key = (oid, id)
-        if key not in self.blob_transfer:
-            raise Exception, "Can't finish a non-started Blob"
-        tempname, tempfile = self.blob_transfer.pop(key)
-        tempfile.close()
+        fd, tempname = self.blob_tempfile
+        self.blob_tempfile = None
+        os.close(fd)
         self.blob_log.append((oid, serial, data, tempname, version))
-    def storeBlob(self, oid, serial, chunk, version, id):
-        # XXX check that underlying storage supports blobs
-        key = (oid, id)
-        if key not in self.blob_transfer:
-            tempname = mktemp()
-            tempfile = open(tempname, "wb")
-            # XXX Force close and remove them when Storage closes
-            self.blob_transfer[key] = (tempname, tempfile)
-        else:
-            tempname, tempfile = self.blob_transfer[key]
-        tempfile.write(chunk)
+    def storeEmptyBlob(self, oid, serial, data, version, id):
+        self.blob_log.append((oid, serial, data, None, version))
     def storeBlobShared(self, oid, serial, data, filename, version, id):
         # Reconstruct the full path from the filename in the OID directory
@@ -551,17 +548,8 @@
         self.blob_log.append((oid, serial, data, filename, version))
-    def loadBlob(self, oid, serial, version, offset):
-        key = (oid, serial)
-        if not key in self.blob_loads:
-            self.blob_loads[key] = \
-                    open(self.storage.loadBlob(oid, serial, version))
-        blobdata = self.blob_loads[key]
-        blobdata.seek(offset)
-        chunk = blobdata.read(4096)
-        if not chunk:
-            del self.blob_loads[key]
-        return chunk
+    def sendBlob(self, oid, serial):
+        self.client.storeBlob(oid, serial, self.storage.loadBlob(oid, serial))
     # The following four methods return values, so they must acquire
     # the storage lock and begin the transaction before returning.

Modified: ZODB/trunk/src/ZEO/TransactionBuffer.py
--- ZODB/trunk/src/ZEO/TransactionBuffer.py	2007-05-18 18:02:19 UTC (rev 75843)
+++ ZODB/trunk/src/ZEO/TransactionBuffer.py	2007-05-18 18:02:21 UTC (rev 75844)
@@ -59,12 +59,14 @@
         self.closed = 0
         self.count = 0
         self.size = 0
+        self.blobs = []
         # It's safe to use a fast pickler because the only objects
         # stored are builtin types -- strings or None.
         self.pickler = cPickle.Pickler(self.file, 1)
         self.pickler.fast = 1
     def close(self):
+        self.clear()
             self.closed = 1
@@ -82,6 +84,9 @@
+    def storeBlob(self, oid, blobfilename):
+        self.blobs.append((oid, blobfilename))
     def _store(self, oid, version, data):
         """Store oid, version, data for later retrieval"""
         if self.closed:
@@ -113,6 +118,10 @@
             self.count = 0
             self.size = 0
+            while self.blobs:
+                oid, serial, blobfilename = self.blobs.pop()
+                if os.path.exists(blobfilename):
+                    os.remove(blobfilename)

Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
--- ZODB/trunk/src/ZEO/tests/testZEO.py	2007-05-18 18:02:19 UTC (rev 75843)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py	2007-05-18 18:02:21 UTC (rev 75844)
@@ -22,6 +22,7 @@
 import signal
 import socket
 import tempfile
+import threading
 import time
 import unittest
 import shutil
@@ -520,93 +521,101 @@
-        filename = self._storage.loadBlob(oid, serial, version)
+        filename = self._storage.loadBlob(oid, serial)
         self.assertEquals(somedata, open(filename, 'rb').read())
 class BlobAdaptedFileStorageTests(GenericTests, CommonBlobTests):
     """ZEO backed by a BlobStorage-adapted FileStorage."""
     def setUp(self):
-        self.blobdir = tempfile.mkdtemp()  # This is the blob directory on the ZEO server
+        self.blobdir = tempfile.mkdtemp()  # blob directory on ZEO server
         self.filestorage = tempfile.mktemp()
         super(BlobAdaptedFileStorageTests, self).setUp()
-    def checkLoadBlobLocks(self):
+    def checkStoreAndLoadBlob(self):
+        from ZODB.utils import oid_repr, tid_repr
         from ZODB.Blobs.Blob import Blob
+        from ZODB.Blobs.BlobStorage import BLOB_SUFFIX
         from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
         import transaction
-        version = ''
-        somedata = 'a' * 10
+        somedata_path = os.path.join(self.blob_cache_dir, 'somedata')
+        somedata = open(somedata_path, 'w+b')
+        for i in range(1000000):
+            somedata.write("%s\n" % i)
+        somedata.seek(0)
         blob = Blob()
         bd_fh = blob.open('w')
-        bd_fh.write(somedata)
+        ZODB.utils.cp(somedata, bd_fh)
         tfname = bd_fh.name
         oid = self._storage.new_oid()
         data = zodb_pickle(blob)
+        self.assert_(os.path.exists(tfname))
         t = transaction.Transaction()
             r1 = self._storage.storeBlob(oid, ZERO, data, tfname, '', t)
             r2 = self._storage.tpc_vote(t)
-            serial = handle_serials(oid, r1, r2)
+            revid = handle_serials(oid, r1, r2)
+        # The uncommitted data file should have been removed
+        self.assert_(not os.path.exists(tfname))
-        class Dummy:
-            def __init__(self):
-                self.acquired = 0
-                self.released = 0
-            def acquire(self):
-                self.acquired += 1
-            def release(self):
-                self.released += 1
-        class statusdict(dict):
-            def __init__(self):
-                self.added = []
-                self.removed = []
+        def check_data(path):
+            self.assert_(os.path.exists(path))
+            f = open(path, 'rb')
+            somedata.seek(0) 
+            d1 = d2 = 1
+            while d1 or d2:
+                d1 = f.read(8096)
+                d2 = somedata.read(8096)
+                self.assertEqual(d1, d2)
-            def __setitem__(self, k, v):
-                self.added.append(k)
-                super(statusdict, self).__setitem__(k, v)
+        # The file should have been copied to the server:
+        filename = os.path.join(self.blobdir, oid_repr(oid),
+                                tid_repr(revid) + BLOB_SUFFIX)
+        check_data(filename)
-            def __delitem__(self, k):
-                self.removed.append(k)
-                super(statusdict, self).__delitem__(k)
+        # It should also be in the cache:
+        filename = os.path.join(self.blob_cache_dir, oid_repr(oid),
+                                tid_repr(revid) + BLOB_SUFFIX)
+        check_data(filename)
-        # ensure that we do locking properly
-        filename = self._storage.fshelper.getBlobFilename(oid, serial)
-        thestatuslock = self._storage.blob_status_lock = Dummy()
-        thebloblock = Dummy()
+        # If we remove it from the cache and call loadBlob, it should
+        # come back. We can do this in many threads.  We'll instrument
+        # the method that is used to request data from teh server to
+        # verify that it is only called once.
-        def getBlobLock():
-            return thebloblock
+        sendBlob_org = ZEO.ServerStub.StorageServer.sendBlob
+        calls = []
+        def sendBlob(self, oid, serial):
+            calls.append((oid, serial))
+            sendBlob_org(self, oid, serial)
-        # override getBlobLock to test that locking is performed
-        self._storage.getBlobLock = getBlobLock
-        thestatusdict = self._storage.blob_status = statusdict()
-        filename = self._storage.loadBlob(oid, serial, version)
-        self.assertEqual(thestatuslock.acquired, 2)
-        self.assertEqual(thestatuslock.released, 2)
+        os.remove(filename)
+        returns = []
+        threads = [
+            threading.Thread(
+               target=lambda :
+                      returns.append(self._storage.loadBlob(oid, revid))
+               )
+            for i in range(10)
+            ]
+        [thread.start() for thread in threads]
+        [thread.join() for thread in threads]
+        [self.assertEqual(r, filename) for r in returns]        
+        check_data(filename)
-        self.assertEqual(thebloblock.acquired, 1)
-        self.assertEqual(thebloblock.released, 1)
-        self.assertEqual(thestatusdict.added, [(oid, serial)])
-        self.assertEqual(thestatusdict.removed, [(oid, serial)])
 class BlobWritableCacheTests(GenericTests, CommonBlobTests):
     def setUp(self):

More information about the Zodb-checkins mailing list