[Zodb-checkins] CVS: Packages/ZEO - TransactionBuffer.py:1.1.2.1 ClientStorage.py:1.26.4.6
jeremy@digicool.com
jeremy@digicool.com
Mon, 2 Apr 2001 19:16:14 -0400 (EDT)
Update of /cvs-repository/Packages/ZEO
In directory korak:/tmp/cvs-serv29215
Modified Files:
Tag: ZEO-ZRPC-Dev
ClientStorage.py
Added Files:
Tag: ZEO-ZRPC-Dev
TransactionBuffer.py
Log Message:
Replace explicit management of TemporaryFile with TransactionBuffer
--- Added File TransactionBuffer.py in package Packages/ZEO ---
"""A TransactionBuffer store transaction updates until commit or abort.
A transaction may generate enough data that it is not practical to
always hold pending updates in memory. Instead, a TransactionBuffer
is used to store the data until a commit or abort.
"""
# XXX Figure out what a sensible storage format is
# XXX A faster implementation might store trans data in memory until
# it reaches a certain size.
import tempfile
import marshal
class TransactionBuffer:
def __init__(self):
self.file = tempfile.TemporaryFile()
self.count = 0
self.size = 0
def store(self, oid, version, data):
"""Store oid, version, data for later retrieval"""
marshal.dump((oid, version, data), self.file)
self.count = self.count + 1
# Estimate per-record cache size
self.size = self.size + len(data) + (27 + 12)
if version:
self.size = self.size + len(version) + 4
def clear(self):
"""Mark the buffer as empty"""
self.file.seek(0)
self.count = 0
self.size = 0
# XXX unchecked constraints:
# 1. can't call store() after begin_iterate()
# 2. must call clear() after iteration finishes
def begin_iterate(self):
"""Move the file pointer in advance of iteration"""
self.file.flush()
self.file.seek(0)
def next(self):
"""Return next tuple of data or None if EOF"""
if not self.count:
return None
try:
oid_ver_data = marshal.load(self.file)
except EOFError:
return None
self.count = self.count - 1
return oid_ver_data
def get_size(self):
"""Return size of data stored in buffer (just a hint)."""
return self.size
--- Updated File ClientStorage.py in package Packages/ZEO --
--- ClientStorage.py 2001/03/30 21:16:02 1.26.4.5
+++ ClientStorage.py 2001/04/02 23:16:12 1.26.4.6
@@ -90,13 +90,14 @@
import tempfile, ExtensionClass, thread
import zrpc2
import ServerStub
+from TransactionBuffer import TransactionBuffer
import cPickle
from struct import pack, unpack
from ZODB import POSException, BaseStorage
from ZODB.TimeStamp import TimeStamp
-from zLOG import LOG, PROBLEM, INFO
+from zLOG import LOG, PROBLEM, INFO, BLATHER
import sys
from types import TupleType
@@ -110,7 +111,7 @@
class ClientDisconnected(ClientStorageError):
"""The database storage is disconnected from the storage."""
-class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
+class ClientStorage(BaseStorage.BaseStorage):
__super_init = BaseStorage.BaseStorage.__init__
_server = None
@@ -133,7 +134,7 @@
name = name or str(addr)
- self._tfile = tempfile.TemporaryFile()
+ self._tbuf = TransactionBuffer()
self._oids = []
# XXX It's confusing to have _serial, _serials, and _seriald.
self._serials = []
@@ -270,13 +271,12 @@
def load(self, oid, version, _stuff=None):
self._lock_acquire()
try:
- cache = self._cache
- p = cache.load(oid, version)
+ p = self._cache.load(oid, version)
if p:
return p
p, s, v, pv, sv = self._server.zeoLoad(oid)
- cache.checkSize(0)
- cache.store(oid, p, s, v, pv, sv)
+ self._cache.checkSize(0)
+ self._cache.store(oid, p, s, v, pv, sv)
if not v or not version or version != v:
if s:
return p, s
@@ -338,11 +338,7 @@
# sendMessage and then return it if _serials was None.
# But sendMessage always returned None.
self._server.storea(oid, serial, data, version, self._serial)
-
- write = self._tfile.write
- write(oid+pack(">HI", len(version), len(data))+version)
- write(data)
-
+ self._tbuf.store(oid, version, data)
return self._check_serials()
finally:
self._lock_release()
@@ -370,7 +366,7 @@
return
self._server.tpc_abort(self._serial)
self._transaction = None
- self._tfile.seek(0)
+ self._tbuf.clear()
self._seriald.clear()
del self._serials[:]
self._commit_lock_release()
@@ -412,7 +408,8 @@
# We have *BOTH* the local and distributed commit
# lock, now we can actually get ready to get started.
self._serial = id
- self._tfile.seek(0)
+## # _tbuf should always be in the clear state
+## self._tfile.seek(0)
self._seriald.clear()
del self._serials[:]
@@ -436,37 +433,25 @@
seriald=self._seriald
r = self._check_serials()
+
+ self._cache.checkSize(self._tbuf.get_size())
- tfile=self._tfile
- seek=tfile.seek
- read=tfile.read
- cache=self._cache
- update=cache.update
- size=tfile.tell()
- cache.checkSize(size)
- seek(0)
- i=0
- while i < size:
- oid=read(8)
+ self._tbuf.begin_iterate()
+ while 1:
try:
- s=seriald[oid]
- except KeyError:
- print "failed to find oid: %s" % repr(oid)
- raise
- h=read(6)
- vlen, dlen = unpack(">HI", h)
- if vlen: v=read(vlen)
- else: v=''
- p=read(dlen)
- if len(p) != dlen:
+ t = self._tbuf.next()
+ except ValueError, msg:
raise ClientStorageError, (
- "Unexpected end of file in client storage "
- "temporary file."
- )
- update(oid, s, v, p)
- i=i+14+vlen+dlen
-
- seek(0)
+ "Unexpected error reading temporary file in "
+ "client storage: %s" % msg)
+ if t is None:
+ break
+ oid, v, p = t
+ LOG("tbuf", BLATHER, "oid=%s v=%s len(p)=%d" % (
+ repr(oid), repr(v), len(p)))
+ s = seriald[oid]
+ self._cache.update(oid, s, v, p)
+ self._tbuf.clear()
self._transaction=None
self._commit_lock_release()
@@ -539,13 +524,13 @@
if self._pickler is None:
return
self._pickler.dump((0,0))
- self._pickler.dump = None
+## self._pickler.dump = None
self._tfile.seek(0)
- load = cPickle.Unpickler(self._tfile).load
+ unpick = cPickle.Unpickler(self._tfile)
self._tfile = None
while 1:
- oid, version = load()
+ oid, version = unpick.load()
if not oid:
break
self._cache.invalidate(oid, version=version)