[Zodb-checkins] SVN: ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py checkpoint
Jim Fulton
jim at zope.com
Sun Nov 30 18:26:05 EST 2008
Log message for revision 93453:
checkpoint
Changed:
U ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py
-=-
Modified: ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py 2008-11-30 23:25:07 UTC (rev 93452)
+++ ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py 2008-11-30 23:26:04 UTC (rev 93453)
@@ -30,6 +30,7 @@
from ZODB import POSException
from ZODB import utils
from ZODB.loglevels import BLATHER
+import BTrees.IOBTree()
import cPickle
import logging
import os
@@ -112,14 +113,16 @@
StorageServerStubClass = ServerStub.stub
def __init__(self, addr, storage='1', cache_size=20 * MB,
- name='', client=None, debug=0, var=None,
+ name='', client=None, var=None,
min_disconnect_poll=1, max_disconnect_poll=30,
wait_for_server_on_startup=None, # deprecated alias for wait
wait=None, wait_timeout=None,
read_only=0, read_only_fallback=0,
drop_cache_rather_verify=False,
username='', password='', realm=None,
- blob_dir=None, shared_blob_dir=False):
+ blob_dir=None, shared_blob_dir=False,
+ blob_cache_size=1<<30, blob_cache_size_check=50,
+ ):
"""ClientStorage constructor.
This is typically invoked from a custom_zodb.py file.
@@ -127,81 +130,106 @@
All arguments except addr should be keyword arguments.
Arguments:
- addr -- The server address(es). This is either a list of
+ addr
+ The server address(es). This is either a list of
addresses or a single address. Each address can be a
(hostname, port) tuple to signify a TCP/IP connection or
a pathname string to signify a Unix domain socket
connection. A hostname may be a DNS name or a dotted IP
address. Required.
- storage -- The storage name, defaulting to '1'. The name must
+ storage
+ The storage name, defaulting to '1'. The name must
match one of the storage names supported by the server(s)
specified by the addr argument. The storage name is
displayed in the Zope control panel.
- cache_size -- The disk cache size, defaulting to 20 megabytes.
+ cache_size
+ The disk cache size, defaulting to 20 megabytes.
This is passed to the ClientCache constructor.
- name -- The storage name, defaulting to ''. If this is false,
+ name
+ The storage name, defaulting to ''. If this is false,
str(addr) is used as the storage name.
- client -- A name used to construct persistent cache filenames.
+ client
+ A name used to construct persistent cache filenames.
Defaults to None, in which case the cache is not persistent.
See ClientCache for more info.
- debug -- Ignored. This is present only for backwards
- compatibility with ZEO 1.
-
- var -- When client is not None, this specifies the directory
+ var
+ When client is not None, this specifies the directory
where the persistent cache files are created. It defaults
to None, in whichcase the current directory is used.
- min_disconnect_poll -- The minimum delay in seconds between
+ min_disconnect_poll
+ The minimum delay in seconds between
attempts to connect to the server, in seconds. Defaults
to 5 seconds.
- max_disconnect_poll -- The maximum delay in seconds between
+ max_disconnect_poll
+ The maximum delay in seconds between
attempts to connect to the server, in seconds. Defaults
to 300 seconds.
- wait_for_server_on_startup -- A backwards compatible alias for
+ wait_for_server_on_startup
+ A backwards compatible alias for
the wait argument.
- wait -- A flag indicating whether to wait until a connection
+ wait
+ A flag indicating whether to wait until a connection
with a server is made, defaulting to true.
- wait_timeout -- Maximum time to wait for a connection before
+ wait_timeout
+ Maximum time to wait for a connection before
giving up. Only meaningful if wait is True.
- read_only -- A flag indicating whether this should be a
+ read_only
+ A flag indicating whether this should be a
read-only storage, defaulting to false (i.e. writing is
allowed by default).
- read_only_fallback -- A flag indicating whether a read-only
+ read_only_fallback
+ A flag indicating whether a read-only
remote storage should be acceptable as a fallback when no
writable storages are available. Defaults to false. At
most one of read_only and read_only_fallback should be
true.
- username -- string with username to be used when authenticating.
+ username
+ string with username to be used when authenticating.
These only need to be provided if you are connecting to an
authenticated server storage.
- password -- string with plaintext password to be used
- when authenticated.
+ password
+ string with plaintext password to be used when authenticated.
- realm -- not documented.
+ realm
+ not documented.
- drop_cache_rather_verify -- a flag indicating that the cache
- should be dropped rather than expensively verified.
+ drop_cache_rather_verify
+ a flag indicating that the cache should be dropped rather
+ than expensively verified.
- blob_dir -- directory path for blob data. 'blob data' is data that
+ blob_dir
+ directory path for blob data. 'blob data' is data that
is retrieved via the loadBlob API.
- shared_blob_dir -- Flag whether the blob_dir is a server-shared
- filesystem that should be used instead of transferring blob data over
- zrpc.
+ shared_blob_dir
+ Flag whether the blob_dir is a server-shared filesystem
+ that should be used instead of transferring blob data over
+ zrpc.
+ blob_cache_size
+ Maximum size of the ZEO cache, in bytes. Defaults to 1GB.
+ This option is ignored if shared_blob_dir is true.
+
+ blob_cache_size_check
+ ZEO check size as percent of blob_cache_size. The ZEO
+ cache size will be checked when this many bytes have been
+ loaded into the cache. Defaults to 50% of the blob cache
+ size. This option is ignored if shared_blob_dir is true.
+
Note that the authentication protocol is defined by the server
and is detected by the ClientStorage upon connecting (see
testConnection() and doAuth() for details).
@@ -342,6 +370,7 @@
# XXX need to check for POSIX-ness here
self.blob_dir = blob_dir
self.shared_blob_dir = shared_blob_dir
+
if blob_dir is not None:
# Avoid doing this import unless we need it, as it
# currently requires pywin32 on Windows.
@@ -360,6 +389,11 @@
self._cache = self.ClientCacheClass(cache_path, size=cache_size)
+ self._blob_cache_size = blob_cache_size
+ self._blob_cache_size_check = (
+ blob_cache_size * blob_cache_size_check / 100)
+ self._check_blob_size()
+
self._rpc_mgr = self.ConnectionManagerClass(addr, self,
tmin=min_disconnect_poll,
tmax=max_disconnect_poll)
@@ -373,6 +407,8 @@
if not self._rpc_mgr.attempt_connect():
self._rpc_mgr.connect()
+
+
def _wait(self, timeout=None):
if timeout is not None:
deadline = time.time() + timeout
@@ -414,6 +450,45 @@
if self._tfile is not None:
self._tfile.close()
+ if self._check_blob_size_thread is not None:
+ self._check_blob_size_thread.join()
+
+ _check_blob_size_thread = None
+ def _check_blob_size(self):
+ self._blob_data_bytes_loaded = 0
+ if self.shared_blob_dir or not self.blob_dir:
+ return
+
+ def check():
+ target = self._blob_cache_size
+ files_by_atime = BTree.IOBTree.BTree()
+ for base, dirs, files in os.walk(self.blob_dir):
+ if base = self.temporaryDirectory():
+ continue
+ for file_name in files:
+ file_name = os.path.join(base, file_name)
+ stat = os.stat(file_name).st_size
+ target -= stat.st_size
+ t = max(stat.st_atime, stat.st_mtime)
+ if t not in files_by_atime:
+ files_by_atime[t] = []
+ files_by_atime[t] = file_name
+
+ while target <= 0 and files_by_atime:
+ for file_name in files_by_atime.pop(files_by_atime.minKey()):
+ size = os.stat(file_name).st_size
+ try:
+ os.remove(file_name)
+ except OSError:
+ pass
+ else:
+ target -= size
+
+ check_blob_size_thread = threading.Thread(target=check)
+ check_blob_size_thread.setDaemon(True)
+ check_blob_size_thread.start()
+ self._check_blob_size_thread = check_blob_size_thread
+
def registerDB(self, db):
"""Storage API: register a database for invalidation messages.
@@ -894,9 +969,13 @@
def receiveBlobChunk(self, oid, serial, chunk):
blob_filename = self.fshelper.getBlobFilename(oid, serial)+'.dl'
assert os.path.exists(blob_filename)
- f = open(blob_filename, 'ab')
+ f = open(blob_filename, 'r+b')
+ f.seek(0, 2)
f.write(chunk)
f.close()
+ self._blob_data_bytes_loaded += len(chunk)
+ if self._blob_data_bytes_loaded > self._blob_cache_size_check:
+ self._check_blob_size()
def receiveBlobStop(self, oid, serial):
blob_filename = self.fshelper.getBlobFilename(oid, serial)
@@ -1125,10 +1204,13 @@
blobs = self._tbuf.blobs
while blobs:
oid, blobfilename = blobs.pop()
+ self._blob_cache_size += os.stat(blobfilename).st_size
targetpath = self.fshelper.getPathForOID(oid, create=True)
rename_or_copy_blob(blobfilename,
- self.fshelper.getBlobFilename(oid, tid),
- )
+ self.fshelper.getBlobFilename(oid, tid),
+ )
+ if self._blob_cache_size > self._blob_cache_size_check:
+ self._check_blob_size()
self._tbuf.clear()
More information about the Zodb-checkins
mailing list