[Zodb-checkins] SVN: ZODB/branches/jim-zeo-blob/src/ZEO/ Refactored loadBlob use only a single round-trip to the server. Now,

Jim Fulton jim at zope.com
Thu May 17 16:39:49 EDT 2007


Log message for revision 75820:
  Refactored loadBlob use only a single round-trip to the server.  Now,
  the client sends a request to the server and the server sends the data
  back using a series of one-way calls and, after queuing the calls,
  queues the send-request return.  The result is that bu the time the
  request from loadBlob has returned, the data has been sent from the
  server. As with storeBlob, the send from the server to the client uses
  iteration to avoid loading the blob into memory.
  
  Changed the loadBlob locking strategy to allow multiple ZEO clients to
  share a blob cache directory.
  

Changed:
  U   ZODB/branches/jim-zeo-blob/src/ZEO/ClientStorage.py
  U   ZODB/branches/jim-zeo-blob/src/ZEO/ClientStub.py
  U   ZODB/branches/jim-zeo-blob/src/ZEO/ServerStub.py
  U   ZODB/branches/jim-zeo-blob/src/ZEO/StorageServer.py
  U   ZODB/branches/jim-zeo-blob/src/ZEO/TransactionBuffer.py
  U   ZODB/branches/jim-zeo-blob/src/ZEO/tests/testZEO.py

-=-
Modified: ZODB/branches/jim-zeo-blob/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-zeo-blob/src/ZEO/ClientStorage.py	2007-05-17 20:39:40 UTC (rev 75819)
+++ ZODB/branches/jim-zeo-blob/src/ZEO/ClientStorage.py	2007-05-17 20:39:48 UTC (rev 75820)
@@ -330,10 +330,6 @@
         else:
             self.fshelper = None
 
-        # Initialize locks
-        self.blob_status_lock = threading.Lock()
-        self.blob_status = {}
-
         # Decide whether to use non-temporary files
         if client is not None:
             dir = var or os.getcwd()
@@ -902,6 +898,8 @@
         else:
             self._server.storeBlob(
                 oid, serial, data, blobfilename, version, txn)
+            if blobfilename is not None:
+                self._tbuf.storeBlob(oid, blobfilename)
         return serials
 
     def _storeBlob_shared(self, oid, serial, data, filename, version, txn):
@@ -923,7 +921,27 @@
                 utils.tid_repr(serial)), level=BLATHER)
             return True
         return False
+
+    def recieveBlobStart(self, oid, serial):
+        blob_filename = self.fshelper.getBlobFilename(oid, serial)
+        assert not os.path.exists(blob_filename)
+        assert os.path.exists(blob_filename+'.lock')
+        blob_filename += '.dl'
+        assert not os.path.exists(blob_filename)
+        f = open(blob_filename, 'w')
+        f.close()
+
+    def recieveBlobChunk(self, oid, serial, chunk):
+        blob_filename = self.fshelper.getBlobFilename(oid, serial)+'.dl'
+        assert os.path.exists(blob_filename)
+        f = open(blob_filename, 'a')
+        f.write(chunk)
+        f.close()
         
+    def recieveBlobStop(self, oid, serial):
+        blob_filename = self.fshelper.getBlobFilename(oid, serial)
+        os.rename(blob_filename+'.dl', blob_filename)
+        
     def loadBlob(self, oid, serial):
 
         # Load a blob.  If it isn't present and we have a shared blob
@@ -943,16 +961,25 @@
             # here, it's not anywahere.
             return None
 
+        # First, we'll create the directory for this oid, if it doesn't exist. 
+        targetpath = self.fshelper.getPathForOID(oid)
+        if not os.path.exists(targetpath):
+            try:
+                os.makedirs(targetpath, 0700)
+            except OSError:
+                # We might have lost a race.  If so, the directory
+                # must exist now
+                assert os.path.exists(targetpath)
+
         # OK, it's not here and we (or someone) needs to get it.  We
         # want to avoid getting it multiple times.  We want to avoid
         # getting it multiple times even accross separate client
         # processes on the same machine. We'll use file locking.
 
+        lockfilename = blob_filename+'.lock'
         try:
-            lock = ZODB.lock_file.LockFile(blob_filename+'.lock')
-        except Exception:
-            # TODO: fic LockFile so that it raises consistent
-            # exceptions accross platforms.
+            lock = ZODB.lock_file.LockFile(lockfilename)
+        except ZODB.lock_file.LockError:
 
             # Someone is already downloading the Blob. Wait for the
             # lock to be freed.  How long should we be willing to wait?
@@ -961,18 +988,22 @@
             while 1:
                 time.sleep(0.1)
                 try:
-                    lock = ZODB.lock_file.LockFile(blob_filename+'.lock')
-                except Exception:
+                    lock = ZODB.lock_file.LockFile(lockfilename)
+                except ZODB.lock_file.LockError:
                     pass
                 else:
                     # We have the lock. We should be able to get the file now.
                     lock.close()
+                    try:
+                        os.remove(lockfilename)
+                    except OSError:
+                        pass
                     break
             
-                if self._have_blob(blob_filename, oid, serial):
-                    return blob_filename
+            if self._have_blob(blob_filename, oid, serial):
+                return blob_filename
 
-            raise AssertionError("Can't find downloaded blob file.")
+            return None
 
         try:
             # We got the lock, so it's our job to download it.  First,
@@ -991,15 +1022,15 @@
             if self._have_blob(blob_filename, oid, serial):
                 return blob_filename
 
-            raise AssertionError("Can't find downloaded blob file.")
+            return None
 
         finally:
             lock.close()
+            try:
+                os.remove(lockfilename)
+            except OSError:
+                pass
 
-    def getBlobLock(self):
-        # indirection to support unit testing
-        return threading.Lock()
-
     def tpc_vote(self, txn):
         """Storage API: vote on a transaction."""
         if txn is not self._transaction:
@@ -1121,6 +1152,20 @@
                 if s != ResolvedSerial:
                     assert s == tid, (s, tid)
                     self._cache.store(oid, version, s, None, data)
+
+        
+        if self.fshelper is not None:
+            blobs = self._tbuf.blobs
+            while blobs:
+                oid, blobfilename = blobs.pop()
+                targetpath = self.fshelper.getPathForOID(oid)
+                if not os.path.exists(targetpath):
+                    os.makedirs(targetpath, 0700)
+                os.rename(blobfilename,
+                          self.fshelper.getBlobFilename(oid, tid),
+                          )
+
+                    
         self._tbuf.clear()
 
     def undo(self, trans_id, txn):

Modified: ZODB/branches/jim-zeo-blob/src/ZEO/ClientStub.py
===================================================================
--- ZODB/branches/jim-zeo-blob/src/ZEO/ClientStub.py	2007-05-17 20:39:40 UTC (rev 75819)
+++ ZODB/branches/jim-zeo-blob/src/ZEO/ClientStub.py	2007-05-17 20:39:48 UTC (rev 75820)
@@ -60,3 +60,18 @@
 
     def info(self, arg):
         self.rpc.callAsync('info', arg)
+
+    def storeBlob(self, oid, serial, blobfilename):
+
+        def store():
+            yield ('recieveBlobStart', (oid, serial))
+            f = open(blobfilename, 'rb')
+            while 1:
+                chunk = f.read(59000)
+                if not chunk:
+                    break
+                yield ('recieveBlobChunk', (oid, serial, chunk, ))
+            f.close()
+            yield ('recieveBlobStop', (oid, serial))
+
+        self.rpc.callAsyncIterator(store())

Modified: ZODB/branches/jim-zeo-blob/src/ZEO/ServerStub.py
===================================================================
--- ZODB/branches/jim-zeo-blob/src/ZEO/ServerStub.py	2007-05-17 20:39:40 UTC (rev 75819)
+++ ZODB/branches/jim-zeo-blob/src/ZEO/ServerStub.py	2007-05-17 20:39:48 UTC (rev 75820)
@@ -240,7 +240,6 @@
                     break
                 yield ('storeBlobChunk', (chunk, ))
             f.close()
-            os.remove(blobfilename)
             yield ('storeBlobEnd', (oid, serial, data, version, id(txn)))
 
         self.rpc.callAsyncIterator(store())
@@ -291,8 +290,8 @@
     def load(self, oid, version):
         return self.rpc.call('load', oid, version)
 
-    def loadBlob(self, oid, serial, version, offset):
-        return self.rpc.call('loadBlob', oid, serial, version, offset)
+    def sendBlob(self, oid, serial):
+        return self.rpc.call('sendBlob', oid, serial)
 
     def getTid(self, oid):
         return self.rpc.call('getTid', oid)

Modified: ZODB/branches/jim-zeo-blob/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-zeo-blob/src/ZEO/StorageServer.py	2007-05-17 20:39:40 UTC (rev 75819)
+++ ZODB/branches/jim-zeo-blob/src/ZEO/StorageServer.py	2007-05-17 20:39:48 UTC (rev 75820)
@@ -106,7 +106,6 @@
         self.auth_realm = auth_realm
         self.blob_tempfile = None
         self.blob_log = []
-        self.blob_loads = {}
         # The authentication protocol may define extra methods.
         self._extensions = {}
         for func in self.extensions:
@@ -526,7 +525,6 @@
         self.stats.stores += 1
         self.txnlog.store(oid, serial, data, version)
 
-
     def storeBlobStart(self):
         assert self.blob_tempfile is None
         self.blob_tempfile = tempfile.mkstemp(
@@ -550,17 +548,8 @@
                                 filename)
         self.blob_log.append((oid, serial, data, filename, version))
 
-    def loadBlob(self, oid, serial, version, offset):
-        key = (oid, serial)
-        if not key in self.blob_loads:
-            self.blob_loads[key] = \
-                    open(self.storage.loadBlob(oid, serial, version))
-        blobdata = self.blob_loads[key]
-        blobdata.seek(offset)
-        chunk = blobdata.read(4096)
-        if not chunk:
-            del self.blob_loads[key]
-        return chunk
+    def sendBlob(self, oid, serial):
+        self.client.storeBlob(oid, serial, self.storage.loadBlob(oid, serial))
 
     # The following four methods return values, so they must acquire
     # the storage lock and begin the transaction before returning.

Modified: ZODB/branches/jim-zeo-blob/src/ZEO/TransactionBuffer.py
===================================================================
--- ZODB/branches/jim-zeo-blob/src/ZEO/TransactionBuffer.py	2007-05-17 20:39:40 UTC (rev 75819)
+++ ZODB/branches/jim-zeo-blob/src/ZEO/TransactionBuffer.py	2007-05-17 20:39:48 UTC (rev 75820)
@@ -59,12 +59,14 @@
         self.closed = 0
         self.count = 0
         self.size = 0
+        self.blobs = []
         # It's safe to use a fast pickler because the only objects
         # stored are builtin types -- strings or None.
         self.pickler = cPickle.Pickler(self.file, 1)
         self.pickler.fast = 1
 
     def close(self):
+        self.clear()
         self.lock.acquire()
         try:
             self.closed = 1
@@ -82,6 +84,9 @@
         finally:
             self.lock.release()
 
+    def storeBlob(self, oid, blobfilename):
+        self.blobs.append((oid, blobfilename))
+
     def _store(self, oid, version, data):
         """Store oid, version, data for later retrieval"""
         if self.closed:
@@ -113,6 +118,10 @@
             self.file.seek(0)
             self.count = 0
             self.size = 0
+            while self.blobs:
+                oid, serial, blobfilename = self.blobs.pop()
+                if os.path.exists(blobfilename):
+                    os.remove(blobfilename)
         finally:
             self.lock.release()
 

Modified: ZODB/branches/jim-zeo-blob/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-zeo-blob/src/ZEO/tests/testZEO.py	2007-05-17 20:39:40 UTC (rev 75819)
+++ ZODB/branches/jim-zeo-blob/src/ZEO/tests/testZEO.py	2007-05-17 20:39:48 UTC (rev 75820)
@@ -22,6 +22,7 @@
 import signal
 import socket
 import tempfile
+import threading
 import time
 import unittest
 import shutil
@@ -520,93 +521,101 @@
             self._storage.tpc_abort(t)
             raise
 
-        filename = self._storage.loadBlob(oid, serial, version)
+        filename = self._storage.loadBlob(oid, serial)
         self.assertEquals(somedata, open(filename, 'rb').read())
 
-
 class BlobAdaptedFileStorageTests(GenericTests, CommonBlobTests):
     """ZEO backed by a BlobStorage-adapted FileStorage."""
 
     def setUp(self):
-        self.blobdir = tempfile.mkdtemp()  # This is the blob directory on the ZEO server
+        self.blobdir = tempfile.mkdtemp()  # blob directory on ZEO server
         self.filestorage = tempfile.mktemp()
         super(BlobAdaptedFileStorageTests, self).setUp()
 
-    def checkLoadBlobLocks(self):
+    def checkStoreAndLoadBlob(self):
+        from ZODB.utils import oid_repr, tid_repr
         from ZODB.Blobs.Blob import Blob
+        from ZODB.Blobs.BlobStorage import BLOB_SUFFIX
         from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
              handle_serials
         import transaction
 
-        version = ''
-        somedata = 'a' * 10
-
+        somedata_path = os.path.join(self.blob_cache_dir, 'somedata')
+        somedata = open(somedata_path, 'w+b')
+        for i in range(1000000):
+            somedata.write("%s\n")
+        somedata.seek(0)
+        
         blob = Blob()
         bd_fh = blob.open('w')
-        bd_fh.write(somedata)
+        ZODB.utils.cp(somedata, bd_fh)
         bd_fh.close()
         tfname = bd_fh.name
         oid = self._storage.new_oid()
         data = zodb_pickle(blob)
+        self.assert_(os.path.exists(tfname))
 
         t = transaction.Transaction()
         try:
             self._storage.tpc_begin(t)
             r1 = self._storage.storeBlob(oid, ZERO, data, tfname, '', t)
             r2 = self._storage.tpc_vote(t)
-            serial = handle_serials(oid, r1, r2)
+            revid = handle_serials(oid, r1, r2)
             self._storage.tpc_finish(t)
         except:
             self._storage.tpc_abort(t)
             raise
 
+        # The uncommitted data file should have been removed
+        self.assert_(not os.path.exists(tfname))
 
-        class Dummy:
-            def __init__(self):
-                self.acquired = 0
-                self.released = 0
-            def acquire(self):
-                self.acquired += 1
-            def release(self):
-                self.released += 1
-
-        class statusdict(dict):
-            def __init__(self):
-                self.added = []
-                self.removed = []
+        def check_data(path):
+            self.assert_(os.path.exists(path))
+            f = open(path, 'rb')
+            somedata.seek(0) 
+            d1 = d2 = 1
+            while d1 or d2:
+                d1 = f.read(8096)
+                d2 = somedata.read(8096)
+                self.assertEqual(d1, d2)
                 
-            def __setitem__(self, k, v):
-                self.added.append(k)
-                super(statusdict, self).__setitem__(k, v)
+        
+        # The file should have been copied to the server:
+        filename = os.path.join(self.blobdir, oid_repr(oid),
+                                tid_repr(revid) + BLOB_SUFFIX)
+        check_data(filename)
 
-            def __delitem__(self, k):
-                self.removed.append(k)
-                super(statusdict, self).__delitem__(k)
+        # It should also be in the cache:
+        filename = os.path.join(self.blob_cache_dir, oid_repr(oid),
+                                tid_repr(revid) + BLOB_SUFFIX)
+        check_data(filename)
 
-        # ensure that we do locking properly
-        filename = self._storage.fshelper.getBlobFilename(oid, serial)
-        thestatuslock = self._storage.blob_status_lock = Dummy()
-        thebloblock = Dummy()
+        # If we remove it from the cache and call loadBlob, it should
+        # come back. We can do this in many threads.  We'll instrument
+        # the method that is used to request data from teh server to
+        # verify that it is only called once.
 
-        def getBlobLock():
-            return thebloblock
+        sendBlob_org = ZEO.ServerStub.StorageServer.sendBlob
+        calls = []
+        def sendBlob(self, oid, serial):
+            calls.append((oid, serial))
+            sendBlob_org(self, oid, serial)
 
-        # override getBlobLock to test that locking is performed
-        self._storage.getBlobLock = getBlobLock
-        thestatusdict = self._storage.blob_status = statusdict()
-
-        filename = self._storage.loadBlob(oid, serial, version)
-
-        self.assertEqual(thestatuslock.acquired, 2)
-        self.assertEqual(thestatuslock.released, 2)
+        os.remove(filename)
+        returns = []
+        threads = [
+            threading.Thread(
+               target=lambda :
+                      returns.append(self._storage.loadBlob(oid, revid))
+               )
+            for i in range(10)
+            ]
+        [thread.start() for thread in threads]
+        [thread.join() for thread in threads]
+        [self.assertEqual(r, filename) for r in returns]        
+        check_data(filename)
         
-        self.assertEqual(thebloblock.acquired, 1)
-        self.assertEqual(thebloblock.released, 1)
 
-        self.assertEqual(thestatusdict.added, [(oid, serial)])
-        self.assertEqual(thestatusdict.removed, [(oid, serial)])
-
-
 class BlobWritableCacheTests(GenericTests, CommonBlobTests):
 
     def setUp(self):



More information about the Zodb-checkins mailing list