[Zodb-checkins] SVN: ZODB/branches/jim-zeo-blob-cache/src/Z checkpoint
Jim Fulton
jim at zope.com
Tue Dec 2 17:51:20 EST 2008
Log message for revision 93556:
checkpoint
Changed:
U ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py
U ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/zeo_blob_cache.test
U ZODB/branches/jim-zeo-blob-cache/src/ZODB/blob.py
-=-
Modified: ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py 2008-12-02 22:15:03 UTC (rev 93555)
+++ ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py 2008-12-02 22:51:20 UTC (rev 93556)
@@ -26,7 +26,6 @@
from ZEO import ServerStub
from ZEO.TransactionBuffer import TransactionBuffer
from ZEO.zrpc.client import ConnectionManager
-from ZODB.blob import rename_or_copy_blob
from ZODB import POSException
from ZODB import utils
from ZODB.loglevels import BLATHER
@@ -44,6 +43,7 @@
import weakref
import zc.lockfile
import ZEO.interfaces
+import ZODB
import ZODB.BaseStorage
import ZODB.interfaces
import zope.event
@@ -370,8 +370,14 @@
# Avoid doing this import unless we need it, as it
# currently requires pywin32 on Windows.
import ZODB.blob
- self.fshelper = ZODB.blob.FilesystemHelper(blob_dir)
- self.fshelper.create()
+ if shared_blob_dir:
+ self.fshelper = ZODB.blob.FilesystemHelper(blob_dir)
+ else:
+ if 'zeocache' not in ZODB.blob.LAYOUTS:
+ ZODB.blob.LAYOUTS['zeocache'] = BlobCacheLayout()
+ self.fshelper = ZODB.blob.FilesystemHelper(
+ blob_dir, layout_name='zeocache')
+ self.fshelper.create()
self.fshelper.checkSecure()
else:
self.fshelper = None
@@ -462,7 +468,7 @@
def _check_blob_size_method(self):
try:
- lock = zc.lockfile.LockFile(
+ check_lock = zc.lockfile.LockFile(
os.path.join(self.blob_dir, 'cache.lock'))
except zc.lockfile.LockError:
# Someone is already cleaning up, so don't bother
@@ -470,6 +476,7 @@
try:
target = self._blob_cache_size
+ size = 0
tmp = self.temporaryDirectory()
blob_suffix = ZODB.blob.BLOB_SUFFIX
files_by_atime = BTrees.IOBTree.BTree()
@@ -482,23 +489,33 @@
continue
file_name = os.path.join(base, file_name)
stat = os.stat(file_name)
- target -= stat.st_size
- t = max(stat.st_atime, stat.st_mtime)
+ size += stat.st_size
+ t = stat.st_atime
if t not in files_by_atime:
files_by_atime[t] = []
files_by_atime[t].append(file_name)
- while target <= 0 and files_by_atime:
+ while size > target and files_by_atime:
for file_name in files_by_atime.pop(files_by_atime.minKey()):
- size = os.stat(file_name).st_size
+ lockfilename = os.path.join(os.path.dirname(file_name),
+ '.lock')
try:
- os.remove(file_name)
- except OSError:
- raise
- else:
- target -= size
+ lock = zc.lockfile.LockFile(lockfilename)
+ except zc.lockfile.LockError:
+ continue # In use, skip
+
+ try:
+ size = os.stat(file_name).st_size
+ try:
+ os.remove(file_name)
+ except OSError:
+ raise
+ else:
+ size -= size
+ finally:
+ lock.close()
finally:
- lock.close()
+ check_lock.close()
def registerDB(self, db):
@@ -953,10 +970,10 @@
# use a slightly different file name. We keep the old one
# until we're done to avoid conflicts. Then remove the old name.
target += 'w'
- rename_or_copy_blob(filename, target)
+ ZODB.blob.rename_or_copy_blob(filename, target)
os.remove(target[:-1])
else:
- rename_or_copy_blob(filename, target)
+ ZODB.blob.rename_or_copy_blob(filename, target)
# Now tell the server where we put it
self._server.storeBlobShared(
@@ -965,7 +982,8 @@
def receiveBlobStart(self, oid, serial):
blob_filename = self.fshelper.getBlobFilename(oid, serial)
assert not os.path.exists(blob_filename)
- assert os.path.exists(blob_filename+'.lock')
+ lockfilename = os.path.join(os.path.dirname(blob_filename), '.lock')
+ assert os.path.exists(lockfilename)
blob_filename += '.dl'
assert not os.path.exists(blob_filename)
f = open(blob_filename, 'wb')
@@ -999,6 +1017,8 @@
blob_filename = self.fshelper.getBlobFilename(oid, serial)
# Case 1: Blob is available already, just use it
if os.path.exists(blob_filename):
+ if not self.shared_blob_dir:
+ _accessed(blob_filename)
return blob_filename
if self.shared_blob_dir:
@@ -1006,57 +1026,30 @@
# here, it's not anywhere.
raise POSException.POSKeyError("No blob file", oid, serial)
- self._blob_download_name =
-
-
# First, we'll create the directory for this oid, if it doesn't exist.
-# self.fshelper.createPathForOID(oid)
+ self.fshelper.createPathForOID(oid)
# OK, it's not here and we (or someone) needs to get it. We
# want to avoid getting it multiple times. We want to avoid
# getting it multiple times even accross separate client
# processes on the same machine. We'll use file locking.
- lockfilename = os.path.join(
- self.blob_dir, (oid+serial).encode(hex)+'.lock')
+ lockfilename = os.path.join(os.path.dirname(blob_filename), '.lock')
+ while 1:
+ try:
+ lock = zc.lockfile.LockFile(lockfilename)
+ except zc.lockfile.LockError:
+ time.sleep(0.01)
+ else:
+ break
-
- lockfilename = blob_filename+'.lock'
try:
- lock = zc.lockfile.LockFile(lockfilename)
- except zc.lockfile.LockError:
-
- # Someone is already downloading the Blob. Wait for the
- # lock to be freed. How long should we be willing to wait?
- # TODO: maybe find some way to assess download progress.
-
- while 1:
- time.sleep(0.1)
- try:
- lock = zc.lockfile.LockFile(lockfilename)
- except zc.lockfile.LockError:
- pass
- else:
- # We have the lock. We should be able to get the file now.
- lock.close()
- try:
- os.remove(lockfilename)
- except OSError:
- pass
- break
-
- if os.path.exists(blob_filename):
- return blob_filename
-
- return None
-
- try:
# We got the lock, so it's our job to download it. First,
# we'll double check that someone didn't download it while we
# were getting the lock:
if os.path.exists(blob_filename):
- return blob_filename
+ return _accessed(blob_filename)
# Ask the server to send it to us. When this function
# returns, it will have been sent. (The recieving will
@@ -1065,17 +1058,41 @@
self._server.sendBlob(oid, serial)
if os.path.exists(blob_filename):
- return blob_filename
+ return _accessed(blob_filename)
raise POSException.POSKeyError("No blob file", oid, serial)
finally:
lock.close()
+
+ def openCommittedBlobFile(self, oid, serial, blob):
+ blob_filename = self.fshelper.getBlobFilename(oid, serial)
+ lockfilename = os.path.join(os.path.dirname(blob_filename), '.lock')
+ while 1:
try:
- os.remove(lockfilename)
- except OSError:
- pass
+ lock = zc.lockfile.LockFile(lockfilename)
+ except zc.lockfile.LockError:
+ time.sleep(.01)
+ else:
+ break
+ try:
+ blob_filename = self.fshelper.getBlobFilename(oid, serial)
+ if not os.path.exists(blob_filename):
+ if self.shared_blob_dir:
+ # We're using a server shared cache. If the file isn't
+ # here, it's not anywhere.
+ raise POSException.POSKeyError("No blob file", oid, serial)
+ self._server.sendBlob(oid, serial)
+ if not os.path.exists(blob_filename):
+ raise POSException.POSKeyError("No blob file", oid, serial)
+
+ _accessed(blob_filename)
+ return ZODB.blob.BlobFile(blob_filename, 'r', blob)
+ finally:
+ lock.close()
+
+
def temporaryDirectory(self):
return self.fshelper.temp_dir
@@ -1218,9 +1235,10 @@
oid, blobfilename = blobs.pop()
self._blob_data_bytes_loaded += os.stat(blobfilename).st_size
targetpath = self.fshelper.getPathForOID(oid, create=True)
- rename_or_copy_blob(blobfilename,
- self.fshelper.getBlobFilename(oid, tid),
- )
+ ZODB.blob.rename_or_copy_blob(
+ blobfilename,
+ self.fshelper.getBlobFilename(oid, tid),
+ )
if self._blob_data_bytes_loaded > self._blob_cache_size_check:
self._check_blob_size()
@@ -1579,6 +1597,7 @@
raise ZODB.interfaces.StorageStopIteration()
return ZODB.BaseStorage.DataRecord(*item)
+
class ClientStorage308Adapter:
def __init__(self, client):
@@ -1592,3 +1611,22 @@
def __getattr__(self, name):
return getattr(self.client, name)
+
+
+class BlobCacheLayout(object):
+
+ size = 997
+
+ def oid_to_path(self, oid):
+ return str(utils.u64(oid) % self.size)
+
+ def getBlobFilePath(self, oid, tid):
+ base, rem = divmod(utils.u64(oid), self.size)
+ return os.path.join(
+ str(rem),
+ "%s.%s%s" % (base, tid.encode('hex'), ZODB.blob.BLOB_SUFFIX)
+ )
+
+def _accessed(filename):
+ os.utime(filename, (time.time(), os.stat(filename).st_mtime))
+ return filename
Modified: ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/zeo_blob_cache.test
===================================================================
--- ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/zeo_blob_cache.test 2008-12-02 22:15:03 UTC (rev 93555)
+++ ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/zeo_blob_cache.test 2008-12-02 22:51:20 UTC (rev 93556)
@@ -34,14 +34,22 @@
blob_cache_size_check option defaults to 100. We passed 10, to check
after writing 10% of the target size.
+We want to check for name collections in the blob cache dir. We'll try
+to provoke name collections by reducing the number of cache directory
+subdirectories.
+
+ >>> import ZEO.ClientStorage
+ >>> orig_blob_cache_layout_size = ZEO.ClientStorage.BlobCacheLayout.size
+ >>> ZEO.ClientStorage.BlobCacheLayout.size = 11
+
Now, let's write some data:
- >>> import ZODB.blob, transaction
+ >>> import ZODB.blob, transaction, time
>>> conn = db.open()
>>> for i in range(1, 101):
... conn.root()[i] = ZODB.blob.Blob()
... conn.root()[i].open('w').write(chr(i)*100)
- ... transaction.commit()
+ >>> transaction.commit()
We've committed 10000 bytes of data, but our target size is 4000. We
expect to have not much more than the target size in the cache blob
@@ -66,8 +74,9 @@
target:
>>> for i in range(1, 101):
- ... if conn.root()[i].open().read() != chr(i)*100:
- ... print 'bad data', i
+ ... data = conn.root()[i].open().read()
+ ... if data != chr(i)*100:
+ ... print 'bad data', `chr(i)`, `data`
>>> db.storage._check_blob_size_thread.join()
@@ -75,16 +84,55 @@
True
>>> for i in range(1, 101):
- ... if conn.root()[i].open().read() != chr(i)*100:
- ... print 'bad data', i
+ ... data = conn.root()[i].open().read()
+ ... if data != chr(i)*100:
+ ... print 'bad data', `chr(i)`, `data`
>>> db.storage._check_blob_size_thread.join()
>>> cache_size('blobs') < 5000
True
+ >>> for i in range(1, 101):
+ ... data = open(conn.root()[i].committed(), 'rb').read()
+ ... if data != chr(i)*100:
+ ... print 'bad data', `chr(i)`, `data`
+ >>> db.storage._check_blob_size_thread.join()
+ >>> cache_size('blobs') < 5000
+ True
+
+Now let see if we can stress things a bit. We'll create many clients
+and get them to pound on the blobs all at once to see if we can
+provoke problems:
+
+ >>> import threading, random
+ >>> def run():
+ ... db = ZEO.DB(addr, blob_dir='blobs',
+ ... blob_cache_size=4000, blob_cache_size_check=10)
+ ... conn = db.open()
+ ... for i in range(300):
+ ... time.sleep(0)
+ ... i = random.randint(1, 100)
+ ... data = conn.root()[i].open().read()
+ ... if data != chr(i)*100:
+ ... print 'bad data', `chr(i)`, `data`
+ ... db._storage._check_blob_size_thread.join()
+ ... db.close()
+
+ >>> threads = [threading.Thread(target=run) for i in range(10)]
+ >>> for thread in threads:
+ ... thread.setDaemon(True)
+ >>> for thread in threads:
+ ... thread.start()
+ >>> for thread in threads:
+ ... thread.join()
+
+ >>> cache_size('blobs') < 5000
+ True
+
.. cleanup
>>> db.close()
+ >>> ZEO.ClientStorage.BlobCacheLayout.size = orig_blob_cache_layout_size
Modified: ZODB/branches/jim-zeo-blob-cache/src/ZODB/blob.py
===================================================================
--- ZODB/branches/jim-zeo-blob-cache/src/ZODB/blob.py 2008-12-02 22:15:03 UTC (rev 93555)
+++ ZODB/branches/jim-zeo-blob-cache/src/ZODB/blob.py 2008-12-02 22:51:20 UTC (rev 93556)
@@ -126,10 +126,29 @@
self.readers = []
if mode == 'r':
- if self._current_filename() is None:
- self._create_uncommitted_file()
+ result = None
+ to_open = self._p_blob_uncommitted
+ if not to_open:
+ to_open = self._p_blob_committed
+ if to_open:
+ storage = self._p_jar._storage
+ if hasattr(storage, 'openCommittedBlobFile'):
+ result = storage.openCommittedBlobFile(
+ self._p_oid, self._p_serial, self)
+ else:
+ # We do this to make sure we have the file and
+ # to let the storage know we're accessing the file.
+ # It might be nice to add a more explicit api for this.
+ n = storage.loadBlob(self._p_oid, self._p_serial)
+ assert to_open == n, (to_open, n)
+ result = BlobFile(to_open, mode, self)
+ else:
+ self._create_uncommitted_file()
+ to_open = self._p_blob_uncommitted
+ assert to_open
- result = BlobFile(self._current_filename(), mode, self)
+ if result is None:
+ result = BlobFile(to_open, mode, self)
def destroyed(ref, readers=self.readers):
try:
@@ -178,8 +197,16 @@
self._p_blob_committed.endswith(SAVEPOINT_SUFFIX)
):
raise BlobError('Uncommitted changes')
- return self._p_blob_committed
+ result = self._p_blob_committed
+
+ # We do this to make sure we have the file and to let the
+ # storage know we're accessing the file.
+ n = self._p_jar._storage.loadBlob(self._p_oid, self._p_serial)
+ assert result == n, (result, n)
+
+ return result
+
def consumeFile(self, filename):
"""Will replace the current data of the blob with the file given under
filename.
@@ -231,11 +258,6 @@
# utility methods
- def _current_filename(self):
- # NOTE: _p_blob_committed and _p_blob_uncommitted appear by virtue of
- # Connection._setstate
- return self._p_blob_uncommitted or self._p_blob_committed
-
def _create_uncommitted_file(self):
assert self._p_blob_uncommitted is None, (
"Uncommitted file already exists.")
@@ -388,13 +410,13 @@
'committed' blob file related to that oid and tid.
"""
- oid_path = self.getPathForOID(oid)
# TIDs are numbers and sometimes passed around as integers. For our
# computations we rely on the 64-bit packed string representation
if isinstance(tid, int):
tid = utils.p64(tid)
- filename = "%s%s" % (utils.tid_repr(tid), BLOB_SUFFIX)
- return os.path.join(oid_path, filename)
+ return os.path.join(self.base_dir,
+ self.layout.getBlobFilePath(oid, tid),
+ )
def blob_mkstemp(self, oid, tid):
"""Given an oid and a tid, return a temporary file descriptor
@@ -513,10 +535,18 @@
oid = ''.join(binascii.unhexlify(byte[2:]) for byte in path)
return oid
+ def getBlobFilePath(self, oid, tid):
+ """Given an oid and a tid, return the full filename of the
+ 'committed' blob file related to that oid and tid.
+
+ """
+ oid_path = self.oid_to_path(oid)
+ filename = "%s%s" % (utils.tid_repr(tid), BLOB_SUFFIX)
+ return os.path.join(oid_path, filename)
+
LAYOUTS['bushy'] = BushyLayout()
-
-class LawnLayout(object):
+class LawnLayout(BushyLayout):
"""A shallow directory layout for blob directories.
Creates a single level of directories (one for each oid).
More information about the Zodb-checkins
mailing list