[Zodb-checkins] CVS: ZODB4/src/zodb/zeo - cache.py:1.4.24.1
Jeremy Hylton
jeremy at zope.com
Wed Jun 18 17:54:40 EDT 2003
Update of /cvs-repository/ZODB4/src/zodb/zeo
In directory cvs.zope.org:/tmp/cvs-serv28292
Modified Files:
Tag: ZODB3-2-merge
cache.py
Log Message:
Backport the new zeo cache code with some small improvements.
=== ZODB4/src/zodb/zeo/cache.py 1.4 => 1.4.24.1 ===
--- ZODB4/src/zodb/zeo/cache.py:1.4 Thu Mar 13 16:32:30 2003
+++ ZODB4/src/zodb/zeo/cache.py Wed Jun 18 16:54:39 2003
@@ -13,10 +13,11 @@
##############################################################################
# XXX TO DO
-# use two indices rather than the sign bit of the index??????
-# add a shared routine to read + verify a record???
-# redesign header to include vdlen???
-# rewrite the cache using a different algorithm???
+# Add a shared routine to read + verify a record. Have that routine
+# return a record object rather than a string.
+# Use two indices rather than the sign bit of the index??????
+# Redesign header to include vdlen???
+# Rewrite the cache using a different algorithm???
"""Implement a client cache
@@ -44,7 +45,9 @@
offset in record: name -- description
- 0: oid -- 8-byte object id
+ 0: oidlen -- 2-byte unsigned object id length
+
+ 2: reserved (6 bytes)
8: status -- 1-byte status 'v': valid, 'n': non-version valid, 'i': invalid
('n' means only the non-version data in the record is valid)
@@ -57,23 +60,25 @@
19: serial -- 8-byte non-version serial (timestamp)
- 27: data -- non-version data
+ 27: oid -- object id
+
+ 27+oidlen: data -- non-version data
- 27+dlen: version -- Version string (if vlen > 0)
+ 27+oidlen+dlen: version -- Version string (if vlen > 0)
- 27+dlen+vlen: vdlen -- 4-byte length of version data (if vlen > 0)
+ 27+oidlen+dlen+vlen: vdlen -- 4-byte length of version data (if vlen > 0)
- 31+dlen+vlen: vdata -- version data (if vlen > 0)
+ 31+oidlen+dlen+vlen: vdata -- version data (if vlen > 0)
- 31+dlen+vlen+vdlen: vserial -- 8-byte version serial (timestamp)
+ 31+oidlen+dlen+vlen+vdlen: vserial -- 8-byte version serial (timestamp)
(if vlen > 0)
- 27+dlen (if vlen == 0) **or**
- 39+dlen+vlen+vdlen: tlen -- 4-byte (unsigned) record length (for
- redundancy and backward traversal)
+ 27+oidlen+dlen (if vlen == 0) **or**
+ 39+oidlen+dlen+vlen+vdlen: tlen -- 4-byte (unsigned) record length (for
+ redundancy and backward traversal)
- 31+dlen (if vlen == 0) **or**
- 43+dlen+vlen+vdlen: -- total record length (equal to tlen)
+ 31+oidlen+dlen (if vlen == 0) **or**
+ 43+oidlen+dlen+vlen+vdlen: -- total record length (equal to tlen)
There is a cache size limit.
@@ -105,7 +110,6 @@
file 0 and file 1.
"""
-import logging
import os
import time
import logging
@@ -114,9 +118,9 @@
from thread import allocate_lock
from zodb.utils import u64
-from zodb.interfaces import ZERO
+from zodb.interfaces import ZERO, _fmt_oid
-magic = 'ZEC1'
+magic = 'ZEC2'
headersize = 12
MB = 1024**2
@@ -158,15 +162,13 @@
if os.path.exists(p[i]):
fi = open(p[i],'r+b')
if fi.read(4) == magic: # Minimal sanity
- fi.seek(0, 2)
- if fi.tell() > headersize:
- # Read serial at offset 19 of first record
- fi.seek(headersize + 19)
- s[i] = fi.read(8)
+ # Read the ltid for this file. If it never
+ # saw a transaction commit, it will get tossed,
+ # even if it has valid data.
+ s[i] = fi.read(8)
# If we found a non-zero serial, then use the file
if s[i] != ZERO:
f[i] = fi
- fi = None
# Whoever has the larger serial is the current
if s[1] > s[0]:
@@ -186,11 +188,16 @@
self._p = p = [None, None]
f[0].write(magic + '\0' * (headersize - len(magic)))
current = 0
+ self._current = current
- self.log("%s: storage=%r, size=%r; file[%r]=%r",
- self.__class__.__name__, storage, size, current, p[current])
+ if self._ltid:
+ ts = "; last txn=%x" % u64(self._ltid)
+ else:
+ ts = ""
+ self.log("%s: storage=%r, size=%r; file[%r]=%r%s" %
+ (self.__class__.__name__, storage, size, current, p[current],
+ ts))
- self._current = current
self._setup_trace()
def open(self):
@@ -224,6 +231,18 @@
except OSError:
pass
+ def _read_header(self, f, pos):
+ # Read record header from f at pos, returning header and oid.
+ f.seek(pos)
+ h = f.read(27)
+ if len(h) != 27:
+ self.log("_read_header: short record at %s in %s", pos, f.name)
+ return None, None
+ oidlen = unpack(">H", h[:2])[0]
+ oid = f.read(oidlen)
+ return h, oid
+
+
def getLastTid(self):
"""Get the last transaction id stored by setLastTid().
@@ -243,7 +262,7 @@
f = self._f[self._current]
f.seek(4)
tid = f.read(8)
- if len(tid) < 8 or tid == '\0\0\0\0\0\0\0\0':
+ if len(tid) < 8 or tid == ZERO:
return None
else:
return tid
@@ -255,7 +274,7 @@
cache file; otherwise it's an instance variable.
"""
if self._client is None:
- if tid == '\0\0\0\0\0\0\0\0':
+ if tid == ZERO:
tid = None
self._ltid = tid
else:
@@ -267,7 +286,7 @@
def _setLastTid(self, tid):
if tid is None:
- tid = '\0\0\0\0\0\0\0\0'
+ tid = ZERO
else:
tid = str(tid)
assert len(tid) == 8
@@ -292,18 +311,14 @@
return None
f = self._f[p < 0]
ap = abs(p)
- f.seek(ap)
- h = f.read(27)
- if len(h) != 27:
- self.log("invalidate: short record for oid %16x "
- "at position %d in cache file %d",
- U64(oid), ap, p < 0)
+ h, rec_oid = self._read_header(f, ap)
+ if h is None:
del self._index[oid]
return None
- if h[:8] != oid:
- self.log("invalidate: oid mismatch: expected %16x read %16x "
+ if rec_oid != oid:
+ self.log("invalidate: oid mismatch: expected %s read %s "
"at position %d in cache file %d",
- U64(oid), U64(h[:8]), ap, p < 0)
+ _fmt_oid(oid), _fmt_oid(rec_oid), ap, p < 0)
del self._index[oid]
return None
f.seek(ap+8) # Switch from reading to writing
@@ -329,16 +344,18 @@
ap = abs(p)
seek = f.seek
read = f.read
- seek(ap)
- h = read(27)
- if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
+ h, rec_oid = self._read_header(f, ap)
+ if h is None:
+ del self._index[oid]
+ return None
+ if len(h) == 27 and h[8] in 'nv' and rec_oid == oid:
tlen, vlen, dlen = unpack(">iHi", h[9:19])
else:
tlen = -1
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
- self.log("load: bad record for oid %16x "
+ self.log("load: bad record for oid %s "
"at position %d in cache file %d",
- U64(oid), ap, p < 0)
+ _fmt_oid(oid), ap, p < 0)
del self._index[oid]
return None
@@ -357,7 +374,7 @@
data = read(dlen)
self._trace(0x2A, oid, version, h[19:], dlen)
if (p < 0) != self._current:
- self._copytocurrent(ap, tlen, dlen, vlen, h, data)
+ self._copytocurrent(ap, tlen, dlen, vlen, h, oid, data)
return data, h[19:]
else:
self._trace(0x26, oid, version)
@@ -369,12 +386,12 @@
v = vheader[:-4]
if version != v:
if dlen:
- seek(ap+27)
+ seek(ap+27+len(oid))
data = read(dlen)
self._trace(0x2C, oid, version, h[19:], dlen)
if (p < 0) != self._current:
self._copytocurrent(ap, tlen, dlen, vlen, h,
- data, vheader)
+ oid, data, vheader)
return data, h[19:]
else:
self._trace(0x28, oid, version)
@@ -386,12 +403,12 @@
self._trace(0x2E, oid, version, vserial, vdlen)
if (p < 0) != self._current:
self._copytocurrent(ap, tlen, dlen, vlen, h,
- None, vheader, vdata, vserial)
+ oid, None, vheader, vdata, vserial)
return vdata, vserial
finally:
self._release()
- def _copytocurrent(self, pos, tlen, dlen, vlen, header,
+ def _copytocurrent(self, pos, tlen, dlen, vlen, header, oid,
data=None, vheader=None, vdata=None, vserial=None):
"""Copy a cache hit from the non-current file to the current file.
@@ -402,29 +419,31 @@
if self._pos + tlen > self._limit:
return # Don't let this cause a cache flip
assert len(header) == 27
+ oidlen = len(oid)
if header[8] == 'n':
# Rewrite the header to drop the version data.
# This shortens the record.
- tlen = 31 + dlen
+ tlen = 31 + oidlen + dlen
vlen = 0
- # (oid:8, status:1, tlen:4, vlen:2, dlen:4, serial:8)
+ # (oidlen:2, reserved:6, status:1, tlen:4,
+ # vlen:2, dlen:4, serial:8)
header = header[:9] + pack(">IHI", tlen, vlen, dlen) + header[-8:]
else:
assert header[8] == 'v'
f = self._f[not self._current]
if data is None:
- f.seek(pos+27)
+ f.seek(pos + 27 + len(oid))
data = f.read(dlen)
if len(data) != dlen:
return
- l = [header, data]
+ l = [header, oid, data]
if vlen:
assert vheader is not None
l.append(vheader)
assert (vdata is None) == (vserial is None)
if vdata is None:
vdlen = unpack(">I", vheader[-4:])[0]
- f.seek(pos+27+dlen+vlen+4)
+ f.seek(pos + 27 + len(oid) + dlen + vlen + 4)
vdata = f.read(vdlen)
if len(vdata) != vdlen:
return
@@ -440,13 +459,12 @@
g.seek(self._pos)
g.writelines(l)
assert g.tell() == self._pos + tlen
- oid = header[:8]
if self._current:
self._index[oid] = - self._pos
else:
self._index[oid] = self._pos
self._pos += tlen
- self._trace(0x6A, header[:8], vlen and vheader[:-4] or '',
+ self._trace(0x6A, oid, vlen and vheader[:-4] or '',
vlen and vserial or header[-8:], dlen)
def update(self, oid, serial, version, data, refs):
@@ -462,9 +480,11 @@
ap = abs(p)
seek = f.seek
read = f.read
- seek(ap)
- h = read(27)
- if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
+ h, rec_oid = self._read_header(f, ap)
+ if h is None:
+ del self._index[oid]
+ return None
+ if len(h)==27 and h[8] in 'nv' and rec_oid == oid:
tlen, vlen, dlen = unpack(">iHi", h[9:19])
else:
return self._store(oid, '', '', version, data, serial)
@@ -500,16 +520,19 @@
ap = abs(p)
seek = f.seek
read = f.read
- seek(ap)
- h = read(27)
- if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
+ h, rec_oid = self._read_header(f, ap)
+ if h is None:
+ del self._index[oid]
+ return None
+
+ if len(h) == 27 and h[8] in 'nv' and rec_oid == oid:
tlen, vlen, dlen = unpack(">iHi", h[9:19])
else:
tlen = -1
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
- self.log("modifiedInVersion: bad record for oid %16x "
+ self.log("modifiedInVersion: bad record for oid %s "
"at position %d in cache file %d",
- U64(oid), ap, p < 0)
+ _fmt_oid(oid), ap, p < 0)
del self._index[oid]
return None
@@ -581,7 +604,7 @@
if not s:
p = ''
s = ZERO
- tlen = 31 + len(p)
+ tlen = 31 + len(oid) + len(p)
if version:
tlen = tlen + len(version) + 12 + len(pv)
vlen = len(version)
@@ -590,7 +613,11 @@
stlen = pack(">I", tlen)
# accumulate various data to write into a list
- l = [oid, 'v', stlen, pack(">HI", vlen, len(p)), s]
+ assert len(oid) < 2**16
+ assert vlen < 2**16
+ assert tlen < 2L**32
+ l = [pack(">H6x", len(oid)), 'v', stlen,
+ pack(">HI", vlen, len(p)), s, oid]
if p:
l.append(p)
if version:
@@ -643,11 +670,11 @@
if version:
code |= 0x80
self._tracefile.write(
- struct_pack(">ii8s8s",
+ struct_pack(">iiH8s",
time_time(),
(dlen+255) & 0x7fffff00 | code | self._current,
- oid,
- serial))
+ len(oid),
+ serial) + oid)
def read_index(self, serial, fileindex):
index = self._index
@@ -658,9 +685,8 @@
count = 0
while 1:
- f.seek(pos)
- h = read(27)
- if len(h) != 27:
+ h, oid = self._read_header(f, pos)
+ if h is None:
# An empty read is expected, anything else is suspect
if h:
self.rilog("truncated header", pos, fileindex)
@@ -674,8 +700,6 @@
self.rilog("invalid header data", pos, fileindex)
break
- oid = h[:8]
-
if h[8] == 'v' and vlen:
seek(dlen+vlen, 1)
vdlen = read(4)
@@ -683,7 +707,7 @@
self.rilog("truncated record", pos, fileindex)
break
vdlen = unpack(">i", vdlen)[0]
- if vlen+dlen+43+vdlen != tlen:
+ if vlen + dlen + 43 + len(oid) + vdlen != tlen:
self.rilog("inconsistent lengths", pos, fileindex)
break
seek(vdlen, 1)
@@ -693,7 +717,7 @@
break
else:
if h[8] in 'vn' and vlen == 0:
- if dlen+31 != tlen:
+ if dlen + len(oid) + 31 != tlen:
self.rilog("inconsistent nv lengths", pos, fileindex)
seek(dlen, 1)
if read(4) != h[9:13]:
More information about the Zodb-checkins
mailing list