[Zodb-checkins] CVS: ZODB3/ZEO - cache.py:1.2 stats.py:1.22
StorageServer.py:1.104 ServerStub.py:1.18
ClientStorage.py:1.113 ICache.py:NONE ClientCache.py:NONE
Jeremy Hylton
jeremy at zope.com
Wed Dec 24 11:02:43 EST 2003
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv27465/ZEO
Modified Files:
stats.py StorageServer.py ServerStub.py ClientStorage.py
Added Files:
cache.py
Removed Files:
ICache.py ClientCache.py
Log Message:
Merge MVCC branch to the HEAD.
=== ZODB3/ZEO/cache.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 24 11:02:42 2003
+++ ZODB3/ZEO/cache.py Wed Dec 24 11:02:03 2003
@@ -0,0 +1,877 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Disk-based client cache for ZEO.
+
+ClientCache exposes an API used by the ZEO client storage. FileCache
+stores objects one disk using a 2-tuple of oid and tid as key.
+
+The upper cache's API is similar to a storage API with methods like
+load(), store(), and invalidate(). It manages in-memory data
+structures that allow it to map this richer API onto the simple
+key-based API of the lower-level cache.
+"""
+
+import bisect
+import logging
+import os
+import struct
+import tempfile
+import time
+
+from sets import Set
+
+from ZODB.utils import z64, u64
+
+##
+# A disk-based cache for ZEO clients.
+# <p>
+# This class provides an interface to a persistent, disk-based cache
+# used by ZEO clients to store copies of database records from the
+# server.
+# <p>
+# The details of the constructor as unspecified at this point.
+# <p>
+# Each entry in the cache is valid for a particular range of transaction
+# ids. The lower bound is the transaction that wrote the data. The
+# upper bound is the next transaction that wrote a revision of the
+# object. If the data is current, the upper bound is stored as None;
+# the data is considered current until an invalidate() call is made.
+# <p>
+# It is an error to call store() twice with the same object without an
+# intervening invalidate() to set the upper bound on the first cache
+# entry. <em>Perhaps it will be necessary to have a call the removes
+# something from the cache outright, without keeping a non-current
+# entry.</em>
+# <h3>Cache verification</h3>
+# <p>
+# When the client is connected to the server, it receives
+# invalidations every time an object is modified. Whe the client is
+# disconnected, it must perform cache verification to make sure its
+# cached data is synchronized with the storage's current state.
+# <p>
+# quick verification
+# full verification
+# <p>
+
+class ClientCache:
+ """A simple in-memory cache."""
+
+ ##
+ # Do we put the constructor here?
+ # @param path path of persistent snapshot of cache state
+ # @param size maximum size of object data, in bytes
+
+ def __init__(self, path=None, size=None, trace=True):
+ self.path = path
+ self.size = size
+ self.log = logging.getLogger("zeo.cache")
+
+ if trace and path:
+ self._setup_trace()
+ else:
+ self._trace = self._notrace
+
+ # Last transaction seen by the cache, either via setLastTid()
+ # or by invalidate().
+ self.tid = None
+
+ # The cache stores objects in a dict mapping (oid, tid) pairs
+ # to Object() records (see below). The tid is the transaction
+ # id that wrote the object. An object record includes data,
+ # serialno, and end tid. It has auxillary data structures to
+ # compute the appropriate tid, given the oid and a transaction id
+ # representing an arbitrary point in history.
+ #
+ # The serialized form of the cache just stores the Object()
+ # records. The in-memory form can be reconstructed from these
+ # records.
+
+ # Maps oid to current tid. Used to find compute key for objects.
+ self.current = {}
+ # Maps oid to list of (start_tid, end_tid) pairs in sorted order.
+ # Used to find matching key for load of non-current data.
+ self.noncurrent = {}
+ # Map oid to version, tid pair. If there is no entry, the object
+ # is not modified in a version.
+ self.version = {}
+
+ # A double-linked list is used to manage the cache. It makes
+ # decisions about which objects to keep and which to evict.
+ self.fc = FileCache(size or 10**6, self.path, self)
+
+ def open(self):
+ self.fc.scan(self.install)
+
+ def install(self, f, ent):
+ # Called by cache storage layer to insert object
+ o = Object.fromFile(f, ent.key, header_only=True)
+ if o is None:
+ return
+ oid = o.key[0]
+ if o.version:
+ self.version[oid] = o.version, o.start_tid
+ elif o.end_tid is None:
+ self.current[oid] = o.start_tid
+ else:
+ L = self.noncurrent.setdefault(oid, [])
+ bisect.insort_left(L, (o.start_tid, o.end_tid))
+
+ def close(self):
+ self.fc.close()
+
+ ##
+ # Set the last transaction seen by the cache.
+ # @param tid a transaction id
+ # @exception ValueError attempt to set a new tid less than the current tid
+
+ def setLastTid(self, tid):
+ self.fc.settid(tid)
+
+ ##
+ # Return the last transaction seen by the cache.
+ # @return a transaction id
+ # @defreturn string
+
+ def getLastTid(self):
+ if self.fc.tid == z64:
+ return None
+ else:
+ return self.fc.tid
+
+ ##
+ # Return the current data record for oid and version.
+ # @param oid object id
+ # @param version a version string
+ # @return data record, serial number, tid or None if the object is not
+ # in the cache
+ # @defreturn 3-tuple: (string, string, string)
+
+ def load(self, oid, version=""):
+ tid = None
+ if version:
+ p = self.version.get(oid)
+ if p is None:
+ return None
+ elif p[0] == version:
+ tid = p[1]
+ # Otherwise, we know the cache has version data but not
+ # for the requested version. Thus, we know it is safe
+ # to return the non-version data from the cache.
+ if tid is None:
+ tid = self.current.get(oid)
+ if tid is None:
+ self._trace(0x20, oid, version)
+ return None
+ o = self.fc.access((oid, tid))
+ if o is None:
+ return None
+ self._trace(0x22, oid, version, o.start_tid, o.end_tid, len(o.data))
+ return o.data, tid, o.version
+
+ ##
+ # Return a non-current revision of oid that was current before tid.
+ # @param oid object id
+ # @param tid id of transaction that wrote next revision of oid
+ # @return data record, serial number, start tid, and end tid
+ # @defreturn 4-tuple: (string, string, string, string)
+
+ def loadBefore(self, oid, tid):
+ L = self.noncurrent.get(oid)
+ if L is None:
+ self._trace(0x24, oid, tid)
+ return None
+ # A pair with None as the second element will always be less
+ # than any pair with the same first tid.
+ i = bisect.bisect_left(L, (tid, None))
+ # The least element left of tid was written before tid. If
+ # there is no element, the cache doesn't have old enough data.
+ if i == 0:
+ self._trace(0x24, oid, tid)
+ return
+ lo, hi = L[i-1]
+ # XXX lo should always be less than tid
+ if not lo < tid <= hi:
+ self._trace(0x24, oid, tid)
+ return None
+ o = self.fc.access((oid, lo))
+ self._trace(0x26, oid, tid)
+ return o.data, o.start_tid, o.end_tid
+
+ ##
+ # Return the version an object is modified in or None for an
+ # object that is not modified in a version.
+ # @param oid object id
+ # @return name of version in which the object is modified
+ # @defreturn string or None
+
+ def modifiedInVersion(self, oid):
+ p = self.version.get(oid)
+ if p is None:
+ return None
+ version, tid = p
+ return version
+
+ ##
+ # Store a new data record in the cache.
+ # @param oid object id
+ # @param version name of version that oid was modified in. The cache
+ # only stores current version data, so end_tid should
+ # be None.
+ # @param start_tid the id of the transaction that wrote this revision
+ # @param end_tid the id of the transaction that created the next
+ # revision of oid. If end_tid is None, the data is
+ # current.
+ # @param data the actual data
+ # @exception ValueError tried to store non-current version data
+
+ def store(self, oid, version, start_tid, end_tid, data):
+ # It's hard for the client to avoid storing the same object
+ # more than once. One case is whether the client requests
+ # version data that doesn't exist. It checks the cache for
+ # the requested version, doesn't find it, then asks the server
+ # for that data. The server returns the non-version data,
+ # which may already by in the cache.
+ if (oid, start_tid) in self.fc:
+ return
+ o = Object((oid, start_tid), version, data, start_tid, end_tid)
+ if version:
+ if end_tid is not None:
+ raise ValueError("cache only stores current version data")
+ if oid in self.version:
+ if self.version[oid] != (version, start_tid):
+ raise ValueError("data already exists for version %r"
+ % self.version[oid][0])
+ self.version[oid] = version, start_tid
+ self._trace(0x50, oid, version, start_tid, dlen=len(data))
+ else:
+ if end_tid is None:
+ _cur_start = self.current.get(oid)
+ if _cur_start:
+ if _cur_start != start_tid:
+ raise ValueError(
+ "already have current data for oid")
+ else:
+ return
+ self.current[oid] = start_tid
+ self._trace(0x52, oid, version, start_tid, dlen=len(data))
+ else:
+ L = self.noncurrent.setdefault(oid, [])
+ p = start_tid, end_tid
+ if p in L:
+ return # duplicate store
+ bisect.insort_left(L, (start_tid, end_tid))
+ self._trace(0x54, oid, version, start_tid, end_tid,
+ dlen=len(data))
+ self.fc.add(o)
+
+ ##
+ # Mark the current data for oid as non-current. If there is no
+ # current data for oid, do nothing.
+ # @param oid object id
+ # @param version name of version to invalidate.
+ # @param tid the id of the transaction that wrote a new revision of oid
+
+ def invalidate(self, oid, version, tid):
+ if tid > self.fc.tid:
+ self.fc.settid(tid)
+ if oid in self.version:
+ self._trace(0x1A, oid, version, tid)
+ dllversion, dlltid = self.version[oid]
+ assert not version or version == dllversion, (version, dllversion)
+ # remove() will call unlink() to delete from self.version
+ self.fc.remove((oid, dlltid))
+ # And continue on, we must also remove any non-version data
+ # from the cache. This is a bit of a failure of the current
+ # cache consistency approach as the new tid of the version
+ # data gets confused with the old tid of the non-version data.
+ # I could sort this out, but it seems simpler to punt and
+ # have the cache invalidation too much for versions.
+
+ if oid not in self.current:
+ self._trace(0x10, oid, version, tid)
+ return
+ cur_tid = self.current.pop(oid)
+ # XXX Want to fetch object without marking it as accessed
+ o = self.fc.access((oid, cur_tid))
+ if o is None:
+ # XXX is this possible?
+ return None
+ o.end_tid = tid
+ self.fc.update(o)
+ self._trace(0x1C, oid, version, tid)
+ L = self.noncurrent.setdefault(oid, [])
+ bisect.insort_left(L, (cur_tid, tid))
+
+ ##
+ # Return the number of object revisions in the cache.
+
+ # XXX just return len(self.cache)?
+
+ def __len__(self):
+ n = len(self.current) + len(self.version)
+ if self.noncurrent:
+ n += sum(map(len, self.noncurrent))
+ return n
+
+ ##
+ # Generates over, version, serial triples for all objects in the
+ # cache. This generator is used by cache verification.
+
+ def contents(self):
+ # XXX May need to materialize list instead of iterating,
+ # depends on whether the caller may change the cache.
+ for o in self.fc:
+ oid, tid = o.key
+ if oid in self.version:
+ obj = self.fc.access(o.key)
+ yield oid, tid, obj.version
+ else:
+ yield oid, tid, ""
+
+ def dump(self):
+ from ZODB.utils import oid_repr
+ print "cache size", len(self)
+ L = list(self.contents())
+ L.sort()
+ for oid, tid, version in L:
+ print oid_repr(oid), oid_repr(tid), repr(version)
+ print "dll contents"
+ L = list(self.fc)
+ L.sort(lambda x,y:cmp(x.key, y.key))
+ for x in L:
+ end_tid = x.end_tid or z64
+ print oid_repr(x.key[0]), oid_repr(x.key[1]), oid_repr(end_tid)
+ print
+
+ def _evicted(self, o):
+ # Called by Object o to signal its eviction
+ oid, tid = o.key
+ if o.end_tid is None:
+ if o.version:
+ del self.version[oid]
+ else:
+ del self.current[oid]
+ else:
+ # XXX Although we use bisect to keep the list sorted,
+ # we never expect the list to be very long. So the
+ # brute force approach should normally be fine.
+ L = self.noncurrent[oid]
+ L.remove((o.start_tid, o.end_tid))
+
+ def _setup_trace(self):
+ tfn = self.path + ".trace"
+ self.tracefile = None
+ try:
+ self.tracefile = open(tfn, "ab")
+ self._trace(0x00)
+ except IOError, msg:
+ self.tracefile = None
+ self.log.warning("Could not write to trace file %s: %s",
+ tfn, msg)
+
+ def _notrace(self, *arg, **kwargs):
+ pass
+
+ def _trace(self,
+ code, oid="", version="", tid="", end_tid=z64, dlen=0,
+ # The next two are just speed hacks.
+ time_time=time.time, struct_pack=struct.pack):
+ # The code argument is two hex digits; bits 0 and 7 must be zero.
+ # The first hex digit shows the operation, the second the outcome.
+ # If the second digit is in "02468" then it is a 'miss'.
+ # If it is in "ACE" then it is a 'hit'.
+ # This method has been carefully tuned to be as fast as possible.
+ # Note: when tracing is disabled, this method is hidden by a dummy.
+ if version:
+ code |= 0x80
+ encoded = (dlen + 255) & 0x7fffff00 | code
+ if tid is None:
+ tid = z64
+ if end_tid is None:
+ end_tid = z64
+ try:
+ self.tracefile.write(
+ struct_pack(">iiH8s8s",
+ time_time(),
+ encoded,
+ len(oid),
+ tid, end_tid) + oid)
+ except:
+ print `tid`, `end_tid`
+ raise
+
+##
+# An Object stores the cached data for a single object.
+# <p>
+# The cached data includes the actual object data, the key, and three
+# data fields that describe the validity period and version of the
+# object. The key contains the oid and a redundant start_tid. The
+# actual size of an object is variable, depending on the size of the
+# data and whether it is in a version.
+# <p>
+# The serialized format does not include the key, because it is stored
+# in the header used by the cache's storage format.
+
+class Object(object):
+ __slots__ = (# pair, object id, txn id -- something usable as a dict key
+ # the second part of the part is equal to start_tid below
+ "key",
+
+ "start_tid", # string, id of txn that wrote the data
+ "end_tid", # string, id of txn that wrote next revision
+ # or None
+ "version", # string, name of version
+ "data", # string, the actual data record for the object
+
+ "size", # total size of serialized object
+ )
+
+ def __init__(self, key, version, data, start_tid, end_tid):
+ self.key = key
+ self.version = version
+ self.data = data
+ self.start_tid = start_tid
+ self.end_tid = end_tid
+ # The size of a the serialized object on disk, include the
+ # 14-byte header, the length of data and version, and a
+ # copy of the 8-byte oid.
+ if data is not None:
+ self.size = 22 + len(data) + len(version)
+
+ # The serialization format uses an end tid of "\0" * 8, the least
+ # 8-byte string, to represent None. It isn't possible for an
+ # end_tid to be 0, because it must always be strictly greater
+ # than the start_tid.
+
+ fmt = ">8shi"
+
+ def serialize(self, f):
+ # Write standard form of Object to file, f.
+ self.serialize_header(f)
+ f.write(self.data)
+ f.write(struct.pack(">8s", self.key[0]))
+
+ def serialize_header(self, f):
+ s = struct.pack(self.fmt, self.end_tid or "\0" * 8,
+ len(self.version), len(self.data))
+ f.write(s)
+ f.write(self.version)
+
+ def fromFile(cls, f, key, header_only=False):
+ s = f.read(struct.calcsize(cls.fmt))
+ if not s:
+ return None
+ oid, start_tid = key
+ end_tid, vlen, dlen = struct.unpack(cls.fmt, s)
+ if end_tid == z64:
+ end_tid = None
+ version = f.read(vlen)
+ if vlen != len(version):
+ raise ValueError("corrupted record, version")
+ if header_only:
+ data = None
+ else:
+ data = f.read(dlen)
+ if dlen != len(data):
+ raise ValueError("corrupted record, data")
+ s = f.read(8)
+ if struct.pack(">8s", s) != oid:
+ raise ValueError("corrupted record, oid")
+ return cls((oid, start_tid), version, data, start_tid, end_tid)
+
+ fromFile = classmethod(fromFile)
+
+def sync(f):
+ f.flush()
+ if hasattr(os, 'fsync'):
+ os.fsync(f.fileno())
+
+class Entry(object):
+ __slots__ = (# object key -- something usable as a dict key.
+ 'key',
+
+ # Offset from start of file to the object's data
+ # record; this includes all overhead bytes (status
+ # byte, size bytes, etc). The size of the data
+ # record is stored in the file near the start of the
+ # record, but for efficiency we also keep size in a
+ # dict (filemap; see later).
+ 'offset',
+ )
+
+ def __init__(self, key=None, offset=None):
+ self.key = key
+ self.offset = offset
+
+
+magic = "ZEC3"
+
+OBJECT_HEADER_SIZE = 1 + 4 + 16
+
+##
+# FileCache stores a cache in a single on-disk file.
+#
+# On-disk cache structure
+#
+# The file begins with a 12-byte header. The first four bytes are the
+# file's magic number - ZEC3 - indicating zeo cache version 3. The
+# next eight bytes are the last transaction id.
+#
+# The file is a contiguous sequence of blocks. All blocks begin with
+# a one-byte status indicator:
+#
+# 'a'
+# Allocated. The block holds an object; the next 4 bytes are >I
+# format total block size.
+#
+# 'f'
+# Free. The block is free; the next 4 bytes are >I format total
+# block size.
+#
+# '1', '2', '3', '4'
+# The block is free, and consists of 1, 2, 3 or 4 bytes total.
+#
+# 'Z'
+# File header. The file starts with a magic number, currently
+# 'ZEC3' and an 8-byte transaction id.
+#
+# "Total" includes the status byte, and size bytes. There are no
+# empty (size 0) blocks.
+
+
+# XXX This needs a lot more hair.
+# The structure of an allocated block is more complicated:
+#
+# 1 byte allocation status ('a').
+# 4 bytes block size, >I format.
+# 16 bytes oid + tid, string.
+# size-OBJECT_HEADER_SIZE bytes, the object pickle.
+
+# The cache's currentofs goes around the file, circularly, forever.
+# It's always the starting offset of some block.
+#
+# When a new object is added to the cache, it's stored beginning at
+# currentofs, and currentofs moves just beyond it. As many contiguous
+# blocks needed to make enough room for the new object are evicted,
+# starting at currentofs. Exception: if currentofs is close enough
+# to the end of the file that the new object can't fit in one
+# contiguous chunk, currentofs is reset to 0 first.
+
+# Do all possible to ensure that the bytes we wrote are really on
+# disk.
+
+class FileCache(object):
+
+ def __init__(self, maxsize, fpath, parent, reuse=True):
+ # Maximum total of object sizes we keep in cache.
+ self.maxsize = maxsize
+ # Current total of object sizes in cache.
+ self.currentsize = 0
+ self.parent = parent
+ self.tid = None
+
+ # Map offset in file to pair (data record size, Entry).
+ # Entry is None iff the block starting at offset is free.
+ # filemap always contains a complete account of what's in the
+ # file -- study method _verify_filemap for executable checking
+ # of the relevant invariants. An offset is at the start of a
+ # block iff it's a key in filemap.
+ self.filemap = {}
+
+ # Map key to Entry. There's one entry for each object in the
+ # cache file. After
+ # obj = key2entry[key]
+ # then
+ # obj.key == key
+ # is true.
+ self.key2entry = {}
+
+ # Always the offset into the file of the start of a block.
+ # New and relocated objects are always written starting at
+ # currentofs.
+ self.currentofs = 12
+
+ self.fpath = fpath
+ if not reuse or not fpath or not os.path.exists(fpath):
+ self.new = True
+ if fpath:
+ self.f = file(fpath, 'wb+')
+ else:
+ self.f = tempfile.TemporaryFile()
+ # Make sure the OS really saves enough bytes for the file.
+ self.f.seek(self.maxsize - 1)
+ self.f.write('x')
+ self.f.truncate()
+ # Start with one magic header block
+ self.f.seek(0)
+ self.f.write(magic)
+ self.f.write(z64)
+ # and one free block.
+ self.f.write('f' + struct.pack(">I", self.maxsize - 12))
+ self.sync()
+ self.filemap[12] = self.maxsize - 12, None
+ else:
+ self.new = False
+ self.f = None
+
+ # Statistics: _n_adds, _n_added_bytes,
+ # _n_evicts, _n_evicted_bytes
+ self.clearStats()
+
+ # Scan the current contents of the cache file, calling install
+ # for each object found in the cache. This method should only
+ # be called once to initialize the cache from disk.
+
+ def scan(self, install):
+ if self.new:
+ return
+ fsize = os.path.getsize(self.fpath)
+ self.f = file(self.fpath, 'rb+')
+ _magic = self.f.read(4)
+ if _magic != magic:
+ raise ValueError("unexpected magic number: %r" % _magic)
+ self.tid = self.f.read(8)
+ # Remember the largest free block. That seems a
+ # decent place to start currentofs.
+ max_free_size = max_free_offset = 0
+ ofs = 12
+ while ofs < fsize:
+ self.f.seek(ofs)
+ ent = None
+ status = self.f.read(1)
+ if status == 'a':
+ size, rawkey = struct.unpack(">I16s", self.f.read(20))
+ key = rawkey[:8], rawkey[8:]
+ assert key not in self.key2entry
+ self.key2entry[key] = ent = Entry(key, ofs)
+ install(self.f, ent)
+ elif status == 'f':
+ size, = struct.unpack(">I", self.f.read(4))
+ elif status in '1234':
+ size = int(status)
+ else:
+ assert 0, status
+
+ self.filemap[ofs] = size, ent
+ if ent is None and size > max_free_size:
+ max_free_size, max_free_offset = size, ofs
+
+ ofs += size
+
+ assert ofs == fsize
+ if __debug__:
+ self._verify_filemap()
+ self.currentofs = max_free_offset
+
+ def clearStats(self):
+ self._n_adds = self._n_added_bytes = 0
+ self._n_evicts = self._n_evicted_bytes = 0
+ self._n_removes = self._n_removed_bytes = 0
+ self._n_accesses = 0
+
+ def getStats(self):
+ return (self._n_adds, self._n_added_bytes,
+ self._n_evicts, self._n_evicted_bytes,
+ self._n_removes, self._n_removed_bytes,
+ self._n_accesses
+ )
+
+ def __len__(self):
+ return len(self.key2entry)
+
+ def __iter__(self):
+ return self.key2entry.itervalues()
+
+ def __contains__(self, key):
+ return key in self.key2entry
+
+ def sync(self):
+ sync(self.f)
+
+ def close(self):
+ if self.f:
+ self.sync()
+ self.f.close()
+ self.f = None
+
+ # Evict objects as necessary to free up at least nbytes bytes,
+ # starting at currentofs. If currentofs is closer than nbytes to
+ # the end of the file, currentofs is reset to 0. The number of
+ # bytes actually freed may be (and probably will be) greater than
+ # nbytes, and is _makeroom's return value. The file is not
+ # altered by _makeroom. filemap is updated to reflect the
+ # evictions, and it's the caller's responsibilty both to fiddle
+ # the file, and to update filemap, to account for all the space
+ # freed (starting at currentofs when _makeroom returns, and
+ # spanning the number of bytes retured by _makeroom).
+
+ def _makeroom(self, nbytes):
+ assert 0 < nbytes <= self.maxsize
+ if self.currentofs + nbytes > self.maxsize:
+ self.currentofs = 12
+ ofs = self.currentofs
+ while nbytes > 0:
+ size, e = self.filemap.pop(ofs)
+ if e is not None:
+ self._evictobj(e, size)
+ ofs += size
+ nbytes -= size
+ return ofs - self.currentofs
+
+ # Write Object obj, with data, to file starting at currentofs.
+ # nfreebytes are already available for overwriting, and it's
+ # guranteed that's enough. obj.offset is changed to reflect the
+ # new data record position, and filemap is updated to match.
+
+ def _writeobj(self, obj, nfreebytes):
+ size = OBJECT_HEADER_SIZE + obj.size
+ assert size <= nfreebytes
+ excess = nfreebytes - size
+ # If there's any excess (which is likely), we need to record a
+ # free block following the end of the data record. That isn't
+ # expensive -- it's all a contiguous write.
+ if excess == 0:
+ extra = ''
+ elif excess < 5:
+ extra = "01234"[excess]
+ else:
+ extra = 'f' + struct.pack(">I", excess)
+
+ self.f.seek(self.currentofs)
+ self.f.writelines(('a',
+ struct.pack(">I8s8s", size,
+ obj.key[0], obj.key[1])))
+ obj.serialize(self.f)
+ self.f.write(extra)
+ e = Entry(obj.key, self.currentofs)
+ self.key2entry[obj.key] = e
+ self.filemap[self.currentofs] = size, e
+ self.currentofs += size
+ if excess:
+ # We need to record the free block in filemap, but there's
+ # no need to advance currentofs beyond it. Instead it
+ # gives some breathing room for the next object to get
+ # written.
+ self.filemap[self.currentofs] = excess, None
+
+ def add(self, object):
+ size = OBJECT_HEADER_SIZE + object.size
+ if size > self.maxsize:
+ return
+ assert size <= self.maxsize
+
+ assert object.key not in self.key2entry
+ assert len(object.key[0]) == 8
+ assert len(object.key[1]) == 8
+
+ self._n_adds += 1
+ self._n_added_bytes += size
+
+ available = self._makeroom(size)
+ self._writeobj(object, available)
+
+ def _verify_filemap(self, display=False):
+ a = 12
+ f = self.f
+ while a < self.maxsize:
+ f.seek(a)
+ status = f.read(1)
+ if status in 'af':
+ size, = struct.unpack(">I", f.read(4))
+ else:
+ size = int(status)
+ if display:
+ if a == self.currentofs:
+ print '*****',
+ print "%c%d" % (status, size),
+ size2, obj = self.filemap[a]
+ assert size == size2
+ assert (obj is not None) == (status == 'a')
+ if obj is not None:
+ assert obj.offset == a
+ assert self.key2entry[obj.key] is obj
+ a += size
+ if display:
+ print
+ assert a == self.maxsize
+
+ def _evictobj(self, e, size):
+ self._n_evicts += 1
+ self._n_evicted_bytes += size
+ # Load the object header into memory so we know how to
+ # update the parent's in-memory data structures.
+ self.f.seek(e.offset + OBJECT_HEADER_SIZE)
+ o = Object.fromFile(self.f, e.key, header_only=True)
+ self.parent._evicted(o)
+
+ ##
+ # Return object for key or None if not in cache.
+
+ def access(self, key):
+ self._n_accesses += 1
+ e = self.key2entry.get(key)
+ if e is None:
+ return None
+ offset = e.offset
+ size, e2 = self.filemap[offset]
+ assert e is e2
+
+ self.f.seek(offset + OBJECT_HEADER_SIZE)
+ return Object.fromFile(self.f, key)
+
+ ##
+ # Remove object for key from cache, if present.
+
+ def remove(self, key):
+ # If an object is being explicitly removed, we need to load
+ # its header into memory and write a free block marker to the
+ # disk where the object was stored. We need to load the
+ # header to update the in-memory data structures held by
+ # ClientCache.
+
+ # XXX Or we could just keep the header in memory at all times.
+
+ e = self.key2entry.get(key)
+ if e is None:
+ return
+ offset = e.offset
+ size, e2 = self.filemap[offset]
+ self.f.seek(offset + OBJECT_HEADER_SIZE)
+ o = Object.fromFile(self.f, key, header_only=True)
+ self.f.seek(offset + OBJECT_HEADER_SIZE)
+ self.f.write('f')
+ self.f.flush()
+ self.parent._evicted(o)
+ self.filemap[offset] = size, None
+
+ ##
+ # Update on-disk representation of obj.
+ #
+ # This method should be called when the object header is modified.
+
+ def update(self, obj):
+
+ e = self.key2entry[obj.key]
+ self.f.seek(e.offset + OBJECT_HEADER_SIZE)
+ obj.serialize_header(self.f)
+
+ def settid(self, tid):
+ if self.tid is not None:
+ if tid < self.tid:
+ raise ValueError(
+ "new last tid must be greater that previous one")
+ self.tid = tid
+ self.f.seek(4)
+ self.f.write(tid)
+ self.f.flush()
=== ZODB3/ZEO/stats.py 1.21 => 1.22 ===
--- ZODB3/ZEO/stats.py:1.21 Tue Jun 10 13:08:10 2003
+++ ZODB3/ZEO/stats.py Wed Dec 24 11:02:03 2003
@@ -128,15 +128,21 @@
# Read file, gathering statistics, and printing each record if verbose
rt0 = time.time()
+ # bycode -- map code to count of occurrences
bycode = {}
+ # records -- number of records
records = 0
+ # version -- number of records with versions
versions = 0
t0 = te = None
+ # datarecords -- number of records with dlen set
datarecords = 0
datasize = 0L
- file0 = file1 = 0
+ # oids -- maps oid to number of times it was loaded
oids = {}
+ # bysize -- maps data size to number of loads
bysize = {}
+ # bysize -- maps data size to number of writes
bysizew = {}
total_loads = 0
byinterval = {}
@@ -157,12 +163,12 @@
if not quiet:
print "Skipping 8 bytes at offset", offset-8
continue
- r = f_read(10)
+ r = f_read(18)
if len(r) < 10:
break
offset += 10
records += 1
- oidlen, serial = struct_unpack(">H8s", r)
+ oidlen, start_tid, end_tid = struct_unpack(">H8s8s", r)
oid = f_read(oidlen)
if len(oid) != oidlen:
break
@@ -187,11 +193,6 @@
if code & 0x80:
version = 'V'
versions += 1
- current = code & 1
- if current:
- file1 += 1
- else:
- file0 += 1
code = code & 0x7e
bycode[code] = bycode.get(code, 0) + 1
byinterval[code] = byinterval.get(code, 0) + 1
@@ -199,22 +200,23 @@
if code & 0x70 == 0x20: # All loads
bysize[dlen] = d = bysize.get(dlen) or {}
d[oid] = d.get(oid, 0) + 1
- elif code == 0x3A: # Update
+ elif code & 0x70 == 0x50: # All stores
bysizew[dlen] = d = bysizew.get(dlen) or {}
d[oid] = d.get(oid, 0) + 1
if verbose:
- print "%s %d %02x %s %016x %1s %s" % (
+ print "%s %d %02x %s %016x %016x %1s %s" % (
time.ctime(ts)[4:-5],
current,
code,
oid_repr(oid),
- U64(serial),
+ U64(start_tid),
+ U64(end_tid),
version,
dlen and str(dlen) or "")
if code & 0x70 == 0x20:
oids[oid] = oids.get(oid, 0) + 1
total_loads += 1
- if code in (0x00, 0x70):
+ if code == 0x00:
if not quiet:
dumpbyinterval(byinterval, h0, he)
byinterval = {}
@@ -222,10 +224,7 @@
h0 = he = ts
if not quiet:
print time.ctime(ts)[4:-5],
- if code == 0x00:
- print '='*20, "Restart", '='*20
- else:
- print '-'*20, "Flip->%d" % current, '-'*20
+ print '='*20, "Restart", '='*20
except KeyboardInterrupt:
print "\nInterrupted. Stats so far:\n"
@@ -248,8 +247,6 @@
print "First time: %s" % time.ctime(t0)
print "Last time: %s" % time.ctime(te)
print "Duration: %s seconds" % addcommas(te-t0)
- print "File stats: %s in file 0; %s in file 1" % (
- addcommas(file0), addcommas(file1))
print "Data recs: %s (%.1f%%), average size %.1f KB" % (
addcommas(datarecords),
100.0 * datarecords / records,
@@ -314,7 +311,7 @@
if code & 0x70 == 0x20:
n = byinterval[code]
loads += n
- if code in (0x2A, 0x2C, 0x2E):
+ if code in (0x22, 0x26):
hits += n
if not loads:
return
@@ -333,7 +330,7 @@
if code & 0x70 == 0x20:
n = bycode[code]
loads += n
- if code in (0x2A, 0x2C, 0x2E):
+ if code in (0x22, 0x26):
hits += n
if loads:
return 100.0 * hits / loads
@@ -376,31 +373,18 @@
0x00: "_setup_trace (initialization)",
0x10: "invalidate (miss)",
- 0x1A: "invalidate (hit, version, writing 'n')",
- 0x1C: "invalidate (hit, writing 'i')",
+ 0x1A: "invalidate (hit, version)",
+ 0x1C: "invalidate (hit, saving non-current)",
0x20: "load (miss)",
- 0x22: "load (miss, version, status 'n')",
- 0x24: "load (miss, deleting index entry)",
- 0x26: "load (miss, no non-version data)",
- 0x28: "load (miss, version mismatch, no non-version data)",
- 0x2A: "load (hit, returning non-version data)",
- 0x2C: "load (hit, version mismatch, returning non-version data)",
- 0x2E: "load (hit, returning version data)",
-
- 0x3A: "update",
-
- 0x40: "modifiedInVersion (miss)",
- 0x4A: "modifiedInVersion (hit, return None, status 'n')",
- 0x4C: "modifiedInVersion (hit, return '')",
- 0x4E: "modifiedInVersion (hit, return version)",
+ 0x22: "load (hit)",
+ 0x24: "load (non-current, miss)",
+ 0x26: "load (non-current, hit)",
+
+ 0x50: "store (version)",
+ 0x52: "store (current, non-version)",
+ 0x54: "store (non-current)",
- 0x5A: "store (non-version data present)",
- 0x5C: "store (only version data present)",
-
- 0x6A: "_copytocurrent",
-
- 0x70: "checkSize (cache flip)",
}
if __name__ == "__main__":
=== ZODB3/ZEO/StorageServer.py 1.103 => 1.104 ===
--- ZODB3/ZEO/StorageServer.py:1.103 Mon Nov 24 16:27:51 2003
+++ ZODB3/ZEO/StorageServer.py Wed Dec 24 11:02:03 2003
@@ -235,6 +235,14 @@
def getExtensionMethods(self):
return self._extensions
+ def loadEx(self, oid, version):
+ self.stats.loads += 1
+ return self.storage.loadEx(oid, version)
+
+ def loadBefore(self, oid, tid):
+ self.stats.loads += 1
+ return self.storage.loadBefore(oid, tid)
+
def zeoLoad(self, oid):
self.stats.loads += 1
v = self.storage.modifiedInVersion(oid)
@@ -260,12 +268,26 @@
% (len(invlist), u64(invtid)))
return invtid, invlist
+ def verify(self, oid, version, tid):
+ try:
+ t = self.storage.getTid(oid)
+ except KeyError:
+ self.client.invalidateVerify((oid, ""))
+ else:
+ if tid != t:
+ # This will invalidate non-version data when the
+ # client only has invalid version data. Since this is
+ # an uncommon case, we avoid the cost of checking
+ # whether the serial number matches the current
+ # non-version data.
+ self.client.invalidateVerify((oid, version))
+
def zeoVerify(self, oid, s, sv):
if not self.verifying:
self.verifying = 1
self.stats.verifying_clients += 1
try:
- os = self.storage.getSerial(oid)
+ os = self.storage.getTid(oid)
except KeyError:
self.client.invalidateVerify((oid, ''))
# XXX It's not clear what we should do now. The KeyError
@@ -344,7 +366,7 @@
def undoLog(self, first, last):
return run_in_thread(self.storage.undoLog, first, last)
- def tpc_begin(self, id, user, description, ext, tid, status):
+ def tpc_begin(self, id, user, description, ext, tid=None, status=" "):
if self.read_only:
raise ReadOnlyError()
if self.transaction is not None:
@@ -521,25 +543,25 @@
return self.storage.tpc_vote(self.transaction)
def _abortVersion(self, src):
- oids = self.storage.abortVersion(src, self.transaction)
+ tid, oids = self.storage.abortVersion(src, self.transaction)
inv = [(oid, src) for oid in oids]
self.invalidated.extend(inv)
- return oids
+ return tid, oids
def _commitVersion(self, src, dest):
- oids = self.storage.commitVersion(src, dest, self.transaction)
+ tid, oids = self.storage.commitVersion(src, dest, self.transaction)
inv = [(oid, dest) for oid in oids]
self.invalidated.extend(inv)
if dest:
inv = [(oid, src) for oid in oids]
self.invalidated.extend(inv)
- return oids
+ return tid, oids
def _transactionalUndo(self, trans_id):
- oids = self.storage.transactionalUndo(trans_id, self.transaction)
+ tid, oids = self.storage.transactionalUndo(trans_id, self.transaction)
inv = [(oid, None) for oid in oids]
self.invalidated.extend(inv)
- return oids
+ return tid, oids
# When a delayed transaction is restarted, the dance is
# complicated. The restart occurs when one ZEOStorage instance
@@ -853,6 +875,9 @@
if earliest_tid > tid:
log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid)))
return None, []
+
+ # XXX this is wrong! must check against tid or we invalidate
+ # too much.
oids = {}
for tid, L in self.invq:
=== ZODB3/ZEO/ServerStub.py 1.17 => 1.18 ===
--- ZODB3/ZEO/ServerStub.py:1.17 Thu Oct 2 14:17:22 2003
+++ ZODB3/ZEO/ServerStub.py Wed Dec 24 11:02:03 2003
@@ -13,6 +13,18 @@
##############################################################################
"""RPC stubs for interface exported by StorageServer."""
+##
+# ZEO storage server.
+# <p>
+# Remote method calls can be synchronous or asynchronous. If the call
+# is synchronous, the client thread blocks until the call returns. A
+# single client can only have one synchronous request outstanding. If
+# several threads share a single client, threads other than the caller
+# will block only if the attempt to make another synchronous call.
+# An asynchronous call does not cause the client thread to block. An
+# exception raised by an asynchronous method is logged on the server,
+# but is not returned to the client.
+
class StorageServer:
"""An RPC stub class for the interface exported by ClientStorage.
@@ -43,46 +55,173 @@
def extensionMethod(self, name):
return ExtensionMethodWrapper(self.rpc, name).call
+ ##
+ # Register current connection with a storage and a mode.
+ # In effect, it is like an open call.
+ # @param storage_name a string naming the storage. This argument
+ # is primarily for backwards compatibility with servers
+ # that supported multiple storages.
+ # @param read_only boolean
+ # @exception ValueError unknown storage_name or already registered
+ # @exception ReadOnlyError storage is read-only and a read-write
+ # connectio was requested
+
def register(self, storage_name, read_only):
self.rpc.call('register', storage_name, read_only)
+ ##
+ # Return dictionary of meta-data about the storage.
+ # @defreturn dict
+
def get_info(self):
return self.rpc.call('get_info')
+ ##
+ # Check whether the server requires authentication. Returns
+ # the name of the protocol.
+ # @defreturn string
+
def getAuthProtocol(self):
return self.rpc.call('getAuthProtocol')
+ ##
+ # Return id of the last committed transaction
+ # @defreturn string
+
def lastTransaction(self):
# Not in protocol version 2.0.0; see __init__()
return self.rpc.call('lastTransaction')
+ ##
+ # Return invalidations for all transactions after tid.
+ # @param tid transaction id
+ # @defreturn 2-tuple, (tid, list)
+ # @return tuple containing the last committed transaction
+ # and a list of oids that were invalidated. Returns
+ # None and an empty list if the server does not have
+ # the list of oids available.
+
def getInvalidations(self, tid):
# Not in protocol version 2.0.0; see __init__()
return self.rpc.call('getInvalidations', tid)
+ ##
+ # Check whether serial numbers s and sv are current for oid.
+ # If one or both of the serial numbers are not current, the
+ # server will make an asynchronous invalidateVerify() call.
+ # @param oid object id
+ # @param s serial number on non-version data
+ # @param sv serial number of version data or None
+ # @defreturn async
+
def zeoVerify(self, oid, s, sv):
self.rpc.callAsync('zeoVerify', oid, s, sv)
+ ##
+ # Check whether current serial number is valid for oid and version.
+ # If the serial number is not current, the server will make an
+ # asynchronous invalidateVerify() call.
+ # @param oid object id
+ # @param version name of version for oid
+ # @param serial client's current serial number
+ # @defreturn async
+
+ def verify(self, oid, version, serial):
+ self.rpc.callAsync('verify', oid, version, serial)
+
+ ##
+ # Signal to the server that cache verification is done.
+ # @defreturn async
+
def endZeoVerify(self):
self.rpc.callAsync('endZeoVerify')
+ ##
+ # Generate a new set of oids.
+ # @param n number of new oids to return
+ # @defreturn list
+ # @return list of oids
+
def new_oids(self, n=None):
if n is None:
return self.rpc.call('new_oids')
else:
return self.rpc.call('new_oids', n)
+ ##
+ # Pack the storage.
+ # @param t pack time
+ # @param wait optional, boolean. If true, the call will not
+ # return until the pack is complete.
+
def pack(self, t, wait=None):
if wait is None:
self.rpc.call('pack', t)
else:
self.rpc.call('pack', t, wait)
+ ##
+ # Return current data for oid. Version data is returned if
+ # present.
+ # @param oid object id
+ # @defreturn 5-tuple
+ # @return 5-tuple, current non-version data, serial number,
+ # version name, version data, version data serial number
+ # @exception KeyError if oid is not found
+
def zeoLoad(self, oid):
return self.rpc.call('zeoLoad', oid)
+ ##
+ # Return current data for oid along with tid if transaction that
+ # wrote the date.
+ # @param oid object id
+ # @param version string, name of version
+ # @defreturn 4-tuple
+ # @return data, serial number, transaction id, version,
+ # where version is the name of the version the data came
+ # from or "" for non-version data
+ # @exception KeyError if oid is not found
+
+ def loadEx(self, oid, version):
+ return self.rpc.call("loadEx", oid, version)
+
+ ##
+ # Return non-current data along with transaction ids that identify
+ # the lifetime of the specific revision.
+ # @param oid object id
+ # @param tid a transaction id that provides an upper bound on
+ # the lifetime of the revision. That is, loadBefore
+ # returns the revision that was current before tid committed.
+ # @defreturn 4-tuple
+ # @return data, serial numbr, start transaction id, end transaction id
+
+ def loadBefore(self, oid, tid):
+ return self.rpc.call("loadBefore", oid, tid)
+
+ ##
+ # Storage new revision of oid.
+ # @param oid object id
+ # @param serial serial number that this transaction read
+ # @param data new data record for oid
+ # @param version name of version or ""
+ # @param id id of current transaction
+ # @defreturn async
+
def storea(self, oid, serial, data, version, id):
self.rpc.callAsync('storea', oid, serial, data, version, id)
+
+ ##
+ # Start two-phase commit for a transaction
+ # @param id id used by client to identify current transaction. The
+ # only purpose of this argument is to distinguish among multiple
+ # threads using a single ClientStorage.
+ # @param user name of user committing transaction (can be "")
+ # @param description string containing transaction metadata (can be "")
+ # @param ext dictionary of extended metadata (?)
+ # @param tid optional explicit tid to pass to underlying storage
+ # @param status optional status character, e.g "p" for pack
+ # @defreturn async
def tpc_begin(self, id, user, descr, ext, tid, status):
return self.rpc.call('tpc_begin', id, user, descr, ext, tid, status)
=== ZODB3/ZEO/ClientStorage.py 1.112 => 1.113 ===
--- ZODB3/ZEO/ClientStorage.py:1.112 Fri Nov 28 11:44:47 2003
+++ ZODB3/ZEO/ClientStorage.py Wed Dec 24 11:02:03 2003
@@ -26,7 +26,8 @@
import time
import types
-from ZEO import ClientCache, ServerStub
+from ZEO import ServerStub
+from ZEO.cache import ClientCache
from ZEO.TransactionBuffer import TransactionBuffer
from ZEO.Exceptions import ClientStorageError, UnrecognizedResult, \
ClientDisconnected, AuthError
@@ -91,7 +92,7 @@
# Classes we instantiate. A subclass might override.
TransactionBufferClass = TransactionBuffer
- ClientCacheClass = ClientCache.ClientCache
+ ClientCacheClass = ClientCache
ConnectionManagerClass = ConnectionManager
StorageServerStubClass = ServerStub.StorageServer
@@ -252,10 +253,17 @@
self._tbuf = self.TransactionBufferClass()
self._db = None
+ self._ltid = None # the last committed transaction
# _serials: stores (oid, serialno) as returned by server
# _seriald: _check_serials() moves from _serials to _seriald,
# which maps oid to serialno
+
+ # XXX If serial number matches transaction id, then there is
+ # no need to have all this extra infrastructure for handling
+ # serial numbers. The vote call can just return the tid.
+ # If there is a conflict error, we can't have a special method
+ # called just to propagate the error.
self._serials = []
self._seriald = {}
@@ -292,13 +300,15 @@
# is executing.
self._lock = threading.Lock()
- t = self._ts = get_timestamp()
- self._serial = `t`
- self._oid = '\0\0\0\0\0\0\0\0'
-
# Decide whether to use non-temporary files
- self._cache = self.ClientCacheClass(storage, cache_size,
- client=client, var=var)
+ if client is not None:
+ dir = var or os.getcwd()
+ cache_path = os.path.join(dir, "%s-%s.zec" % (client, storage))
+ else:
+ cache_path = None
+ self._cache = self.ClientCacheClass(cache_path)
+ # XXX When should it be opened?
+ self._cache.open()
self._rpc_mgr = self.ConnectionManagerClass(addr, self,
tmin=min_disconnect_poll,
@@ -312,9 +322,6 @@
# doesn't succeed, call connect() to start a thread.
if not self._rpc_mgr.attempt_connect():
self._rpc_mgr.connect()
- # If the connect hasn't occurred, run with cached data.
- if not self._ready.isSet():
- self._cache.open()
def _wait(self, timeout=None):
if timeout is not None:
@@ -555,7 +562,6 @@
if ltid == last_inval_tid:
log2(INFO, "No verification necessary "
"(last_inval_tid up-to-date)")
- self._cache.open()
self._server = server
self._ready.set()
return "no verification"
@@ -569,7 +575,6 @@
pair = server.getInvalidations(last_inval_tid)
if pair is not None:
log2(INFO, "Recovering %d invalidations" % len(pair[1]))
- self._cache.open()
self.invalidateTransaction(*pair)
self._server = server
self._ready.set()
@@ -581,7 +586,9 @@
self._pickler = cPickle.Pickler(self._tfile, 1)
self._pickler.fast = 1 # Don't use the memo
- self._cache.verify(server.zeoVerify)
+ # XXX should batch these operations for efficiency
+ for oid, tid, version in self._cache.contents():
+ server.verify(oid, version, tid)
self._pending_server = server
server.endZeoVerify()
return "full verification"
@@ -600,8 +607,7 @@
This is called by ConnectionManager when the connection is
closed or when certain problems with the connection occur.
"""
- log2(PROBLEM, "Disconnected from storage: %s"
- % repr(self._server_addr))
+ log2(INFO, "Disconnected from storage: %s" % repr(self._server_addr))
self._connection = None
self._ready.clear()
self._server = disconnected_stub
@@ -671,10 +677,10 @@
raise POSException.StorageTransactionError(self._transaction,
trans)
- def abortVersion(self, version, transaction):
+ def abortVersion(self, version, txn):
"""Storage API: clear any changes made by the given version."""
- self._check_trans(transaction)
- oids = self._server.abortVersion(version, self._serial)
+ self._check_trans(txn)
+ tid, oids = self._server.abortVersion(version, id(txn))
# When a version aborts, invalidate the version and
# non-version data. The non-version data should still be
# valid, but older versions of ZODB will change the
@@ -686,28 +692,31 @@
# we could just invalidate the version data.
for oid in oids:
self._tbuf.invalidate(oid, '')
- return oids
+ return tid, oids
- def commitVersion(self, source, destination, transaction):
+ def commitVersion(self, source, destination, txn):
"""Storage API: commit the source version in the destination."""
- self._check_trans(transaction)
- oids = self._server.commitVersion(source, destination, self._serial)
+ self._check_trans(txn)
+ tid, oids = self._server.commitVersion(source, destination, id(txn))
if destination:
# just invalidate our version data
for oid in oids:
self._tbuf.invalidate(oid, source)
else:
- # destination is '', so invalidate version and non-version
+ # destination is "", so invalidate version and non-version
for oid in oids:
- self._tbuf.invalidate(oid, destination)
- return oids
+ self._tbuf.invalidate(oid, "")
+ return tid, oids
- def history(self, oid, version, length=1):
+ def history(self, oid, version, length=1, filter=None):
"""Storage API: return a sequence of HistoryEntry objects.
This does not support the optional filter argument defined by
the Storage API.
"""
+ if filter is not None:
+ log2(WARNING, "filter argument to history() ignored")
+ # XXX should I run filter on the results?
return self._server.history(oid, version, length)
def getSerial(self, oid):
@@ -725,11 +734,14 @@
specified by the given object id and version, if they exist;
otherwise a KeyError is raised.
"""
+ return self.loadEx(oid, version)[:2]
+
+ def loadEx(self, oid, version):
self._lock.acquire() # for atomic processing of invalidations
try:
- pair = self._cache.load(oid, version)
- if pair:
- return pair
+ t = self._cache.load(oid, version)
+ if t:
+ return t
finally:
self._lock.release()
@@ -745,25 +757,55 @@
finally:
self._lock.release()
- p, s, v, pv, sv = self._server.zeoLoad(oid)
+ data, tid, ver = self._server.loadEx(oid, version)
self._lock.acquire() # for atomic processing of invalidations
try:
if self._load_status:
- self._cache.checkSize(0)
- self._cache.store(oid, p, s, v, pv, sv)
+ self._cache.store(oid, ver, tid, None, data)
self._load_oid = None
finally:
self._lock.release()
finally:
self._load_lock.release()
- if v and version and v == version:
- return pv, sv
- else:
- if s:
- return p, s
- raise KeyError, oid # no non-version data for this
+ return data, tid, ver
+
+ def loadBefore(self, oid, tid):
+ self._lock.acquire()
+ try:
+ t = self._cache.loadBefore(oid, tid)
+ if t is not None:
+ return t
+ finally:
+ self._lock.release()
+
+ t = self._server.loadBefore(oid, tid)
+ if t is None:
+ return None
+ data, start, end = t
+ if end is None:
+ # This method should not be used to get current data. It
+ # doesn't use the _load_lock, so it is possble to overlap
+ # this load with an invalidation for the same object.
+
+ # XXX If we call again, we're guaranteed to get the
+ # post-invalidation data. But if the data is still
+ # current, we'll still get end == None.
+
+ # Maybe the best thing to do is to re-run the test with
+ # the load lock in the case. That's slow performance, but
+ # I don't think real application code will ever care about
+ # it.
+
+ return data, start, end
+ self._lock.acquire()
+ try:
+ self._cache.store(oid, "", start, end, data)
+ finally:
+ self._lock.release()
+
+ return data, start, end
def modifiedInVersion(self, oid):
"""Storage API: return the version, if any, that modfied an object.
@@ -815,6 +857,8 @@
def _check_serials(self):
"""Internal helper to move data from _serials to _seriald."""
+ # XXX serials are always going to be the same, the only
+ # question is whether an exception has been raised.
if self._serials:
l = len(self._serials)
r = self._serials[:l]
@@ -825,18 +869,18 @@
self._seriald[oid] = s
return r
- def store(self, oid, serial, data, version, transaction):
+ def store(self, oid, serial, data, version, txn):
"""Storage API: store data for an object."""
- self._check_trans(transaction)
- self._server.storea(oid, serial, data, version, self._serial)
+ self._check_trans(txn)
+ self._server.storea(oid, serial, data, version, id(txn))
self._tbuf.store(oid, version, data)
return self._check_serials()
- def tpc_vote(self, transaction):
+ def tpc_vote(self, txn):
"""Storage API: vote on a transaction."""
- if transaction is not self._transaction:
+ if txn is not self._transaction:
return
- self._server.vote(self._serial)
+ self._server.vote(id(txn))
return self._check_serials()
def tpc_begin(self, txn, tid=None, status=' '):
@@ -856,15 +900,8 @@
self._transaction = txn
self._tpc_cond.release()
- if tid is None:
- self._ts = get_timestamp(self._ts)
- id = `self._ts`
- else:
- self._ts = TimeStamp(tid)
- id = tid
-
try:
- self._server.tpc_begin(id, txn.user, txn.description,
+ self._server.tpc_begin(id(txn), txn.user, txn.description,
txn._extension, tid, status)
except:
# Client may have disconnected during the tpc_begin().
@@ -872,7 +909,6 @@
self.end_transaction()
raise
- self._serial = id
self._tbuf.clear()
self._seriald.clear()
del self._serials[:]
@@ -881,18 +917,17 @@
"""Internal helper to end a transaction."""
# the right way to set self._transaction to None
# calls notify() on _tpc_cond in case there are waiting threads
- self._ltid = self._serial
self._tpc_cond.acquire()
self._transaction = None
self._tpc_cond.notify()
self._tpc_cond.release()
def lastTransaction(self):
- return self._ltid
+ return self._cache.getLastTid()
- def tpc_abort(self, transaction):
+ def tpc_abort(self, txn):
"""Storage API: abort a transaction."""
- if transaction is not self._transaction:
+ if txn is not self._transaction:
return
try:
# XXX Are there any transactions that should prevent an
@@ -900,7 +935,7 @@
# all, yet you want to be sure that other abort logic is
# executed regardless.
try:
- self._server.tpc_abort(self._serial)
+ self._server.tpc_abort(id(txn))
except ClientDisconnected:
log2(BLATHER, 'ClientDisconnected in tpc_abort() ignored')
finally:
@@ -909,9 +944,9 @@
del self._serials[:]
self.end_transaction()
- def tpc_finish(self, transaction, f=None):
+ def tpc_finish(self, txn, f=None):
"""Storage API: finish a transaction."""
- if transaction is not self._transaction:
+ if txn is not self._transaction:
return
self._load_lock.acquire()
try:
@@ -919,15 +954,16 @@
raise ClientDisconnected(
'Calling tpc_finish() on a disconnected transaction')
- tid = self._server.tpc_finish(self._serial)
+ tid = self._server.tpc_finish(id(txn))
self._lock.acquire() # for atomic processing of invalidations
try:
- self._update_cache()
+ self._update_cache(tid)
if f is not None:
- f()
+ f(tid)
finally:
self._lock.release()
+ # XXX Shouldn't this cache call be made while holding the lock?
self._cache.setLastTid(tid)
r = self._check_serials()
@@ -936,7 +972,7 @@
self._load_lock.release()
self.end_transaction()
- def _update_cache(self):
+ def _update_cache(self, tid):
"""Internal helper to handle objects modified by a transaction.
This iterates over the objects in the transaction buffer and
@@ -949,7 +985,6 @@
if self._cache is None:
return
- self._cache.checkSize(self._tbuf.get_size())
try:
self._tbuf.begin_iterate()
except ValueError, msg:
@@ -965,18 +1000,17 @@
"client storage: %s" % msg)
if t is None:
break
- oid, v, p = t
- if p is None: # an invalidation
- s = None
- else:
+ oid, version, data = t
+ self._cache.invalidate(oid, version, tid)
+ # If data is None, we just invalidate.
+ if data is not None:
s = self._seriald[oid]
- if s == ResolvedSerial or s is None:
- self._cache.invalidate(oid, v)
- else:
- self._cache.update(oid, s, v, p)
+ if s != ResolvedSerial:
+ assert s == tid, (s, tid)
+ self._cache.store(oid, version, s, None, data)
self._tbuf.clear()
- def transactionalUndo(self, trans_id, trans):
+ def transactionalUndo(self, trans_id, txn):
"""Storage API: undo a transaction.
This is executed in a transactional context. It has no effect
@@ -985,24 +1019,11 @@
Zope uses this to implement undo unless it is not supported by
a storage.
"""
- self._check_trans(trans)
- oids = self._server.transactionalUndo(trans_id, self._serial)
+ self._check_trans(txn)
+ tid, oids = self._server.transactionalUndo(trans_id, id(txn))
for oid in oids:
self._tbuf.invalidate(oid, '')
- return oids
-
- def undo(self, transaction_id):
- """Storage API: undo a transaction, writing directly to the storage."""
- if self._is_read_only:
- raise POSException.ReadOnlyError()
- oids = self._server.undo(transaction_id)
- self._lock.acquire()
- try:
- for oid in oids:
- self._cache.invalidate(oid, '')
- finally:
- self._lock.release()
- return oids
+ return tid, oids
def undoInfo(self, first=0, last=-20, specification=None):
"""Storage API: return undo information."""
@@ -1059,15 +1080,15 @@
try:
# versions maps version names to dictionary of invalidations
versions = {}
- for oid, version in invs:
+ for oid, version, tid in invs:
if oid == self._load_oid:
self._load_status = 0
- self._cache.invalidate(oid, version=version)
- versions.setdefault(version, {})[oid] = 1
+ self._cache.invalidate(oid, version, tid)
+ versions.setdefault((version, tid), {})[oid] = tid
if self._db is not None:
- for v, d in versions.items():
- self._db.invalidate(d, version=v)
+ for (version, tid), d in versions.items():
+ self._db.invalidate(tid, d, version=version)
finally:
self._lock.release()
@@ -1099,7 +1120,8 @@
for t in args:
self._pickler.dump(t)
return
- self._process_invalidations(args)
+ self._process_invalidations([(oid, version, tid)
+ for oid, version in args])
# The following are for compatibility with protocol version 2.0.0
@@ -1110,36 +1132,10 @@
end = endVerify
Invalidate = invalidateTrans
-try:
- StopIteration
-except NameError:
- class StopIteration(Exception):
- pass
-
-class InvalidationLogIterator:
- """Helper class for reading invalidations in endVerify."""
-
- def __init__(self, fileobj):
- self._unpickler = cPickle.Unpickler(fileobj)
- self.getitem_i = 0
-
- def __iter__(self):
- return self
-
- def next(self):
- oid, version = self._unpickler.load()
+def InvalidationLogIterator(fileobj):
+ unpickler = cPickle.Unpickler(fileobj)
+ while 1:
+ oid, version = unpickler.load()
if oid is None:
- raise StopIteration
- return oid, version
-
- # The __getitem__() method is needed to support iteration
- # in Python 2.1.
-
- def __getitem__(self, i):
- assert i == self.getitem_i
- try:
- obj = self.next()
- except StopIteration:
- raise IndexError, i
- self.getitem_i += 1
- return obj
+ break
+ yield oid, version, None
=== Removed File ZODB3/ZEO/ICache.py ===
=== Removed File ZODB3/ZEO/ClientCache.py ===
More information about the Zodb-checkins
mailing list