[Zodb-checkins] SVN: ZODB/trunk/src/ Improved zeo blob cache clean up to make it a bit more robust and to
Jim Fulton
jim at zope.com
Fri Jun 5 18:54:00 EDT 2009
Log message for revision 100661:
Improved zeo blob cache clean up to make it a bit more robust and to
avoid spurious test failures.
Changed:
U ZODB/trunk/src/CHANGES.txt
U ZODB/trunk/src/ZEO/ClientStorage.py
U ZODB/trunk/src/ZEO/tests/zeo_blob_cache.test
-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt 2009-06-05 22:53:38 UTC (rev 100660)
+++ ZODB/trunk/src/CHANGES.txt 2009-06-05 22:54:00 UTC (rev 100661)
@@ -14,6 +14,8 @@
- Fixed analyze.py and added test.
+- ZEO client blob cache size management is a little bit more robust.
+
3.9.0b1 (2009-05-04)
====================
Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py 2009-06-05 22:53:38 UTC (rev 100660)
+++ ZODB/trunk/src/ZEO/ClientStorage.py 2009-06-05 22:54:00 UTC (rev 100661)
@@ -38,6 +38,7 @@
import stat
import sys
import tempfile
+import thread
import threading
import time
import types
@@ -398,6 +399,7 @@
self._blob_cache_size = blob_cache_size
self._blob_data_bytes_loaded = 0
if blob_cache_size is not None:
+ assert blob_cache_size_check < 100
self._blob_cache_size_check = (
blob_cache_size * blob_cache_size_check / 100)
self._check_blob_size()
@@ -477,7 +479,7 @@
check_blob_size_thread = threading.Thread(
target=_check_blob_cache_size,
- args=(self.blob_dir, self._blob_cache_size),
+ args=(self.blob_dir, target),
)
check_blob_size_thread.setDaemon(True)
check_blob_size_thread.start()
@@ -1620,7 +1622,6 @@
def _check_blob_cache_size(blob_dir, target):
logger = logging.getLogger(__name__+'.check_blob_cache')
- logger.info("Checking blob cache size")
layout = open(os.path.join(blob_dir, ZODB.blob.LAYOUT_MARKER)
).read().strip()
@@ -1628,64 +1629,90 @@
logger.critical("Invalid blob directory layout %s", layout)
raise ValueError("Invalid blob directory layout", layout)
+ attempt_path = os.path.join(blob_dir, 'check_size.attempt')
+
try:
check_lock = zc.lockfile.LockFile(
os.path.join(blob_dir, 'check_size.lock'))
except zc.lockfile.LockError:
- # Someone is already cleaning up, so don't bother
- logger.info("Another thread is checking the blob cache size")
- return
+ try:
+ time.sleep(1)
+ check_lock = zc.lockfile.LockFile(
+ os.path.join(blob_dir, 'check_size.lock'))
+ except zc.lockfile.LockError:
+ # Someone is already cleaning up, so don't bother
+ logger.debug("%s Another thread is checking the blob cache size.",
+ thread.get_ident())
+ open(attempt_path, 'w').close() # Mark that we tried
+ return
+ logger.debug("%s Checking blob cache size. (target: %s)",
+ thread.get_ident(), target)
+
try:
- size = 0
- blob_suffix = ZODB.blob.BLOB_SUFFIX
- files_by_atime = BTrees.IOBTree.BTree()
+ while 1:
+ size = 0
+ blob_suffix = ZODB.blob.BLOB_SUFFIX
+ files_by_atime = BTrees.OOBTree.BTree()
- for dirname in os.listdir(blob_dir):
- if not cache_file_name(dirname):
- continue
- base = os.path.join(blob_dir, dirname)
- if not os.path.isdir(base):
- continue
- for file_name in os.listdir(base):
- if not file_name.endswith(blob_suffix):
+ for dirname in os.listdir(blob_dir):
+ if not cache_file_name(dirname):
continue
- file_name = os.path.join(base, file_name)
- if not os.path.isfile(file_name):
+ base = os.path.join(blob_dir, dirname)
+ if not os.path.isdir(base):
continue
- stat = os.stat(file_name)
- size += stat.st_size
- t = int(stat.st_atime)
- if t not in files_by_atime:
- files_by_atime[t] = []
- files_by_atime[t].append(file_name)
+ for file_name in os.listdir(base):
+ if not file_name.endswith(blob_suffix):
+ continue
+ file_path = os.path.join(base, file_name)
+ if not os.path.isfile(file_path):
+ continue
+ stat = os.stat(file_path)
+ size += stat.st_size
+ t = stat.st_atime
+ if t not in files_by_atime:
+ files_by_atime[t] = []
+ files_by_atime[t].append(os.path.join(dirname, file_name))
- logger.info("blob cache size: %s", size)
+ logger.debug("%s blob cache size: %s", thread.get_ident(), size)
- while size > target and files_by_atime:
- for file_name in files_by_atime.pop(files_by_atime.minKey()):
- lockfilename = os.path.join(os.path.dirname(file_name),
- '.lock')
- try:
- lock = zc.lockfile.LockFile(lockfilename)
- except zc.lockfile.LockError:
- logger.info("Skipping locked %s",
- os.path.basename(file_name))
- continue # In use, skip
+ if size <= target:
+ if os.path.isfile(attempt_path):
+ os.remove(attempt_path)
+ continue
+ logger.debug("%s -->", thread.get_ident())
+ break
- try:
- fsize = os.stat(file_name).st_size
+ while size > target and files_by_atime:
+ for file_name in files_by_atime.pop(files_by_atime.minKey()):
+ file_name = os.path.join(blob_dir, file_name)
+ lockfilename = os.path.join(os.path.dirname(file_name),
+ '.lock')
try:
- ZODB.blob.remove_committed(file_name)
- except OSError, v:
- pass # probably open on windows
- else:
- size -= fsize
- finally:
- lock.close()
+ lock = zc.lockfile.LockFile(lockfilename)
+ except zc.lockfile.LockError:
+ logger.debug("%s Skipping locked %s",
+ thread.get_ident(),
+ os.path.basename(file_name))
+ continue # In use, skip
- logger.info("reduced blob cache size: %s", size)
+ try:
+ fsize = os.stat(file_name).st_size
+ try:
+ ZODB.blob.remove_committed(file_name)
+ except OSError, v:
+ pass # probably open on windows
+ else:
+ size -= fsize
+ finally:
+ lock.close()
+ if size <= target:
+ break
+
+ logger.debug("%s reduced blob cache size: %s",
+ thread.get_ident(), size)
+
finally:
check_lock.close()
Modified: ZODB/trunk/src/ZEO/tests/zeo_blob_cache.test
===================================================================
--- ZODB/trunk/src/ZEO/tests/zeo_blob_cache.test 2009-06-05 22:53:38 UTC (rev 100660)
+++ ZODB/trunk/src/ZEO/tests/zeo_blob_cache.test 2009-06-05 22:54:00 UTC (rev 100661)
@@ -33,6 +33,11 @@
blob_cache_size_check option defaults to 100. We passed 10, to check
after writing 10% of the target size.
+.. We're going to wait for any threads we started to finish, so...
+
+ >>> import threading
+ >>> old_threads = list(threading.enumerate())
+
We want to check for name collections in the blob cache dir. We'll try
to provoke name collections by reducing the number of cache directory
subdirectories.
@@ -66,12 +71,15 @@
... if os.path.exists(os.path.join(base, f)):
... raise
... return size
-
- >>> db.storage._check_blob_size_thread.join()
- >>> cache_size('blobs') < 5000
- True
+ >>> def check():
+ ... return cache_size('blobs') < 5000
+ >>> def onfail():
+ ... return cache_size('blobs')
+ >>> from ZEO.tests.forker import wait_until
+ >>> wait_until("size is reduced", check, 99, onfail)
+
If we read all of the blobs, data will be downloaded again, as
necessary, but the cache size will remain not much bigger than the
target:
@@ -81,38 +89,27 @@
... if data != chr(i)*100:
... print 'bad data', `chr(i)`, `data`
- >>> db.storage._check_blob_size_thread.join()
+ >>> wait_until("size is reduced", check, 99, onfail)
- >>> cache_size('blobs') < 5000
- True
-
>>> for i in range(1, 101):
... data = conn.root()[i].open().read()
... if data != chr(i)*100:
... print 'bad data', `chr(i)`, `data`
- >>> db.storage._check_blob_size_thread.join()
-
>>> for i in range(1, 101):
... data = conn.root()[i].open('c').read()
... if data != chr(i)*100:
... print 'bad data', `chr(i)`, `data`
- >>> db.storage._check_blob_size_thread.join()
+ >>> wait_until("size is reduced", check, 99, onfail)
- >>> cache_size('blobs') < 5000
- True
-
>>> for i in range(1, 101):
... data = open(conn.root()[i].committed(), 'rb').read()
... if data != chr(i)*100:
... print 'bad data', `chr(i)`, `data`
- >>> db.storage._check_blob_size_thread.join()
+ >>> wait_until("size is reduced", check, 99, onfail)
- >>> cache_size('blobs') < 5000
- True
-
Now let see if we can stress things a bit. We'll create many clients
and get them to pound on the blobs all at once to see if we can
provoke problems:
@@ -131,7 +128,6 @@
... data = conn.root()[i].open('c').read()
... if data != chr(i)*100:
... print 'bad data', `chr(i)`, `data`
- ... db._storage._check_blob_size_thread.join()
... db.close()
>>> threads = [threading.Thread(target=run) for i in range(10)]
@@ -140,12 +136,18 @@
>>> for thread in threads:
... thread.start()
>>> for thread in threads:
- ... thread.join()
+ ... thread.join(99)
+ ... if thread.isAlive():
+ ... print "Can't join thread."
- >>> cache_size('blobs') < 5000
- True
+ >>> wait_until("size is reduced", check, 99, onfail)
.. cleanup
+ >>> for thread in threading.enumerate():
+ ... if thread not in old_threads:
+ ... thread.join(33)
+
>>> db.close()
>>> ZEO.ClientStorage.BlobCacheLayout.size = orig_blob_cache_layout_size
+
More information about the Zodb-checkins
mailing list