[Zodb-checkins] CVS: Packages/ZEO - TransactionBuffer.py:1.1.2.1 ClientStorage.py:1.26.4.6

jeremy@digicool.com jeremy@digicool.com
Mon, 2 Apr 2001 19:16:14 -0400 (EDT)


Update of /cvs-repository/Packages/ZEO
In directory korak:/tmp/cvs-serv29215

Modified Files:
      Tag: ZEO-ZRPC-Dev
	ClientStorage.py 
Added Files:
      Tag: ZEO-ZRPC-Dev
	TransactionBuffer.py 
Log Message:
Replace explicit management of TemporaryFile with TransactionBuffer



--- Added File TransactionBuffer.py in package Packages/ZEO ---
"""A TransactionBuffer store transaction updates until commit or abort.

A transaction may generate enough data that it is not practical to
always hold pending updates in memory.  Instead, a TransactionBuffer
is used to store the data until a commit or abort.
"""

# XXX Figure out what a sensible storage format is

# XXX A faster implementation might store trans data in memory until
# it reaches a certain size.

import tempfile
import marshal

class TransactionBuffer:
    def __init__(self):
        self.file = tempfile.TemporaryFile()
        self.count = 0
        self.size = 0

    def store(self, oid, version, data):
        """Store oid, version, data for later retrieval"""
        marshal.dump((oid, version, data), self.file)
        self.count = self.count + 1
        # Estimate per-record cache size
        self.size = self.size + len(data) + (27 + 12)
        if version:
            self.size = self.size + len(version) + 4

    def clear(self):
        """Mark the buffer as empty"""
        self.file.seek(0)
        self.count = 0
        self.size = 0

    # XXX unchecked constraints:
    # 1. can't call store() after begin_iterate()
    # 2. must call clear() after iteration finishes

    def begin_iterate(self):
        """Move the file pointer in advance of iteration"""
        self.file.flush()
        self.file.seek(0)

    def next(self):
        """Return next tuple of data or None if EOF"""
        if not self.count:
            return None
        try:
            oid_ver_data = marshal.load(self.file)
        except EOFError:
            return None
        self.count = self.count - 1
        return oid_ver_data
            
    def get_size(self):
        """Return size of data stored in buffer (just a hint)."""
    
        return self.size

--- Updated File ClientStorage.py in package Packages/ZEO --
--- ClientStorage.py	2001/03/30 21:16:02	1.26.4.5
+++ ClientStorage.py	2001/04/02 23:16:12	1.26.4.6
@@ -90,13 +90,14 @@
 import tempfile, ExtensionClass, thread
 import zrpc2
 import ServerStub
+from TransactionBuffer import TransactionBuffer
 
 import cPickle
 
 from struct import pack, unpack
 from ZODB import POSException, BaseStorage
 from ZODB.TimeStamp import TimeStamp
-from zLOG import LOG, PROBLEM, INFO
+from zLOG import LOG, PROBLEM, INFO, BLATHER
 
 import sys
 from types import TupleType
@@ -110,7 +111,7 @@
 class ClientDisconnected(ClientStorageError):
     """The database storage is disconnected from the storage."""
 
-class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
+class ClientStorage(BaseStorage.BaseStorage):
     __super_init = BaseStorage.BaseStorage.__init__
 
     _server = None
@@ -133,7 +134,7 @@
 
         name = name or str(addr)
 
-        self._tfile = tempfile.TemporaryFile()
+        self._tbuf = TransactionBuffer()
         self._oids = []
         # XXX It's confusing to have _serial, _serials, and _seriald. 
         self._serials = []
@@ -270,13 +271,12 @@
     def load(self, oid, version, _stuff=None):
         self._lock_acquire()
         try:
-            cache = self._cache
-            p = cache.load(oid, version)
+            p = self._cache.load(oid, version)
             if p:
                 return p
             p, s, v, pv, sv = self._server.zeoLoad(oid)
-            cache.checkSize(0)
-            cache.store(oid, p, s, v, pv, sv)
+            self._cache.checkSize(0)
+            self._cache.store(oid, p, s, v, pv, sv)
             if not v or not version or version != v:
                 if s:
                     return p, s
@@ -338,11 +338,7 @@
             # sendMessage and then return it if _serials was None.
             # But sendMessage always returned None.
             self._server.storea(oid, serial, data, version, self._serial) 
-            
-            write = self._tfile.write
-            write(oid+pack(">HI", len(version), len(data))+version)
-            write(data)
-
+            self._tbuf.store(oid, version, data)
             return self._check_serials()
         finally:
             self._lock_release()
@@ -370,7 +366,7 @@
                 return
             self._server.tpc_abort(self._serial)
             self._transaction = None
-            self._tfile.seek(0)
+            self._tbuf.clear()
             self._seriald.clear()
             del self._serials[:]
             self._commit_lock_release()
@@ -412,7 +408,8 @@
             # We have *BOTH* the local and distributed commit
             # lock, now we can actually get ready to get started.
             self._serial = id
-            self._tfile.seek(0)
+##            # _tbuf should always be in the clear state
+##            self._tfile.seek(0)
             self._seriald.clear()
             del self._serials[:]
 
@@ -436,37 +433,25 @@
 
             seriald=self._seriald
             r = self._check_serials()
+
+            self._cache.checkSize(self._tbuf.get_size())
 
-            tfile=self._tfile
-            seek=tfile.seek
-            read=tfile.read
-            cache=self._cache
-            update=cache.update
-            size=tfile.tell()
-            cache.checkSize(size)
-            seek(0)
-            i=0
-            while i < size:
-                oid=read(8)
+            self._tbuf.begin_iterate()
+            while 1:
                 try:
-                    s=seriald[oid]
-                except KeyError:
-                    print "failed to find oid: %s" % repr(oid)
-                    raise
-                h=read(6)
-                vlen, dlen = unpack(">HI", h)
-                if vlen: v=read(vlen)
-                else: v=''
-                p=read(dlen)
-                if len(p) != dlen:
+                    t = self._tbuf.next()
+                except ValueError, msg:
                     raise ClientStorageError, (
-                        "Unexpected end of file in client storage "
-                        "temporary file."
-                        )
-                update(oid, s, v, p)
-                i=i+14+vlen+dlen
-
-            seek(0)
+                        "Unexpected error reading temporary file in "
+                        "client storage: %s" % msg)
+                if t is None:
+                    break
+                oid, v, p = t
+                LOG("tbuf", BLATHER, "oid=%s v=%s len(p)=%d" % (
+                    repr(oid), repr(v), len(p)))
+                s = seriald[oid]
+                self._cache.update(oid, s, v, p)
+            self._tbuf.clear()
 
             self._transaction=None
             self._commit_lock_release()
@@ -539,13 +524,13 @@
         if self._pickler is None:
             return
         self._pickler.dump((0,0))
-        self._pickler.dump = None
+##        self._pickler.dump = None
         self._tfile.seek(0)
-        load = cPickle.Unpickler(self._tfile).load
+        unpick = cPickle.Unpickler(self._tfile)
         self._tfile = None
 
         while 1:
-            oid, version = load()
+            oid, version = unpick.load()
             if not oid:
                 break
             self._cache.invalidate(oid, version=version)