[Zodb-checkins] SVN: ZODB/branches/ctheune-blobsupport/src/Z -
Implemented ZEO support.
Christian Theune
ct at gocept.com
Tue Mar 22 18:03:36 EST 2005
Log message for revision 29642:
- Implemented ZEO support.
Changed:
U ZODB/branches/ctheune-blobsupport/src/ZEO/ClientStorage.py
U ZODB/branches/ctheune-blobsupport/src/ZEO/ServerStub.py
U ZODB/branches/ctheune-blobsupport/src/ZEO/StorageServer.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/BlobStorage.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/TODO.txt
U ZODB/branches/ctheune-blobsupport/src/ZODB/Connection.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/component.xml
U ZODB/branches/ctheune-blobsupport/src/ZODB/config.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/interfaces.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/utils.py
-=-
Modified: ZODB/branches/ctheune-blobsupport/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZEO/ClientStorage.py 2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZEO/ClientStorage.py 2005-03-22 23:03:36 UTC (rev 29642)
@@ -27,6 +27,7 @@
import types
import logging
+from zope.interface import implements
from ZEO import ServerStub
from ZEO.cache import ClientCache
from ZEO.TransactionBuffer import TransactionBuffer
@@ -34,8 +35,11 @@
from ZEO.auth import get_module
from ZEO.zrpc.client import ConnectionManager
+from ZODB.Blobs.BlobStorage import BLOB_SUFFIX, BLOB_DIRTY
from ZODB import POSException
+from ZODB import utils
from ZODB.loglevels import BLATHER
+from ZODB.Blobs.interfaces import IBlobStorage
from persistent.TimeStamp import TimeStamp
logger = logging.getLogger('ZEO.ClientStorage')
@@ -93,6 +97,7 @@
tpc_begin().
"""
+ implements(IBlobStorage)
# Classes we instantiate. A subclass might override.
TransactionBufferClass = TransactionBuffer
@@ -106,7 +111,7 @@
wait_for_server_on_startup=None, # deprecated alias for wait
wait=None, wait_timeout=None,
read_only=0, read_only_fallback=0,
- username='', password='', realm=None):
+ username='', password='', realm=None, blob_dir="/tmp"):
"""ClientStorage constructor.
This is typically invoked from a custom_zodb.py file.
@@ -303,6 +308,8 @@
# is executing.
self._lock = threading.Lock()
+ self.blob_dir = blob_dir
+
# Decide whether to use non-temporary files
if client is not None:
dir = var or os.getcwd()
@@ -885,6 +892,58 @@
self._tbuf.store(oid, version, data)
return self._check_serials()
+ def storeBlob(self, oid, serial, data, blobfilename, version, txn):
+ serials = self.store(oid, serial, data, version, txn)
+ blobfile = open(blobfilename, "rb")
+ while True:
+ chunk = blobfile.read(4096)
+ if not chunk:
+ self._server.storeBlobEnd(oid, serial, data, version, id(txn))
+ break
+ self._server.storeBlob(oid, serial, chunk, version, id(txn))
+ return serials
+
+ def _getDirtyFilename(self, oid, serial):
+ """Generate an intermediate filename for two-phase commit.
+ """
+ return self._getCleanFilename(oid, serial) + "." + BLOB_DIRTY
+
+ def _getBlobPath(self, oid):
+ return self.blob_dir
+
+ def _getCleanFilename(self, oid, tid):
+ return os.path.join(self._getBlobPath(oid),
+ "%s-%s%s" % (utils.oid_repr(oid),
+ utils.tid_repr(tid),
+ BLOB_SUFFIX,)
+ )
+ def loadBlob(self, oid, serial, version):
+ blob_filename = self._getCleanFilename(oid, serial)
+ if os.path.exists(blob_filename): # XXX see race condition below
+ return blob_filename
+
+ self._load_lock.acquire()
+ try:
+ if self._server is None:
+ raise ClientDisconnected()
+
+ tempfilename = self._getDirtyFilename(oid, serial)
+ tempfile = open(tempfilename, "wb")
+
+ offset = 0
+ while True:
+ chunk = self._server.loadBlob(oid, serial, version, offset)
+ if not chunk:
+ break
+ offset += len(chunk)
+ tempfile.write(chunk)
+
+ tempfile.close()
+ utils.best_rename(tempfilename, blob_filename)
+ return blob_filename
+ finally:
+ self._load_lock.release()
+
def tpc_vote(self, txn):
"""Storage API: vote on a transaction."""
if txn is not self._transaction:
Modified: ZODB/branches/ctheune-blobsupport/src/ZEO/ServerStub.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZEO/ServerStub.py 2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZEO/ServerStub.py 2005-03-22 23:03:36 UTC (rev 29642)
@@ -211,6 +211,12 @@
def storea(self, oid, serial, data, version, id):
self.rpc.callAsync('storea', oid, serial, data, version, id)
+ def storeBlobEnd(self, oid, serial, data, version, id):
+ self.rpc.callAsync('storeBlobEnd', oid, serial, data, version, id)
+
+ def storeBlob(self, oid, serial, chunk, version, id):
+ self.rpc.callAsync('storeBlob', oid, serial, chunk, version, id)
+
##
# Start two-phase commit for a transaction
# @param id id used by client to identify current transaction. The
@@ -250,6 +256,9 @@
def load(self, oid, version):
return self.rpc.call('load', oid, version)
+ def loadBlob(self, oid, serial, version, offset):
+ return self.rpc.call('loadBlob', oid, serial, version, offset)
+
def getSerial(self, oid):
return self.rpc.call('getSerial', oid)
Modified: ZODB/branches/ctheune-blobsupport/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZEO/StorageServer.py 2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZEO/StorageServer.py 2005-03-22 23:03:36 UTC (rev 29642)
@@ -42,7 +42,7 @@
from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.serialize import referencesf
-from ZODB.utils import u64, oid_repr
+from ZODB.utils import u64, oid_repr, mktemp
from ZODB.loglevels import BLATHER
logger = logging.getLogger('ZEO.StorageServer')
@@ -93,6 +93,9 @@
self.log_label = _label
self.authenticated = 0
self.auth_realm = auth_realm
+ self.blob_transfer = {}
+ self.blob_log = []
+ self.blob_loads = {}
# The authentication protocol may define extra methods.
self._extensions = {}
for func in self.extensions:
@@ -454,6 +457,49 @@
self.stats.stores += 1
self.txnlog.store(oid, serial, data, version)
+ def storeBlobEnd(self, oid, serial, data, version, id):
+ key = (oid, id)
+ if key not in self.blob_transfer:
+ raise Exception, "Can't finish a non-started Blob"
+ tempname, tempfile = self.blob_transfer.pop(key)
+ tempfile.close()
+ self.blob_log.append((oid, serial, data, tempname, version))
+
+ def storeBlob(self, oid, serial, chunk, version, id):
+ # XXX check that underlying storage supports blobs
+ key = (oid, id)
+ if key not in self.blob_transfer:
+ tempname = mktemp()
+ tempfile = open(tempname, "wb")
+ self.blob_transfer[key] = (tempname, tempfile) # XXX Force close and remove them when Storage closes
+ else:
+ tempname, tempfile = self.blob_transfer[key]
+
+ tempfile.write(chunk)
+
+ def loadBlob(self, oid, serial, version, offset):
+ key = (oid, serial)
+ if not key in self.blob_loads:
+ self.blob_loads[key] = \
+ open(self.storage.loadBlob(oid, serial, version))
+ blobdata = self.blob_loads[key]
+ blobdata.seek(offset)
+ chunk = blobdata.read(4096)
+ if not chunk:
+ del self.blob_loads[key]
+ return chunk
+
+
+
+
+
+
+
+
+
+
+
+
# The following four methods return values, so they must acquire
# the storage lock and begin the transaction before returning.
@@ -596,6 +642,13 @@
# load oid, serial, data, version
if not self._store(*loader.load()):
break
+
+ # Blob support
+ while self.blob_log:
+ oid, oldserial, data, blobfilename, version = self.blob_log.pop()
+ self.storage.storeBlob(oid, oldserial, data, blobfilename,
+ version, self.transaction,)
+
resp = self._thunk()
if delay is not None:
delay.reply(resp)
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/BlobStorage.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/BlobStorage.py 2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/BlobStorage.py 2005-03-22 23:03:36 UTC (rev 29642)
@@ -13,13 +13,19 @@
##############################################################################
import os
+import shutil
+import base64
from zope.interface import implements
from zope.proxy import ProxyBase, getProxiedObject
from ZODB import utils
from ZODB.Blobs.interfaces import IBlobStorage, IBlob
+from ZODB.POSException import POSKeyError
+BLOB_SUFFIX = ".blob"
+BLOB_DIRTY = "store"
+
class BlobStorage(ProxyBase):
"""A storage to support blobs."""
@@ -35,7 +41,7 @@
ProxyBase.__init__(self, storage)
self.base_directory = base_directory
self.dirty_oids = []
-
+
def storeBlob(self, oid, oldserial, data, blobfilename, version, transaction):
"""Stores data that has a BLOB attached."""
serial = self.store(oid, oldserial, data, version, transaction)
@@ -44,6 +50,10 @@
self._lock_acquire()
try:
+ targetpath = self._getBlobPath(oid)
+ if not os.path.exists(targetpath):
+ os.makedirs(targetpath, 0700)
+
targetname = self._getCleanFilename(oid, serial)
try:
os.rename(blobfilename, targetname)
@@ -64,21 +74,24 @@
def _getDirtyFilename(self, oid):
"""Generate an intermediate filename for two-phase commit.
-
- XXX Not used right now due to conceptual flux. Please keep it around
- anyway.
"""
- return self._getCleanFilename(oid, "store")
+ return self._getCleanFilename(oid, BLOB_DIRTY)
+ def _getBlobPath(self, oid):
+ return os.path.join(self.base_directory,
+ utils.oid_repr(oid)
+ )
+
def _getCleanFilename(self, oid, tid):
- return "%s/%s-%s.blob" % \
- (self.base_directory,
- utils.oid_repr(oid),
- utils.tid_repr(tid),
- )
+ return os.path.join(self._getBlobPath(oid),
+ "%s%s" % (utils.tid_repr(tid),
+ BLOB_SUFFIX,)
+ )
def _finish(self, tid, u, d, e):
ProxyBase._finish(self, tid, u, d, e)
+ # Move dirty blobs if they are "really" dirty
+
self.dirty_blobs = []
def _abort(self):
@@ -87,10 +100,118 @@
# Throw away the stuff we'd had committed
while self.dirty_blobs:
oid, serial = self.dirty_blobs.pop()
- os.unlink(self._getCleanFilename(oid))
-
+ clean = self._getCleanFilename(oid, serial)
+ dirty = self._getDirtyFilename(oid, serial)
+ for filename in [clean, dirty]:
+ if os.exists(filename):
+ os.unlink(filename)
+
def loadBlob(self, oid, serial, version):
"""Return the filename where the blob file can be found.
"""
return self._getCleanFilename(oid, serial)
+ def _getNewestBlobSerial(self, oid):
+ blob_path = self._getBlobPath(oid)
+ serials = os.listdir(blob_path)
+ serials = [ os.path.join(blob_path, serial) for serial in serials ]
+ serials.sort(lambda x,y: cmp(os.stat(x).st_mtime,
+ os.stat(y).st_mtime)
+ )
+ return self._splitBlobFilename(serials[-1])[1]
+
+ def pack(self, packtime, referencesf):
+ """Remove all unused oid/tid combinations."""
+ getProxiedObject(self).pack(packtime, referencesf)
+
+ self._lock_acquire()
+ try:
+ # Walk over all existing files and check if they are still needed
+ for filename in os.listdir(self.base_directory):
+ oid = utils.repr_to_oid(filename)
+ serial = self._getNewestBlobSerial(oid)
+ file_path = os.path.join(self.base_directory, filename)
+
+ try:
+ self.loadSerial(oid, serial) # XXX Is that expensive?
+ except POSKeyError:
+ # The object doesn't exist anymore at all. We can remove
+ # everything belonging to that oid
+ shutil.rmtree(file_path)
+ else:
+ # The object still exists. We can remove everything but the
+ # last recent object before pack time.
+ serials = os.listdir(file_path)
+ recent_candidate = \
+ os.path.split(self._getCleanFilename(oid, serial))[1]
+ serials.remove(recent_candidate)
+ for serial_candidate in serials:
+ cfname = os.path.join(file_path, serial_candidate)
+ mtime = os.stat(cfname).st_mtime
+ if mtime < packtime:
+ os.unlink(cfname)
+ finally:
+ self._lock_release()
+
+ def getSize(self):
+ """Return the size of the database in bytes."""
+ orig_size = getProxiedObject(self).getSize()
+
+ blob_size = 0
+ for oid in os.listdir(self.base_directory):
+ for serial in os.listdir(os.path.join(self.base_directory, oid)):
+ if not serial.endswith(BLOB_SUFFIX):
+ continue
+ file_path = os.path.join(self.base_directory, oid, serial)
+ blob_size += os.stat(file_path).st_size
+
+ return orig_size + blob_size
+
+ def _splitBlobFilename(self, filename):
+ """Returns OID, TID for a given blob filename.
+
+ If it's not a blob filename, (None, None) is returned.
+ """
+ if not filename.endswith(BLOB_SUFFIX):
+ return None, None
+ path, filename = os.path.split(filename)
+ oid = os.path.split(path)[1]
+
+ serial = filename[:-len(BLOB_SUFFIX)]
+ oid = utils.repr_to_oid(oid)
+ if serial != BLOB_DIRTY:
+ serial = utils.repr_to_oid(serial)
+ else:
+ serial = None
+ return oid, serial
+
+ def undo(self, serial_id, transaction):
+ serial, keys = getProxiedObject(self).undo(serial_id, transaction)
+ self._lock_acquire()
+ try:
+ # The old serial_id is given in base64 encoding ...
+ serial_id = base64.decodestring(serial_id+ '\n')
+ for oid in self._getOIDsForSerial(serial_id):
+ data, serial_before, serial_after = \
+ self.loadBefore(oid, serial_id)
+ orig = file(self._getCleanFilename(oid, serial_before), "r")
+ new = file(self._getCleanFilename(oid, serial), "w")
+ utils.cp(orig, new)
+ orig.close()
+ new.close()
+ self.dirty_oids.append((oid, serial))
+ finally:
+ self._lock_release()
+ return serial, keys
+
+ def _getOIDsForSerial(self, search_serial):
+ oids = []
+ for oidpath in os.listdir(self.base_directory):
+ for filename in os.listdir(os.path.join(self.base_directory,
+ oidpath)):
+ blob_path = os.path.join(self.base_directory, oidpath,
+ filename)
+ oid, serial = self._splitBlobFilename(blob_path)
+ if search_serial == serial:
+ oids.append(oid)
+ return oids
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/TODO.txt
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/TODO.txt 2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/TODO.txt 2005-03-22 23:03:36 UTC (rev 29642)
@@ -1,4 +1,12 @@
-- Blob instances should clean up temporary files after committing
+- Support database import/export
-- Support database import/export
+- Support ZEO
+
+- Support selection of text/binary mode for blobs
+
+ - implement loadBlob and storeBlob
+
+ - loadBlob needs to handle the BLOB_CACHE_DIRECTORY
+
+ - storeBlob needs to hand the actual file data off to the server
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Connection.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Connection.py 2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Connection.py 2005-03-22 23:03:36 UTC (rev 29642)
@@ -350,6 +350,7 @@
s = self._storage.storeBlob(oid, serial, p,
obj._p_blob_uncommitted,
self._version, transaction)
+ obj._p_invalidate()
else:
s = self._storage.store(oid, serial, p, self._version,
transaction)
@@ -371,12 +372,6 @@
self._handle_serial(s, oid)
- if IBlob.providedBy(obj):
- # We need to update internals of the blobs here
- obj._p_blob_uncommitted = None
- obj._p_blob_data = \
- self._storage.loadBlob(oid, obj._p_serial,
- self._version )
def commit_sub(self, t):
"""Commit all changes made in subtransactions and begin 2-phase commit
@@ -687,6 +682,7 @@
# Blob support
if IBlob.providedBy(obj):
+ obj._p_blob_uncommitted = None
obj._p_blob_data = \
self._storage.loadBlob(oid, serial, self._version)
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/component.xml
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/component.xml 2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/component.xml 2005-03-22 23:03:36 UTC (rev 29642)
@@ -64,6 +64,12 @@
<sectiontype name="zeoclient" datatype=".ZEOClient"
implements="ZODB.storage">
+
+ <key name="blob-dir" required="no" default="/tmp">
+ <description>
+ Path name to the blob storage directory.
+ </description>
+ </key>
<multikey name="server" datatype="socket-address" required="yes"/>
<key name="storage" default="1">
<description>
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/config.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/config.py 2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/config.py 2005-03-22 23:03:36 UTC (rev 29642)
@@ -150,6 +150,7 @@
L = [server.address for server in self.config.server]
return ClientStorage(
L,
+ blob_dir=self.config.blob_dir,
storage=self.config.storage,
cache_size=self.config.cache_size,
name=self.config.name,
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/interfaces.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/interfaces.py 2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/interfaces.py 2005-03-22 23:03:36 UTC (rev 29642)
@@ -386,7 +386,7 @@
"""XXX"""
def getSize():
- """XXX"""
+ """Return the size of the database in bytes."""
def history(oid, version, length=1, filter=None):
"""XXX"""
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/utils.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/utils.py 2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/utils.py 2005-03-22 23:03:36 UTC (rev 29642)
@@ -16,7 +16,7 @@
import time
import struct
from struct import pack, unpack
-from binascii import hexlify
+from binascii import hexlify, unhexlify
import cPickle as pickle
from cStringIO import StringIO
import weakref
@@ -88,7 +88,7 @@
U64 = u64
-def cp(f1, f2, length):
+def cp(f1, f2, length=None):
"""Copy all data from one file to another.
It copies the data from the current position of the input file (f1)
@@ -101,6 +101,12 @@
write = f2.write
n = 8192
+ if length is None:
+ old_pos = f1.tell()
+ f1.seek(0,2)
+ length = f1.tell()
+ f1.seek(old_pos)
+
while length > 0:
if n > length:
n = length
@@ -133,6 +139,13 @@
else:
return repr(oid)
+def repr_to_oid(repr):
+ if repr.startswith("0x"):
+ repr = repr[2:]
+ as_bin = unhexlify(repr)
+ as_bin = "\x00"*(8-len(as_bin)) + as_bin
+ return as_bin
+
serial_repr = oid_repr
tid_repr = serial_repr
@@ -314,3 +327,20 @@
handle, filename = mkstemp()
os.close(handle)
return filename
+
+def best_rename(sourcename, targetname):
+ try:
+ os.rename(sourcename, targetname)
+ except OSError:
+ # XXX This creates a race condition for un-locked return above
+ source = open(sourcename, "rb")
+ target = open(targetname, "wb")
+ while True:
+ chunk = source.read(4096)
+ if not chunk:
+ break
+ target.write(chunk)
+ source.close()
+ target.close()
+ os.unlink(sourcename)
+
More information about the Zodb-checkins
mailing list