[Zodb-checkins] SVN: ZODB/branches/jim-dev/src/Z checkpoint

Jim Fulton jim at zope.com
Wed Dec 3 18:05:21 EST 2008


Log message for revision 93594:
  checkpoint

Changed:
  U   ZODB/branches/jim-dev/src/ZEO/ClientStorage.py
  U   ZODB/branches/jim-dev/src/ZEO/tests/forker.py
  U   ZODB/branches/jim-dev/src/ZEO/tests/testZEO.py
  A   ZODB/branches/jim-dev/src/ZEO/tests/zeo_blob_cache.test
  U   ZODB/branches/jim-dev/src/ZODB/Connection.py
  U   ZODB/branches/jim-dev/src/ZODB/DemoStorage.py
  U   ZODB/branches/jim-dev/src/ZODB/blob.py
  U   ZODB/branches/jim-dev/src/ZODB/interfaces.py
  U   ZODB/branches/jim-dev/src/ZODB/tests/testConfig.py

-=-
Modified: ZODB/branches/jim-dev/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-dev/src/ZEO/ClientStorage.py	2008-12-03 20:37:39 UTC (rev 93593)
+++ ZODB/branches/jim-dev/src/ZEO/ClientStorage.py	2008-12-03 23:05:21 UTC (rev 93594)
@@ -26,10 +26,10 @@
 from ZEO import ServerStub
 from ZEO.TransactionBuffer import TransactionBuffer
 from ZEO.zrpc.client import ConnectionManager
-from ZODB.blob import rename_or_copy_blob
 from ZODB import POSException
 from ZODB import utils
 from ZODB.loglevels import BLATHER
+import BTrees.IOBTree
 import cPickle
 import logging
 import os
@@ -43,6 +43,7 @@
 import weakref
 import zc.lockfile
 import ZEO.interfaces
+import ZODB
 import ZODB.BaseStorage
 import ZODB.interfaces
 import zope.event
@@ -112,14 +113,16 @@
     StorageServerStubClass = ServerStub.stub
 
     def __init__(self, addr, storage='1', cache_size=20 * MB,
-                 name='', client=None, debug=0, var=None,
+                 name='', client=None, var=None,
                  min_disconnect_poll=1, max_disconnect_poll=30,
                  wait_for_server_on_startup=None, # deprecated alias for wait
                  wait=None, wait_timeout=None,
                  read_only=0, read_only_fallback=0,
                  drop_cache_rather_verify=False,
                  username='', password='', realm=None,
-                 blob_dir=None, shared_blob_dir=False):
+                 blob_dir=None, shared_blob_dir=False,
+                 blob_cache_size=1<<62, blob_cache_size_check=100,
+                 ):
         """ClientStorage constructor.
 
         This is typically invoked from a custom_zodb.py file.
@@ -127,81 +130,106 @@
         All arguments except addr should be keyword arguments.
         Arguments:
 
-        addr -- The server address(es).  This is either a list of
+        addr
+            The server address(es).  This is either a list of
             addresses or a single address.  Each address can be a
             (hostname, port) tuple to signify a TCP/IP connection or
             a pathname string to signify a Unix domain socket
             connection.  A hostname may be a DNS name or a dotted IP
             address.  Required.
 
-        storage -- The storage name, defaulting to '1'.  The name must
+        storage
+            The storage name, defaulting to '1'.  The name must
             match one of the storage names supported by the server(s)
             specified by the addr argument.  The storage name is
             displayed in the Zope control panel.
 
-        cache_size -- The disk cache size, defaulting to 20 megabytes.
+        cache_size
+            The disk cache size, defaulting to 20 megabytes.
             This is passed to the ClientCache constructor.
 
-        name -- The storage name, defaulting to ''.  If this is false,
+        name
+            The storage name, defaulting to ''.  If this is false,
             str(addr) is used as the storage name.
 
-        client -- A name used to construct persistent cache filenames.
+        client
+            A name used to construct persistent cache filenames.
             Defaults to None, in which case the cache is not persistent.
             See ClientCache for more info.
 
-        debug -- Ignored.  This is present only for backwards
-            compatibility with ZEO 1.
-
-        var -- When client is not None, this specifies the directory
+        var
+            When client is not None, this specifies the directory
             where the persistent cache files are created.  It defaults
             to None, in whichcase the current directory is used.
 
-        min_disconnect_poll -- The minimum delay in seconds between
+        min_disconnect_poll
+            The minimum delay in seconds between
             attempts to connect to the server, in seconds.  Defaults
             to 5 seconds.
 
-        max_disconnect_poll -- The maximum delay in seconds between
+        max_disconnect_poll
+            The maximum delay in seconds between
             attempts to connect to the server, in seconds.  Defaults
             to 300 seconds.
 
-        wait_for_server_on_startup -- A backwards compatible alias for
+        wait_for_server_on_startup
+            A backwards compatible alias for
             the wait argument.
 
-        wait -- A flag indicating whether to wait until a connection
+        wait
+            A flag indicating whether to wait until a connection
             with a server is made, defaulting to true.
 
-        wait_timeout -- Maximum time to wait for a connection before
+        wait_timeout
+            Maximum time to wait for a connection before
             giving up.  Only meaningful if wait is True.
 
-        read_only -- A flag indicating whether this should be a
+        read_only
+            A flag indicating whether this should be a
             read-only storage, defaulting to false (i.e. writing is
             allowed by default).
 
-        read_only_fallback -- A flag indicating whether a read-only
+        read_only_fallback
+            A flag indicating whether a read-only
             remote storage should be acceptable as a fallback when no
             writable storages are available.  Defaults to false.  At
             most one of read_only and read_only_fallback should be
             true.
 
-        username -- string with username to be used when authenticating.
+        username
+            string with username to be used when authenticating.
             These only need to be provided if you are connecting to an
             authenticated server storage.
 
-        password -- string with plaintext password to be used
-            when authenticated.
+        password
+            string with plaintext password to be used when authenticated.
 
-        realm -- not documented.
+        realm
+            not documented.
 
-        drop_cache_rather_verify -- a flag indicating that the cache
-            should be dropped rather than expensively verified.
+        drop_cache_rather_verify
+            a flag indicating that the cache should be dropped rather
+            than expensively verified.
 
-        blob_dir -- directory path for blob data.  'blob data' is data that
+        blob_dir
+            directory path for blob data.  'blob data' is data that
             is retrieved via the loadBlob API.
 
-        shared_blob_dir -- Flag whether the blob_dir is a server-shared
-        filesystem that should be used instead of transferring blob data over
-        zrpc.
+        shared_blob_dir
+            Flag whether the blob_dir is a server-shared filesystem
+            that should be used instead of transferring blob data over
+            zrpc.
 
+        blob_cache_size
+            Maximum size of the ZEO cache, in bytes. Defaults to 1GB.
+            This option is ignored if shared_blob_dir is true.
+
+        blob_cache_size_check
+            ZEO check size as percent of blob_cache_size.  The ZEO
+            cache size will be checked when this many bytes have been
+            loaded into the cache. Defaults to 50% of the blob cache
+            size.   This option is ignored if shared_blob_dir is true.
+
         Note that the authentication protocol is defined by the server
         and is detected by the ClientStorage upon connecting (see
         testConnection() and doAuth() for details).
@@ -219,11 +247,6 @@
             read_only_fallback and "fallback" or "normal",
             storage,
             )
-        
-        if debug:
-            logger.warning(
-                "%s ClientStorage(): debug argument is no longer used",
-                self.__name__)
 
         self._drop_cache_rather_verify = drop_cache_rather_verify
 
@@ -342,12 +365,19 @@
         # XXX need to check for POSIX-ness here
         self.blob_dir = blob_dir
         self.shared_blob_dir = shared_blob_dir
+        
         if blob_dir is not None:
             # Avoid doing this import unless we need it, as it
             # currently requires pywin32 on Windows.
             import ZODB.blob
-            self.fshelper = ZODB.blob.FilesystemHelper(blob_dir)
-            self.fshelper.create()
+            if shared_blob_dir:
+                self.fshelper = ZODB.blob.FilesystemHelper(blob_dir)
+            else:
+                if 'zeocache' not in ZODB.blob.LAYOUTS:
+                    ZODB.blob.LAYOUTS['zeocache'] = BlobCacheLayout()
+                self.fshelper = ZODB.blob.FilesystemHelper(
+                    blob_dir, layout_name='zeocache')
+                self.fshelper.create()
             self.fshelper.checkSecure()
         else:
             self.fshelper = None
@@ -360,6 +390,11 @@
 
         self._cache = self.ClientCacheClass(cache_path, size=cache_size)
 
+        self._blob_cache_size = blob_cache_size
+        self._blob_cache_size_check = (
+            blob_cache_size * blob_cache_size_check / 100)
+        self._check_blob_size()
+
         self._rpc_mgr = self.ConnectionManagerClass(addr, self,
                                                     tmin=min_disconnect_poll,
                                                     tmax=max_disconnect_poll)
@@ -373,6 +408,8 @@
             if not self._rpc_mgr.attempt_connect():
                 self._rpc_mgr.connect()
 
+        
+
     def _wait(self, timeout=None):
         if timeout is not None:
             deadline = time.time() + timeout
@@ -414,6 +451,73 @@
         if self._tfile is not None:
             self._tfile.close()
 
+        if self._check_blob_size_thread is not None:
+            self._check_blob_size_thread.join()
+
+    _check_blob_size_thread = None
+    def _check_blob_size(self):
+        self._blob_data_bytes_loaded = 0
+        if self.shared_blob_dir or not self.blob_dir:
+            return
+
+        check_blob_size_thread = threading.Thread(
+            target=self._check_blob_size_method)
+        check_blob_size_thread.setDaemon(True)
+        check_blob_size_thread.start()
+        self._check_blob_size_thread = check_blob_size_thread
+
+    def _check_blob_size_method(self):
+        try:
+            check_lock = zc.lockfile.LockFile(
+                os.path.join(self.blob_dir, 'cache.lock'))
+        except zc.lockfile.LockError:
+            # Someone is already cleaning up, so don't bother
+            return
+
+        try:
+           target = self._blob_cache_size
+           size = 0
+           tmp = self.temporaryDirectory()
+           blob_suffix = ZODB.blob.BLOB_SUFFIX
+           files_by_atime = BTrees.IOBTree.BTree()
+           for base, dirs, files in os.walk(self.blob_dir):
+               if base == tmp:
+                   del dirs[:]
+                   continue
+               for file_name in files:
+                   if not file_name.endswith(blob_suffix):
+                       continue
+                   file_name = os.path.join(base, file_name)
+                   stat = os.stat(file_name)
+                   size += stat.st_size
+                   t = stat.st_atime
+                   if t not in files_by_atime:
+                       files_by_atime[t] = []
+                   files_by_atime[t].append(file_name)
+
+           while size > target and files_by_atime:
+               for file_name in files_by_atime.pop(files_by_atime.minKey()):
+                   lockfilename = os.path.join(os.path.dirname(file_name),
+                                               '.lock')
+                   try:
+                       lock = zc.lockfile.LockFile(lockfilename)
+                   except zc.lockfile.LockError:
+                       continue  # In use, skip
+
+                   try:
+                       size = os.stat(file_name).st_size
+                       try:
+                           os.remove(file_name)
+                       except OSError:
+                           raise
+                       else:
+                           size -= size
+                   finally:
+                       lock.close()
+        finally:
+            check_lock.close()
+
+
     def registerDB(self, db):
         """Storage API: register a database for invalidation messages.
 
@@ -866,26 +970,20 @@
             # use a slightly different file name. We keep the old one
             # until we're done to avoid conflicts. Then remove the old name.
             target += 'w'
-            rename_or_copy_blob(filename, target)
+            ZODB.blob.rename_or_copy_blob(filename, target)
             os.remove(target[:-1])
         else:
-            rename_or_copy_blob(filename, target)
+            ZODB.blob.rename_or_copy_blob(filename, target)
 
         # Now tell the server where we put it
         self._server.storeBlobShared(
             oid, serial, data, os.path.basename(target), id(txn))
 
-    def _have_blob(self, blob_filename, oid, serial):
-        if os.path.exists(blob_filename):
-            logger.debug("%s Found blob %r/%r in cache.",
-                         self.__name__, oid, serial)
-            return True
-        return False
-
     def receiveBlobStart(self, oid, serial):
         blob_filename = self.fshelper.getBlobFilename(oid, serial)
         assert not os.path.exists(blob_filename)
-        assert os.path.exists(blob_filename+'.lock')
+        lockfilename = os.path.join(os.path.dirname(blob_filename), '.lock')
+        assert os.path.exists(lockfilename)
         blob_filename += '.dl'
         assert not os.path.exists(blob_filename)
         f = open(blob_filename, 'wb')
@@ -894,9 +992,13 @@
     def receiveBlobChunk(self, oid, serial, chunk):
         blob_filename = self.fshelper.getBlobFilename(oid, serial)+'.dl'
         assert os.path.exists(blob_filename)
-        f = open(blob_filename, 'ab')
+        f = open(blob_filename, 'r+b')
+        f.seek(0, 2)
         f.write(chunk)
         f.close()
+        self._blob_data_bytes_loaded += len(chunk)
+        if self._blob_data_bytes_loaded > self._blob_cache_size_check:
+            self._check_blob_size()
 
     def receiveBlobStop(self, oid, serial):
         blob_filename = self.fshelper.getBlobFilename(oid, serial)
@@ -913,14 +1015,25 @@
                                            "configured.")
 
         blob_filename = self.fshelper.getBlobFilename(oid, serial)
-        # Case 1: Blob is available already, just use it
-        if self._have_blob(blob_filename, oid, serial):
+        if self.shared_blob_dir:
+            if os.path.exists(blob_filename):
+                return blob_filename
+            else:
+                # We're using a server shared cache.  If the file isn't
+                # here, it's not anywhere.
+                raise POSException.POSKeyError("No blob file", oid, serial)
+
+        
+        if os.path.exists(blob_filename):
+            try:
+                _accessed(blob_filename)
+            except OSError:
+                # It might have been deleted while we were calling _accessed.
+                # We don't have the file lock.
+                if os.path.exists(blob_filename):
+                    raise
             return blob_filename
 
-        if self.shared_blob_dir:
-            # We're using a server shared cache.  If the file isn't
-            # here, it's not anywhere.
-            raise POSException.POSKeyError("No blob file", oid, serial)
 
         # First, we'll create the directory for this oid, if it doesn't exist. 
         self.fshelper.createPathForOID(oid)
@@ -930,42 +1043,22 @@
         # getting it multiple times even accross separate client
         # processes on the same machine. We'll use file locking.
 
-        lockfilename = blob_filename+'.lock'
-        try:
-            lock = zc.lockfile.LockFile(lockfilename)
-        except zc.lockfile.LockError:
+        lockfilename = os.path.join(os.path.dirname(blob_filename), '.lock')
+        while 1:
+            try:
+                lock = zc.lockfile.LockFile(lockfilename)
+            except zc.lockfile.LockError:
+                time.sleep(0.01)
+            else:
+                break
 
-            # Someone is already downloading the Blob. Wait for the
-            # lock to be freed.  How long should we be willing to wait?
-            # TODO: maybe find some way to assess download progress.
-
-            while 1:
-                time.sleep(0.1)
-                try:
-                    lock = zc.lockfile.LockFile(lockfilename)
-                except zc.lockfile.LockError:
-                    pass
-                else:
-                    # We have the lock. We should be able to get the file now.
-                    lock.close()
-                    try:
-                        os.remove(lockfilename)
-                    except OSError:
-                        pass
-                    break
-
-            if self._have_blob(blob_filename, oid, serial):
-                return blob_filename
-
-            return None
-
         try:
             # We got the lock, so it's our job to download it.  First,
             # we'll double check that someone didn't download it while we
             # were getting the lock:
 
-            if self._have_blob(blob_filename, oid, serial):
-                return blob_filename
+            if os.path.exists(blob_filename):
+                return _accessed(blob_filename)
 
             # Ask the server to send it to us.  When this function
             # returns, it will have been sent. (The recieving will
@@ -973,18 +1066,55 @@
 
             self._server.sendBlob(oid, serial)
 
-            if self._have_blob(blob_filename, oid, serial):
-                return blob_filename
+            if os.path.exists(blob_filename):
+                return _accessed(blob_filename)
 
             raise POSException.POSKeyError("No blob file", oid, serial)
 
         finally:
             lock.close()
+
+    def openCommittedBlobFile(self, oid, serial, blob=None):
+        blob_filename = self.loadBlob(oid, serial)
+        try:
+            if blob is None:
+                return open(blob_filename, 'rb')
+            else:
+                return ZODB.blob.BlobFile(blob_filename, 'r', blob)
+        except (IOError):
+            # The file got removed while we were opening.
+            # Fall through and try again with the protection of the lock.
+            pass
+        
+        lockfilename = os.path.join(os.path.dirname(blob_filename), '.lock')
+        while 1:
             try:
-                os.remove(lockfilename)
-            except OSError:
-                pass
+                lock = zc.lockfile.LockFile(lockfilename)
+            except zc.lockfile.LockError:
+                time.sleep(.01)
+            else:
+                break
 
+        try:
+            blob_filename = self.fshelper.getBlobFilename(oid, serial)
+            if not os.path.exists(blob_filename):
+                if self.shared_blob_dir:
+                    # We're using a server shared cache.  If the file isn't
+                    # here, it's not anywhere.
+                    raise POSException.POSKeyError("No blob file", oid, serial)
+                self._server.sendBlob(oid, serial)
+                if not os.path.exists(blob_filename):
+                    raise POSException.POSKeyError("No blob file", oid, serial)
+
+            _accessed(blob_filename)
+            if blob is None:
+                return open(blob_filename, 'rb')
+            else:
+                return ZODB.blob.BlobFile(blob_filename, 'r', blob)
+        finally:
+            lock.close()
+        
+
     def temporaryDirectory(self):
         return self.fshelper.temp_dir
 
@@ -1125,10 +1255,14 @@
             blobs = self._tbuf.blobs
             while blobs:
                 oid, blobfilename = blobs.pop()
+                self._blob_data_bytes_loaded += os.stat(blobfilename).st_size
                 targetpath = self.fshelper.getPathForOID(oid, create=True)
-                rename_or_copy_blob(blobfilename,
-                          self.fshelper.getBlobFilename(oid, tid),
-                          )
+                ZODB.blob.rename_or_copy_blob(
+                    blobfilename,
+                    self.fshelper.getBlobFilename(oid, tid),
+                    )
+                if self._blob_data_bytes_loaded > self._blob_cache_size_check:
+                    self._check_blob_size()
 
         self._tbuf.clear()
 
@@ -1485,6 +1619,7 @@
             raise ZODB.interfaces.StorageStopIteration()
         return ZODB.BaseStorage.DataRecord(*item)
 
+
 class ClientStorage308Adapter:
 
     def __init__(self, client):
@@ -1498,3 +1633,22 @@
 
     def __getattr__(self, name):
         return getattr(self.client, name)
+
+
+class BlobCacheLayout(object):
+
+    size = 997
+
+    def oid_to_path(self, oid):
+        return str(utils.u64(oid) % self.size)
+
+    def getBlobFilePath(self, oid, tid):
+        base, rem = divmod(utils.u64(oid), self.size)
+        return os.path.join(
+            str(rem),
+            "%s.%s%s" % (base, tid.encode('hex'), ZODB.blob.BLOB_SUFFIX)
+            )
+
+def _accessed(filename):
+    os.utime(filename, (time.time(), os.stat(filename).st_mtime))
+    return filename

Modified: ZODB/branches/jim-dev/src/ZEO/tests/forker.py
===================================================================
--- ZODB/branches/jim-dev/src/ZEO/tests/forker.py	2008-12-03 20:37:39 UTC (rev 93593)
+++ ZODB/branches/jim-dev/src/ZEO/tests/forker.py	2008-12-03 23:05:21 UTC (rev 93594)
@@ -285,7 +285,7 @@
     servers = {}
 
     def start_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
-                     addr=None, path='Data.fs', protocol=None):
+                     addr=None, path='Data.fs', protocol=None, blob_dir=None):
         """Start a ZEO server.
 
         Return the server and admin addresses.
@@ -298,7 +298,7 @@
         elif addr is not None:
             raise TypeError("Can't specify port and addr")
         addr, adminaddr, pid, config_path = start_zeo_server(
-            storage_conf, zeo_conf, port, keep, path, protocol)
+            storage_conf, zeo_conf, port, keep, path, protocol, blob_dir)
         os.remove(config_path)
         servers[adminaddr] = pid
         return addr, adminaddr

Modified: ZODB/branches/jim-dev/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-dev/src/ZEO/tests/testZEO.py	2008-12-03 20:37:39 UTC (rev 93593)
+++ ZODB/branches/jim-dev/src/ZEO/tests/testZEO.py	2008-12-03 23:05:21 UTC (rev 93594)
@@ -737,7 +737,11 @@
         check_data(filename)
 
         # ... and on the server
-        server_filename = filename.replace(self.blob_cache_dir, self.blobdir)
+        server_filename = os.path.join(
+            self.blobdir,
+            ZODB.blob.BushyLayout().getBlobFilePath(oid, revid),
+            )
+        
         self.assert_(server_filename.startswith(self.blobdir))
         check_data(server_filename)
 
@@ -1168,7 +1172,7 @@
         doctest.DocFileSuite(
             'zeo-fan-out.test', 'zdoptions.test',
             'drop_cache_rather_than_verify.txt',
-            'protocols.test',
+            'protocols.test', 'zeo_blob_cache.test',
             setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
             ),
         )

Copied: ZODB/branches/jim-dev/src/ZEO/tests/zeo_blob_cache.test (from rev 93586, ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/zeo_blob_cache.test)
===================================================================
--- ZODB/branches/jim-dev/src/ZEO/tests/zeo_blob_cache.test	                        (rev 0)
+++ ZODB/branches/jim-dev/src/ZEO/tests/zeo_blob_cache.test	2008-12-03 23:05:21 UTC (rev 93594)
@@ -0,0 +1,149 @@
+ZEO caching of blob data
+========================
+
+ZEO supports 2 modes for providing clients access to blob data:
+
+shared
+    Blob data are shared via a network file system.  The client shares
+    a common blob directory with the server.
+
+non-shared
+    Blob data are loaded from the storage server and cached locally.
+    A maximum size for the blob data can be set and data are removed
+    when the size is exceeded.
+
+In this test, we'll demonstrate that blobs data are removed from a ZEO
+cache when the amount of data stored exceeds a given limit.
+
+Let's start by setting up some data:
+
+    >>> addr, _ = start_server(blob_dir='server-blobs')
+
+We'll also create a client.
+
+    >>> import ZEO
+    >>> db = ZEO.DB(addr, blob_dir='blobs',
+    ...             blob_cache_size=4000, blob_cache_size_check=10)
+
+Here, we passed a blob_cache_size parameter, which specifies a target
+blob cache size.  This is not a hard limit, but rather a target.  It
+defaults to a very large value. We also passed a blob_cache_size_check
+option. The blob_cache_size_check option specifies the number of
+bytes, as a percent of the target that can be written or downloaded
+from the server before the cache size is checked. The
+blob_cache_size_check option defaults to 100. We passed 10, to check
+after writing 10% of the target size.
+
+We want to check for name collections in the blob cache dir. We'll try
+to provoke name collections by reducing the number of cache directory
+subdirectories.
+
+    >>> import ZEO.ClientStorage
+    >>> orig_blob_cache_layout_size = ZEO.ClientStorage.BlobCacheLayout.size
+    >>> ZEO.ClientStorage.BlobCacheLayout.size = 11
+
+Now, let's write some data:
+
+    >>> import ZODB.blob, transaction, time
+    >>> conn = db.open()
+    >>> for i in range(1, 101):
+    ...     conn.root()[i] = ZODB.blob.Blob()
+    ...     conn.root()[i].open('w').write(chr(i)*100)
+    >>> transaction.commit()
+
+We've committed 10000 bytes of data, but our target size is 4000.  We
+expect to have not much more than the target size in the cache blob
+directory.
+
+    >>> import os
+    >>> def cache_size(d):
+    ...     size = 0
+    ...     for base, dirs, files in os.walk(d):
+    ...         for f in files:
+    ...             if f.endswith('.blob'):
+    ...                 size += os.stat(os.path.join(base, f)).st_size
+    ...     return size
+    
+    >>> db.storage._check_blob_size_thread.join()
+
+    >>> cache_size('blobs') < 5000
+    True
+
+If we read all of the blobs, data will be downloaded again, as
+necessary, but the cache size will remain not much bigger than the
+target:
+
+    >>> for i in range(1, 101):
+    ...     data = conn.root()[i].open().read()
+    ...     if data != chr(i)*100:
+    ...         print 'bad data', `chr(i)`, `data`
+
+    >>> db.storage._check_blob_size_thread.join()
+
+    >>> cache_size('blobs') < 5000
+    True
+
+    >>> for i in range(1, 101):
+    ...     data = conn.root()[i].open().read()
+    ...     if data != chr(i)*100:
+    ...         print 'bad data', `chr(i)`, `data`
+
+    >>> db.storage._check_blob_size_thread.join()
+
+    >>> for i in range(1, 101):
+    ...     data = conn.root()[i].open('c').read()
+    ...     if data != chr(i)*100:
+    ...         print 'bad data', `chr(i)`, `data`
+
+    >>> db.storage._check_blob_size_thread.join()
+
+    >>> cache_size('blobs') < 5000
+    True
+
+    >>> for i in range(1, 101):
+    ...     data = open(conn.root()[i].committed(), 'rb').read()
+    ...     if data != chr(i)*100:
+    ...         print 'bad data', `chr(i)`, `data`
+
+    >>> db.storage._check_blob_size_thread.join()
+
+    >>> cache_size('blobs') < 5000
+    True
+
+Now let see if we can stress things a bit.  We'll create many clients
+and get them to pound on the blobs all at once to see if we can
+provoke problems:
+
+    >>> import threading, random
+    >>> def run():
+    ...     db = ZEO.DB(addr, blob_dir='blobs',
+    ...                 blob_cache_size=4000, blob_cache_size_check=10)
+    ...     conn = db.open()
+    ...     for i in range(300):
+    ...         time.sleep(0)
+    ...         i = random.randint(1, 100)
+    ...         data = conn.root()[i].open().read()
+    ...         if data != chr(i)*100:
+    ...             print 'bad data', `chr(i)`, `data`
+    ...         i = random.randint(1, 100)
+    ...         data = conn.root()[i].open('c').read()
+    ...         if data != chr(i)*100:
+    ...             print 'bad data', `chr(i)`, `data`
+    ...     db._storage._check_blob_size_thread.join()
+    ...     db.close()
+
+    >>> threads = [threading.Thread(target=run) for i in range(10)]
+    >>> for thread in threads:
+    ...     thread.setDaemon(True)
+    >>> for thread in threads:
+    ...     thread.start()
+    >>> for thread in threads:
+    ...     thread.join()
+
+    >>> cache_size('blobs') < 5000
+    True
+
+.. cleanup
+
+    >>> db.close()
+    >>> ZEO.ClientStorage.BlobCacheLayout.size = orig_blob_cache_layout_size

Modified: ZODB/branches/jim-dev/src/ZODB/Connection.py
===================================================================
--- ZODB/branches/jim-dev/src/ZODB/Connection.py	2008-12-03 20:37:39 UTC (rev 93593)
+++ ZODB/branches/jim-dev/src/ZODB/Connection.py	2008-12-03 23:05:21 UTC (rev 93594)
@@ -38,6 +38,7 @@
 
 import transaction
 
+import ZODB
 from ZODB.blob import SAVEPOINT_SUFFIX
 from ZODB.ConflictResolution import ResolvedSerial
 from ZODB.ExportImport import ExportImport
@@ -1271,6 +1272,13 @@
             return self._storage.loadBlob(oid, serial)
         return filename
 
+    def openCommittedBlobFile(self, oid, serial, blob=None):
+        blob_filename = self.loadBlob(oid, serial)
+        if blob is None:
+            return open(blob_filename, 'rb')
+        else:
+            return ZODB.blob.BlobFile(blob_filename, 'r', blob)
+
     def _getBlobPath(self):
         return os.path.join(self.temporaryDirectory(), 'savepoints')
 

Modified: ZODB/branches/jim-dev/src/ZODB/DemoStorage.py
===================================================================
--- ZODB/branches/jim-dev/src/ZODB/DemoStorage.py	2008-12-03 20:37:39 UTC (rev 93593)
+++ ZODB/branches/jim-dev/src/ZODB/DemoStorage.py	2008-12-03 23:05:21 UTC (rev 93594)
@@ -172,8 +172,22 @@
             if self._blobify():
                 return self.loadBlob(oid, serial)
             raise
-                
 
+    def openCommittedBlobFile(self, oid, serial, blob=None):
+        try:
+            return self.changes.openCommittedBlobFile(oid, serial, blob)
+        except ZODB.POSException.POSKeyError:
+            try:
+                return self.base.openCommittedBlobFile(oid, serial, blob)
+            except AttributeError:
+                if not zope.interface.IBlobStorage.providBy(self.base):
+                    raise ZODB.POSException.POSKeyError(oid, serial)
+                raise
+        except AttributeError:
+            if self._blobify():
+                return self.openCommittedBlobFile(oid, serial, blob)
+            raise
+
     def loadSerial(self, oid, serial):
         try:
             return self.changes.loadSerial(oid, serial)

Modified: ZODB/branches/jim-dev/src/ZODB/blob.py
===================================================================
--- ZODB/branches/jim-dev/src/ZODB/blob.py	2008-12-03 20:37:39 UTC (rev 93593)
+++ ZODB/branches/jim-dev/src/ZODB/blob.py	2008-12-03 23:05:21 UTC (rev 93594)
@@ -120,7 +120,15 @@
             raise ValueError("invalid mode", mode)
 
         if mode == 'c':
-            return open(self.committed(), 'rb')
+            if (self._p_blob_uncommitted
+                or
+                not self._p_blob_committed
+                or
+                self._p_blob_committed.endswith(SAVEPOINT_SUFFIX)
+                ):
+                raise BlobError('Uncommitted changes')            
+            return self._p_jar._storage.openCommittedBlobFile(
+                self._p_oid, self._p_serial)
 
         if self.writers:
             raise BlobError("Already opened for writing.")
@@ -129,10 +137,20 @@
             self.readers = []
 
         if mode == 'r':
-            if self._current_filename() is None:
-                self._create_uncommitted_file()
+            result = None
+            to_open = self._p_blob_uncommitted
+            if not to_open:
+                to_open = self._p_blob_committed
+                if to_open:
+                    result = self._p_jar._storage.openCommittedBlobFile(
+                        self._p_oid, self._p_serial, self)
+                else:
+                    self._create_uncommitted_file()
+                    to_open = self._p_blob_uncommitted
+                    assert to_open
 
-            result = BlobFile(self._current_filename(), mode, self)
+            if result is None:
+                result = BlobFile(to_open, mode, self)
 
             def destroyed(ref, readers=self.readers):
                 try:
@@ -181,8 +199,16 @@
             self._p_blob_committed.endswith(SAVEPOINT_SUFFIX)
             ):
             raise BlobError('Uncommitted changes')
-        return self._p_blob_committed
 
+        result = self._p_blob_committed
+        
+        # We do this to make sure we have the file and to let the
+        # storage know we're accessing the file.
+        n = self._p_jar._storage.loadBlob(self._p_oid, self._p_serial)
+        assert result == n, (result, n)
+
+        return result
+
     def consumeFile(self, filename):
         """Will replace the current data of the blob with the file given under
         filename.
@@ -234,11 +260,6 @@
 
     # utility methods
 
-    def _current_filename(self):
-        # NOTE: _p_blob_committed and _p_blob_uncommitted appear by virtue of
-        # Connection._setstate
-        return self._p_blob_uncommitted or self._p_blob_committed
-
     def _create_uncommitted_file(self):
         assert self._p_blob_uncommitted is None, (
             "Uncommitted file already exists.")
@@ -391,13 +412,15 @@
         'committed' blob file related to that oid and tid.
 
         """
-        oid_path = self.getPathForOID(oid)
         # TIDs are numbers and sometimes passed around as integers. For our
         # computations we rely on the 64-bit packed string representation
+        if isinstance(oid, int):
+            oid = utils.p64(oid)
         if isinstance(tid, int):
             tid = utils.p64(tid)
-        filename = "%s%s" % (utils.tid_repr(tid), BLOB_SUFFIX)
-        return os.path.join(oid_path, filename)
+        return os.path.join(self.base_dir,
+                            self.layout.getBlobFilePath(oid, tid),
+                            )
 
     def blob_mkstemp(self, oid, tid):
         """Given an oid and a tid, return a temporary file descriptor
@@ -516,10 +539,18 @@
         oid = ''.join(binascii.unhexlify(byte[2:]) for byte in path)
         return oid
 
+    def getBlobFilePath(self, oid, tid):
+        """Given an oid and a tid, return the full filename of the
+        'committed' blob file related to that oid and tid.
+
+        """
+        oid_path = self.oid_to_path(oid)
+        filename = "%s%s" % (utils.tid_repr(tid), BLOB_SUFFIX)
+        return os.path.join(oid_path, filename)
+
 LAYOUTS['bushy'] = BushyLayout()
 
-
-class LawnLayout(object):
+class LawnLayout(BushyLayout):
     """A shallow directory layout for blob directories.
 
     Creates a single level of directories (one for each oid).
@@ -673,6 +704,14 @@
         return filename
 
     @non_overridable
+    def openCommittedBlobFile(self, oid, serial, blob=None):
+        blob_filename = self.loadBlob(oid, serial)
+        if blob is None:
+            return open(blob_filename, 'rb')
+        else:
+            return BlobFile(blob_filename, 'r', blob)
+
+    @non_overridable
     def _packUndoing(self, packtime, referencesf):
         # Walk over all existing revisions of all blob files and check
         # if they are still needed by attempting to load the revision

Modified: ZODB/branches/jim-dev/src/ZODB/interfaces.py
===================================================================
--- ZODB/branches/jim-dev/src/ZODB/interfaces.py	2008-12-03 20:37:39 UTC (rev 93593)
+++ ZODB/branches/jim-dev/src/ZODB/interfaces.py	2008-12-03 23:05:21 UTC (rev 93594)
@@ -1034,6 +1034,18 @@
         Raises POSKeyError if the blobfile cannot be found.
         """
 
+    def openCommittedBlobFile(oid, serial, blob=None):
+        """Return a file for committed data for the given object id and serial
+
+        If a blob is provided, then a BlobFile object is returned,
+        otherwise, an ordinary file is returned.  In either case, the
+        file is opened for binary reading.
+
+        This method is used to allow storages that cache blob data to
+        make sure that data are available at least long enough for the
+        file to be opened.
+        """
+
     def temporaryDirectory():
         """Return a directory that should be used for uncommitted blob data.
 

Modified: ZODB/branches/jim-dev/src/ZODB/tests/testConfig.py
===================================================================
--- ZODB/branches/jim-dev/src/ZODB/tests/testConfig.py	2008-12-03 20:37:39 UTC (rev 93593)
+++ ZODB/branches/jim-dev/src/ZODB/tests/testConfig.py	2008-12-03 23:05:21 UTC (rev 93594)
@@ -12,6 +12,7 @@
 #
 ##############################################################################
 
+import os
 import transaction
 import unittest
 import ZEO.ClientStorage
@@ -115,15 +116,16 @@
         cfg = """
         <zodb>
           <zeoclient>
-            blob-dir /tmp
+            blob-dir blobs
             server localhost:56897
             wait false
           </zeoclient>
         </zodb>
         """
         config, handle = ZConfig.loadConfigFile(getDbSchema(), StringIO(cfg))
-        self.assertEqual(config.database.config.storage.config.blob_dir,
-                         '/tmp')
+        self.assertEqual(
+            os.path.abspath(config.database.config.storage.config.blob_dir),
+            os.path.abspath('blobs'))
         self.assertRaises(ClientDisconnected, self._test, cfg)
 
 



More information about the Zodb-checkins mailing list