[Zodb-checkins] SVN: ZODB/branches/3.8/ Major refactoring of cache
code to reduce memory consumption, which
Jim Fulton
jim at zope.com
Tue May 13 13:26:09 EDT 2008
Log message for revision 86709:
Major refactoring of cache code to reduce memory consumption, which
was astronomical. Also greatly simplified the code.
Changed:
U ZODB/branches/3.8/NEWS.txt
U ZODB/branches/3.8/src/ZEO/cache.py
U ZODB/branches/3.8/src/ZEO/tests/test_cache.py
-=-
Modified: ZODB/branches/3.8/NEWS.txt
===================================================================
--- ZODB/branches/3.8/NEWS.txt 2008-05-13 16:57:56 UTC (rev 86708)
+++ ZODB/branches/3.8/NEWS.txt 2008-05-13 17:26:08 UTC (rev 86709)
@@ -1,3 +1,5 @@
+
+
Whats new in ZODB 3.8.1
=======================
@@ -17,6 +19,9 @@
- A bug in the cache-opening logic led to cache failure in the
unlikely event that a cache has no free blocks.
+- The cache used an excessive amount of memory, causing applications
+ with large caches to exhaust available memory.
+
- When using ZEO Client Storages, Errors occured when trying to store
objects too big to fit in the ZEO cache file.
Modified: ZODB/branches/3.8/src/ZEO/cache.py
===================================================================
--- ZODB/branches/3.8/src/ZEO/cache.py 2008-05-13 16:57:56 UTC (rev 86708)
+++ ZODB/branches/3.8/src/ZEO/cache.py 2008-05-13 17:26:08 UTC (rev 86709)
@@ -22,51 +22,54 @@
FileCache.
"""
+from struct import pack, unpack
+
import bisect
+import BTrees.LLBTree
+import BTrees.LOBTree
import logging
import os
-import struct
import tempfile
import time
+import ZODB.fsIndex
import ZODB.lock_file
-from ZODB.utils import z64, u64
+from ZODB.utils import p64, u64, z64
logger = logging.getLogger("ZEO.cache")
-##
# A disk-based cache for ZEO clients.
-# <p>
+#
# This class provides an interface to a persistent, disk-based cache
# used by ZEO clients to store copies of database records from the
# server.
-# <p>
+#
# The details of the constructor as unspecified at this point.
-# <p>
+#
# Each entry in the cache is valid for a particular range of transaction
# ids. The lower bound is the transaction that wrote the data. The
# upper bound is the next transaction that wrote a revision of the
# object. If the data is current, the upper bound is stored as None;
# the data is considered current until an invalidate() call is made.
-# <p>
+#
# It is an error to call store() twice with the same object without an
# intervening invalidate() to set the upper bound on the first cache
-# entry. <em>Perhaps it will be necessary to have a call the removes
+# entry. Perhaps it will be necessary to have a call the removes
# something from the cache outright, without keeping a non-current
-# entry.</em>
-# <h3>Cache verification</h3>
-# <p>
+# entry.
+
+# Cache verification
+#
# When the client is connected to the server, it receives
# invalidations every time an object is modified. When the client is
# disconnected then reconnects, it must perform cache verification to make
# sure its cached data is synchronized with the storage's current state.
-# <p>
+#
# quick verification
# full verification
-# <p>
+#
-##
# FileCache stores a cache in a single on-disk file.
#
# On-disk cache structure.
@@ -100,12 +103,14 @@
#
# 1 byte allocation status ('a').
# 4 bytes block size, >I format.
-# 16 bytes oid + tid, string.
-# size-OBJECT_HEADER_SIZE bytes, the serialization of an Object (see
-# class Object for details).
+# 8 byte oid
+# 8 byte start_tid
+# 8 byte end_tid
+# 2 byte version length
+# 4 byte data size
+# version
+# data
-OBJECT_HEADER_SIZE = 1 + 4 + 16
-
# The cache's currentofs goes around the file, circularly, forever.
# It's always the starting offset of some block.
#
@@ -120,11 +125,6 @@
class ClientCache(object):
"""A simple in-memory cache."""
- ##
- # Do we put the constructor here?
- # @param path path of persistent snapshot of cache state (a file path)
- # @param size size of cache file, in bytes
-
# The default size of 200MB makes a lot more sense than the traditional
# default of 20MB. The default here is misleading, though, since
# ClientStorage is the only user of ClientCache, and it always passes an
@@ -140,57 +140,22 @@
# to change the cache size in that case
self.maxsize = size
- # The cache stores objects in a dict mapping (oid, tid) pairs
- # to Object() records (see below). The tid is the transaction
- # id that wrote the object. An object record includes data,
- # serialno, and end tid. It has auxillary data structures to
- # compute the appropriate tid, given the oid and a transaction id
- # representing an arbitrary point in history.
- #
- # The serialized form of the cache just stores the Object()
- # records. The in-memory form can be reconstructed from these
- # records.
+ # The number of records in the cache.
+ self._len = 0
- # Maps oid to current tid. Used to compute key for objects.
- self.current = {}
+ # {oid -> pos}
+ self.current = ZODB.fsIndex.fsIndex()
- # Maps oid to list of (start_tid, end_tid) pairs in sorted order.
- # Used to find matching key for load of non-current data.
- self.noncurrent = {}
+ # {oid -> {tid->pos}}
+ # Note that caches in the wild seem to have very little non-current
+ # data, so this would seem to have little impact on memory consumption.
+ # I wonder if we even need to store non-current data in the cache.
+ self.noncurrent = BTrees.LOBTree.LOBTree()
- # Map oid to (version, tid) pair. If there is no entry, the object
- # is not modified in a version.
- self.version = {}
-
# tid for the most recent transaction we know about. This is also
# stored near the start of the file.
self.tid = None
- # There's one Entry instance, kept in memory, for each currently
- # allocated block in the file, and there's one allocated block in the
- # file per serialized Object. filemap retrieves the Entry given the
- # starting offset of a block, and key2entry retrieves the Entry given
- # an object revision's key (an (oid, start_tid) pair). From an
- # Entry, we can get the Object's key and file offset.
-
- # Map offset in file to pair (data record size, Entry).
- # Entry is None iff the block starting at offset is free.
- # filemap always contains a complete account of what's in the
- # file -- study method _verify_filemap for executable checking
- # of the relevant invariants. An offset is at the start of a
- # block iff it's a key in filemap. The data record size is
- # stored in the file too, so we could just seek to the offset
- # and read it up; keeping it in memory is an optimization.
- self.filemap = {}
-
- # Map key to Entry. After
- # obj = key2entry[key]
- # then
- # obj.key == key
- # is true. An object is currently stored on disk iff its key is in
- # key2entry.
- self.key2entry = {}
-
# Always the offset into the file of the start of a block.
# New and relocated objects are always written starting at
# currentofs.
@@ -224,11 +189,9 @@
self.f.write(magic)
self.f.write(z64)
# and one free block.
- self.f.write('f' + struct.pack(">I", self.maxsize -
+ self.f.write('f' + pack(">I", self.maxsize -
ZEC3_HEADER_SIZE))
sync(self.f)
- self.filemap[ZEC3_HEADER_SIZE] = (self.maxsize - ZEC3_HEADER_SIZE,
- None)
# Statistics: _n_adds, _n_added_bytes,
# _n_evicts, _n_evicted_bytes,
@@ -237,12 +200,17 @@
self._setup_trace(path)
+ # Backward compatibility. Client code used to have to use the fc
+ # attr to get to the file cache to get cache stats.
+ @property
+ def fc(self):
+ return self
##
# Scan the current contents of the cache file, calling `install`
# for each object found in the cache. This method should only
# be called once to initialize the cache from disk.
- def scan(self, install):
+ def open(self):
if self.f is not None: # we're not (re)using a pre-existing file
return
fsize = os.path.getsize(self.path)
@@ -252,10 +220,12 @@
fsize, self.maxsize)
self.maxsize = fsize
self.f = open(self.path, 'rb+')
- _magic = self.f.read(4)
+ read = self.f.read
+ seek = self.f.seek
+ _magic = read(4)
if _magic != magic:
raise ValueError("unexpected magic number: %r" % _magic)
- self.tid = self.f.read(8)
+ self.tid = read(8)
if len(self.tid) != 8:
raise ValueError("cache file too small -- no tid at start")
@@ -263,39 +233,49 @@
# file, and tell our parent about it too (via the `install` callback).
# Remember the location of the largest free block. That seems a
# decent place to start currentofs.
- max_free_size = 0
+ max_free_size = l = 0
ofs = max_free_offset = ZEC3_HEADER_SIZE
+ current = self.current
while ofs < fsize:
- self.f.seek(ofs)
- ent = None
- status = self.f.read(1)
+ seek(ofs)
+ status = read(1)
if status == 'a':
- size, rawkey = struct.unpack(">I16s", self.f.read(20))
- key = rawkey[:8], rawkey[8:]
- assert key not in self.key2entry
- self.key2entry[key] = ent = Entry(key, ofs)
- install(self.f, ent)
+ size, oid, start_tid, end_tid = unpack(">I8s8s8s", read(28))
+
+ if end_tid == z64:
+ current[oid] = ofs
+ else:
+ assert start_tid < end_tid
+ self._set_noncurrent(oid, start_tid, ofs)
+ l += 1
elif status == 'f':
- size, = struct.unpack(">I", self.f.read(4))
+ size, = unpack(">I", read(4))
elif status in '1234':
size = int(status)
else:
raise ValueError("unknown status byte value %s in client "
"cache file" % 0, hex(ord(status)))
-
- self.filemap[ofs] = size, ent
- if ent is None and size > max_free_size:
- max_free_size, max_free_offset = size, ofs
-
ofs += size
if ofs != fsize:
raise ValueError("final offset %s != file size %s in client "
"cache file" % (ofs, fsize))
- if __debug__:
- self._verify_filemap()
self.currentofs = max_free_offset
+ self._len = l
+ def _set_noncurrent(self, oid, tid, ofs):
+ noncurrent_for_oid = self.noncurrent.get(u64(oid))
+ if noncurrent_for_oid is None:
+ noncurrent_for_oid = BTrees.LLBTree.LLBTree()
+ self.noncurrent[u64(oid)] = noncurrent_for_oid
+ noncurrent_for_oid[u64(tid)] = ofs
+
+ def _del_noncurrent(self, oid, tid):
+ noncurrent_for_oid = self.noncurrent[u64(oid)]
+ del noncurrent_for_oid[u64(tid)]
+ if not noncurrent_for_oid:
+ del self.noncurrent[u64(oid)]
+
def clearStats(self):
self._n_adds = self._n_added_bytes = 0
self._n_evicts = self._n_evicted_bytes = 0
@@ -310,19 +290,9 @@
##
# The number of objects currently in the cache.
def __len__(self):
- return len(self.key2entry)
+ return self._len
##
- # Iterate over the objects in the cache, producing an Entry for each.
- def __iter__(self):
- return self.key2entry.itervalues()
-
- ##
- # Test whether an (oid, tid) pair is in the cache.
- def __contains__(self, key):
- return key in self.key2entry
-
- ##
# Close the underlying file. No methods accessing the cache should be
# used after this.
def close(self):
@@ -349,160 +319,31 @@
if self.currentofs + nbytes > self.maxsize:
self.currentofs = ZEC3_HEADER_SIZE
ofs = self.currentofs
+ seek = self.f.seek
+ read = self.f.read
+ current = self.current
while nbytes > 0:
- size, e = self.filemap.pop(ofs)
- if e is not None:
- del self.key2entry[e.key]
- self._evictobj(e, size)
+ seek(ofs)
+ status = read(1)
+ if status == 'a':
+ size, oid, start_tid, end_tid = unpack(">I8s8s8s", read(28))
+ self._n_evicts += 1
+ self._n_evicted_bytes += size
+ if end_tid == z64:
+ del current[oid]
+ else:
+ self._del_noncurrent(oid, start_tid)
+ self._len -= 1
+ else:
+ if status == 'f':
+ size = unpack(">I", read(4))[0]
+ else:
+ size = int(status)
ofs += size
nbytes -= size
return ofs - self.currentofs
##
- # Write Object obj, with data, to file starting at currentofs.
- # nfreebytes are already available for overwriting, and it's
- # guranteed that's enough. obj.offset is changed to reflect the
- # new data record position, and filemap and key2entry are updated to
- # match.
- def _writeobj(self, obj, nfreebytes):
- size = OBJECT_HEADER_SIZE + obj.size
- assert size <= nfreebytes
- excess = nfreebytes - size
- # If there's any excess (which is likely), we need to record a
- # free block following the end of the data record. That isn't
- # expensive -- it's all a contiguous write.
- if excess == 0:
- extra = ''
- elif excess < 5:
- extra = "01234"[excess]
- else:
- extra = 'f' + struct.pack(">I", excess)
-
- self.f.seek(self.currentofs)
-
- # Before writing data, we'll write a free block for the space freed.
- # We'll come back with a last atomic write to rewrite the start of the
- # allocated-block header.
- self.f.write('f'+struct.pack(">I", nfreebytes))
-
- # Now write the rest of the allocation block header and object data.
- self.f.write(struct.pack(">8s8s", obj.key[0], obj.key[1]))
- obj.serialize(self.f)
- self.f.write(extra)
-
- # Now, we'll go back and rewrite the beginning of the
- # allocated block header.
- self.f.seek(self.currentofs)
- self.f.write('a'+struct.pack(">I", size))
-
- # Update index
- e = Entry(obj.key, self.currentofs)
- self.key2entry[obj.key] = e
- self.filemap[self.currentofs] = size, e
- self.currentofs += size
- if excess:
- # We need to record the free block in filemap, but there's
- # no need to advance currentofs beyond it. Instead it
- # gives some breathing room for the next object to get
- # written.
- self.filemap[self.currentofs] = excess, None
-
- ##
- # Add Object object to the cache. This may evict existing objects, to
- # make room (and almost certainly will, in steady state once the cache
- # is first full). The object must not already be in the cache. If the
- # object is too large for the cache, False is returned, otherwise True.
- def add(self, object):
- size = OBJECT_HEADER_SIZE + object.size
- # A number of cache simulation experiments all concluded that the
- # 2nd-level ZEO cache got a much higher hit rate if "very large"
- # objects simply weren't cached. For now, we ignore the request
- # only if the entire cache file is too small to hold the object.
- if size > self.maxsize - ZEC3_HEADER_SIZE:
- return False
-
- assert object.key not in self.key2entry
- assert len(object.key[0]) == 8
- assert len(object.key[1]) == 8
-
- self._n_adds += 1
- self._n_added_bytes += size
-
- available = self._makeroom(size)
- self._writeobj(object, available)
- return True
-
- ##
- # Evict the object represented by Entry `e` from the cache, freeing
- # `size` bytes in the file for reuse. `size` is used only for summary
- # statistics. This does not alter the file, or self.filemap or
- # self.key2entry (those are the caller's responsibilities). It does
- # invoke _evicted(Object) on our parent.
- def _evictobj(self, e, size):
- self._n_evicts += 1
- self._n_evicted_bytes += size
- # Load the object header into memory so we know how to
- # update the parent's in-memory data structures.
- self.f.seek(e.offset + OBJECT_HEADER_SIZE)
- o = Object.fromFile(self.f, e.key, skip_data=True)
- self._evicted(o)
-
- ##
- # Return Object for key, or None if not in cache.
- def access(self, key):
- self._n_accesses += 1
- e = self.key2entry.get(key)
- if e is None:
- return None
- offset = e.offset
- size, e2 = self.filemap[offset]
- assert e is e2
-
- self.f.seek(offset + OBJECT_HEADER_SIZE)
- return Object.fromFile(self.f, key)
-
- ##
- # Remove Object for key from cache, if present.
- def remove(self, key):
- # If an object is being explicitly removed, we need to load
- # its header into memory and write a free block marker to the
- # disk where the object was stored. We need to load the
- # header to update the in-memory data structures held by
- # ClientCache.
-
- # We could instead just keep the header in memory at all times.
-
- e = self.key2entry.pop(key, None)
- if e is None:
- return
- offset = e.offset
- size, e2 = self.filemap[offset]
- assert e is e2
- self.filemap[offset] = size, None
- self.f.seek(offset + OBJECT_HEADER_SIZE)
- o = Object.fromFile(self.f, key, skip_data=True)
- assert size >= 5 # only free blocks are tiny
- # Because `size` >= 5, we can change an allocated block to a free
- # block just by overwriting the 'a' status byte with 'f' -- the
- # size field stays the same.
- self.f.seek(offset)
- self.f.write('f')
- self.f.flush()
- self._evicted(o)
-
- ##
- # Update on-disk representation of Object obj.
- #
- # This method should be called when the object header is modified.
- # obj must be in the cache. The only real use for this is during
- # invalidation, to set the end_tid field on a revision that was current
- # (and so had an end_tid of None, but no longer does).
- def update(self, obj):
- e = self.key2entry[obj.key]
- self.f.seek(e.offset + OBJECT_HEADER_SIZE)
- obj.serialize_header(self.f)
-
- ##
# Update our idea of the most recent tid. This is stored in the
# instance, and also written out near the start of the cache file. The
# new tid must be strictly greater than our current idea of the most
@@ -518,101 +359,7 @@
self.f.write(tid)
self.f.flush()
-
##
- # This debug method marches over the entire cache file, verifying that
- # the current contents match the info in self.filemap and self.key2entry.
- def _verify_filemap(self, display=False):
- a = ZEC3_HEADER_SIZE
- f = self.f
- while a < self.maxsize:
- f.seek(a)
- status = f.read(1)
- if status in 'af':
- size, = struct.unpack(">I", f.read(4))
- else:
- size = int(status)
- if display:
- if a == self.currentofs:
- print '*****',
- print "%c%d" % (status, size),
- size2, obj = self.filemap[a]
- assert size == size2
- assert (obj is not None) == (status == 'a')
- if obj is not None:
- assert obj.offset == a
- assert self.key2entry[obj.key] is obj
- a += size
- if display:
- print
- assert a == self.maxsize
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- def open(self):
- self.scan(self.install)
-
- ##
- # Callback for FileCache.scan(), when a pre-existing file cache is
- # used. For each object in the file, `install()` is invoked. `f`
- # is the file object, positioned at the start of the serialized Object.
- # `ent` is an Entry giving the object's key ((oid, start_tid) pair).
- def install(self, f, ent):
- # Called by cache storage layer to insert object.
- o = Object.fromFile(f, ent.key, skip_data=True)
- if o is None:
- return
- oid = o.key[0]
- if o.version:
- self.version[oid] = o.version, o.start_tid
- elif o.end_tid is None:
- self.current[oid] = o.start_tid
- else:
- assert o.start_tid < o.end_tid
- this_span = o.start_tid, o.end_tid
- span_list = self.noncurrent.get(oid)
- if span_list:
- bisect.insort_left(span_list, this_span)
- else:
- self.noncurrent[oid] = [this_span]
-
- ##
- # Set the last transaction seen by the cache.
- # @param tid a transaction id
- # @exception ValueError attempt to set a new tid less than the current tid
-
- ##
# Return the last transaction seen by the cache.
# @return a transaction id
# @defreturn string, or None if no transaction is yet known
@@ -631,29 +378,30 @@
# @defreturn 3-tuple: (string, string, string)
def load(self, oid, version=""):
- tid = None
- if version:
- p = self.version.get(oid)
- if p is None:
- self._trace(0x20, oid, version)
- return None
- elif p[0] == version:
- tid = p[1]
- # Otherwise, we know the cache has version data but not
- # for the requested version. Thus, we know it is safe
- # to return the non-version data from the cache.
- if tid is None:
- tid = self.current.get(oid)
- if tid is None:
+ ofs = self.current.get(oid)
+ if ofs is None:
self._trace(0x20, oid, version)
return None
- o = self.access((oid, tid))
- if o is None:
- self._trace(0x20, oid, version)
- return None
- self._trace(0x22, oid, version, o.start_tid, o.end_tid, len(o.data))
- return o.data, tid, o.version
+ self.f.seek(ofs)
+ read = self.f.read
+ assert read(1) == 'a'
+ size, saved_oid, tid, end_tid, lver, ldata = unpack(
+ ">I8s8s8shI", read(34))
+ assert saved_oid == oid
+ if lver or version:
+ if lver != len(version) or read(lver) != version:
+ self._trace(0x20, oid, version)
+ return None
+
+
+ data = read(ldata)
+ assert len(data) == ldata
+ assert read(8) == oid
+ self._n_accesses += 1
+ self._trace(0x22, oid, version, tid, end_tid, ldata)
+ return data, tid, version
+
##
# Return a non-current revision of oid that was current before tid.
# @param oid object id
@@ -661,31 +409,39 @@
# @return data record, serial number, start tid, and end tid
# @defreturn 4-tuple: (string, string, string, string)
- def loadBefore(self, oid, tid):
- L = self.noncurrent.get(oid)
- if L is None:
- self._trace(0x24, oid, "", tid)
+ def loadBefore(self, oid, before_tid):
+ noncurrent_for_oid = self.noncurrent.get(u64(oid))
+ if noncurrent_for_oid is None:
+ self._trace(0x24, oid, "", before_tid)
return None
- # A pair with None as the second element is less than any pair with
- # the same first tid. Dubious: this relies on that None is less
- # than any comparable non-None object in recent Pythons.
- i = bisect.bisect_left(L, (tid, None))
- # Now L[i-1] < (tid, None) < L[i], and the start_tid for everything in
- # L[:i] is < tid, and the start_tid for everything in L[i:] is >= tid.
- # Therefore the largest start_tid < tid must be at L[i-1]. If i is 0,
- # there is no start_tid < tid: we don't have any data old enougn.
- if i == 0:
- self._trace(0x24, oid, "", tid)
- return
- lo, hi = L[i-1]
- assert lo < tid
- if tid > hi: # we don't have any data in the right range
- self._trace(0x24, oid, "", tid)
+
+ items = noncurrent_for_oid.items(None, u64(before_tid)-1)
+ if not items:
+ self._trace(0x24, oid, "", before_tid)
return None
- o = self.access((oid, lo))
- self._trace(0x26, oid, "", tid)
- return o.data, o.start_tid, o.end_tid
+ tid, ofs = items[-1]
+ self.f.seek(ofs)
+ read = self.f.read
+ assert read(1) == 'a'
+ size, saved_oid, saved_tid, end_tid, lver, ldata = unpack(
+ ">I8s8s8shI", read(34))
+ assert saved_oid == oid
+ assert saved_tid == p64(tid)
+ assert lver == 0
+ assert end_tid != z64
+ data = read(ldata)
+ assert len(data) == ldata
+ assert read(8) == oid
+
+ if end_tid < before_tid:
+ self._trace(0x24, oid, "", before_tid)
+ return None
+
+ self._n_accesses += 1
+ self._trace(0x26, oid, "", saved_tid)
+ return data, saved_tid, end_tid
+
##
# Return the version an object is modified in, or None for an
# object that is not modified in a version.
@@ -693,12 +449,20 @@
# @return name of version in which the object is modified
# @defreturn string or None
+ # XXX This approac is wrong, but who cares
def modifiedInVersion(self, oid):
- p = self.version.get(oid)
- if p is None:
+ ofs = self.current.get(oid)
+ if ofs is None:
return None
- version, tid = p
- return version
+ self.f.seek(ofs)
+ read = self.f.read
+ assert self.f.read(1) == 'a'
+ size, saved_oid, saved_tid, end_tid, lver, ldata = unpack(
+ ">I8s8s8shI", read(34))
+ assert saved_oid == oid
+ if lver:
+ return read(lver)
+ return None
##
# Store a new data record in the cache.
@@ -720,64 +484,91 @@
# the requested version, doesn't find it, then asks the server
# for that data. The server returns the non-version data,
# which may already be in the cache.
- if (oid, start_tid) in self:
+ seek = self.f.seek
+ if end_tid is None:
+ ofs = self.current.get(oid)
+ if ofs:
+ seek(ofs)
+ read = self.f.read
+ assert read(1) == 'a'
+ size, saved_oid, saved_tid, end_tid = unpack(
+ ">I8s8s8s", read(28))
+ assert saved_oid == oid
+ assert end_tid == z64
+ if saved_tid == start_tid:
+ return
+ raise ValueError("already have current data for oid")
+ else:
+ noncurrent_for_oid = self.noncurrent.get(u64(oid))
+ if noncurrent_for_oid and (u64(start_tid) in noncurrent_for_oid):
+ return
+
+ if version:
+ raise ValueError("cache only stores current version data")
+
+ size = 43 + len(version) + len(data)
+
+ # A number of cache simulation experiments all concluded that the
+ # 2nd-level ZEO cache got a much higher hit rate if "very large"
+ # objects simply weren't cached. For now, we ignore the request
+ # only if the entire cache file is too small to hold the object.
+ if size > self.maxsize - ZEC3_HEADER_SIZE:
return
- o = Object((oid, start_tid), version, data, start_tid, end_tid)
+
+ self._n_adds += 1
+ self._n_added_bytes += size
+ self._len += 1
+
+ nfreebytes = self._makeroom(size)
+ assert size <= nfreebytes
+ excess = nfreebytes - size
+ # If there's any excess (which is likely), we need to record a
+ # free block following the end of the data record. That isn't
+ # expensive -- it's all a contiguous write.
+ if excess == 0:
+ extra = ''
+ elif excess < 5:
+ extra = "01234"[excess]
+ else:
+ extra = 'f' + pack(">I", excess)
+
+ ofs = self.currentofs
+ seek(ofs)
+ write = self.f.write
+
+ # Before writing data, we'll write a free block for the space freed.
+ # We'll come back with a last atomic write to rewrite the start of the
+ # allocated-block header.
+ write('f'+pack(">I", nfreebytes))
+
+ # Now write the rest of the allocation block header and object data.
+ write(pack(">8s8s8shi",
+ oid, start_tid, end_tid or z64, len(version), len(data),
+ ))
if version:
- if end_tid is not None:
- raise ValueError("cache only stores current version data")
- if oid in self.version:
- if self.version[oid] != (version, start_tid):
- raise ValueError("data already exists for version %r"
- % self.version[oid][0])
- if not self.add(o):
- return # too large
- self.version[oid] = version, start_tid
- self._trace(0x50, oid, version, start_tid, dlen=len(data))
+ write(version)
+ write(data)
+ write(oid)
+ write(extra)
+
+ # Now, we'll go back and rewrite the beginning of the
+ # allocated block header.
+ seek(ofs)
+ write('a'+pack(">I", size))
+
+ if end_tid:
+ self._set_noncurrent(oid, start_tid, ofs)
+ self._trace(0x54, oid, version, start_tid, end_tid, dlen=len(data))
else:
- if end_tid is None:
- _cur_start = self.current.get(oid)
- if _cur_start:
- if _cur_start != start_tid:
- raise ValueError(
- "already have current data for oid")
- else:
- return
- if not self.add(o):
- return # too large
- self.current[oid] = start_tid
- self._trace(0x52, oid, version, start_tid, dlen=len(data))
+ self.current[oid] = ofs
+ if version:
+ self._trace(0x50, oid, version, start_tid, dlen=len(data))
else:
- L = self.noncurrent.setdefault(oid, [])
- p = start_tid, end_tid
- if p in L:
- return # duplicate store
- if not self.add(o):
- return # too large
- bisect.insort_left(L, p)
- self._trace(0x54, oid, version, start_tid, end_tid,
- dlen=len(data))
+ self._trace(0x52, oid, version, start_tid, dlen=len(data))
+
+ self.currentofs += size
##
- # Remove all knowledge of noncurrent revisions of oid, both in
- # self.noncurrent and in our FileCache. `version` and `tid` are used
- # only for trace records.
- def _remove_noncurrent_revisions(self, oid, version, tid):
- noncurrent_list = self.noncurrent.get(oid)
- if noncurrent_list:
- # Note: must iterate over a copy of noncurrent_list. The
- # FileCache remove() calls our _evicted() method, and that
- # mutates the list.
- for old_tid, dummy in noncurrent_list[:]:
- # 0x1E = invalidate (hit, discarding current or non-current)
- self._trace(0x1E, oid, version, tid)
- self.remove((oid, old_tid))
- # fc.remove() calling back to _evicted() should have removed
- # the list from noncurrent when the last non-current revision
- # was removed.
- assert oid not in self.noncurrent
-
- ##
# If `tid` is None, or we have data for `oid` in a (non-empty) version,
# forget all knowledge of `oid`. (`tid` can be None only for
# invalidations generated by startup cache verification.) If `tid`
@@ -794,76 +585,56 @@
if tid > self.tid and tid is not None:
self.setLastTid(tid)
- remove_all_knowledge_of_oid = tid is None
-
- if oid in self.version:
- # Forget we know about the version data.
- # 0x1A = invalidate (hit, version)
- self._trace(0x1A, oid, version, tid)
- dllversion, dlltid = self.version[oid]
- assert not version or version == dllversion, (version, dllversion)
- self.remove((oid, dlltid))
- assert oid not in self.version # .remove() got rid of it
- # And continue: we must also remove any non-version data from
- # the cache. Or, at least, I have such a poor understanding of
- # versions that anything less drastic would probably be wrong.
- remove_all_knowledge_of_oid = True
-
- if remove_all_knowledge_of_oid:
- self._remove_noncurrent_revisions(oid, version, tid)
-
- # Only current, non-version data remains to be handled.
-
- cur_tid = self.current.get(oid)
- if not cur_tid:
+ ofs = self.current.get(oid)
+ if ofs is None:
# 0x10 == invalidate (miss)
self._trace(0x10, oid, version, tid)
return
- # We had current data for oid, but no longer.
+ self.f.seek(ofs)
+ read = self.f.read
+ assert read(1) == 'a'
+ size, saved_oid, saved_tid, end_tid, lver = unpack(
+ ">I8s8s8sh", read(30))
+ assert saved_oid == oid
+ assert end_tid == z64
+ del self.current[oid]
+ if tid is None or lver:
+ self.f.seek(ofs)
+ self.f.write('f'+pack(">I", size))
+ if lver:
+ # 0x1A = invalidate (hit, version)
+ self._trace(0x1A, oid, version, tid)
+ else:
+ # 0x1E = invalidate (hit, discarding current or non-current)
+ self._trace(0x1E, oid, version, tid)
+ self._len -= 1
+ else:
+ self.f.seek(ofs+21)
+ self.f.write(tid)
+ self._set_noncurrent(oid, saved_tid, ofs)
+ # 0x1C = invalidate (hit, saving non-current)
+ self._trace(0x1C, oid, version, tid)
- if remove_all_knowledge_of_oid:
- # 0x1E = invalidate (hit, discarding current or non-current)
- self._trace(0x1E, oid, version, tid)
- self.remove((oid, cur_tid))
- assert cur_tid not in self.current # .remove() got rid of it
- return
-
- # Add the data we have to the list of non-current data for oid.
- assert tid is not None and cur_tid <= tid
- # 0x1C = invalidate (hit, saving non-current)
- self._trace(0x1C, oid, version, tid)
- del self.current[oid] # because we no longer have current data
-
- # Update the end_tid half of oid's validity range on disk.
- # TODO: Want to fetch object without marking it as accessed.
- o = self.access((oid, cur_tid))
- assert o is not None
- assert o.end_tid is None # i.e., o was current
- if o is None:
- # TODO: Since we asserted o is not None above, this block
- # should be removed; waiting on time to prove it can't happen.
- return
- o.end_tid = tid
- self.update(o) # record the new end_tid on disk
- # Add to oid's list of non-current data.
- L = self.noncurrent.setdefault(oid, [])
- bisect.insort_left(L, (cur_tid, tid))
-
-
##
# Generates (oid, serial, version) triples for all objects in the
# cache. This generator is used by cache verification.
def contents(self):
# May need to materialize list instead of iterating;
# depends on whether the caller may change the cache.
- for o in self:
- oid, tid = o.key
- if oid in self.version:
- obj = self.access(o.key)
- yield oid, tid, obj.version
+ seek = self.f.seek
+ read = self.f.read
+ for oid, ofs in self.current.iteritems():
+ seek(ofs)
+ assert read(1) == 'a'
+ size, saved_oid, tid, end_tid, lver = unpack(">I8s8s8sh", read(30))
+ assert saved_oid == oid
+ assert end_tid == z64
+ if lver:
+ version = read(lver)
else:
- yield oid, tid, ""
+ version = ''
+ yield oid, tid, version
def dump(self):
from ZODB.utils import oid_repr
@@ -880,29 +651,6 @@
print oid_repr(x.key[0]), oid_repr(x.key[1]), oid_repr(end_tid)
print
- def _evicted(self, o):
- # Called by the FileCache to signal that Object o has been evicted.
- oid, tid = o.key
- if o.end_tid is None:
- if o.version:
- del self.version[oid]
- else:
- del self.current[oid]
- else:
- # Although we use bisect to keep the list sorted,
- # we never expect the list to be very long. So the
- # brute force approach should normally be fine.
- L = self.noncurrent[oid]
- element = (o.start_tid, o.end_tid)
- if len(L) == 1:
- # We don't want to leave an empty list in the dict: if
- # the oid is never referenced again, it would consume RAM
- # forever more for no purpose.
- assert L[0] == element
- del self.noncurrent[oid]
- else:
- L.remove(element)
-
# If `path` isn't None (== we're using a persistent cache file), and
# envar ZEO_CACHE_TRACE is set to a non-empty value, try to open
# path+'.trace' as a trace file, and store the file object in
@@ -910,206 +658,46 @@
# tracing by setting self._trace to a dummy function, and set
# self._tracefile to None.
def _setup_trace(self, path):
- self._tracefile = None
+ _tracefile = None
if path and os.environ.get("ZEO_CACHE_TRACE"):
tfn = path + ".trace"
try:
- self._tracefile = open(tfn, "ab")
- self._trace(0x00)
+ _tracefile = open(tfn, "ab")
except IOError, msg:
- self._tracefile = None
logger.warning("cannot write tracefile %r (%s)", tfn, msg)
else:
logger.info("opened tracefile %r", tfn)
- if self._tracefile is None:
- def notrace(*args, **kws):
- pass
- self._trace = notrace
+ if _tracefile is None:
+ self._trace = lambda *a, **k: None
+ return
- def _trace(self,
- code, oid="", version="", tid=z64, end_tid=z64, dlen=0,
- # The next two are just speed hacks.
- time_time=time.time, struct_pack=struct.pack):
- # The code argument is two hex digits; bits 0 and 7 must be zero.
- # The first hex digit shows the operation, the second the outcome.
- # This method has been carefully tuned to be as fast as possible.
- # Note: when tracing is disabled, this method is hidden by a dummy.
- if version:
- code |= 0x80
- encoded = (dlen + 255) & 0x7fffff00 | code
- if tid is None:
- tid = z64
- if end_tid is None:
- end_tid = z64
- try:
- self._tracefile.write(
- struct_pack(">iiH8s8s",
- time_time(),
- encoded,
- len(oid),
- tid, end_tid) + oid)
- except:
- print `tid`, `end_tid`
- raise
+ now = time.time
+ def _trace(code, oid="", version="", tid=z64, end_tid=z64, dlen=0):
+ # The code argument is two hex digits; bits 0 and 7 must be zero.
+ # The first hex digit shows the operation, the second the outcome.
+ # This method has been carefully tuned to be as fast as possible.
+ # Note: when tracing is disabled, this method is hidden by a dummy.
+ if version:
+ code |= 0x80
+ encoded = (dlen + 255) & 0x7fffff00 | code
+ if tid is None:
+ tid = z64
+ if end_tid is None:
+ end_tid = z64
+ try:
+ _tracefile.write(
+ pack(">iiH8s8s",
+ now(), encoded, len(oid), tid, end_tid) + oid,
+ )
+ except:
+ print `tid`, `end_tid`
+ raise
+
+ self._trace = _trace
+ _trace(0x00)
-##
-# An Object stores the cached data for a single object.
-# <p>
-# The cached data includes the actual object data, the key, and three
-# data fields that describe the validity period and version of the
-# object. The key contains the oid and a redundant start_tid. The
-# actual size of an object is variable, depending on the size of the
-# data and whether it is in a version.
-# <p>
-# The serialized format does not include the key, because it is stored
-# in the header used by the cache file's storage format.
-# <p>
-# Instances of Object are generally short-lived -- they're really a way to
-# package data on the way to or from the disk file.
-class Object(object):
- __slots__ = (# pair (object id, txn id) -- something usable as a dict key;
- # the second part of the pair is equal to start_tid
- "key",
-
- # string, tid of txn that wrote the data
- "start_tid",
-
- # string, tid of txn that wrote next revision, or None
- # if the data is current; if not None, end_tid is strictly
- # greater than start_tid
- "end_tid",
-
- # string, name of version
- "version",
-
- # string, the actual data record for the object
- "data",
-
- # total size of serialized object; this includes the
- # data, version, and all overhead (header) bytes.
- "size",
- )
-
- # A serialized Object on disk looks like:
- #
- # offset # bytes value
- # ------ ------- -----
- # 0 8 end_tid; string
- # 8 2 len(version); 2-byte signed int
- # 10 4 len(data); 4-byte signed int
- # 14 len(version) version; string
- # 14+len(version) len(data) the object pickle; string
- # 14+len(version)+
- # len(data) 8 oid; string
-
- # The serialization format uses an end tid of "\0"*8 (z64), the least
- # 8-byte string, to represent None. It isn't possible for an end_tid
- # to be 0, because it must always be strictly greater than the start_tid.
-
- fmt = ">8shi" # end_tid, len(self.version), len(self.data)
- FIXED_HEADER_SIZE = struct.calcsize(fmt)
- assert FIXED_HEADER_SIZE == 14
- TOTAL_FIXED_SIZE = FIXED_HEADER_SIZE + 8 # +8 for the oid at the end
-
- def __init__(self, key, version, data, start_tid, end_tid):
- self.key = key
- self.version = version
- self.data = data
- self.start_tid = start_tid
- self.end_tid = end_tid
- # The size of the serialized object on disk, including the
- # 14-byte header, the lengths of data and version, and a
- # copy of the 8-byte oid.
- if data is not None:
- self.size = self.TOTAL_FIXED_SIZE + len(data) + len(version)
-
- ##
- # Return the fixed-sized serialization header as a string: pack end_tid,
- # and the lengths of the .version and .data members.
- def get_header(self):
- return struct.pack(self.fmt,
- self.end_tid or z64,
- len(self.version),
- len(self.data))
-
- ##
- # Write the serialized representation of self to file f, at its current
- # position.
- def serialize(self, f):
- f.writelines([self.get_header(),
- self.version,
- self.data,
- self.key[0]])
-
- ##
- # Write the fixed-size header for self, to file f at its current position.
- # The only real use for this is when the current revision of an object
- # in cache is invalidated. Then the end_tid field gets set to the tid
- # of the transaction that caused the invalidation.
- def serialize_header(self, f):
- f.write(self.get_header())
-
- ##
- # fromFile is a class constructor, unserializing an Object from the
- # current position in file f. Exclusive access to f for the duration
- # is assumed. The key is a (oid, start_tid) pair, and the oid must
- # match the serialized oid. If `skip_data` is true, .data is left
- # None in the Object returned, but all the other fields are populated.
- # Else (`skip_data` is false, the default), all fields including .data
- # are populated. .data can be big, so it's prudent to skip it when it
- # isn't needed.
- def fromFile(cls, f, key, skip_data=False):
- s = f.read(cls.FIXED_HEADER_SIZE)
- if not s:
- return None
- oid, start_tid = key
-
- end_tid, vlen, dlen = struct.unpack(cls.fmt, s)
- if end_tid == z64:
- end_tid = None
-
- version = f.read(vlen)
- if vlen != len(version):
- raise ValueError("corrupted record, version")
-
- if skip_data:
- data = None
- f.seek(dlen, 1)
- else:
- data = f.read(dlen)
- if dlen != len(data):
- raise ValueError("corrupted record, data")
-
- s = f.read(8)
- if s != oid:
- raise ValueError("corrupted record, oid")
-
- return cls((oid, start_tid), version, data, start_tid, end_tid)
-
- fromFile = classmethod(fromFile)
-
-
-# Entry just associates a key with a file offset. It's used by FileCache.
-class Entry(object):
- __slots__ = (# object key -- something usable as a dict key.
- 'key',
-
- # Offset from start of file to the object's data
- # record; this includes all overhead bytes (status
- # byte, size bytes, etc). The size of the data
- # record is stored in the file near the start of the
- # record, but for efficiency we also keep size in a
- # dict (filemap; see later).
- 'offset',
- )
-
- def __init__(self, key=None, offset=None):
- self.key = key
- self.offset = offset
-
-
def sync(f):
f.flush()
Modified: ZODB/branches/3.8/src/ZEO/tests/test_cache.py
===================================================================
--- ZODB/branches/3.8/src/ZEO/tests/test_cache.py 2008-05-13 16:57:56 UTC (rev 86708)
+++ ZODB/branches/3.8/src/ZEO/tests/test_cache.py 2008-05-13 17:26:08 UTC (rev 86709)
@@ -20,7 +20,7 @@
from zope.testing import doctest
import ZEO.cache
-from ZODB.utils import p64
+from ZODB.utils import p64, u64
n1 = p64(1)
n2 = p64(2)
@@ -45,9 +45,9 @@
self.assertEqual(self.cache.getLastTid(), None)
self.cache.setLastTid(n2)
self.assertEqual(self.cache.getLastTid(), n2)
- self.cache.invalidate(None, "", n1)
+ self.cache.invalidate(n1, "", n1)
self.assertEqual(self.cache.getLastTid(), n2)
- self.cache.invalidate(None, "", n3)
+ self.cache.invalidate(n1, "", n3)
self.assertEqual(self.cache.getLastTid(), n3)
self.assertRaises(ValueError, self.cache.setLastTid, n2)
@@ -99,9 +99,9 @@
self.assertEqual(self.cache.loadBefore(n2, n4), None)
def testException(self):
+ # Not allowed to save non-current version data
self.assertRaises(ValueError,
- self.cache.store,
- n1, "version", n2, n3, "data")
+ self.cache.store, n1, "version", n2, n3, "data")
self.cache.store(n1, "", n2, None, "data")
self.assertRaises(ValueError,
self.cache.store,
@@ -149,9 +149,10 @@
eq = self.assertEqual
eq(copy.getLastTid(), self.cache.getLastTid())
eq(len(copy), len(self.cache))
- eq(copy.version, self.cache.version)
- eq(copy.current, self.cache.current)
- eq(copy.noncurrent, self.cache.noncurrent)
+ eq(dict(copy.current), dict(self.cache.current))
+ eq(dict([(k, dict(v)) for (k, v) in copy.noncurrent.items()]),
+ dict([(k, dict(v)) for (k, v) in self.cache.noncurrent.items()]),
+ )
def testCurrentObjectLargerThanCache(self):
if self.cache.path:
@@ -181,7 +182,7 @@
self.assertEquals(None, self.cache.load(n1))
# If an object cannot be stored in the cache, it must not be
# recorded as non-current.
- self.assert_((n2, n3) not in cache.noncurrent[n1])
+ self.assert_(1 not in cache.noncurrent)
__test__ = dict(
kill_does_not_cause_cache_corruption =
@@ -233,9 +234,7 @@
>>> import ZEO.cache, ZODB.utils
>>> cache = ZEO.cache.ClientCache('cache', 1000)
- >>> data = 'X' * (1000 - ZEO.cache.ZEC3_HEADER_SIZE
- ... - ZEO.cache.OBJECT_HEADER_SIZE
- ... - ZEO.cache.Object.TOTAL_FIXED_SIZE)
+ >>> data = 'X' * (1000 - ZEO.cache.ZEC3_HEADER_SIZE - 43)
>>> cache.store(ZODB.utils.p64(1), '', ZODB.utils.p64(1), None, data)
>>> cache.close()
>>> cache = ZEO.cache.ClientCache('cache', 1000)
More information about the Zodb-checkins
mailing list