[Zodb-checkins] SVN: ZODB/branches/jim-readCurrent/src/ZEO/ Updated checkCurrentSerialInTransaction for ZEO.

Jim Fulton jim at zope.com
Tue Aug 31 15:48:33 EDT 2010

Log message for revision 116068:
  Updated checkCurrentSerialInTransaction for ZEO.
  Also did some storage server implementation cleanup, removing some
  dups and folding a small module into StorageServer.py.

  U   ZODB/branches/jim-readCurrent/src/ZEO/ClientStorage.py
  D   ZODB/branches/jim-readCurrent/src/ZEO/CommitLog.py
  D   ZODB/branches/jim-readCurrent/src/ZEO/README.txt
  U   ZODB/branches/jim-readCurrent/src/ZEO/ServerStub.py
  U   ZODB/branches/jim-readCurrent/src/ZEO/StorageServer.py

Modified: ZODB/branches/jim-readCurrent/src/ZEO/ClientStorage.py
--- ZODB/branches/jim-readCurrent/src/ZEO/ClientStorage.py	2010-08-31 19:48:31 UTC (rev 116067)
+++ ZODB/branches/jim-readCurrent/src/ZEO/ClientStorage.py	2010-08-31 19:48:33 UTC (rev 116068)
@@ -931,6 +931,11 @@
         self._tbuf.store(oid, data)
         return self._check_serials()
+    def checkCurrentSerialInTransaction(self, oid, serial, transaction):
+        self._check_trans(transaction)
+        self._server.checkCurrentSerialInTransaction(oid, serial,
+                                                     id(transaction))
     def storeBlob(self, oid, serial, data, blobfilename, version, txn):
         """Storage API: store a blob object."""
         assert not version

Deleted: ZODB/branches/jim-readCurrent/src/ZEO/CommitLog.py
--- ZODB/branches/jim-readCurrent/src/ZEO/CommitLog.py	2010-08-31 19:48:31 UTC (rev 116067)
+++ ZODB/branches/jim-readCurrent/src/ZEO/CommitLog.py	2010-08-31 19:48:33 UTC (rev 116068)
@@ -1,64 +0,0 @@
-# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
-# All Rights Reserved.
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-"""Log a transaction's commit info during two-phase commit.
-A storage server allows multiple clients to commit transactions, but
-must serialize them as the actually execute at the server.  The
-concurrent commits are achieved by logging actions up until the
-tpc_vote().  At that point, the entire transaction is committed on the
-real storage.
-import cPickle
-import tempfile
-class CommitLog:
-    def __init__(self):
-        self.file = tempfile.TemporaryFile(suffix=".comit-log")
-        self.pickler = cPickle.Pickler(self.file, 1)
-        self.pickler.fast = 1
-        self.stores = 0
-    def size(self):
-        return self.file.tell()
-    def delete(self, oid, serial):
-        self.pickler.dump(('_delete', (oid, serial)))
-        self.stores += 1
-    def store(self, oid, serial, data):
-        self.pickler.dump(('_store', (oid, serial, data)))
-        self.stores += 1
-    def restore(self, oid, serial, data, prev_txn):
-        self.pickler.dump(('_restore', (oid, serial, data, prev_txn)))
-        self.stores += 1
-    def undo(self, transaction_id):
-        self.pickler.dump(('_undo', (transaction_id, )))
-        self.stores += 1
-    def __iter__(self):
-        self.file.seek(0)
-        unpickler = cPickle.Unpickler(self.file)
-        for i in range(self.stores):
-            yield unpickler.load()
-    def close(self):
-        if self.file:
-            self.file.close()
-            self.file = None

Deleted: ZODB/branches/jim-readCurrent/src/ZEO/README.txt
--- ZODB/branches/jim-readCurrent/src/ZEO/README.txt	2010-08-31 19:48:31 UTC (rev 116067)
+++ ZODB/branches/jim-readCurrent/src/ZEO/README.txt	2010-08-31 19:48:33 UTC (rev 116068)
@@ -1,44 +0,0 @@
-ZEO 2.0
-What's ZEO?
-ZEO stands for Zope Enterprise Objects.  ZEO is an add-on for Zope
-that allows multiple processes to connect to a single ZODB storage.
-Those processes can live on different machines, but don't need to.
-ZEO 2 has many improvements over ZEO 1, and is incompatible with ZEO 1;
-if you upgrade an existing ZEO 1 installation, you must upgrade the
-server and all clients simultaneous.  If you received ZEO 2 as part of
-the ZODB 3 distribution, the ZEO 1 sources are provided in a separate
-directory (ZEO1).  Some documentation for ZEO is available in the ZODB 3
-package in the Doc subdirectory.  ZEO depends on the ZODB software; it
-can be used with the version of ZODB distributed with Zope 2.5.1 or
-later.  More information about ZEO can be found in the ZODB Wiki:
-    http://www.zope.org/Wikis/ZODB
-What's here?
-This list of filenames is mostly for ZEO developers::
- ClientCache.py          client-side cache implementation
- ClientStorage.py        client-side storage implementation
- ClientStub.py           RPC stubs for callbacks from server to client
- CommitLog.py            buffer used during two-phase commit on the server
- Exceptions.py           definitions of exceptions
- ICache.py               interface definition for the client-side cache
- ServerStub.py           RPC stubs for the server
- StorageServer.py        server-side storage implementation
- TransactionBuffer.py    buffer used for transaction data in the client
- __init__.py             near-empty file to make this directory a package
- simul.py                command-line tool to simulate cache behavior
- start.py                command-line tool to start the storage server
- stats.py                command-line tool to process client cache traces
- tests/                  unit tests and other test utilities
- util.py                 utilities used by the server startup tool
- version.txt             text file indicating the ZEO version
- zrpc/                   subpackage implementing Remote Procedure Call (RPC)

Modified: ZODB/branches/jim-readCurrent/src/ZEO/ServerStub.py
--- ZODB/branches/jim-readCurrent/src/ZEO/ServerStub.py	2010-08-31 19:48:31 UTC (rev 116067)
+++ ZODB/branches/jim-readCurrent/src/ZEO/ServerStub.py	2010-08-31 19:48:33 UTC (rev 116068)
@@ -199,6 +199,9 @@
     def storea(self, oid, serial, data, id):
         self.rpc.callAsync('storea', oid, serial, data, id)
+    def checkCurrentSerialInTransaction(self, oid, serial, id):
+        self.rpc.callAsync('checkCurrentSerialInTransaction', oid, serial, id)
     def restorea(self, oid, serial, data, prev_txn, id):
         self.rpc.callAsync('restorea', oid, serial, data, prev_txn, id)

Modified: ZODB/branches/jim-readCurrent/src/ZEO/StorageServer.py
--- ZODB/branches/jim-readCurrent/src/ZEO/StorageServer.py	2010-08-31 19:48:31 UTC (rev 116067)
+++ ZODB/branches/jim-readCurrent/src/ZEO/StorageServer.py	2010-08-31 19:48:33 UTC (rev 116068)
@@ -22,7 +22,6 @@
 from __future__ import with_statement
-from ZEO.CommitLog import CommitLog
 from ZEO.Exceptions import AuthError
 from ZEO.monitor import StorageStats, StatsServer
 from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay, Result
@@ -477,6 +476,7 @@
                     if not getattr(self, op)(*args):
                 # Blob support
                 while self.blob_log and not self.store_failed:
                     oid, oldserial, data, blobfilename = self.blob_log.pop()
@@ -531,6 +531,10 @@
         self.stats.stores += 1
         self.txnlog.store(oid, serial, data)
+    def checkCurrentSerialInTransaction(self, oid, serial, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        self.txnlog.checkread(oid, serial)
     def restorea(self, oid, serial, data, prev_txn, id):
         self._check_tid(id, exc=StorageTransactionError)
         self.stats.stores += 1
@@ -581,6 +585,20 @@
         self._check_tid(tid, exc=StorageTransactionError)
+    def _op_error(self, oid, err, op):
+        self.store_failed = 1
+        if isinstance(err, ConflictError):
+            self.stats.conflicts += 1
+            self.log("conflict error oid=%s msg=%s" %
+                     (oid_repr(oid), str(err)), BLATHER)
+        if not isinstance(err, TransactionError):
+            # Unexpected errors are logged and passed to the client
+            self.log("%s error: %s, %s" % ((op,)+ sys.exc_info()[:2]),
+                     logging.ERROR, exc_info=True)
+        err = self._marshal_error(err)
+        # The exception is reported back as newserial for this oid
+        self.serials.append((oid, err))
     def _delete(self, oid, serial):
         err = None
@@ -588,23 +606,24 @@
         except (SystemExit, KeyboardInterrupt):
         except Exception, err:
-            self.store_failed = 1
-            if isinstance(err, ConflictError):
-                self.stats.conflicts += 1
-                self.log("conflict error oid=%s msg=%s" %
-                         (oid_repr(oid), str(err)), BLATHER)
-            if not isinstance(err, TransactionError):
-                # Unexpected errors are logged and passed to the client
-                self.log("store error: %s, %s" % sys.exc_info()[:2],
-                         logging.ERROR, exc_info=True)
-            err = self._marshal_error(err)
-            # The exception is reported back as newserial for this oid
-            self.serials.append((oid, err))
+            self._op_error(oid, err, 'delete')
         return err is None
+    def _checkread(self, oid, serial):
+        err = None
+        try:
+            self.storage.checkCurrentSerialInTransaction(
+                oid, serial, self.transaction)
+        except (SystemExit, KeyboardInterrupt):
+            raise
+        except Exception, err:
+            self._op_error(oid, err, 'checkCurrentSerialInTransaction')
+        return err is None
     def _store(self, oid, serial, data, blobfile=None):
         err = None
@@ -617,18 +636,7 @@
         except (SystemExit, KeyboardInterrupt):
         except Exception, err:
-            self.store_failed = 1
-            if isinstance(err, ConflictError):
-                self.stats.conflicts += 1
-                self.log("conflict error oid=%s msg=%s" %
-                         (oid_repr(oid), str(err)), BLATHER)
-            if not isinstance(err, TransactionError):
-                # Unexpected errors are logged and passed to the client
-                self.log("store error: %s, %s" % sys.exc_info()[:2],
-                         logging.ERROR, exc_info=True)
-            err = self._marshal_error(err)
-            # The exception is reported back as newserial for this oid
-            newserial = [(oid, err)]
+            self._op_error(oid, err, 'store')
             if serial != "\0\0\0\0\0\0\0\0":
@@ -636,8 +644,7 @@
             if isinstance(newserial, str):
                 newserial = [(oid, newserial)]
-        if newserial:
-            for oid, s in newserial:
+            for oid, s in newserial or ():
                 if s == ResolvedSerial:
                     self.stats.conflicts_resolved += 1
@@ -656,14 +663,7 @@
         except (SystemExit, KeyboardInterrupt):
         except Exception, err:
-            self.store_failed = 1
-            if not isinstance(err, TransactionError):
-                # Unexpected errors are logged and passed to the client
-                self.log("store error: %s, %s" % sys.exc_info()[:2],
-                         logging.ERROR, exc_info=True)
-            err = self._marshal_error(err)
-            # The exception is reported back as newserial for this oid
-            self.serials.append((oid, err))
+            self._op_error(oid, err, 'restore')
         return err is None
@@ -674,14 +674,7 @@
         except (SystemExit, KeyboardInterrupt):
         except Exception, err:
-            self.store_failed = 1
-            if not isinstance(err, TransactionError):
-                # Unexpected errors are logged and passed to the client
-                self.log("store error: %s, %s" % sys.exc_info()[:2],
-                         logging.ERROR, exc_info=True)
-            err = self._marshal_error(err)
-            # The exception is reported back as newserial for this oid
-            self.serials.append((z64, err))
+            self._op_error(z64, err, 'undo')
             self.serials.extend((oid, ResolvedSerial) for oid in oids)
@@ -1535,3 +1528,44 @@
         host, port = addr
         return str(host) + ":" + str(port)
+class CommitLog:
+    def __init__(self):
+        self.file = tempfile.TemporaryFile(suffix=".comit-log")
+        self.pickler = cPickle.Pickler(self.file, 1)
+        self.pickler.fast = 1
+        self.stores = 0
+    def size(self):
+        return self.file.tell()
+    def delete(self, oid, serial):
+        self.pickler.dump(('_delete', (oid, serial)))
+        self.stores += 1
+    def checkread(self, oid, serial):
+        self.pickler.dump(('_checkread', (oid, serial)))
+        self.stores += 1
+    def store(self, oid, serial, data):
+        self.pickler.dump(('_store', (oid, serial, data)))
+        self.stores += 1
+    def restore(self, oid, serial, data, prev_txn):
+        self.pickler.dump(('_restore', (oid, serial, data, prev_txn)))
+        self.stores += 1
+    def undo(self, transaction_id):
+        self.pickler.dump(('_undo', (transaction_id, )))
+        self.stores += 1
+    def __iter__(self):
+        self.file.seek(0)
+        unpickler = cPickle.Unpickler(self.file)
+        for i in range(self.stores):
+            yield unpickler.load()
+    def close(self):
+        if self.file:
+            self.file.close()
+            self.file = None

More information about the Zodb-checkins mailing list