[Zodb-checkins] SVN: ZODB/branches/jim-zeo-blob-cache/ checkpoint
Jim Fulton
jim at zope.com
Mon Dec 1 19:57:02 EST 2008
Log message for revision 93525:
checkpoint
Changed:
U ZODB/branches/jim-zeo-blob-cache/buildout.cfg
U ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py
U ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/forker.py
U ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/testZEO.py
-=-
Modified: ZODB/branches/jim-zeo-blob-cache/buildout.cfg
===================================================================
--- ZODB/branches/jim-zeo-blob-cache/buildout.cfg 2008-12-02 00:11:40 UTC (rev 93524)
+++ ZODB/branches/jim-zeo-blob-cache/buildout.cfg 2008-12-02 00:57:01 UTC (rev 93525)
@@ -15,4 +15,5 @@
[scripts]
recipe = zc.recipe.egg
eggs = ZODB3
+ lockfile
interpreter = py
Modified: ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py 2008-12-02 00:11:40 UTC (rev 93524)
+++ ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py 2008-12-02 00:57:01 UTC (rev 93525)
@@ -30,7 +30,7 @@
from ZODB import POSException
from ZODB import utils
from ZODB.loglevels import BLATHER
-import BTrees.IOBTree()
+import BTrees.IOBTree
import cPickle
import logging
import os
@@ -121,7 +121,7 @@
drop_cache_rather_verify=False,
username='', password='', realm=None,
blob_dir=None, shared_blob_dir=False,
- blob_cache_size=1<<30, blob_cache_size_check=50,
+ blob_cache_size=1<<62, blob_cache_size_check=100,
):
"""ClientStorage constructor.
@@ -247,11 +247,6 @@
read_only_fallback and "fallback" or "normal",
storage,
)
-
- if debug:
- logger.warning(
- "%s ClientStorage(): debug argument is no longer used",
- self.__name__)
self._drop_cache_rather_verify = drop_cache_rather_verify
@@ -459,36 +454,53 @@
if self.shared_blob_dir or not self.blob_dir:
return
- def check():
- target = self._blob_cache_size
- files_by_atime = BTree.IOBTree.BTree()
- for base, dirs, files in os.walk(self.blob_dir):
- if base = self.temporaryDirectory():
- continue
- for file_name in files:
- file_name = os.path.join(base, file_name)
- stat = os.stat(file_name).st_size
- target -= stat.st_size
- t = max(stat.st_atime, stat.st_mtime)
- if t not in files_by_atime:
- files_by_atime[t] = []
- files_by_atime[t] = file_name
-
- while target <= 0 and files_by_atime:
- for file_name in files_by_atime.pop(files_by_atime.minKey()):
- size = os.stat(file_name).st_size
- try:
- os.remove(file_name)
- except OSError:
- pass
- else:
- target -= size
-
- check_blob_size_thread = threading.Thread(target=check)
+ check_blob_size_thread = threading.Thread(
+ target=self._check_blob_size_method)
check_blob_size_thread.setDaemon(True)
check_blob_size_thread.start()
self._check_blob_size_thread = check_blob_size_thread
+ def _check_blob_size_method(self):
+ try:
+ lock = zc.lockfile.LockFile(
+ os.path.join(self.blob_dir, 'cache.lock'))
+ except zc.lockfile.LockError:
+ # Someone is already cleaning up, so don't bother
+ return
+
+ try:
+ target = self._blob_cache_size
+ tmp = self.temporaryDirectory()
+ blob_suffix = ZODB.blob.BLOB_SUFFIX
+ files_by_atime = BTrees.IOBTree.BTree()
+ for base, dirs, files in os.walk(self.blob_dir):
+ if base == tmp:
+ del dirs[:]
+ continue
+ for file_name in files:
+ if not file_name.endswith(blob_suffix):
+ continue
+ file_name = os.path.join(base, file_name)
+ stat = os.stat(file_name)
+ target -= stat.st_size
+ t = max(stat.st_atime, stat.st_mtime)
+ if t not in files_by_atime:
+ files_by_atime[t] = []
+ files_by_atime[t].append(file_name)
+
+ while target <= 0 and files_by_atime:
+ for file_name in files_by_atime.pop(files_by_atime.minKey()):
+ size = os.stat(file_name).st_size
+ try:
+ os.remove(file_name)
+ except OSError:
+ raise
+ else:
+ target -= size
+ finally:
+ lock.close()
+
+
def registerDB(self, db):
"""Storage API: register a database for invalidation messages.
@@ -950,13 +962,6 @@
self._server.storeBlobShared(
oid, serial, data, os.path.basename(target), id(txn))
- def _have_blob(self, blob_filename, oid, serial):
- if os.path.exists(blob_filename):
- logger.debug("%s Found blob %r/%r in cache.",
- self.__name__, oid, serial)
- return True
- return False
-
def receiveBlobStart(self, oid, serial):
blob_filename = self.fshelper.getBlobFilename(oid, serial)
assert not os.path.exists(blob_filename)
@@ -993,7 +998,7 @@
blob_filename = self.fshelper.getBlobFilename(oid, serial)
# Case 1: Blob is available already, just use it
- if self._have_blob(blob_filename, oid, serial):
+ if os.path.exists(blob_filename):
return blob_filename
if self.shared_blob_dir:
@@ -1001,14 +1006,21 @@
# here, it's not anywhere.
raise POSException.POSKeyError("No blob file", oid, serial)
+ self._blob_download_name =
+
+
# First, we'll create the directory for this oid, if it doesn't exist.
- self.fshelper.createPathForOID(oid)
+# self.fshelper.createPathForOID(oid)
# OK, it's not here and we (or someone) needs to get it. We
# want to avoid getting it multiple times. We want to avoid
# getting it multiple times even accross separate client
# processes on the same machine. We'll use file locking.
+ lockfilename = os.path.join(
+ self.blob_dir, (oid+serial).encode(hex)+'.lock')
+
+
lockfilename = blob_filename+'.lock'
try:
lock = zc.lockfile.LockFile(lockfilename)
@@ -1033,7 +1045,7 @@
pass
break
- if self._have_blob(blob_filename, oid, serial):
+ if os.path.exists(blob_filename):
return blob_filename
return None
@@ -1043,7 +1055,7 @@
# we'll double check that someone didn't download it while we
# were getting the lock:
- if self._have_blob(blob_filename, oid, serial):
+ if os.path.exists(blob_filename):
return blob_filename
# Ask the server to send it to us. When this function
@@ -1052,7 +1064,7 @@
self._server.sendBlob(oid, serial)
- if self._have_blob(blob_filename, oid, serial):
+ if os.path.exists(blob_filename):
return blob_filename
raise POSException.POSKeyError("No blob file", oid, serial)
@@ -1204,12 +1216,12 @@
blobs = self._tbuf.blobs
while blobs:
oid, blobfilename = blobs.pop()
- self._blob_cache_size += os.stat(blobfilename).st_size
+ self._blob_data_bytes_loaded += os.stat(blobfilename).st_size
targetpath = self.fshelper.getPathForOID(oid, create=True)
rename_or_copy_blob(blobfilename,
self.fshelper.getBlobFilename(oid, tid),
)
- if self._blob_cache_size > self._blob_cache_size_check:
+ if self._blob_data_bytes_loaded > self._blob_cache_size_check:
self._check_blob_size()
self._tbuf.clear()
Modified: ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/forker.py
===================================================================
--- ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/forker.py 2008-12-02 00:11:40 UTC (rev 93524)
+++ ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/forker.py 2008-12-02 00:57:01 UTC (rev 93525)
@@ -285,7 +285,7 @@
servers = {}
def start_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
- addr=None, path='Data.fs', protocol=None):
+ addr=None, path='Data.fs', protocol=None, blob_dir=None):
"""Start a ZEO server.
Return the server and admin addresses.
@@ -298,7 +298,7 @@
elif addr is not None:
raise TypeError("Can't specify port and addr")
addr, adminaddr, pid, config_path = start_zeo_server(
- storage_conf, zeo_conf, port, keep, path, protocol)
+ storage_conf, zeo_conf, port, keep, path, protocol, blob_dir)
os.remove(config_path)
servers[adminaddr] = pid
return addr, adminaddr
Modified: ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/testZEO.py 2008-12-02 00:11:40 UTC (rev 93524)
+++ ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/testZEO.py 2008-12-02 00:57:01 UTC (rev 93525)
@@ -1168,7 +1168,7 @@
doctest.DocFileSuite(
'zeo-fan-out.test', 'zdoptions.test',
'drop_cache_rather_than_verify.txt',
- 'protocols.test',
+ 'protocols.test', 'zeo_blob_cache.test',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
),
)
More information about the Zodb-checkins
mailing list