[Zodb-checkins] SVN: ZODB/trunk/src/ZEO/ Reimplemented the ZEO Blob
protocol:
Jim Fulton
jim at zope.com
Fri May 18 14:02:22 EDT 2007
Log message for revision 75844:
Reimplemented the ZEO Blob protocol:
- Avoid more than one round-trip call when loading blobs via copy from
the server.
- Avoid loading large amounts of blob data into memory. The old
storeBlob implementation was likely to queue blob adta faster than
it could be sent, leading to a large memory foot print for the
queue. Now, iterators are used to read data from files only when the
network layer is ready to send it.
- Fixed storeBlob to move the input file to the blob cache (when not
sharing the blob directiry with the server).
- Extended the loadBlob locking model to work with multiple processes
by using file locks rather than threading locks. A common
configuration is to use a client process per core, so that a machine
is likely to have many client processes and it should be possible
for the client processes to share a common blob cache.
Changed:
U ZODB/trunk/src/ZEO/ClientStorage.py
U ZODB/trunk/src/ZEO/ClientStub.py
U ZODB/trunk/src/ZEO/ServerStub.py
U ZODB/trunk/src/ZEO/StorageServer.py
U ZODB/trunk/src/ZEO/TransactionBuffer.py
U ZODB/trunk/src/ZEO/tests/testZEO.py
-=-
Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py 2007-05-18 18:02:19 UTC (rev 75843)
+++ ZODB/trunk/src/ZEO/ClientStorage.py 2007-05-18 18:02:21 UTC (rev 75844)
@@ -21,6 +21,7 @@
import cPickle
import os
import socket
+import sys
import tempfile
import threading
import time
@@ -35,6 +36,7 @@
from ZEO.auth import get_module
from ZEO.zrpc.client import ConnectionManager
+import ZODB.lock_file
from ZODB import POSException
from ZODB import utils
from ZODB.loglevels import BLATHER
@@ -329,10 +331,6 @@
else:
self.fshelper = None
- # Initialize locks
- self.blob_status_lock = threading.Lock()
- self.blob_status = {}
-
# Decide whether to use non-temporary files
if client is not None:
dir = var or os.getcwd()
@@ -896,9 +894,13 @@
"""Storage API: store a blob object."""
serials = self.store(oid, serial, data, version, txn)
if self.blob_cache_writable:
- self._storeBlob_shared(oid, serial, data, blobfilename, version, txn)
+ self._storeBlob_shared(
+ oid, serial, data, blobfilename, version, txn)
else:
- self._storeBlob_copy(oid, serial, data, blobfilename, version, txn)
+ self._server.storeBlob(
+ oid, serial, data, blobfilename, version, txn)
+ if blobfilename is not None:
+ self._tbuf.storeBlob(oid, blobfilename)
return serials
def _storeBlob_shared(self, oid, serial, data, filename, version, txn):
@@ -908,121 +910,145 @@
os.mkdir(dir)
fd, target = self.fshelper.blob_mkstemp(oid, serial)
os.close(fd)
- os.rename(filename, target)
+
+ if sys.platform == 'win32':
+
+ # On windows, we can't rename to an existing file. That's
+ # OK. We don't care what file we get as long as it is
+ # unique. We'll just keep trying until the rename suceeds.
+ os.remove(target)
+ i = 0
+ while 1:
+ try:
+ os.rename(filename, target + str(i))
+ except OSError:
+ i += 1
+ else:
+ break
+ target += str(i)
+ else:
+ os.rename(filename, target)
# Now tell the server where we put it
- self._server.storeBlobShared(oid, serial, data,
- os.path.basename(target), version, id(txn))
+ self._server.storeBlobShared(
+ oid, serial, data,
+ os.path.basename(target), version, id(txn))
- def _storeBlob_copy(self, oid, serial, data, blobfilename, version, txn):
- """Version of storeBlob() that copies the data over the ZEO protocol."""
- blobfile = open(blobfilename, "rb")
- while True:
- chunk = blobfile.read(1<<16)
- # even if the blobfile is completely empty, we need to call
- # storeBlob at least once in order to be able to call
- # storeBlobEnd successfully.
- self._server.storeBlob(oid, serial, chunk, version, id(txn))
- if not chunk:
- self._server.storeBlobEnd(oid, serial, data, version, id(txn))
- break
- blobfile.close()
- os.unlink(blobfilename)
+ def _have_blob(self, blob_filename, oid, serial):
+ if os.path.exists(blob_filename):
+ log2("Found blob %s/%s in cache." % (utils.oid_repr(oid),
+ utils.tid_repr(serial)), level=BLATHER)
+ return True
+ return False
- def _do_load_blob(self, oid, serial, version):
- """Do the actual loading from the RPC server."""
+ def recieveBlobStart(self, oid, serial):
blob_filename = self.fshelper.getBlobFilename(oid, serial)
- if self._server is None:
- raise ClientDisconnected()
+ assert not os.path.exists(blob_filename)
+ assert os.path.exists(blob_filename+'.lock')
+ blob_filename += '.dl'
+ assert not os.path.exists(blob_filename)
+ f = open(blob_filename, 'wb')
+ f.close()
- targetpath = self.fshelper.getPathForOID(oid)
- if not os.path.exists(targetpath):
- os.makedirs(targetpath, 0700)
+ def recieveBlobChunk(self, oid, serial, chunk):
+ blob_filename = self.fshelper.getBlobFilename(oid, serial)+'.dl'
+ assert os.path.exists(blob_filename)
+ f = open(blob_filename, 'ab')
+ f.write(chunk)
+ f.close()
+
+ def recieveBlobStop(self, oid, serial):
+ blob_filename = self.fshelper.getBlobFilename(oid, serial)
+ os.rename(blob_filename+'.dl', blob_filename)
+
+ def loadBlob(self, oid, serial):
- # We write to a temporary file first, so we do not accidentally
- # allow half-baked copies of this blob be loaded
- tempfd, tempfilename = self.fshelper.blob_mkstemp(oid, serial)
- tempfile = os.fdopen(tempfd, 'wb')
-
- offset = 0
- while True:
- chunk = self._server.loadBlob(oid, serial, version, offset)
- if not chunk:
- break
- offset += len(chunk)
- tempfile.write(chunk)
-
- tempfile.close()
- # XXX will fail on Windows if file is open
- os.rename(tempfilename, blob_filename)
- return blob_filename
-
- def loadBlob(self, oid, serial, version):
- """Loading a blob has to know about loading the same blob
- from another thread as the same time.
-
- 1. Check if the blob is downloaded already
- 2. Check whether it is currently beeing downloaded
- 2a. Wait for other download to finish, return
- 3. If not beeing downloaded, start download
- """
+ # Load a blob. If it isn't present and we have a shared blob
+ # directory, then assume that it doesn't exist on the server
+ # and return None.
if self.fshelper is None:
raise POSException.Unsupported("No blob cache directory is "
"configured.")
blob_filename = self.fshelper.getBlobFilename(oid, serial)
# Case 1: Blob is available already, just use it
- if os.path.exists(blob_filename):
- log2("Found blob %s/%s in cache." % (utils.oid_repr(oid),
- utils.tid_repr(serial)), level=BLATHER)
+ if self._have_blob(blob_filename, oid, serial):
return blob_filename
- # Case 2,3: Blob might still be downloading or not there yet
+ if self.blob_cache_writable:
+ # We're using a server shared cache. If the file isn't
+ # here, it's not anywahere.
+ return None
- # Try to get or create a lock for the downloading of this blob,
- # identified by it's oid and serial
- lock_key = (oid, serial)
-
- # We need to make the check for an existing lock and the possible
- # creation of a new one atomic, so there is another lock:
- self.blob_status_lock.acquire()
+ # First, we'll create the directory for this oid, if it doesn't exist.
+ targetpath = self.fshelper.getPathForOID(oid)
+ if not os.path.exists(targetpath):
+ try:
+ os.makedirs(targetpath, 0700)
+ except OSError:
+ # We might have lost a race. If so, the directory
+ # must exist now
+ assert os.path.exists(targetpath)
+
+ # OK, it's not here and we (or someone) needs to get it. We
+ # want to avoid getting it multiple times. We want to avoid
+ # getting it multiple times even accross separate client
+ # processes on the same machine. We'll use file locking.
+
+ lockfilename = blob_filename+'.lock'
try:
- if not self.blob_status.has_key(oid):
- self.blob_status[lock_key] = self.getBlobLock()
- lock = self.blob_status[lock_key]
- finally:
- self.blob_status_lock.release()
+ lock = ZODB.lock_file.LockFile(lockfilename)
+ except ZODB.lock_file.LockError:
- # We acquire the lock to either start downloading, or wait
- # for another download to finish
- lock.acquire()
+ # Someone is already downloading the Blob. Wait for the
+ # lock to be freed. How long should we be willing to wait?
+ # TODO: maybe find some way to assess download progress.
+
+ while 1:
+ time.sleep(0.1)
+ try:
+ lock = ZODB.lock_file.LockFile(lockfilename)
+ except ZODB.lock_file.LockError:
+ pass
+ else:
+ # We have the lock. We should be able to get the file now.
+ lock.close()
+ try:
+ os.remove(lockfilename)
+ except OSError:
+ pass
+ break
+
+ if self._have_blob(blob_filename, oid, serial):
+ return blob_filename
+
+ return None
+
try:
- # If there was another download that is finished by now,
- # we just take the result.
- if os.path.exists(blob_filename):
- log2("Found blob %s/%s in cache after it was downloaded "
- "from another thread." % (utils.oid_repr(oid),
- utils.tid_repr(serial)), level=BLATHER)
+ # We got the lock, so it's our job to download it. First,
+ # we'll double check that someone didn't download it while we
+ # were getting the lock:
+
+ if self._have_blob(blob_filename, oid, serial):
return blob_filename
- # Otherwise we download and use that
- return self._do_load_blob(oid, serial, version)
+ # Ask the server to send it to us. When this function
+ # returns, it will have been sent. (The recieving will
+ # have been handled by the asyncore thread.)
+
+ self._server.sendBlob(oid, serial)
+
+ if self._have_blob(blob_filename, oid, serial):
+ return blob_filename
+
+ return None
+
finally:
- # When done we remove the download lock ...
- lock.release()
-
- # And the status information isn't needed as well,
- # but we have to use the second lock here as well, to avoid
- # making the creation of this status lock non-atomic (see above)
- self.blob_status_lock.acquire()
+ lock.close()
try:
- del self.blob_status[lock_key]
- finally:
- self.blob_status_lock.release()
+ os.remove(lockfilename)
+ except OSError:
+ pass
- def getBlobLock(self):
- # indirection to support unit testing
- return threading.Lock()
-
def tpc_vote(self, txn):
"""Storage API: vote on a transaction."""
if txn is not self._transaction:
@@ -1144,6 +1170,20 @@
if s != ResolvedSerial:
assert s == tid, (s, tid)
self._cache.store(oid, version, s, None, data)
+
+
+ if self.fshelper is not None:
+ blobs = self._tbuf.blobs
+ while blobs:
+ oid, blobfilename = blobs.pop()
+ targetpath = self.fshelper.getPathForOID(oid)
+ if not os.path.exists(targetpath):
+ os.makedirs(targetpath, 0700)
+ os.rename(blobfilename,
+ self.fshelper.getBlobFilename(oid, tid),
+ )
+
+
self._tbuf.clear()
def undo(self, trans_id, txn):
Modified: ZODB/trunk/src/ZEO/ClientStub.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStub.py 2007-05-18 18:02:19 UTC (rev 75843)
+++ ZODB/trunk/src/ZEO/ClientStub.py 2007-05-18 18:02:21 UTC (rev 75844)
@@ -60,3 +60,18 @@
def info(self, arg):
self.rpc.callAsync('info', arg)
+
+ def storeBlob(self, oid, serial, blobfilename):
+
+ def store():
+ yield ('recieveBlobStart', (oid, serial))
+ f = open(blobfilename, 'rb')
+ while 1:
+ chunk = f.read(59000)
+ if not chunk:
+ break
+ yield ('recieveBlobChunk', (oid, serial, chunk, ))
+ f.close()
+ yield ('recieveBlobStop', (oid, serial))
+
+ self.rpc.callAsyncIterator(store())
Modified: ZODB/trunk/src/ZEO/ServerStub.py
===================================================================
--- ZODB/trunk/src/ZEO/ServerStub.py 2007-05-18 18:02:19 UTC (rev 75843)
+++ ZODB/trunk/src/ZEO/ServerStub.py 2007-05-18 18:02:21 UTC (rev 75844)
@@ -13,6 +13,7 @@
##############################################################################
"""RPC stubs for interface exported by StorageServer."""
+import os
import time
##
@@ -219,12 +220,30 @@
def storea(self, oid, serial, data, version, id):
self.rpc.callAsync('storea', oid, serial, data, version, id)
- def storeBlobEnd(self, oid, serial, data, version, id):
- self.rpc.callAsync('storeBlobEnd', oid, serial, data, version, id)
+ def storeBlob(self, oid, serial, data, blobfilename, version, txn):
- def storeBlob(self, oid, serial, chunk, version, id):
- self.rpc.callAsync('storeBlob', oid, serial, chunk, version, id)
+ # Store a blob to the server. We don't want to real all of
+ # the data into memory, so we use a message iterator. This
+ # allows us to read the blob data as needed.
+ if blobfilename is None:
+ self.rpc.callAsync('storeEmptyBlob',
+ oid, serial, data, version, id(txn))
+ return
+
+ def store():
+ yield ('storeBlobStart', ())
+ f = open(blobfilename, 'rb')
+ while 1:
+ chunk = f.read(59000)
+ if not chunk:
+ break
+ yield ('storeBlobChunk', (chunk, ))
+ f.close()
+ yield ('storeBlobEnd', (oid, serial, data, version, id(txn)))
+
+ self.rpc.callAsyncIterator(store())
+
def storeBlobShared(self, oid, serial, data, filename, version, id):
self.rpc.callAsync('storeBlobShared', oid, serial, data, filename,
version, id)
@@ -271,8 +290,8 @@
def load(self, oid, version):
return self.rpc.call('load', oid, version)
- def loadBlob(self, oid, serial, version, offset):
- return self.rpc.call('loadBlob', oid, serial, version, offset)
+ def sendBlob(self, oid, serial):
+ return self.rpc.call('sendBlob', oid, serial)
def getTid(self, oid):
return self.rpc.call('getTid', oid)
Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py 2007-05-18 18:02:19 UTC (rev 75843)
+++ ZODB/trunk/src/ZEO/StorageServer.py 2007-05-18 18:02:21 UTC (rev 75844)
@@ -25,6 +25,7 @@
import logging
import os
import sys
+import tempfile
import threading
import time
import warnings
@@ -103,9 +104,8 @@
self.log_label = _label
self.authenticated = 0
self.auth_realm = auth_realm
- self.blob_transfer = {}
+ self.blob_tempfile = None
self.blob_log = []
- self.blob_loads = {}
# The authentication protocol may define extra methods.
self._extensions = {}
for func in self.extensions:
@@ -525,25 +525,22 @@
self.stats.stores += 1
self.txnlog.store(oid, serial, data, version)
+ def storeBlobStart(self):
+ assert self.blob_tempfile is None
+ self.blob_tempfile = tempfile.mkstemp(
+ dir=self.storage.temporaryDirectory())
+
+ def storeBlobChunk(self, chunk):
+ os.write(self.blob_tempfile[0], chunk)
+
def storeBlobEnd(self, oid, serial, data, version, id):
- key = (oid, id)
- if key not in self.blob_transfer:
- raise Exception, "Can't finish a non-started Blob"
- tempname, tempfile = self.blob_transfer.pop(key)
- tempfile.close()
+ fd, tempname = self.blob_tempfile
+ self.blob_tempfile = None
+ os.close(fd)
self.blob_log.append((oid, serial, data, tempname, version))
- def storeBlob(self, oid, serial, chunk, version, id):
- # XXX check that underlying storage supports blobs
- key = (oid, id)
- if key not in self.blob_transfer:
- tempname = mktemp()
- tempfile = open(tempname, "wb")
- # XXX Force close and remove them when Storage closes
- self.blob_transfer[key] = (tempname, tempfile)
- else:
- tempname, tempfile = self.blob_transfer[key]
- tempfile.write(chunk)
+ def storeEmptyBlob(self, oid, serial, data, version, id):
+ self.blob_log.append((oid, serial, data, None, version))
def storeBlobShared(self, oid, serial, data, filename, version, id):
# Reconstruct the full path from the filename in the OID directory
@@ -551,17 +548,8 @@
filename)
self.blob_log.append((oid, serial, data, filename, version))
- def loadBlob(self, oid, serial, version, offset):
- key = (oid, serial)
- if not key in self.blob_loads:
- self.blob_loads[key] = \
- open(self.storage.loadBlob(oid, serial, version))
- blobdata = self.blob_loads[key]
- blobdata.seek(offset)
- chunk = blobdata.read(4096)
- if not chunk:
- del self.blob_loads[key]
- return chunk
+ def sendBlob(self, oid, serial):
+ self.client.storeBlob(oid, serial, self.storage.loadBlob(oid, serial))
# The following four methods return values, so they must acquire
# the storage lock and begin the transaction before returning.
Modified: ZODB/trunk/src/ZEO/TransactionBuffer.py
===================================================================
--- ZODB/trunk/src/ZEO/TransactionBuffer.py 2007-05-18 18:02:19 UTC (rev 75843)
+++ ZODB/trunk/src/ZEO/TransactionBuffer.py 2007-05-18 18:02:21 UTC (rev 75844)
@@ -59,12 +59,14 @@
self.closed = 0
self.count = 0
self.size = 0
+ self.blobs = []
# It's safe to use a fast pickler because the only objects
# stored are builtin types -- strings or None.
self.pickler = cPickle.Pickler(self.file, 1)
self.pickler.fast = 1
def close(self):
+ self.clear()
self.lock.acquire()
try:
self.closed = 1
@@ -82,6 +84,9 @@
finally:
self.lock.release()
+ def storeBlob(self, oid, blobfilename):
+ self.blobs.append((oid, blobfilename))
+
def _store(self, oid, version, data):
"""Store oid, version, data for later retrieval"""
if self.closed:
@@ -113,6 +118,10 @@
self.file.seek(0)
self.count = 0
self.size = 0
+ while self.blobs:
+ oid, serial, blobfilename = self.blobs.pop()
+ if os.path.exists(blobfilename):
+ os.remove(blobfilename)
finally:
self.lock.release()
Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py 2007-05-18 18:02:19 UTC (rev 75843)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py 2007-05-18 18:02:21 UTC (rev 75844)
@@ -22,6 +22,7 @@
import signal
import socket
import tempfile
+import threading
import time
import unittest
import shutil
@@ -520,93 +521,101 @@
self._storage.tpc_abort(t)
raise
- filename = self._storage.loadBlob(oid, serial, version)
+ filename = self._storage.loadBlob(oid, serial)
self.assertEquals(somedata, open(filename, 'rb').read())
-
class BlobAdaptedFileStorageTests(GenericTests, CommonBlobTests):
"""ZEO backed by a BlobStorage-adapted FileStorage."""
def setUp(self):
- self.blobdir = tempfile.mkdtemp() # This is the blob directory on the ZEO server
+ self.blobdir = tempfile.mkdtemp() # blob directory on ZEO server
self.filestorage = tempfile.mktemp()
super(BlobAdaptedFileStorageTests, self).setUp()
- def checkLoadBlobLocks(self):
+ def checkStoreAndLoadBlob(self):
+ from ZODB.utils import oid_repr, tid_repr
from ZODB.Blobs.Blob import Blob
+ from ZODB.Blobs.BlobStorage import BLOB_SUFFIX
from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
handle_serials
import transaction
- version = ''
- somedata = 'a' * 10
-
+ somedata_path = os.path.join(self.blob_cache_dir, 'somedata')
+ somedata = open(somedata_path, 'w+b')
+ for i in range(1000000):
+ somedata.write("%s\n" % i)
+ somedata.seek(0)
+
blob = Blob()
bd_fh = blob.open('w')
- bd_fh.write(somedata)
+ ZODB.utils.cp(somedata, bd_fh)
bd_fh.close()
tfname = bd_fh.name
oid = self._storage.new_oid()
data = zodb_pickle(blob)
+ self.assert_(os.path.exists(tfname))
t = transaction.Transaction()
try:
self._storage.tpc_begin(t)
r1 = self._storage.storeBlob(oid, ZERO, data, tfname, '', t)
r2 = self._storage.tpc_vote(t)
- serial = handle_serials(oid, r1, r2)
+ revid = handle_serials(oid, r1, r2)
self._storage.tpc_finish(t)
except:
self._storage.tpc_abort(t)
raise
+ # The uncommitted data file should have been removed
+ self.assert_(not os.path.exists(tfname))
- class Dummy:
- def __init__(self):
- self.acquired = 0
- self.released = 0
- def acquire(self):
- self.acquired += 1
- def release(self):
- self.released += 1
-
- class statusdict(dict):
- def __init__(self):
- self.added = []
- self.removed = []
+ def check_data(path):
+ self.assert_(os.path.exists(path))
+ f = open(path, 'rb')
+ somedata.seek(0)
+ d1 = d2 = 1
+ while d1 or d2:
+ d1 = f.read(8096)
+ d2 = somedata.read(8096)
+ self.assertEqual(d1, d2)
- def __setitem__(self, k, v):
- self.added.append(k)
- super(statusdict, self).__setitem__(k, v)
+
+ # The file should have been copied to the server:
+ filename = os.path.join(self.blobdir, oid_repr(oid),
+ tid_repr(revid) + BLOB_SUFFIX)
+ check_data(filename)
- def __delitem__(self, k):
- self.removed.append(k)
- super(statusdict, self).__delitem__(k)
+ # It should also be in the cache:
+ filename = os.path.join(self.blob_cache_dir, oid_repr(oid),
+ tid_repr(revid) + BLOB_SUFFIX)
+ check_data(filename)
- # ensure that we do locking properly
- filename = self._storage.fshelper.getBlobFilename(oid, serial)
- thestatuslock = self._storage.blob_status_lock = Dummy()
- thebloblock = Dummy()
+ # If we remove it from the cache and call loadBlob, it should
+ # come back. We can do this in many threads. We'll instrument
+ # the method that is used to request data from teh server to
+ # verify that it is only called once.
- def getBlobLock():
- return thebloblock
+ sendBlob_org = ZEO.ServerStub.StorageServer.sendBlob
+ calls = []
+ def sendBlob(self, oid, serial):
+ calls.append((oid, serial))
+ sendBlob_org(self, oid, serial)
- # override getBlobLock to test that locking is performed
- self._storage.getBlobLock = getBlobLock
- thestatusdict = self._storage.blob_status = statusdict()
-
- filename = self._storage.loadBlob(oid, serial, version)
-
- self.assertEqual(thestatuslock.acquired, 2)
- self.assertEqual(thestatuslock.released, 2)
+ os.remove(filename)
+ returns = []
+ threads = [
+ threading.Thread(
+ target=lambda :
+ returns.append(self._storage.loadBlob(oid, revid))
+ )
+ for i in range(10)
+ ]
+ [thread.start() for thread in threads]
+ [thread.join() for thread in threads]
+ [self.assertEqual(r, filename) for r in returns]
+ check_data(filename)
- self.assertEqual(thebloblock.acquired, 1)
- self.assertEqual(thebloblock.released, 1)
- self.assertEqual(thestatusdict.added, [(oid, serial)])
- self.assertEqual(thestatusdict.removed, [(oid, serial)])
-
-
class BlobWritableCacheTests(GenericTests, CommonBlobTests):
def setUp(self):
More information about the Zodb-checkins
mailing list