[Zodb-checkins] SVN: ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py checkpoint

Jim Fulton jim at zope.com
Sun Nov 30 18:26:05 EST 2008


Log message for revision 93453:
  checkpoint

Changed:
  U   ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py

-=-
Modified: ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py	2008-11-30 23:25:07 UTC (rev 93452)
+++ ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py	2008-11-30 23:26:04 UTC (rev 93453)
@@ -30,6 +30,7 @@
 from ZODB import POSException
 from ZODB import utils
 from ZODB.loglevels import BLATHER
+import BTrees.IOBTree()
 import cPickle
 import logging
 import os
@@ -112,14 +113,16 @@
     StorageServerStubClass = ServerStub.stub
 
     def __init__(self, addr, storage='1', cache_size=20 * MB,
-                 name='', client=None, debug=0, var=None,
+                 name='', client=None, var=None,
                  min_disconnect_poll=1, max_disconnect_poll=30,
                  wait_for_server_on_startup=None, # deprecated alias for wait
                  wait=None, wait_timeout=None,
                  read_only=0, read_only_fallback=0,
                  drop_cache_rather_verify=False,
                  username='', password='', realm=None,
-                 blob_dir=None, shared_blob_dir=False):
+                 blob_dir=None, shared_blob_dir=False,
+                 blob_cache_size=1<<30, blob_cache_size_check=50,
+                 ):
         """ClientStorage constructor.
 
         This is typically invoked from a custom_zodb.py file.
@@ -127,81 +130,106 @@
         All arguments except addr should be keyword arguments.
         Arguments:
 
-        addr -- The server address(es).  This is either a list of
+        addr
+            The server address(es).  This is either a list of
             addresses or a single address.  Each address can be a
             (hostname, port) tuple to signify a TCP/IP connection or
             a pathname string to signify a Unix domain socket
             connection.  A hostname may be a DNS name or a dotted IP
             address.  Required.
 
-        storage -- The storage name, defaulting to '1'.  The name must
+        storage
+            The storage name, defaulting to '1'.  The name must
             match one of the storage names supported by the server(s)
             specified by the addr argument.  The storage name is
             displayed in the Zope control panel.
 
-        cache_size -- The disk cache size, defaulting to 20 megabytes.
+        cache_size
+            The disk cache size, defaulting to 20 megabytes.
             This is passed to the ClientCache constructor.
 
-        name -- The storage name, defaulting to ''.  If this is false,
+        name
+            The storage name, defaulting to ''.  If this is false,
             str(addr) is used as the storage name.
 
-        client -- A name used to construct persistent cache filenames.
+        client
+            A name used to construct persistent cache filenames.
             Defaults to None, in which case the cache is not persistent.
             See ClientCache for more info.
 
-        debug -- Ignored.  This is present only for backwards
-            compatibility with ZEO 1.
-
-        var -- When client is not None, this specifies the directory
+        var
+            When client is not None, this specifies the directory
             where the persistent cache files are created.  It defaults
             to None, in whichcase the current directory is used.
 
-        min_disconnect_poll -- The minimum delay in seconds between
+        min_disconnect_poll
+            The minimum delay in seconds between
             attempts to connect to the server, in seconds.  Defaults
             to 5 seconds.
 
-        max_disconnect_poll -- The maximum delay in seconds between
+        max_disconnect_poll
+            The maximum delay in seconds between
             attempts to connect to the server, in seconds.  Defaults
             to 300 seconds.
 
-        wait_for_server_on_startup -- A backwards compatible alias for
+        wait_for_server_on_startup
+            A backwards compatible alias for
             the wait argument.
 
-        wait -- A flag indicating whether to wait until a connection
+        wait
+            A flag indicating whether to wait until a connection
             with a server is made, defaulting to true.
 
-        wait_timeout -- Maximum time to wait for a connection before
+        wait_timeout
+            Maximum time to wait for a connection before
             giving up.  Only meaningful if wait is True.
 
-        read_only -- A flag indicating whether this should be a
+        read_only
+            A flag indicating whether this should be a
             read-only storage, defaulting to false (i.e. writing is
             allowed by default).
 
-        read_only_fallback -- A flag indicating whether a read-only
+        read_only_fallback
+            A flag indicating whether a read-only
             remote storage should be acceptable as a fallback when no
             writable storages are available.  Defaults to false.  At
             most one of read_only and read_only_fallback should be
             true.
 
-        username -- string with username to be used when authenticating.
+        username
+            string with username to be used when authenticating.
             These only need to be provided if you are connecting to an
             authenticated server storage.
 
-        password -- string with plaintext password to be used
-            when authenticated.
+        password
+            string with plaintext password to be used when authenticated.
 
-        realm -- not documented.
+        realm
+            not documented.
 
-        drop_cache_rather_verify -- a flag indicating that the cache
-            should be dropped rather than expensively verified.
+        drop_cache_rather_verify
+            a flag indicating that the cache should be dropped rather
+            than expensively verified.
 
-        blob_dir -- directory path for blob data.  'blob data' is data that
+        blob_dir
+            directory path for blob data.  'blob data' is data that
             is retrieved via the loadBlob API.
 
-        shared_blob_dir -- Flag whether the blob_dir is a server-shared
-        filesystem that should be used instead of transferring blob data over
-        zrpc.
+        shared_blob_dir
+            Flag whether the blob_dir is a server-shared filesystem
+            that should be used instead of transferring blob data over
+            zrpc.
 
+        blob_cache_size
+            Maximum size of the ZEO cache, in bytes. Defaults to 1GB.
+            This option is ignored if shared_blob_dir is true.
+
+        blob_cache_size_check
+            ZEO check size as percent of blob_cache_size.  The ZEO
+            cache size will be checked when this many bytes have been
+            loaded into the cache. Defaults to 50% of the blob cache
+            size.   This option is ignored if shared_blob_dir is true.
+
         Note that the authentication protocol is defined by the server
         and is detected by the ClientStorage upon connecting (see
         testConnection() and doAuth() for details).
@@ -342,6 +370,7 @@
         # XXX need to check for POSIX-ness here
         self.blob_dir = blob_dir
         self.shared_blob_dir = shared_blob_dir
+        
         if blob_dir is not None:
             # Avoid doing this import unless we need it, as it
             # currently requires pywin32 on Windows.
@@ -360,6 +389,11 @@
 
         self._cache = self.ClientCacheClass(cache_path, size=cache_size)
 
+        self._blob_cache_size = blob_cache_size
+        self._blob_cache_size_check = (
+            blob_cache_size * blob_cache_size_check / 100)
+        self._check_blob_size()
+
         self._rpc_mgr = self.ConnectionManagerClass(addr, self,
                                                     tmin=min_disconnect_poll,
                                                     tmax=max_disconnect_poll)
@@ -373,6 +407,8 @@
             if not self._rpc_mgr.attempt_connect():
                 self._rpc_mgr.connect()
 
+        
+
     def _wait(self, timeout=None):
         if timeout is not None:
             deadline = time.time() + timeout
@@ -414,6 +450,45 @@
         if self._tfile is not None:
             self._tfile.close()
 
+        if self._check_blob_size_thread is not None:
+            self._check_blob_size_thread.join()
+
+    _check_blob_size_thread = None
+    def _check_blob_size(self):
+        self._blob_data_bytes_loaded = 0
+        if self.shared_blob_dir or not self.blob_dir:
+            return
+
+        def check():
+            target = self._blob_cache_size
+            files_by_atime = BTree.IOBTree.BTree()
+            for base, dirs, files in os.walk(self.blob_dir):
+                if base = self.temporaryDirectory():
+                    continue
+                for file_name in files:
+                    file_name = os.path.join(base, file_name)
+                    stat = os.stat(file_name).st_size
+                    target -= stat.st_size
+                    t = max(stat.st_atime, stat.st_mtime)
+                    if t not in files_by_atime:
+                        files_by_atime[t] = []
+                    files_by_atime[t] = file_name
+
+            while target <= 0 and files_by_atime:
+                for file_name in files_by_atime.pop(files_by_atime.minKey()):
+                    size = os.stat(file_name).st_size
+                    try:
+                        os.remove(file_name)
+                    except OSError:
+                        pass
+                    else:
+                        target -= size
+            
+        check_blob_size_thread = threading.Thread(target=check)
+        check_blob_size_thread.setDaemon(True)
+        check_blob_size_thread.start()
+        self._check_blob_size_thread = check_blob_size_thread
+
     def registerDB(self, db):
         """Storage API: register a database for invalidation messages.
 
@@ -894,9 +969,13 @@
     def receiveBlobChunk(self, oid, serial, chunk):
         blob_filename = self.fshelper.getBlobFilename(oid, serial)+'.dl'
         assert os.path.exists(blob_filename)
-        f = open(blob_filename, 'ab')
+        f = open(blob_filename, 'r+b')
+        f.seek(0, 2)
         f.write(chunk)
         f.close()
+        self._blob_data_bytes_loaded += len(chunk)
+        if self._blob_data_bytes_loaded > self._blob_cache_size_check:
+            self._check_blob_size()
 
     def receiveBlobStop(self, oid, serial):
         blob_filename = self.fshelper.getBlobFilename(oid, serial)
@@ -1125,10 +1204,13 @@
             blobs = self._tbuf.blobs
             while blobs:
                 oid, blobfilename = blobs.pop()
+                self._blob_cache_size += os.stat(blobfilename).st_size
                 targetpath = self.fshelper.getPathForOID(oid, create=True)
                 rename_or_copy_blob(blobfilename,
-                          self.fshelper.getBlobFilename(oid, tid),
-                          )
+                                    self.fshelper.getBlobFilename(oid, tid),
+                                    )
+                if self._blob_cache_size > self._blob_cache_size_check:
+                    self._check_blob_size()
 
         self._tbuf.clear()
 



More information about the Zodb-checkins mailing list