[Zodb-checkins] SVN: ZODB/branches/3.8/src/ZEO/ Combined the ZEO
client cache and file classes as a first step in a
Jim Fulton
jim at zope.com
Sun May 11 11:15:14 EDT 2008
Log message for revision 86650:
Combined the ZEO client cache and file classes as a first step in a
refactoring to simplify the data structures to fix a serious memory
bug: the cache uses waaaay the heck too much.
Changed:
U ZODB/branches/3.8/src/ZEO/cache.py
U ZODB/branches/3.8/src/ZEO/tests/test_cache.py
-=-
Modified: ZODB/branches/3.8/src/ZEO/cache.py
===================================================================
--- ZODB/branches/3.8/src/ZEO/cache.py 2008-05-11 09:48:43 UTC (rev 86649)
+++ ZODB/branches/3.8/src/ZEO/cache.py 2008-05-11 15:15:12 UTC (rev 86650)
@@ -65,6 +65,58 @@
# full verification
# <p>
+
+##
+# FileCache stores a cache in a single on-disk file.
+#
+# On-disk cache structure.
+#
+# The file begins with a 12-byte header. The first four bytes are the
+# file's magic number - ZEC3 - indicating zeo cache version 3. The
+# next eight bytes are the last transaction id.
+
+magic = "ZEC3"
+ZEC3_HEADER_SIZE = 12
+
+# After the header, the file contains a contiguous sequence of blocks. All
+# blocks begin with a one-byte status indicator:
+#
+# 'a'
+# Allocated. The block holds an object; the next 4 bytes are >I
+# format total block size.
+#
+# 'f'
+# Free. The block is free; the next 4 bytes are >I format total
+# block size.
+#
+# '1', '2', '3', '4'
+# The block is free, and consists of 1, 2, 3 or 4 bytes total.
+#
+# "Total" includes the status byte, and size bytes. There are no
+# empty (size 0) blocks.
+
+
+# Allocated blocks have more structure:
+#
+# 1 byte allocation status ('a').
+# 4 bytes block size, >I format.
+# 16 bytes oid + tid, string.
+# size-OBJECT_HEADER_SIZE bytes, the serialization of an Object (see
+# class Object for details).
+
+OBJECT_HEADER_SIZE = 1 + 4 + 16
+
+# The cache's currentofs goes around the file, circularly, forever.
+# It's always the starting offset of some block.
+#
+# When a new object is added to the cache, it's stored beginning at
+# currentofs, and currentofs moves just beyond it. As many contiguous
+# blocks needed to make enough room for the new object are evicted,
+# starting at currentofs. Exception: if currentofs is close enough
+# to the end of the file that the new object can't fit in one
+# contiguous chunk, currentofs is reset to ZEC3_HEADER_SIZE first.
+
+
class ClientCache(object):
"""A simple in-memory cache."""
@@ -78,9 +130,16 @@
# ClientStorage is the only user of ClientCache, and it always passes an
# explicit size of its own choosing.
def __init__(self, path=None, size=200*1024**2):
+
+ # - `path`: filepath for the cache file, or None (in which case
+ # a temp file will be created)
self.path = path
- self.size = size
+ # - `maxsize`: total size of the cache file, in bytes; this is
+ # ignored path names an existing file; perhaps we should attempt
+ # to change the cache size in that case
+ self.maxsize = size
+
# The cache stores objects in a dict mapping (oid, tid) pairs
# to Object() records (see below). The tid is the transaction
# id that wrote the object. An object record includes data,
@@ -103,14 +162,426 @@
# is not modified in a version.
self.version = {}
- # A FileCache instance does all the low-level work of storing
- # and retrieving objects to/from the cache file.
- self.fc = FileCache(size, self.path, self)
+ # tid for the most recent transaction we know about. This is also
+ # stored near the start of the file.
+ self.tid = None
- self._setup_trace(self.path)
+ # There's one Entry instance, kept in memory, for each currently
+ # allocated block in the file, and there's one allocated block in the
+ # file per serialized Object. filemap retrieves the Entry given the
+ # starting offset of a block, and key2entry retrieves the Entry given
+ # an object revision's key (an (oid, start_tid) pair). From an
+ # Entry, we can get the Object's key and file offset.
+ # Map offset in file to pair (data record size, Entry).
+ # Entry is None iff the block starting at offset is free.
+ # filemap always contains a complete account of what's in the
+ # file -- study method _verify_filemap for executable checking
+ # of the relevant invariants. An offset is at the start of a
+ # block iff it's a key in filemap. The data record size is
+ # stored in the file too, so we could just seek to the offset
+ # and read it up; keeping it in memory is an optimization.
+ self.filemap = {}
+
+ # Map key to Entry. After
+ # obj = key2entry[key]
+ # then
+ # obj.key == key
+ # is true. An object is currently stored on disk iff its key is in
+ # key2entry.
+ self.key2entry = {}
+
+ # Always the offset into the file of the start of a block.
+ # New and relocated objects are always written starting at
+ # currentofs.
+ self.currentofs = ZEC3_HEADER_SIZE
+
+ # self.f is the open file object.
+ # When we're not reusing an existing file, self.f is left None
+ # here -- the scan() method must be called then to open the file
+ # (and it sets self.f).
+
+ if path:
+ self._lock_file = ZODB.lock_file.LockFile(path + '.lock')
+
+ if path and os.path.exists(path):
+ # Reuse an existing file. scan() will open & read it.
+ self.f = None
+ logger.info("reusing persistent cache file %r", path)
+ else:
+ if path:
+ self.f = open(path, 'wb+')
+ logger.info("created persistent cache file %r", path)
+ else:
+ self.f = tempfile.TemporaryFile()
+ logger.info("created temporary cache file %r", self.f.name)
+ # Make sure the OS really saves enough bytes for the file.
+ self.f.seek(self.maxsize - 1)
+ self.f.write('x')
+ self.f.truncate()
+ # Start with one magic header block
+ self.f.seek(0)
+ self.f.write(magic)
+ self.f.write(z64)
+ # and one free block.
+ self.f.write('f' + struct.pack(">I", self.maxsize -
+ ZEC3_HEADER_SIZE))
+ sync(self.f)
+ self.filemap[ZEC3_HEADER_SIZE] = (self.maxsize - ZEC3_HEADER_SIZE,
+ None)
+
+ # Statistics: _n_adds, _n_added_bytes,
+ # _n_evicts, _n_evicted_bytes,
+ # _n_accesses
+ self.clearStats()
+
+ self._setup_trace(path)
+
+
+ ##
+ # Scan the current contents of the cache file, calling `install`
+ # for each object found in the cache. This method should only
+ # be called once to initialize the cache from disk.
+ def scan(self, install):
+ if self.f is not None: # we're not (re)using a pre-existing file
+ return
+ fsize = os.path.getsize(self.path)
+ if fsize != self.maxsize:
+ logger.warning("existing cache file %r has size %d; "
+ "requested size %d ignored", self.path,
+ fsize, self.maxsize)
+ self.maxsize = fsize
+ self.f = open(self.path, 'rb+')
+ _magic = self.f.read(4)
+ if _magic != magic:
+ raise ValueError("unexpected magic number: %r" % _magic)
+ self.tid = self.f.read(8)
+ if len(self.tid) != 8:
+ raise ValueError("cache file too small -- no tid at start")
+
+ # Populate .filemap and .key2entry to reflect what's currently in the
+ # file, and tell our parent about it too (via the `install` callback).
+ # Remember the location of the largest free block. That seems a
+ # decent place to start currentofs.
+ max_free_size = 0
+ ofs = max_free_offset = ZEC3_HEADER_SIZE
+ while ofs < fsize:
+ self.f.seek(ofs)
+ ent = None
+ status = self.f.read(1)
+ if status == 'a':
+ size, rawkey = struct.unpack(">I16s", self.f.read(20))
+ key = rawkey[:8], rawkey[8:]
+ assert key not in self.key2entry
+ self.key2entry[key] = ent = Entry(key, ofs)
+ install(self.f, ent)
+ elif status == 'f':
+ size, = struct.unpack(">I", self.f.read(4))
+ elif status in '1234':
+ size = int(status)
+ else:
+ raise ValueError("unknown status byte value %s in client "
+ "cache file" % 0, hex(ord(status)))
+
+ self.filemap[ofs] = size, ent
+ if ent is None and size > max_free_size:
+ max_free_size, max_free_offset = size, ofs
+
+ ofs += size
+
+ if ofs != fsize:
+ raise ValueError("final offset %s != file size %s in client "
+ "cache file" % (ofs, fsize))
+ if __debug__:
+ self._verify_filemap()
+ self.currentofs = max_free_offset
+
+ def clearStats(self):
+ self._n_adds = self._n_added_bytes = 0
+ self._n_evicts = self._n_evicted_bytes = 0
+ self._n_accesses = 0
+
+ def getStats(self):
+ return (self._n_adds, self._n_added_bytes,
+ self._n_evicts, self._n_evicted_bytes,
+ self._n_accesses
+ )
+
+ ##
+ # The number of objects currently in the cache.
+ def __len__(self):
+ return len(self.key2entry)
+
+ ##
+ # Iterate over the objects in the cache, producing an Entry for each.
+ def __iter__(self):
+ return self.key2entry.itervalues()
+
+ ##
+ # Test whether an (oid, tid) pair is in the cache.
+ def __contains__(self, key):
+ return key in self.key2entry
+
+ ##
+ # Close the underlying file. No methods accessing the cache should be
+ # used after this.
+ def close(self):
+ if hasattr(self,'_lock_file'):
+ self._lock_file.close()
+ if self.f:
+ sync(self.f)
+ self.f.close()
+ self.f = None
+
+ ##
+ # Evict objects as necessary to free up at least nbytes bytes,
+ # starting at currentofs. If currentofs is closer than nbytes to
+ # the end of the file, currentofs is reset to ZEC3_HEADER_SIZE first.
+ # The number of bytes actually freed may be (and probably will be)
+ # greater than nbytes, and is _makeroom's return value. The file is not
+ # altered by _makeroom. filemap and key2entry are updated to reflect the
+ # evictions, and it's the caller's responsibility both to fiddle
+ # the file, and to update filemap, to account for all the space
+ # freed (starting at currentofs when _makeroom returns, and
+ # spanning the number of bytes retured by _makeroom).
+ def _makeroom(self, nbytes):
+ assert 0 < nbytes <= self.maxsize - ZEC3_HEADER_SIZE
+ if self.currentofs + nbytes > self.maxsize:
+ self.currentofs = ZEC3_HEADER_SIZE
+ ofs = self.currentofs
+ while nbytes > 0:
+ size, e = self.filemap.pop(ofs)
+ if e is not None:
+ del self.key2entry[e.key]
+ self._evictobj(e, size)
+ ofs += size
+ nbytes -= size
+ return ofs - self.currentofs
+
+ ##
+ # Write Object obj, with data, to file starting at currentofs.
+ # nfreebytes are already available for overwriting, and it's
+ # guranteed that's enough. obj.offset is changed to reflect the
+ # new data record position, and filemap and key2entry are updated to
+ # match.
+ def _writeobj(self, obj, nfreebytes):
+ size = OBJECT_HEADER_SIZE + obj.size
+ assert size <= nfreebytes
+ excess = nfreebytes - size
+ # If there's any excess (which is likely), we need to record a
+ # free block following the end of the data record. That isn't
+ # expensive -- it's all a contiguous write.
+ if excess == 0:
+ extra = ''
+ elif excess < 5:
+ extra = "01234"[excess]
+ else:
+ extra = 'f' + struct.pack(">I", excess)
+
+ self.f.seek(self.currentofs)
+
+ # Before writing data, we'll write a free block for the space freed.
+ # We'll come back with a last atomic write to rewrite the start of the
+ # allocated-block header.
+ self.f.write('f'+struct.pack(">I", nfreebytes))
+
+ # Now write the rest of the allocation block header and object data.
+ self.f.write(struct.pack(">8s8s", obj.key[0], obj.key[1]))
+ obj.serialize(self.f)
+ self.f.write(extra)
+
+ # Now, we'll go back and rewrite the beginning of the
+ # allocated block header.
+ self.f.seek(self.currentofs)
+ self.f.write('a'+struct.pack(">I", size))
+
+ # Update index
+ e = Entry(obj.key, self.currentofs)
+ self.key2entry[obj.key] = e
+ self.filemap[self.currentofs] = size, e
+ self.currentofs += size
+ if excess:
+ # We need to record the free block in filemap, but there's
+ # no need to advance currentofs beyond it. Instead it
+ # gives some breathing room for the next object to get
+ # written.
+ self.filemap[self.currentofs] = excess, None
+
+ ##
+ # Add Object object to the cache. This may evict existing objects, to
+ # make room (and almost certainly will, in steady state once the cache
+ # is first full). The object must not already be in the cache. If the
+ # object is too large for the cache, False is returned, otherwise True.
+ def add(self, object):
+ size = OBJECT_HEADER_SIZE + object.size
+ # A number of cache simulation experiments all concluded that the
+ # 2nd-level ZEO cache got a much higher hit rate if "very large"
+ # objects simply weren't cached. For now, we ignore the request
+ # only if the entire cache file is too small to hold the object.
+ if size > self.maxsize - ZEC3_HEADER_SIZE:
+ return False
+
+ assert object.key not in self.key2entry
+ assert len(object.key[0]) == 8
+ assert len(object.key[1]) == 8
+
+ self._n_adds += 1
+ self._n_added_bytes += size
+
+ available = self._makeroom(size)
+ self._writeobj(object, available)
+ return True
+
+ ##
+ # Evict the object represented by Entry `e` from the cache, freeing
+ # `size` bytes in the file for reuse. `size` is used only for summary
+ # statistics. This does not alter the file, or self.filemap or
+ # self.key2entry (those are the caller's responsibilities). It does
+ # invoke _evicted(Object) on our parent.
+ def _evictobj(self, e, size):
+ self._n_evicts += 1
+ self._n_evicted_bytes += size
+ # Load the object header into memory so we know how to
+ # update the parent's in-memory data structures.
+ self.f.seek(e.offset + OBJECT_HEADER_SIZE)
+ o = Object.fromFile(self.f, e.key, skip_data=True)
+ self._evicted(o)
+
+ ##
+ # Return Object for key, or None if not in cache.
+ def access(self, key):
+ self._n_accesses += 1
+ e = self.key2entry.get(key)
+ if e is None:
+ return None
+ offset = e.offset
+ size, e2 = self.filemap[offset]
+ assert e is e2
+
+ self.f.seek(offset + OBJECT_HEADER_SIZE)
+ return Object.fromFile(self.f, key)
+
+ ##
+ # Remove Object for key from cache, if present.
+ def remove(self, key):
+ # If an object is being explicitly removed, we need to load
+ # its header into memory and write a free block marker to the
+ # disk where the object was stored. We need to load the
+ # header to update the in-memory data structures held by
+ # ClientCache.
+
+ # We could instead just keep the header in memory at all times.
+
+ e = self.key2entry.pop(key, None)
+ if e is None:
+ return
+ offset = e.offset
+ size, e2 = self.filemap[offset]
+ assert e is e2
+ self.filemap[offset] = size, None
+ self.f.seek(offset + OBJECT_HEADER_SIZE)
+ o = Object.fromFile(self.f, key, skip_data=True)
+ assert size >= 5 # only free blocks are tiny
+ # Because `size` >= 5, we can change an allocated block to a free
+ # block just by overwriting the 'a' status byte with 'f' -- the
+ # size field stays the same.
+ self.f.seek(offset)
+ self.f.write('f')
+ self.f.flush()
+ self._evicted(o)
+
+ ##
+ # Update on-disk representation of Object obj.
+ #
+ # This method should be called when the object header is modified.
+ # obj must be in the cache. The only real use for this is during
+ # invalidation, to set the end_tid field on a revision that was current
+ # (and so had an end_tid of None, but no longer does).
+ def update(self, obj):
+ e = self.key2entry[obj.key]
+ self.f.seek(e.offset + OBJECT_HEADER_SIZE)
+ obj.serialize_header(self.f)
+
+ ##
+ # Update our idea of the most recent tid. This is stored in the
+ # instance, and also written out near the start of the cache file. The
+ # new tid must be strictly greater than our current idea of the most
+ # recent tid.
+ def setLastTid(self, tid):
+ if self.tid is not None and tid <= self.tid:
+ raise ValueError("new last tid (%s) must be greater than "
+ "previous one (%s)" % (u64(tid),
+ u64(self.tid)))
+ assert isinstance(tid, str) and len(tid) == 8
+ self.tid = tid
+ self.f.seek(len(magic))
+ self.f.write(tid)
+ self.f.flush()
+
+
+ ##
+ # This debug method marches over the entire cache file, verifying that
+ # the current contents match the info in self.filemap and self.key2entry.
+ def _verify_filemap(self, display=False):
+ a = ZEC3_HEADER_SIZE
+ f = self.f
+ while a < self.maxsize:
+ f.seek(a)
+ status = f.read(1)
+ if status in 'af':
+ size, = struct.unpack(">I", f.read(4))
+ else:
+ size = int(status)
+ if display:
+ if a == self.currentofs:
+ print '*****',
+ print "%c%d" % (status, size),
+ size2, obj = self.filemap[a]
+ assert size == size2
+ assert (obj is not None) == (status == 'a')
+ if obj is not None:
+ assert obj.offset == a
+ assert self.key2entry[obj.key] is obj
+ a += size
+ if display:
+ print
+ assert a == self.maxsize
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
def open(self):
- self.fc.scan(self.install)
+ self.scan(self.install)
##
# Callback for FileCache.scan(), when a pre-existing file cache is
@@ -136,31 +607,20 @@
else:
self.noncurrent[oid] = [this_span]
- def close(self):
- self.fc.close()
- if self._tracefile:
- sync(self._tracefile)
- self._tracefile.close()
- self._tracefile = None
-
##
# Set the last transaction seen by the cache.
# @param tid a transaction id
# @exception ValueError attempt to set a new tid less than the current tid
- def setLastTid(self, tid):
- self.fc.settid(tid)
-
##
# Return the last transaction seen by the cache.
# @return a transaction id
# @defreturn string, or None if no transaction is yet known
-
def getLastTid(self):
- if self.fc.tid == z64:
+ if self.tid == z64:
return None
else:
- return self.fc.tid
+ return self.tid
##
# Return the current data record for oid and version.
@@ -187,7 +647,7 @@
if tid is None:
self._trace(0x20, oid, version)
return None
- o = self.fc.access((oid, tid))
+ o = self.access((oid, tid))
if o is None:
self._trace(0x20, oid, version)
return None
@@ -222,7 +682,7 @@
if tid > hi: # we don't have any data in the right range
self._trace(0x24, oid, "", tid)
return None
- o = self.fc.access((oid, lo))
+ o = self.access((oid, lo))
self._trace(0x26, oid, "", tid)
return o.data, o.start_tid, o.end_tid
@@ -260,7 +720,7 @@
# the requested version, doesn't find it, then asks the server
# for that data. The server returns the non-version data,
# which may already be in the cache.
- if (oid, start_tid) in self.fc:
+ if (oid, start_tid) in self:
return
o = Object((oid, start_tid), version, data, start_tid, end_tid)
if version:
@@ -270,7 +730,7 @@
if self.version[oid] != (version, start_tid):
raise ValueError("data already exists for version %r"
% self.version[oid][0])
- if not self.fc.add(o):
+ if not self.add(o):
return # too large
self.version[oid] = version, start_tid
self._trace(0x50, oid, version, start_tid, dlen=len(data))
@@ -283,7 +743,7 @@
"already have current data for oid")
else:
return
- if not self.fc.add(o):
+ if not self.add(o):
return # too large
self.current[oid] = start_tid
self._trace(0x52, oid, version, start_tid, dlen=len(data))
@@ -292,7 +752,7 @@
p = start_tid, end_tid
if p in L:
return # duplicate store
- if not self.fc.add(o):
+ if not self.add(o):
return # too large
bisect.insort_left(L, p)
self._trace(0x54, oid, version, start_tid, end_tid,
@@ -311,7 +771,7 @@
for old_tid, dummy in noncurrent_list[:]:
# 0x1E = invalidate (hit, discarding current or non-current)
self._trace(0x1E, oid, version, tid)
- self.fc.remove((oid, old_tid))
+ self.remove((oid, old_tid))
# fc.remove() calling back to _evicted() should have removed
# the list from noncurrent when the last non-current revision
# was removed.
@@ -331,8 +791,8 @@
# or None to forget all cached info about oid (version, current
# revision, and non-current revisions)
def invalidate(self, oid, version, tid):
- if tid > self.fc.tid and tid is not None:
- self.fc.settid(tid)
+ if tid > self.tid and tid is not None:
+ self.setLastTid(tid)
remove_all_knowledge_of_oid = tid is None
@@ -342,7 +802,7 @@
self._trace(0x1A, oid, version, tid)
dllversion, dlltid = self.version[oid]
assert not version or version == dllversion, (version, dllversion)
- self.fc.remove((oid, dlltid))
+ self.remove((oid, dlltid))
assert oid not in self.version # .remove() got rid of it
# And continue: we must also remove any non-version data from
# the cache. Or, at least, I have such a poor understanding of
@@ -365,7 +825,7 @@
if remove_all_knowledge_of_oid:
# 0x1E = invalidate (hit, discarding current or non-current)
self._trace(0x1E, oid, version, tid)
- self.fc.remove((oid, cur_tid))
+ self.remove((oid, cur_tid))
assert cur_tid not in self.current # .remove() got rid of it
return
@@ -377,7 +837,7 @@
# Update the end_tid half of oid's validity range on disk.
# TODO: Want to fetch object without marking it as accessed.
- o = self.fc.access((oid, cur_tid))
+ o = self.access((oid, cur_tid))
assert o is not None
assert o.end_tid is None # i.e., o was current
if o is None:
@@ -385,20 +845,11 @@
# should be removed; waiting on time to prove it can't happen.
return
o.end_tid = tid
- self.fc.update(o) # record the new end_tid on disk
+ self.update(o) # record the new end_tid on disk
# Add to oid's list of non-current data.
L = self.noncurrent.setdefault(oid, [])
bisect.insort_left(L, (cur_tid, tid))
- ##
- # Return the number of object revisions in the cache.
- #
- # Or maybe better to just return len(self.cache)? Needs clearer use case.
- def __len__(self):
- n = len(self.current) + len(self.version)
- if self.noncurrent:
- n += sum(map(len, self.noncurrent))
- return n
##
# Generates (oid, serial, version) triples for all objects in the
@@ -406,10 +857,10 @@
def contents(self):
# May need to materialize list instead of iterating;
# depends on whether the caller may change the cache.
- for o in self.fc:
+ for o in self:
oid, tid = o.key
if oid in self.version:
- obj = self.fc.access(o.key)
+ obj = self.access(o.key)
yield oid, tid, obj.version
else:
yield oid, tid, ""
@@ -422,7 +873,7 @@
for oid, tid, version in L:
print oid_repr(oid), oid_repr(tid), repr(version)
print "dll contents"
- L = list(self.fc)
+ L = list(self)
L.sort(lambda x, y: cmp(x.key, y.key))
for x in L:
end_tid = x.end_tid or z64
@@ -659,463 +1110,10 @@
self.offset = offset
-
-##
-# FileCache stores a cache in a single on-disk file.
-#
-# On-disk cache structure.
-#
-# The file begins with a 12-byte header. The first four bytes are the
-# file's magic number - ZEC3 - indicating zeo cache version 3. The
-# next eight bytes are the last transaction id.
-
-magic = "ZEC3"
-ZEC3_HEADER_SIZE = 12
-
-# After the header, the file contains a contiguous sequence of blocks. All
-# blocks begin with a one-byte status indicator:
-#
-# 'a'
-# Allocated. The block holds an object; the next 4 bytes are >I
-# format total block size.
-#
-# 'f'
-# Free. The block is free; the next 4 bytes are >I format total
-# block size.
-#
-# '1', '2', '3', '4'
-# The block is free, and consists of 1, 2, 3 or 4 bytes total.
-#
-# "Total" includes the status byte, and size bytes. There are no
-# empty (size 0) blocks.
-
-
-# Allocated blocks have more structure:
-#
-# 1 byte allocation status ('a').
-# 4 bytes block size, >I format.
-# 16 bytes oid + tid, string.
-# size-OBJECT_HEADER_SIZE bytes, the serialization of an Object (see
-# class Object for details).
-
-OBJECT_HEADER_SIZE = 1 + 4 + 16
-
-# The cache's currentofs goes around the file, circularly, forever.
-# It's always the starting offset of some block.
-#
-# When a new object is added to the cache, it's stored beginning at
-# currentofs, and currentofs moves just beyond it. As many contiguous
-# blocks needed to make enough room for the new object are evicted,
-# starting at currentofs. Exception: if currentofs is close enough
-# to the end of the file that the new object can't fit in one
-# contiguous chunk, currentofs is reset to ZEC3_HEADER_SIZE first.
-
-# Do all possible to ensure that the bytes we wrote to file f are really on
-# disk.
def sync(f):
f.flush()
- if hasattr(os, 'fsync'):
- os.fsync(f.fileno())
-class FileCache(object):
-
- def __init__(self, maxsize, fpath, parent):
- # - `maxsize`: total size of the cache file, in bytes; this is
- # ignored path names an existing file; perhaps we should attempt
- # to change the cache size in that case
- # - `fpath`: filepath for the cache file, or None (in which case
- # a temp file will be created)
- # - `parent`: the ClientCache instance; its `_evicted()` method
- # is called whenever we need to evict an object to make room in
- # the file
- self.maxsize = maxsize
- self.parent = parent
-
- # tid for the most recent transaction we know about. This is also
- # stored near the start of the file.
- self.tid = None
-
- # There's one Entry instance, kept in memory, for each currently
- # allocated block in the file, and there's one allocated block in the
- # file per serialized Object. filemap retrieves the Entry given the
- # starting offset of a block, and key2entry retrieves the Entry given
- # an object revision's key (an (oid, start_tid) pair). From an
- # Entry, we can get the Object's key and file offset.
-
- # Map offset in file to pair (data record size, Entry).
- # Entry is None iff the block starting at offset is free.
- # filemap always contains a complete account of what's in the
- # file -- study method _verify_filemap for executable checking
- # of the relevant invariants. An offset is at the start of a
- # block iff it's a key in filemap. The data record size is
- # stored in the file too, so we could just seek to the offset
- # and read it up; keeping it in memory is an optimization.
- self.filemap = {}
-
- # Map key to Entry. After
- # obj = key2entry[key]
- # then
- # obj.key == key
- # is true. An object is currently stored on disk iff its key is in
- # key2entry.
- self.key2entry = {}
-
- # Always the offset into the file of the start of a block.
- # New and relocated objects are always written starting at
- # currentofs.
- self.currentofs = ZEC3_HEADER_SIZE
-
- # self.f is the open file object.
- # When we're not reusing an existing file, self.f is left None
- # here -- the scan() method must be called then to open the file
- # (and it sets self.f).
-
- self.fpath = fpath
-
- if fpath:
- self._lock_file = ZODB.lock_file.LockFile(fpath + '.lock')
-
- if fpath and os.path.exists(fpath):
- # Reuse an existing file. scan() will open & read it.
- self.f = None
- logger.info("reusing persistent cache file %r", fpath)
- else:
- if fpath:
- self.f = open(fpath, 'wb+')
- logger.info("created persistent cache file %r", fpath)
- else:
- self.f = tempfile.TemporaryFile()
- logger.info("created temporary cache file %r", self.f.name)
- # Make sure the OS really saves enough bytes for the file.
- self.f.seek(self.maxsize - 1)
- self.f.write('x')
- self.f.truncate()
- # Start with one magic header block
- self.f.seek(0)
- self.f.write(magic)
- self.f.write(z64)
- # and one free block.
- self.f.write('f' + struct.pack(">I", self.maxsize -
- ZEC3_HEADER_SIZE))
- self.sync()
- self.filemap[ZEC3_HEADER_SIZE] = (self.maxsize - ZEC3_HEADER_SIZE,
- None)
-
- # Statistics: _n_adds, _n_added_bytes,
- # _n_evicts, _n_evicted_bytes,
- # _n_accesses
- self.clearStats()
-
- ##
- # Scan the current contents of the cache file, calling `install`
- # for each object found in the cache. This method should only
- # be called once to initialize the cache from disk.
- def scan(self, install):
- if self.f is not None: # we're not (re)using a pre-existing file
- return
- fsize = os.path.getsize(self.fpath)
- if fsize != self.maxsize:
- logger.warning("existing cache file %r has size %d; "
- "requested size %d ignored", self.fpath,
- fsize, self.maxsize)
- self.maxsize = fsize
- self.f = open(self.fpath, 'rb+')
- _magic = self.f.read(4)
- if _magic != magic:
- raise ValueError("unexpected magic number: %r" % _magic)
- self.tid = self.f.read(8)
- if len(self.tid) != 8:
- raise ValueError("cache file too small -- no tid at start")
-
- # Populate .filemap and .key2entry to reflect what's currently in the
- # file, and tell our parent about it too (via the `install` callback).
- # Remember the location of the largest free block. That seems a
- # decent place to start currentofs.
- max_free_size = 0
- ofs = max_free_offset = ZEC3_HEADER_SIZE
- while ofs < fsize:
- self.f.seek(ofs)
- ent = None
- status = self.f.read(1)
- if status == 'a':
- size, rawkey = struct.unpack(">I16s", self.f.read(20))
- key = rawkey[:8], rawkey[8:]
- assert key not in self.key2entry
- self.key2entry[key] = ent = Entry(key, ofs)
- install(self.f, ent)
- elif status == 'f':
- size, = struct.unpack(">I", self.f.read(4))
- elif status in '1234':
- size = int(status)
- else:
- raise ValueError("unknown status byte value %s in client "
- "cache file" % 0, hex(ord(status)))
-
- self.filemap[ofs] = size, ent
- if ent is None and size > max_free_size:
- max_free_size, max_free_offset = size, ofs
-
- ofs += size
-
- if ofs != fsize:
- raise ValueError("final offset %s != file size %s in client "
- "cache file" % (ofs, fsize))
- if __debug__:
- self._verify_filemap()
- self.currentofs = max_free_offset
-
- def clearStats(self):
- self._n_adds = self._n_added_bytes = 0
- self._n_evicts = self._n_evicted_bytes = 0
- self._n_accesses = 0
-
- def getStats(self):
- return (self._n_adds, self._n_added_bytes,
- self._n_evicts, self._n_evicted_bytes,
- self._n_accesses
- )
-
- ##
- # The number of objects currently in the cache.
- def __len__(self):
- return len(self.key2entry)
-
- ##
- # Iterate over the objects in the cache, producing an Entry for each.
- def __iter__(self):
- return self.key2entry.itervalues()
-
- ##
- # Test whether an (oid, tid) pair is in the cache.
- def __contains__(self, key):
- return key in self.key2entry
-
- ##
- # Do all possible to ensure all bytes written to the file so far are
- # actually on disk.
- def sync(self):
- sync(self.f)
-
- ##
- # Close the underlying file. No methods accessing the cache should be
- # used after this.
- def close(self):
- if hasattr(self,'_lock_file'):
- self._lock_file.close()
- if self.f:
- self.sync()
- self.f.close()
- self.f = None
-
- ##
- # Evict objects as necessary to free up at least nbytes bytes,
- # starting at currentofs. If currentofs is closer than nbytes to
- # the end of the file, currentofs is reset to ZEC3_HEADER_SIZE first.
- # The number of bytes actually freed may be (and probably will be)
- # greater than nbytes, and is _makeroom's return value. The file is not
- # altered by _makeroom. filemap and key2entry are updated to reflect the
- # evictions, and it's the caller's responsibility both to fiddle
- # the file, and to update filemap, to account for all the space
- # freed (starting at currentofs when _makeroom returns, and
- # spanning the number of bytes retured by _makeroom).
- def _makeroom(self, nbytes):
- assert 0 < nbytes <= self.maxsize - ZEC3_HEADER_SIZE
- if self.currentofs + nbytes > self.maxsize:
- self.currentofs = ZEC3_HEADER_SIZE
- ofs = self.currentofs
- while nbytes > 0:
- size, e = self.filemap.pop(ofs)
- if e is not None:
- del self.key2entry[e.key]
- self._evictobj(e, size)
- ofs += size
- nbytes -= size
- return ofs - self.currentofs
-
- ##
- # Write Object obj, with data, to file starting at currentofs.
- # nfreebytes are already available for overwriting, and it's
- # guranteed that's enough. obj.offset is changed to reflect the
- # new data record position, and filemap and key2entry are updated to
- # match.
- def _writeobj(self, obj, nfreebytes):
- size = OBJECT_HEADER_SIZE + obj.size
- assert size <= nfreebytes
- excess = nfreebytes - size
- # If there's any excess (which is likely), we need to record a
- # free block following the end of the data record. That isn't
- # expensive -- it's all a contiguous write.
- if excess == 0:
- extra = ''
- elif excess < 5:
- extra = "01234"[excess]
- else:
- extra = 'f' + struct.pack(">I", excess)
-
- self.f.seek(self.currentofs)
-
- # Before writing data, we'll write a free block for the space freed.
- # We'll come back with a last atomic write to rewrite the start of the
- # allocated-block header.
- self.f.write('f'+struct.pack(">I", nfreebytes))
-
- # Now write the rest of the allocation block header and object data.
- self.f.write(struct.pack(">8s8s", obj.key[0], obj.key[1]))
- obj.serialize(self.f)
- self.f.write(extra)
-
- # Now, we'll go back and rewrite the beginning of the
- # allocated block header.
- self.f.seek(self.currentofs)
- self.f.write('a'+struct.pack(">I", size))
-
- # Update index
- e = Entry(obj.key, self.currentofs)
- self.key2entry[obj.key] = e
- self.filemap[self.currentofs] = size, e
- self.currentofs += size
- if excess:
- # We need to record the free block in filemap, but there's
- # no need to advance currentofs beyond it. Instead it
- # gives some breathing room for the next object to get
- # written.
- self.filemap[self.currentofs] = excess, None
-
- ##
- # Add Object object to the cache. This may evict existing objects, to
- # make room (and almost certainly will, in steady state once the cache
- # is first full). The object must not already be in the cache. If the
- # object is too large for the cache, False is returned, otherwise True.
- def add(self, object):
- size = OBJECT_HEADER_SIZE + object.size
- # A number of cache simulation experiments all concluded that the
- # 2nd-level ZEO cache got a much higher hit rate if "very large"
- # objects simply weren't cached. For now, we ignore the request
- # only if the entire cache file is too small to hold the object.
- if size > self.maxsize - ZEC3_HEADER_SIZE:
- return False
-
- assert object.key not in self.key2entry
- assert len(object.key[0]) == 8
- assert len(object.key[1]) == 8
-
- self._n_adds += 1
- self._n_added_bytes += size
-
- available = self._makeroom(size)
- self._writeobj(object, available)
- return True
-
- ##
- # Evict the object represented by Entry `e` from the cache, freeing
- # `size` bytes in the file for reuse. `size` is used only for summary
- # statistics. This does not alter the file, or self.filemap or
- # self.key2entry (those are the caller's responsibilities). It does
- # invoke _evicted(Object) on our parent.
- def _evictobj(self, e, size):
- self._n_evicts += 1
- self._n_evicted_bytes += size
- # Load the object header into memory so we know how to
- # update the parent's in-memory data structures.
- self.f.seek(e.offset + OBJECT_HEADER_SIZE)
- o = Object.fromFile(self.f, e.key, skip_data=True)
- self.parent._evicted(o)
-
- ##
- # Return Object for key, or None if not in cache.
- def access(self, key):
- self._n_accesses += 1
- e = self.key2entry.get(key)
- if e is None:
- return None
- offset = e.offset
- size, e2 = self.filemap[offset]
- assert e is e2
-
- self.f.seek(offset + OBJECT_HEADER_SIZE)
- return Object.fromFile(self.f, key)
-
- ##
- # Remove Object for key from cache, if present.
- def remove(self, key):
- # If an object is being explicitly removed, we need to load
- # its header into memory and write a free block marker to the
- # disk where the object was stored. We need to load the
- # header to update the in-memory data structures held by
- # ClientCache.
-
- # We could instead just keep the header in memory at all times.
-
- e = self.key2entry.pop(key, None)
- if e is None:
- return
- offset = e.offset
- size, e2 = self.filemap[offset]
- assert e is e2
- self.filemap[offset] = size, None
- self.f.seek(offset + OBJECT_HEADER_SIZE)
- o = Object.fromFile(self.f, key, skip_data=True)
- assert size >= 5 # only free blocks are tiny
- # Because `size` >= 5, we can change an allocated block to a free
- # block just by overwriting the 'a' status byte with 'f' -- the
- # size field stays the same.
- self.f.seek(offset)
- self.f.write('f')
- self.f.flush()
- self.parent._evicted(o)
-
- ##
- # Update on-disk representation of Object obj.
- #
- # This method should be called when the object header is modified.
- # obj must be in the cache. The only real use for this is during
- # invalidation, to set the end_tid field on a revision that was current
- # (and so had an end_tid of None, but no longer does).
- def update(self, obj):
- e = self.key2entry[obj.key]
- self.f.seek(e.offset + OBJECT_HEADER_SIZE)
- obj.serialize_header(self.f)
-
- ##
- # Update our idea of the most recent tid. This is stored in the
- # instance, and also written out near the start of the cache file. The
- # new tid must be strictly greater than our current idea of the most
- # recent tid.
- def settid(self, tid):
- if self.tid is not None and tid <= self.tid:
- raise ValueError("new last tid (%s) must be greater than "
- "previous one (%s)" % (u64(tid),
- u64(self.tid)))
- assert isinstance(tid, str) and len(tid) == 8
- self.tid = tid
- self.f.seek(len(magic))
- self.f.write(tid)
- self.f.flush()
-
- ##
- # This debug method marches over the entire cache file, verifying that
- # the current contents match the info in self.filemap and self.key2entry.
- def _verify_filemap(self, display=False):
- a = ZEC3_HEADER_SIZE
- f = self.f
- while a < self.maxsize:
- f.seek(a)
- status = f.read(1)
- if status in 'af':
- size, = struct.unpack(">I", f.read(4))
- else:
- size = int(status)
- if display:
- if a == self.currentofs:
- print '*****',
- print "%c%d" % (status, size),
- size2, obj = self.filemap[a]
- assert size == size2
- assert (obj is not None) == (status == 'a')
- if obj is not None:
- assert obj.offset == a
- assert self.key2entry[obj.key] is obj
- a += size
- if display:
- print
- assert a == self.maxsize
+if hasattr(os, 'fsync'):
+ def sync(f):
+ f.flush()
+ os.fsync(f.fileno())
Modified: ZODB/branches/3.8/src/ZEO/tests/test_cache.py
===================================================================
--- ZODB/branches/3.8/src/ZEO/tests/test_cache.py 2008-05-11 09:48:43 UTC (rev 86649)
+++ ZODB/branches/3.8/src/ZEO/tests/test_cache.py 2008-05-11 15:15:12 UTC (rev 86650)
@@ -109,21 +109,20 @@
def testEviction(self):
# Manually override the current maxsize
- maxsize = self.cache.size = self.cache.fc.maxsize = 3395 # 1245
- self.cache.fc = ZEO.cache.FileCache(3395, None, self.cache)
+ cache = ZEO.cache.ClientCache(None, 3395)
# Trivial test of eviction code. Doesn't test non-current
# eviction.
data = ["z" * i for i in range(100)]
for i in range(50):
n = p64(i)
- self.cache.store(n, "", n, None, data[i])
- self.assertEquals(len(self.cache), i + 1)
+ cache.store(n, "", n, None, data[i])
+ self.assertEquals(len(cache), i + 1)
# The cache now uses 1225 bytes. The next insert
# should delete some objects.
n = p64(50)
- self.cache.store(n, "", n, None, data[51])
- self.assert_(len(self.cache) < 51)
+ cache.store(n, "", n, None, data[51])
+ self.assert_(len(cache) < 51)
# TODO: Need to make sure eviction of non-current data
# and of version data are handled correctly.
@@ -138,9 +137,9 @@
# Copy data from self.cache into path, reaching into the cache
# guts to make the copy.
dst = open(path, "wb+")
- src = self.cache.fc.f
+ src = self.cache.f
src.seek(0)
- dst.write(src.read(self.cache.fc.maxsize))
+ dst.write(src.read(self.cache.maxsize))
dst.close()
copy = ZEO.cache.ClientCache(path)
copy.open()
More information about the Zodb-checkins
mailing list