[Zodb-checkins] SVN: ZODB/branches/jim-zeo-blob/src/ZEO/ Changed
the storeBlob implementation to use iterators to avoid loading
Jim Fulton
jim at zope.com
Tue May 15 18:43:48 EDT 2007
Log message for revision 75784:
Changed the storeBlob implementation to use iterators to avoid loading
blobs into memory.
Changed:
U ZODB/branches/jim-zeo-blob/src/ZEO/ClientStorage.py
U ZODB/branches/jim-zeo-blob/src/ZEO/ServerStub.py
U ZODB/branches/jim-zeo-blob/src/ZEO/StorageServer.py
-=-
Modified: ZODB/branches/jim-zeo-blob/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-zeo-blob/src/ZEO/ClientStorage.py 2007-05-15 22:29:32 UTC (rev 75783)
+++ ZODB/branches/jim-zeo-blob/src/ZEO/ClientStorage.py 2007-05-15 22:43:47 UTC (rev 75784)
@@ -896,9 +896,11 @@
"""Storage API: store a blob object."""
serials = self.store(oid, serial, data, version, txn)
if self.blob_cache_writable:
- self._storeBlob_shared(oid, serial, data, blobfilename, version, txn)
+ self._storeBlob_shared(
+ oid, serial, data, blobfilename, version, txn)
else:
- self._storeBlob_copy(oid, serial, data, blobfilename, version, txn)
+ self._server.storeBlob(
+ oid, serial, data, blobfilename, version, txn)
return serials
def _storeBlob_shared(self, oid, serial, data, filename, version, txn):
@@ -910,24 +912,10 @@
os.close(fd)
os.rename(filename, target)
# Now tell the server where we put it
- self._server.storeBlobShared(oid, serial, data,
- os.path.basename(target), version, id(txn))
+ self._server.storeBlobShared(
+ oid, serial, data,
+ os.path.basename(target), version, id(txn))
- def _storeBlob_copy(self, oid, serial, data, blobfilename, version, txn):
- """Version of storeBlob() that copies the data over the ZEO protocol."""
- blobfile = open(blobfilename, "rb")
- while True:
- chunk = blobfile.read(1<<16)
- # even if the blobfile is completely empty, we need to call
- # storeBlob at least once in order to be able to call
- # storeBlobEnd successfully.
- self._server.storeBlob(oid, serial, chunk, version, id(txn))
- if not chunk:
- self._server.storeBlobEnd(oid, serial, data, version, id(txn))
- break
- blobfile.close()
- os.unlink(blobfilename)
-
def _do_load_blob(self, oid, serial, version):
"""Do the actual loading from the RPC server."""
blob_filename = self.fshelper.getBlobFilename(oid, serial)
Modified: ZODB/branches/jim-zeo-blob/src/ZEO/ServerStub.py
===================================================================
--- ZODB/branches/jim-zeo-blob/src/ZEO/ServerStub.py 2007-05-15 22:29:32 UTC (rev 75783)
+++ ZODB/branches/jim-zeo-blob/src/ZEO/ServerStub.py 2007-05-15 22:43:47 UTC (rev 75784)
@@ -13,6 +13,7 @@
##############################################################################
"""RPC stubs for interface exported by StorageServer."""
+import os
import time
##
@@ -219,12 +220,31 @@
def storea(self, oid, serial, data, version, id):
self.rpc.callAsync('storea', oid, serial, data, version, id)
- def storeBlobEnd(self, oid, serial, data, version, id):
- self.rpc.callAsync('storeBlobEnd', oid, serial, data, version, id)
+ def storeBlob(self, oid, serial, data, blobfilename, version, txn):
- def storeBlob(self, oid, serial, chunk, version, id):
- self.rpc.callAsync('storeBlob', oid, serial, chunk, version, id)
+ # Store a blob to the server. We don't want to real all of
+ # the data into memory, so we use a message iterator. This
+ # allows us to read the blob data as needed.
+ if blobfilename is None:
+ self.rpc.callAsync('storeEmptyBlob',
+ oid, serial, data, version, id(txn))
+ return
+
+ def store():
+ yield ('storeBlobStart', ())
+ f = open(blobfilename, 'rb')
+ while 1:
+ chunk = f.read(59000)
+ if not chunk:
+ break
+ yield ('storeBlobChunk', (chunk, ))
+ f.close()
+ os.remove(blobfilename)
+ yield ('storeBlobEnd', (oid, serial, data, version, id(txn)))
+
+ self.rpc.callAsyncIterator(store())
+
def storeBlobShared(self, oid, serial, data, filename, version, id):
self.rpc.callAsync('storeBlobShared', oid, serial, data, filename,
version, id)
Modified: ZODB/branches/jim-zeo-blob/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-zeo-blob/src/ZEO/StorageServer.py 2007-05-15 22:29:32 UTC (rev 75783)
+++ ZODB/branches/jim-zeo-blob/src/ZEO/StorageServer.py 2007-05-15 22:43:47 UTC (rev 75784)
@@ -25,6 +25,7 @@
import logging
import os
import sys
+import tempfile
import threading
import time
import warnings
@@ -103,7 +104,7 @@
self.log_label = _label
self.authenticated = 0
self.auth_realm = auth_realm
- self.blob_transfer = {}
+ self.blob_tempfile = None
self.blob_log = []
self.blob_loads = {}
# The authentication protocol may define extra methods.
@@ -525,25 +526,23 @@
self.stats.stores += 1
self.txnlog.store(oid, serial, data, version)
+
+ def storeBlobStart(self):
+ assert self.blob_tempfile is None
+ self.blob_tempfile = tempfile.mkstemp(
+ dir=self.storage.temporaryDirectory())
+
+ def storeBlobChunk(self, chunk):
+ os.write(self.blob_tempfile[0], chunk)
+
def storeBlobEnd(self, oid, serial, data, version, id):
- key = (oid, id)
- if key not in self.blob_transfer:
- raise Exception, "Can't finish a non-started Blob"
- tempname, tempfile = self.blob_transfer.pop(key)
- tempfile.close()
+ fd, tempname = self.blob_tempfile
+ self.blob_tempfile = None
+ os.close(fd)
self.blob_log.append((oid, serial, data, tempname, version))
- def storeBlob(self, oid, serial, chunk, version, id):
- # XXX check that underlying storage supports blobs
- key = (oid, id)
- if key not in self.blob_transfer:
- tempname = mktemp()
- tempfile = open(tempname, "wb")
- # XXX Force close and remove them when Storage closes
- self.blob_transfer[key] = (tempname, tempfile)
- else:
- tempname, tempfile = self.blob_transfer[key]
- tempfile.write(chunk)
+ def storeEmptyBlob(self, oid, serial, data, version, id):
+ self.blob_log.append((oid, serial, data, None, version))
def storeBlobShared(self, oid, serial, data, filename, version, id):
# Reconstruct the full path from the filename in the OID directory
More information about the Zodb-checkins
mailing list