[Zodb-checkins] SVN: ZODB/branches/ctheune-blobsupport/src/Z - Implemented ZEO support.

Christian Theune ct at gocept.com
Tue Mar 22 18:03:36 EST 2005


Log message for revision 29642:
   - Implemented ZEO support.
  
  

Changed:
  U   ZODB/branches/ctheune-blobsupport/src/ZEO/ClientStorage.py
  U   ZODB/branches/ctheune-blobsupport/src/ZEO/ServerStub.py
  U   ZODB/branches/ctheune-blobsupport/src/ZEO/StorageServer.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/BlobStorage.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/TODO.txt
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/Connection.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/component.xml
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/config.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/interfaces.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/utils.py

-=-
Modified: ZODB/branches/ctheune-blobsupport/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZEO/ClientStorage.py	2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZEO/ClientStorage.py	2005-03-22 23:03:36 UTC (rev 29642)
@@ -27,6 +27,7 @@
 import types
 import logging
 
+from zope.interface import implements
 from ZEO import ServerStub
 from ZEO.cache import ClientCache
 from ZEO.TransactionBuffer import TransactionBuffer
@@ -34,8 +35,11 @@
 from ZEO.auth import get_module
 from ZEO.zrpc.client import ConnectionManager
 
+from ZODB.Blobs.BlobStorage import BLOB_SUFFIX, BLOB_DIRTY
 from ZODB import POSException
+from ZODB import utils
 from ZODB.loglevels import BLATHER
+from ZODB.Blobs.interfaces import IBlobStorage
 from persistent.TimeStamp import TimeStamp
 
 logger = logging.getLogger('ZEO.ClientStorage')
@@ -93,6 +97,7 @@
     tpc_begin().
     """
 
+    implements(IBlobStorage)
     # Classes we instantiate.  A subclass might override.
 
     TransactionBufferClass = TransactionBuffer
@@ -106,7 +111,7 @@
                  wait_for_server_on_startup=None, # deprecated alias for wait
                  wait=None, wait_timeout=None,
                  read_only=0, read_only_fallback=0,
-                 username='', password='', realm=None):
+                 username='', password='', realm=None, blob_dir="/tmp"):
         """ClientStorage constructor.
 
         This is typically invoked from a custom_zodb.py file.
@@ -303,6 +308,8 @@
         # is executing.
         self._lock = threading.Lock()
 
+        self.blob_dir = blob_dir
+
         # Decide whether to use non-temporary files
         if client is not None:
             dir = var or os.getcwd()
@@ -885,6 +892,58 @@
         self._tbuf.store(oid, version, data)
         return self._check_serials()
 
+    def storeBlob(self, oid, serial, data, blobfilename, version, txn):
+        serials = self.store(oid, serial, data, version, txn)
+        blobfile = open(blobfilename, "rb")
+        while True:
+            chunk = blobfile.read(4096)
+            if not chunk:
+                self._server.storeBlobEnd(oid, serial, data, version, id(txn))
+                break
+            self._server.storeBlob(oid, serial, chunk, version, id(txn))
+        return serials
+
+    def _getDirtyFilename(self, oid, serial):
+        """Generate an intermediate filename for two-phase commit.
+        """
+        return self._getCleanFilename(oid, serial) + "." + BLOB_DIRTY
+
+    def _getBlobPath(self, oid):
+        return self.blob_dir
+
+    def _getCleanFilename(self, oid, tid):
+        return os.path.join(self._getBlobPath(oid),
+                            "%s-%s%s" % (utils.oid_repr(oid),
+                                         utils.tid_repr(tid), 
+                                         BLOB_SUFFIX,)
+                            )
+    def loadBlob(self, oid, serial, version):
+        blob_filename = self._getCleanFilename(oid, serial)
+        if os.path.exists(blob_filename):    # XXX see race condition below
+            return blob_filename
+
+        self._load_lock.acquire()
+        try:
+            if self._server is None:
+                raise ClientDisconnected()
+
+            tempfilename = self._getDirtyFilename(oid, serial)
+            tempfile = open(tempfilename, "wb")
+            
+            offset = 0
+            while True:
+                chunk = self._server.loadBlob(oid, serial, version, offset)
+                if not chunk:
+                    break
+                offset += len(chunk)
+                tempfile.write(chunk)
+
+            tempfile.close()
+            utils.best_rename(tempfilename, blob_filename)
+            return blob_filename
+        finally:
+            self._load_lock.release()
+
     def tpc_vote(self, txn):
         """Storage API: vote on a transaction."""
         if txn is not self._transaction:

Modified: ZODB/branches/ctheune-blobsupport/src/ZEO/ServerStub.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZEO/ServerStub.py	2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZEO/ServerStub.py	2005-03-22 23:03:36 UTC (rev 29642)
@@ -211,6 +211,12 @@
     def storea(self, oid, serial, data, version, id):
         self.rpc.callAsync('storea', oid, serial, data, version, id)
 
+    def storeBlobEnd(self, oid, serial, data, version, id):
+        self.rpc.callAsync('storeBlobEnd', oid, serial, data, version, id)
+
+    def storeBlob(self, oid, serial, chunk, version, id):
+        self.rpc.callAsync('storeBlob', oid, serial, chunk, version, id)
+
     ##
     # Start two-phase commit for a transaction
     # @param id id used by client to identify current transaction.  The
@@ -250,6 +256,9 @@
     def load(self, oid, version):
         return self.rpc.call('load', oid, version)
 
+    def loadBlob(self, oid, serial, version, offset):
+        return self.rpc.call('loadBlob', oid, serial, version, offset)
+
     def getSerial(self, oid):
         return self.rpc.call('getSerial', oid)
 

Modified: ZODB/branches/ctheune-blobsupport/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZEO/StorageServer.py	2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZEO/StorageServer.py	2005-03-22 23:03:36 UTC (rev 29642)
@@ -42,7 +42,7 @@
 from ZODB.POSException import StorageError, StorageTransactionError
 from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
 from ZODB.serialize import referencesf
-from ZODB.utils import u64, oid_repr
+from ZODB.utils import u64, oid_repr, mktemp
 from ZODB.loglevels import BLATHER
 
 logger = logging.getLogger('ZEO.StorageServer')
@@ -93,6 +93,9 @@
         self.log_label = _label
         self.authenticated = 0
         self.auth_realm = auth_realm
+        self.blob_transfer = {}
+        self.blob_log = []
+        self.blob_loads = {}
         # The authentication protocol may define extra methods.
         self._extensions = {}
         for func in self.extensions:
@@ -454,6 +457,49 @@
         self.stats.stores += 1
         self.txnlog.store(oid, serial, data, version)
 
+    def storeBlobEnd(self, oid, serial, data, version, id):
+        key = (oid, id)
+        if key not in self.blob_transfer:
+            raise Exception, "Can't finish a non-started Blob"
+        tempname, tempfile = self.blob_transfer.pop(key)
+        tempfile.close()
+        self.blob_log.append((oid, serial, data, tempname, version))
+
+    def storeBlob(self, oid, serial, chunk, version, id):
+        # XXX check that underlying storage supports blobs
+        key = (oid, id)
+        if key not in self.blob_transfer:
+            tempname = mktemp()
+            tempfile = open(tempname, "wb")
+            self.blob_transfer[key] = (tempname, tempfile)   # XXX Force close and remove them when Storage closes
+        else:
+            tempname, tempfile = self.blob_transfer[key]
+
+        tempfile.write(chunk)
+ 
+    def loadBlob(self, oid, serial, version, offset):
+        key = (oid, serial)
+        if not key in self.blob_loads:
+            self.blob_loads[key] = \
+                    open(self.storage.loadBlob(oid, serial, version))
+        blobdata = self.blob_loads[key]
+        blobdata.seek(offset)
+        chunk = blobdata.read(4096)
+        if not chunk:
+            del self.blob_loads[key]
+        return chunk
+
+            
+           
+        
+            
+
+
+            
+
+
+        
+
     # The following four methods return values, so they must acquire
     # the storage lock and begin the transaction before returning.
 
@@ -596,6 +642,13 @@
             # load oid, serial, data, version
             if not self._store(*loader.load()):
                 break
+
+        # Blob support
+        while self.blob_log:
+            oid, oldserial, data, blobfilename, version = self.blob_log.pop()
+            self.storage.storeBlob(oid, oldserial, data, blobfilename, 
+                                   version, self.transaction,)
+
         resp = self._thunk()
         if delay is not None:
             delay.reply(resp)

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/BlobStorage.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/BlobStorage.py	2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/BlobStorage.py	2005-03-22 23:03:36 UTC (rev 29642)
@@ -13,13 +13,19 @@
 ##############################################################################
 
 import os
+import shutil
+import base64
 
 from zope.interface import implements
 from zope.proxy import ProxyBase, getProxiedObject
 
 from ZODB import utils
 from ZODB.Blobs.interfaces import IBlobStorage, IBlob
+from ZODB.POSException import POSKeyError
 
+BLOB_SUFFIX = ".blob"
+BLOB_DIRTY = "store"
+
 class BlobStorage(ProxyBase):
     """A storage to support blobs."""
 
@@ -35,7 +41,7 @@
         ProxyBase.__init__(self, storage)
         self.base_directory = base_directory
         self.dirty_oids = []
-        
+     
     def storeBlob(self, oid, oldserial, data, blobfilename, version, transaction):
         """Stores data that has a BLOB attached."""
         serial = self.store(oid, oldserial, data, version, transaction)
@@ -44,6 +50,10 @@
 
         self._lock_acquire()
         try:
+            targetpath = self._getBlobPath(oid)
+            if not os.path.exists(targetpath):
+                os.makedirs(targetpath, 0700)
+                              
             targetname = self._getCleanFilename(oid, serial)
             try:
                 os.rename(blobfilename, targetname)
@@ -64,21 +74,24 @@
 
     def _getDirtyFilename(self, oid):
         """Generate an intermediate filename for two-phase commit.
-
-        XXX Not used right now due to conceptual flux. Please keep it around
-        anyway. 
         """
-        return self._getCleanFilename(oid, "store")
+        return self._getCleanFilename(oid, BLOB_DIRTY)
 
+    def _getBlobPath(self, oid):
+        return os.path.join(self.base_directory,
+                            utils.oid_repr(oid)
+                            )
+
     def _getCleanFilename(self, oid, tid):
-        return "%s/%s-%s.blob" % \
-                (self.base_directory, 
-                 utils.oid_repr(oid),
-                 utils.tid_repr(tid),
-                 )
+        return os.path.join(self._getBlobPath(oid),
+                            "%s%s" % (utils.tid_repr(tid), 
+                                      BLOB_SUFFIX,)
+                            )
 
     def _finish(self, tid, u, d, e): 
         ProxyBase._finish(self, tid, u, d, e)
+        # Move dirty blobs if they are "really" dirty
+        
         self.dirty_blobs = []
 
     def _abort(self):
@@ -87,10 +100,118 @@
         # Throw away the stuff we'd had committed
         while self.dirty_blobs:
             oid, serial = self.dirty_blobs.pop()
-            os.unlink(self._getCleanFilename(oid))
-        
+            clean = self._getCleanFilename(oid, serial)
+            dirty = self._getDirtyFilename(oid, serial)
+            for filename in [clean, dirty]:
+                if os.exists(filename):
+                    os.unlink(filename) 
+
     def loadBlob(self, oid, serial, version):
         """Return the filename where the blob file can be found.
         """
         return self._getCleanFilename(oid, serial)
 
+    def _getNewestBlobSerial(self, oid):
+        blob_path = self._getBlobPath(oid)
+        serials = os.listdir(blob_path)
+        serials = [ os.path.join(blob_path, serial) for serial in serials ]
+        serials.sort(lambda x,y: cmp(os.stat(x).st_mtime, 
+                                     os.stat(y).st_mtime)
+                     )
+        return self._splitBlobFilename(serials[-1])[1]
+
+    def pack(self, packtime, referencesf):
+        """Remove all unused oid/tid combinations."""
+        getProxiedObject(self).pack(packtime, referencesf)
+
+        self._lock_acquire()
+        try:
+            # Walk over all existing files and check if they are still needed
+            for filename in os.listdir(self.base_directory):
+                oid = utils.repr_to_oid(filename)
+                serial = self._getNewestBlobSerial(oid)
+                file_path = os.path.join(self.base_directory, filename)
+        
+                try:
+                    self.loadSerial(oid, serial)   # XXX Is that expensive?
+                except POSKeyError:
+                    # The object doesn't exist anymore at all. We can remove
+                    # everything belonging to that oid
+                    shutil.rmtree(file_path)
+                else:
+                    # The object still exists. We can remove everything but the
+                    # last recent object before pack time.
+                    serials = os.listdir(file_path)
+                    recent_candidate = \
+                            os.path.split(self._getCleanFilename(oid, serial))[1]
+                    serials.remove(recent_candidate)
+                    for serial_candidate in serials:
+                        cfname = os.path.join(file_path, serial_candidate)
+                        mtime = os.stat(cfname).st_mtime
+                        if mtime < packtime:
+                            os.unlink(cfname)
+        finally:
+            self._lock_release()
+         
+    def getSize(self):
+        """Return the size of the database in bytes."""
+        orig_size = getProxiedObject(self).getSize()
+        
+        blob_size = 0
+        for oid in os.listdir(self.base_directory):
+            for serial in os.listdir(os.path.join(self.base_directory, oid)):
+                if not serial.endswith(BLOB_SUFFIX):
+                    continue
+                file_path = os.path.join(self.base_directory, oid, serial)
+                blob_size += os.stat(file_path).st_size
+        
+        return orig_size + blob_size
+
+    def _splitBlobFilename(self, filename):
+        """Returns OID, TID for a given blob filename.
+
+        If it's not a blob filename, (None, None) is returned.
+        """
+        if not filename.endswith(BLOB_SUFFIX):
+            return None, None
+        path, filename = os.path.split(filename)
+        oid = os.path.split(path)[1]
+
+        serial = filename[:-len(BLOB_SUFFIX)]
+        oid = utils.repr_to_oid(oid)
+        if serial != BLOB_DIRTY:
+            serial = utils.repr_to_oid(serial)
+        else:
+            serial = None
+        return oid, serial 
+
+    def undo(self, serial_id, transaction):
+        serial, keys = getProxiedObject(self).undo(serial_id, transaction)
+        self._lock_acquire()
+        try:
+            # The old serial_id is given in base64 encoding ...
+            serial_id = base64.decodestring(serial_id+ '\n')
+            for oid in self._getOIDsForSerial(serial_id):
+                data, serial_before, serial_after = \
+                        self.loadBefore(oid, serial_id) 
+                orig = file(self._getCleanFilename(oid, serial_before), "r")
+                new = file(self._getCleanFilename(oid, serial), "w")
+                utils.cp(orig, new)
+                orig.close()
+                new.close()
+                self.dirty_oids.append((oid, serial))
+        finally:
+            self._lock_release()
+        return serial, keys
+
+    def _getOIDsForSerial(self, search_serial):
+        oids = []
+        for oidpath in os.listdir(self.base_directory):
+            for filename in os.listdir(os.path.join(self.base_directory,
+                                     oidpath)):
+                blob_path = os.path.join(self.base_directory, oidpath, 
+                                         filename)
+                oid, serial = self._splitBlobFilename(blob_path)
+                if search_serial == serial:
+                    oids.append(oid)
+        return oids

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/TODO.txt
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/TODO.txt	2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/TODO.txt	2005-03-22 23:03:36 UTC (rev 29642)
@@ -1,4 +1,12 @@
 
-- Blob instances should clean up temporary files after committing
+- Support database import/export
 
-- Support database import/export
+- Support ZEO
+
+- Support selection of text/binary mode for blobs
+
+    -   implement loadBlob and storeBlob
+
+    -   loadBlob needs to handle the BLOB_CACHE_DIRECTORY
+
+    -   storeBlob needs to hand the actual file data off to the server

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Connection.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Connection.py	2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Connection.py	2005-03-22 23:03:36 UTC (rev 29642)
@@ -350,6 +350,7 @@
                 s = self._storage.storeBlob(oid, serial, p,
                                             obj._p_blob_uncommitted,
                                             self._version, transaction)
+                obj._p_invalidate()
             else:
                 s = self._storage.store(oid, serial, p, self._version,
                                         transaction)
@@ -371,12 +372,6 @@
 
             self._handle_serial(s, oid)
             
-            if IBlob.providedBy(obj):
-                # We need to update internals of the blobs here
-                obj._p_blob_uncommitted = None
-                obj._p_blob_data = \
-                        self._storage.loadBlob(oid, obj._p_serial, 
-                                               self._version )
 
     def commit_sub(self, t):
         """Commit all changes made in subtransactions and begin 2-phase commit
@@ -687,6 +682,7 @@
 
         # Blob support
         if IBlob.providedBy(obj):
+            obj._p_blob_uncommitted = None
             obj._p_blob_data = \
                     self._storage.loadBlob(oid, serial, self._version)
 

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/component.xml
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/component.xml	2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/component.xml	2005-03-22 23:03:36 UTC (rev 29642)
@@ -64,6 +64,12 @@
 
   <sectiontype name="zeoclient" datatype=".ZEOClient"
                implements="ZODB.storage">
+
+     <key name="blob-dir" required="no" default="/tmp">
+      <description>
+        Path name to the blob storage directory.
+      </description>
+    </key>
     <multikey name="server" datatype="socket-address" required="yes"/>
     <key name="storage" default="1">
       <description>

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/config.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/config.py	2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/config.py	2005-03-22 23:03:36 UTC (rev 29642)
@@ -150,6 +150,7 @@
         L = [server.address for server in self.config.server]
         return ClientStorage(
             L,
+            blob_dir=self.config.blob_dir,
             storage=self.config.storage,
             cache_size=self.config.cache_size,
             name=self.config.name,

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/interfaces.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/interfaces.py	2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/interfaces.py	2005-03-22 23:03:36 UTC (rev 29642)
@@ -386,7 +386,7 @@
         """XXX"""
         
     def getSize():
-        """XXX"""
+        """Return the size of the database in bytes."""
 
     def history(oid, version, length=1, filter=None):
         """XXX"""

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/utils.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/utils.py	2005-03-22 21:14:33 UTC (rev 29641)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/utils.py	2005-03-22 23:03:36 UTC (rev 29642)
@@ -16,7 +16,7 @@
 import time
 import struct
 from struct import pack, unpack
-from binascii import hexlify
+from binascii import hexlify, unhexlify
 import cPickle as pickle
 from cStringIO import StringIO
 import weakref
@@ -88,7 +88,7 @@
 
 U64 = u64
 
-def cp(f1, f2, length):
+def cp(f1, f2, length=None):
     """Copy all data from one file to another.
     
     It copies the data from the current position of the input file (f1)
@@ -101,6 +101,12 @@
     write = f2.write
     n = 8192
 
+    if length is None:
+        old_pos = f1.tell()
+        f1.seek(0,2)
+        length = f1.tell()
+        f1.seek(old_pos)
+    
     while length > 0:
         if n > length:
             n = length
@@ -133,6 +139,13 @@
     else:
         return repr(oid)
 
+def repr_to_oid(repr):
+    if repr.startswith("0x"):
+        repr = repr[2:]
+    as_bin = unhexlify(repr)
+    as_bin = "\x00"*(8-len(as_bin)) + as_bin
+    return as_bin
+
 serial_repr = oid_repr
 tid_repr = serial_repr
 
@@ -314,3 +327,20 @@
     handle, filename = mkstemp()
     os.close(handle)
     return filename
+
+def best_rename(sourcename, targetname):
+    try:
+        os.rename(sourcename, targetname)
+    except OSError:
+        # XXX This creates a race condition for un-locked return above
+        source = open(sourcename, "rb")
+        target = open(targetname, "wb")
+        while True:
+            chunk = source.read(4096)
+            if not chunk:
+                break
+            target.write(chunk)
+        source.close()
+        target.close()
+        os.unlink(sourcename)
+



More information about the Zodb-checkins mailing list