[Zodb-checkins] CVS: ZODB3/ZEO - ClientStorage.py:1.110.2.3
cache.py:1.1.2.3
Jeremy Hylton
cvs-admin at zope.org
Mon Nov 10 17:43:25 EST 2003
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv9752/ZEO
Modified Files:
Tag: ZODB3-mvcc-2-branch
ClientStorage.py cache.py
Log Message:
Revise ClientCache implementation and integrate with one of Tim's in-memory caches.
The in-memory cache isn't the expected long-term solution, but it shows how to integrate a lower-level storage scheme with the necessary application-level interface.
Add some simple tests of eviction, which the previous cache did not implement.
Add code one the way towards serialization.
=== ZODB3/ZEO/ClientStorage.py 1.110.2.2 => 1.110.2.3 ===
--- ZODB3/ZEO/ClientStorage.py:1.110.2.2 Wed Nov 5 23:41:54 2003
+++ ZODB3/ZEO/ClientStorage.py Mon Nov 10 17:42:54 2003
@@ -27,7 +27,7 @@
import types
from ZEO import ServerStub
-from ZEO.cache import Cache
+from ZEO.cache import ClientCache
from ZEO.TransactionBuffer import TransactionBuffer
from ZEO.Exceptions import ClientStorageError, UnrecognizedResult, \
ClientDisconnected, AuthError
@@ -92,7 +92,7 @@
# Classes we instantiate. A subclass might override.
TransactionBufferClass = TransactionBuffer
- ClientCacheClass = Cache
+ ClientCacheClass = ClientCache
ConnectionManagerClass = ConnectionManager
StorageServerStubClass = ServerStub.StorageServer
=== ZODB3/ZEO/cache.py 1.1.2.2 => 1.1.2.3 ===
--- ZODB3/ZEO/cache.py:1.1.2.2 Wed Nov 5 23:35:21 2003
+++ ZODB3/ZEO/cache.py Mon Nov 10 17:42:54 2003
@@ -13,6 +13,11 @@
##############################################################################
"""Example client cache that stores multiple revisions of an object."""
+import bisect
+import struct
+
+from sets import Set
+
##
# A disk-based cache for ZEO clients.
# <p>
@@ -39,24 +44,88 @@
# invalidations every time an object is modified. Whe the client is
# disconnected, it must perform cache verification to make sure its
# cached data is synchronized with the storage's current state.
-
+# <p>
# quick verification
# full verification
+# <p>
+# XXX serialization
+# cache is normally an in-memory data structure?
+# pro: faster, simpler
+# cons: might use too much memory on small machine
+# perhaps, just use smaller cache
+# periodically write snapshot of dll to disk
+# as invalidations come in, write them to a log
+# on close, write new snapshot
+# XXX unlink old snapshot first?
-class Cache:
+class ClientCache:
"""A simple in-memory cache."""
- def __init__(self):
+ ##
+ # Do we put the constructor here?
+ # @param path path of persistent snapshot of cache state
+ # @param size maximum size of object data, in bytes
+
+ def __init__(self, path=None, size=None):
+ self.path = path
+ self.size = size
+
+ # Last transaction seen by the cache, either via setLastTid()
+ # or by invalidate().
self.tid = None
+
+ # The cache stores objects in a dict mapping (oid, tid) pairs
+ # to Object() records (see below). The tid is the transaction
+ # id that wrote the object. An object record includes data,
+ # serialno, and end tid. It has auxillary data structures to
+ # compute the appropriate tid, given the oid and a transaction id
+ # representing an arbitrary point in history.
+ #
+ # The serialized form of the cache just stores the Object()
+ # records. The in-memory form can be reconstructed from these
+ # records.
+
+ # Maps oid to current tid. Used to find compute key for objects.
self.current = {}
- self.current_tid = {}
- self.version = {}
+ # Maps oid to list of (start_tid, end_tid) pairs in sorted order.
+ # Used to find matching key for load of non-current data.
self.noncurrent = {}
+ # XXX I think oid, version can map to oid, tid without confusion.
+ # A transaction can write version data or non-version data,
+ # but not both.
+ self.version = {}
- # XXX perhaps we need an open, too
+ # A double-linked list is used to manage the cache. It makes
+ # decisions about which objects to keep and which to evict.
+ self.dll = Cache(size or 10**6)
+
+ def open(self):
+ self._read()
+
+ def _read(self):
+ f = open(self.path, "rb")
+ while 1:
+ o = Object.fromFile(f, self)
+ if o is None:
+ break
+ if o.version:
+ self.version[o.oid] = o.start_tid
+ elif o.end_tid is None:
+ self.current[o.oid] = o.start_tid
+ else:
+ # XXX recreate the internal cache data structures
+ print "non-current cache data"
def close(self):
- pass
+ self._write()
+
+ def _write(self):
+ # XXX The cache needs a minimal header.
+ # magic cookie, format version no, configured size of cache
+ f = open(self.path, "wb")
+ for o in self.dll:
+ o.write(f)
+ f.close()
##
# Set the last transaction seen by the cache.
@@ -79,20 +148,19 @@
return self.tid
##
- # Return the current data record for oid and version
+ # Return the current data record for oid and version.
# @param oid object id
# @param version a version string
- # @return data record and serial number
+ # @return data record and serial number or None if the object is not
+ # in the cache
# @defreturn 2-tuple: (string, string)
def load(self, oid, version=""):
- if version:
- t = self.version.get(oid)
- if t is not None:
- stored_version, data, serial, tid = t
- if version == stored_version:
- return data, serial
- return self.current.get(oid)
+ tid = version and self.version.get(oid) or self.current.get(oid)
+ if tid is None:
+ return None
+ o = self.dll.access((oid, tid))
+ return o.data, o.serialno
##
# Return a non-current revision of oid that was current before tid.
@@ -102,10 +170,22 @@
# @defreturn 4-tuple: (string, string, string, string)
def loadNonCurrent(self, oid, tid):
- for lo, hi, data, serial in self.noncurrent.get(oid, []):
- if lo < tid <= hi:
- return data, serial, lo, hi
- return None
+ L = self.noncurrent.get(oid)
+ if L is None:
+ return None
+ # A pair with None as the second element will always be less
+ # than any pair with the same first tid.
+ i = bisect.bisect_left(L, (tid, None))
+ # The least element left of tid was written before tid. If
+ # there is no element, the cache doesn't have old enough data.
+ if i == 0:
+ return
+ lo, hi = L[i-1]
+ # XXX lo should always be less than tid
+ if not lo < tid <= hi:
+ return None
+ o = self.dll.access((oid, lo))
+ return o.data, o.serialno, o.start_tid, o.end_tid
##
# Return the version an object is modified in or None for an
@@ -115,10 +195,11 @@
# @defreturn string or None
def modifiedInVersion(self, oid):
- t = self.version.get(oid)
- if t is None:
+ tid = self.version.get(oid)
+ if tid is None:
return None
- return t[0]
+ o = self.dll.access((oid, tid))
+ return o.version
##
# Store a new data record in the cache.
@@ -134,21 +215,21 @@
# @exception ValueError tried to store non-current version data
def store(self, oid, version, serial, start_tid, end_tid, data):
+ o = Object((oid, start_tid), version, serial, data, start_tid, end_tid,
+ self)
if version:
if end_tid is not None:
raise ValueError("cache only stores current version data")
- self.version[oid] = version, data, serial, start_tid
- return
- # XXX If there was a previous revision, we need to invalidate it.
- if end_tid is None:
- if oid in self.current:
- raise ValueError("already have current data for oid")
- self.current[oid] = data, serial
- self.current_tid[oid] = start_tid
+ self.version[oid] = start_tid
else:
- # XXX could use bisect and keep the list sorted
- L = self.noncurrent.setdefault(oid, [])
- L.append((start_tid, end_tid, data, serial))
+ if end_tid is None:
+ if oid in self.current:
+ raise ValueError("already have current data for oid")
+ self.current[oid] = start_tid
+ else:
+ L = self.noncurrent.setdefault(oid, [])
+ bisect.insort_left(L, (start_tid, end_tid))
+ self.dll.add(o)
##
# Mark the current data for oid as non-current. If there is no
@@ -163,13 +244,15 @@
if version:
if oid in self.version:
del self.version[oid]
- data = self.current.get(oid)
- if data is None:
return
- del self.current[oid]
- start_tid = self.current_tid.pop(oid)
+ if oid not in self.current:
+ return
+ cur_tid = self.current.pop(oid)
+ # XXX Want to fetch object without marking it as accessed
+ o = self.dll.access((oid, cur_tid))
+ o.end_tid = tid
L = self.noncurrent.setdefault(oid, [])
- L.append((start_tid, tid, data[0], data[1]))
+ bisect.insort_left(L, (cur_tid, tid))
##
# An iterator yielding the current contents of the cache.
@@ -177,7 +260,300 @@
# @return oid, version, serial number triples
def contents(self):
+ # XXX
for oid, (data, serial) in self.current.items():
yield oid, "", serial
for oid, (version, data, serial, start_tid) in self.version.items():
yield oid, version, serial
+
+ ##
+ # Return the number of object revisions in the cache.
+
+ def __len__(self):
+ n = len(self.current) + len(self.version)
+ if self.noncurrent:
+ n += reduce(int.__add__, map(len, self.noncurrent))
+ return n
+
+ def _evicted(self, o):
+ # Called by Object o to signal its eviction
+ oid, tid = o.key
+ if o.end_tid is None:
+ if o.version:
+ del self.version[oid]
+ else:
+ del self.current[oid]
+ else:
+ # XXX haven't handled non-current
+ L = self.noncurrent[oid]
+ i = bisect.bisect_left((o.start_end, o.end_tid))
+ print L, i
+
+##
+# An object that's part of a headed, circular, doubly-linked list.
+# Because it's doubly linked, an object can be removed from the list
+# in constant time.
+# <p>
+# The cache keeps such a list of all Objects (see later), and uses a
+# travelling pointer to decay the worth of objects over time.
+
+class DLLNode(object):
+ # previous and next objects in circular list
+ __slots__ = '_prev', '_next'
+
+ def __init__(self):
+ self._prev = self._next = None
+
+ # Insert self immediately before other in list.
+ def insert_before(self, other):
+ prev = other._prev
+ self._prev = prev
+ self._next = other
+ prev._next = other._prev = self
+
+ # Insert self immediately after other in list.
+ def insert_after(self, other):
+ self.insert_before(other._next)
+
+ # Remove self from the list.
+ def unlink(self):
+ prev, next = self._prev, self._next
+ prev._next = next
+ next._prev = prev
+ self._prev = self._next = None
+##
+# The head of a doubly-linked list.
+
+class DLLHead(DLLNode):
+ def __init__(self):
+ self._prev = self._next = self
+
+ # In Boolean context, a DLLHead is true iff the list isn't empty.
+ def __nonzero__(self):
+ return self._next is not self
+
+##
+# A node for a data object stored in the cache.
+
+# XXX Objects probably keep a pointer to the ClientCache so they
+# can remove themselves for auxilliary data structures.
+
+class Object(DLLNode):
+ __slots__ = (# object id, txn id -- something usable as a dict key
+ "key",
+
+ # memory size -- an integer
+ "msize",
+
+ # one-byte int giving the usefulness of the object --
+ # the larger the value, the more reluctant we are
+ # to evict the object
+ "worth",
+
+ "start_tid", # string, id of txn that wrote the data
+ "end_tid", # string, id of txn that wrote next revision
+ # or None
+ "version", # string, name of version
+ "serialno", # string, serial number assigned by txn
+ "data", # string, the actual data record for the object
+
+ "cache", # ClientCache instance containing Object
+ )
+
+ def __init__(self, key, version, serialno, data, start_tid, end_tid,
+ cache):
+ self.key = key
+ self.msize = len(data)
+ self.worth = None
+ self.version = version
+ self.serialno = serialno
+ self.data = data
+ self.start_tid = start_tid
+ self.end_tid = end_tid
+ self.cache = cache
+
+ def unlink(self):
+ DLLNode.unlink(self)
+ self.cache._evicted(self)
+
+ def serialize(self, f):
+ # Write standard form of Object to file, f.
+ s = struct.pack(">QQQQhi", self.key[0], self.serialno,
+ self.start_tid, self.end_tid, len(self.version),
+ len(self.data))
+ f.write(s)
+ f.write(self.version)
+ f.write(self.data)
+ f.write(struct.pack(">Q", s))
+
+ def fromFile(cls, f, cache):
+ fmt = ">QQQQhi"
+ s = f.read(struct.calcsize(fmt))
+ if not s:
+ return None
+ oid, serialno, start_tid, end_tid, vlen, dlen = struct.unpack(fmt, s)
+ version = f.read(vlen)
+ if vlen != len(version):
+ raise ValueError("corrupted record, version")
+ data = f.read(dlen)
+ if dlen != len(data):
+ raise ValueError("corrupted record, data")
+ s = f.read(8)
+ if struct.pack(">Q", s) != oid:
+ raise ValueError("corrupted record, oid")
+ return cls((oid, start_tid), version, serialno, data,
+ start_tid, end_tid, cache)
+
+ fromFile = classmethod(fromFile)
+
+# Serialized format is:
+# 8-byte oid
+# 8-byte serialno
+# 8-byte start_tid
+# 8-byte end_end
+# 2-byte version length
+# 4-byte data length
+# version
+# data
+# 8-byte oid
+# struct format is >QQQQhi
+
+##
+# XXX
+
+class Cache(object):
+ def __init__(self, maxsize):
+ # Maximum total of object sizes we keep in cache.
+ self.maxsize = maxsize
+ # Current total of object sizes in cache.
+ self.currentsize = 0
+
+ # A worth byte maps to a set of all Objects with that worth.
+ # This is cheap to keep updated, and makes finding low-worth
+ # objects for eviction trivial (just march over the worthsets
+ # list, in order).
+ self.worthsets = [Set() for dummy in range(256)]
+
+ # We keep a circular list of all objects in cache. currentobj
+ # walks around it forever. Each time _tick() is called, the
+ # worth of currentobj is decreased, basically by shifting
+ # right 1, and currentobj moves on to the next object. When
+ # an object is first inserted, it enters the list right before
+ # currentobj. When an object is accessed, its worth is
+ # increased by or'ing in 0x80. This scheme comes from the
+ # Thor system, and is an inexpensive way to account for both
+ # recency and frequency of access: recency is reflected in
+ # the leftmost bit set, and frequency by how many bits are
+ # set.
+ #
+ # Note: because evictions are interleaved with ticks,
+ # unlinking an object is tricky, lest we evict currentobj. The
+ # class _unlink method takes care of this properly.
+ self.listhead = DLLHead()
+ self.currentobj = self.listhead
+
+ # Map an object.key to its Object.
+ self.key2object = {}
+
+ # Statistics: _n_adds, _n_added_bytes,
+ # _n_evicts, _n_evicted_bytes
+ # _n_accesses
+ self.clearStats()
+
+ def clearStats(self):
+ self._n_adds = self._n_added_bytes = 0
+ self._n_evicts = self._n_evicted_bytes = 0
+ self._n_accesses = 0
+
+ def getStats(self):
+ return (self._n_adds, self._n_added_bytes,
+ self._n_evicts, self._n_evicted_bytes,
+ self._n_accesses,
+ )
+
+ # Support iteration over objects in the cache without
+ # marking the objects as accessed.
+
+ def __iter__(self):
+ return self.key2object.itervalues()
+
+ # Unlink object from the circular list, taking care not to lose
+ # track of the current object. Always call this instead of
+ # invoking obj.unlink() directly.
+ def _unlink(self, obj):
+ assert obj is not self.listhead
+ if obj is self.currentobj:
+ self.currentobj = obj._next
+ obj.unlink()
+
+ # Change obj.worth to newworth, maintaining invariants.
+ def _change_worth(self, obj, newworth):
+ if obj.worth != newworth:
+ self.worthsets[obj.worth].remove(obj)
+ obj.worth = newworth
+ self.worthsets[newworth].add(obj)
+
+ def add(self, object):
+ self._n_adds += 1
+ self._n_added_bytes += object.msize
+
+ assert object.key not in self.key2object
+ self.key2object[object.key] = object
+
+ newsize = self.currentsize + object.msize
+ if newsize > self.maxsize:
+ self._evictbytes(newsize - self.maxsize)
+ newsize = self.currentsize + object.msize
+ self.currentsize = newsize
+ object.insert_before(self.currentobj)
+
+ # Give new object an intial worth roughly equal to the log
+ # (base 2) of its size. The intuition is that larger objects
+ # are more expensive to fetch over the network, so are worth
+ # more (at least at first).
+ worth = 0
+ targetsize = 1
+ while object.msize > targetsize:
+ worth += 1
+ targetsize <<= 1
+ object.worth = worth
+ self.worthsets[worth].add(object)
+
+ # Decrease the worth of the current object, and advance the
+ # current object.
+ def _tick(self):
+ c = self.currentobj
+ if c is self.listhead:
+ c = c._next
+ if c is self.listhead: # list is empty
+ return
+ self._change_worth(c, (c.worth + 1) >> 1)
+ self.currentobj = c._next
+
+ def access(self, oid):
+ self._n_accesses += 1
+ self._tick()
+ obj = self.key2object.get(oid)
+ if obj is None:
+ return False # XXX None?
+ self._change_worth(obj, obj.worth | 0x80)
+ return obj
+
+ # Evict objects of least worth first, until at least nbytes bytes
+ # have been freed.
+ def _evictbytes(self, nbytes):
+ for s in self.worthsets:
+ while s:
+ if nbytes <= 0:
+ return
+ obj = s.pop()
+ nbytes -= obj.msize
+ self._evictobj(obj)
+
+ def _evictobj(self, obj):
+ self._n_evicts += 1
+ self._n_evicted_bytes += obj.msize
+ self.currentsize -= obj.msize
+ self.worthsets[obj.worth].discard(obj)
+ del self.key2object[obj.key]
+ self._unlink(obj)
More information about the Zodb-checkins
mailing list