[Zodb-checkins] SVN: ZODB/trunk/src/ZEO/ Implementated checkCurrentSerialInTransaction for ZEO.
Jim Fulton
jim at zope.com
Thu Sep 2 09:55:29 EDT 2010
Log message for revision 116133:
Implementated checkCurrentSerialInTransaction for ZEO.
Also did some storage server implementation cleanup, removing some
dups and folding a small module into StorageServer.py.
Changed:
U ZODB/trunk/src/ZEO/ClientStorage.py
D ZODB/trunk/src/ZEO/CommitLog.py
D ZODB/trunk/src/ZEO/README.txt
U ZODB/trunk/src/ZEO/ServerStub.py
U ZODB/trunk/src/ZEO/StorageServer.py
-=-
Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py 2010-09-02 13:55:26 UTC (rev 116132)
+++ ZODB/trunk/src/ZEO/ClientStorage.py 2010-09-02 13:55:29 UTC (rev 116133)
@@ -931,6 +931,11 @@
self._tbuf.store(oid, data)
return self._check_serials()
+ def checkCurrentSerialInTransaction(self, oid, serial, transaction):
+ self._check_trans(transaction)
+ self._server.checkCurrentSerialInTransaction(oid, serial,
+ id(transaction))
+
def storeBlob(self, oid, serial, data, blobfilename, version, txn):
"""Storage API: store a blob object."""
assert not version
Deleted: ZODB/trunk/src/ZEO/CommitLog.py
===================================================================
--- ZODB/trunk/src/ZEO/CommitLog.py 2010-09-02 13:55:26 UTC (rev 116132)
+++ ZODB/trunk/src/ZEO/CommitLog.py 2010-09-02 13:55:29 UTC (rev 116133)
@@ -1,64 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE
-#
-##############################################################################
-"""Log a transaction's commit info during two-phase commit.
-
-A storage server allows multiple clients to commit transactions, but
-must serialize them as the actually execute at the server. The
-concurrent commits are achieved by logging actions up until the
-tpc_vote(). At that point, the entire transaction is committed on the
-real storage.
-
-"""
-
-import cPickle
-import tempfile
-
-
-class CommitLog:
-
- def __init__(self):
- self.file = tempfile.TemporaryFile(suffix=".comit-log")
- self.pickler = cPickle.Pickler(self.file, 1)
- self.pickler.fast = 1
- self.stores = 0
-
- def size(self):
- return self.file.tell()
-
- def delete(self, oid, serial):
- self.pickler.dump(('_delete', (oid, serial)))
- self.stores += 1
-
- def store(self, oid, serial, data):
- self.pickler.dump(('_store', (oid, serial, data)))
- self.stores += 1
-
- def restore(self, oid, serial, data, prev_txn):
- self.pickler.dump(('_restore', (oid, serial, data, prev_txn)))
- self.stores += 1
-
- def undo(self, transaction_id):
- self.pickler.dump(('_undo', (transaction_id, )))
- self.stores += 1
-
- def __iter__(self):
- self.file.seek(0)
- unpickler = cPickle.Unpickler(self.file)
- for i in range(self.stores):
- yield unpickler.load()
-
- def close(self):
- if self.file:
- self.file.close()
- self.file = None
Deleted: ZODB/trunk/src/ZEO/README.txt
===================================================================
--- ZODB/trunk/src/ZEO/README.txt 2010-09-02 13:55:26 UTC (rev 116132)
+++ ZODB/trunk/src/ZEO/README.txt 2010-09-02 13:55:29 UTC (rev 116133)
@@ -1,44 +0,0 @@
-=======
-ZEO 2.0
-=======
-
-What's ZEO?
------------
-
-ZEO stands for Zope Enterprise Objects. ZEO is an add-on for Zope
-that allows multiple processes to connect to a single ZODB storage.
-Those processes can live on different machines, but don't need to.
-ZEO 2 has many improvements over ZEO 1, and is incompatible with ZEO 1;
-if you upgrade an existing ZEO 1 installation, you must upgrade the
-server and all clients simultaneous. If you received ZEO 2 as part of
-the ZODB 3 distribution, the ZEO 1 sources are provided in a separate
-directory (ZEO1). Some documentation for ZEO is available in the ZODB 3
-package in the Doc subdirectory. ZEO depends on the ZODB software; it
-can be used with the version of ZODB distributed with Zope 2.5.1 or
-later. More information about ZEO can be found in the ZODB Wiki:
-
- http://www.zope.org/Wikis/ZODB
-
-What's here?
-------------
-
-This list of filenames is mostly for ZEO developers::
-
- ClientCache.py client-side cache implementation
- ClientStorage.py client-side storage implementation
- ClientStub.py RPC stubs for callbacks from server to client
- CommitLog.py buffer used during two-phase commit on the server
- Exceptions.py definitions of exceptions
- ICache.py interface definition for the client-side cache
- ServerStub.py RPC stubs for the server
- StorageServer.py server-side storage implementation
- TransactionBuffer.py buffer used for transaction data in the client
- __init__.py near-empty file to make this directory a package
- simul.py command-line tool to simulate cache behavior
- start.py command-line tool to start the storage server
- stats.py command-line tool to process client cache traces
- tests/ unit tests and other test utilities
- util.py utilities used by the server startup tool
- version.txt text file indicating the ZEO version
- zrpc/ subpackage implementing Remote Procedure Call (RPC)
-
Modified: ZODB/trunk/src/ZEO/ServerStub.py
===================================================================
--- ZODB/trunk/src/ZEO/ServerStub.py 2010-09-02 13:55:26 UTC (rev 116132)
+++ ZODB/trunk/src/ZEO/ServerStub.py 2010-09-02 13:55:29 UTC (rev 116133)
@@ -199,6 +199,9 @@
def storea(self, oid, serial, data, id):
self.rpc.callAsync('storea', oid, serial, data, id)
+ def checkCurrentSerialInTransaction(self, oid, serial, id):
+ self.rpc.callAsync('checkCurrentSerialInTransaction', oid, serial, id)
+
def restorea(self, oid, serial, data, prev_txn, id):
self.rpc.callAsync('restorea', oid, serial, data, prev_txn, id)
Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py 2010-09-02 13:55:26 UTC (rev 116132)
+++ ZODB/trunk/src/ZEO/StorageServer.py 2010-09-02 13:55:29 UTC (rev 116133)
@@ -22,7 +22,6 @@
from __future__ import with_statement
-from ZEO.CommitLog import CommitLog
from ZEO.Exceptions import AuthError
from ZEO.monitor import StorageStats, StatsServer
from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay, Result
@@ -477,6 +476,7 @@
if not getattr(self, op)(*args):
break
+
# Blob support
while self.blob_log and not self.store_failed:
oid, oldserial, data, blobfilename = self.blob_log.pop()
@@ -531,6 +531,10 @@
self.stats.stores += 1
self.txnlog.store(oid, serial, data)
+ def checkCurrentSerialInTransaction(self, oid, serial, id):
+ self._check_tid(id, exc=StorageTransactionError)
+ self.txnlog.checkread(oid, serial)
+
def restorea(self, oid, serial, data, prev_txn, id):
self._check_tid(id, exc=StorageTransactionError)
self.stats.stores += 1
@@ -581,6 +585,20 @@
self._check_tid(tid, exc=StorageTransactionError)
self.txnlog.undo(trans_id)
+ def _op_error(self, oid, err, op):
+ self.store_failed = 1
+ if isinstance(err, ConflictError):
+ self.stats.conflicts += 1
+ self.log("conflict error oid=%s msg=%s" %
+ (oid_repr(oid), str(err)), BLATHER)
+ if not isinstance(err, TransactionError):
+ # Unexpected errors are logged and passed to the client
+ self.log("%s error: %s, %s" % ((op,)+ sys.exc_info()[:2]),
+ logging.ERROR, exc_info=True)
+ err = self._marshal_error(err)
+ # The exception is reported back as newserial for this oid
+ self.serials.append((oid, err))
+
def _delete(self, oid, serial):
err = None
try:
@@ -588,23 +606,24 @@
except (SystemExit, KeyboardInterrupt):
raise
except Exception, err:
- self.store_failed = 1
- if isinstance(err, ConflictError):
- self.stats.conflicts += 1
- self.log("conflict error oid=%s msg=%s" %
- (oid_repr(oid), str(err)), BLATHER)
- if not isinstance(err, TransactionError):
- # Unexpected errors are logged and passed to the client
- self.log("store error: %s, %s" % sys.exc_info()[:2],
- logging.ERROR, exc_info=True)
- err = self._marshal_error(err)
- # The exception is reported back as newserial for this oid
- self.serials.append((oid, err))
+ self._op_error(oid, err, 'delete')
else:
self.invalidated.append(oid)
return err is None
+ def _checkread(self, oid, serial):
+ err = None
+ try:
+ self.storage.checkCurrentSerialInTransaction(
+ oid, serial, self.transaction)
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except Exception, err:
+ self._op_error(oid, err, 'checkCurrentSerialInTransaction')
+
+ return err is None
+
def _store(self, oid, serial, data, blobfile=None):
err = None
try:
@@ -617,18 +636,7 @@
except (SystemExit, KeyboardInterrupt):
raise
except Exception, err:
- self.store_failed = 1
- if isinstance(err, ConflictError):
- self.stats.conflicts += 1
- self.log("conflict error oid=%s msg=%s" %
- (oid_repr(oid), str(err)), BLATHER)
- if not isinstance(err, TransactionError):
- # Unexpected errors are logged and passed to the client
- self.log("store error: %s, %s" % sys.exc_info()[:2],
- logging.ERROR, exc_info=True)
- err = self._marshal_error(err)
- # The exception is reported back as newserial for this oid
- newserial = [(oid, err)]
+ self._op_error(oid, err, 'store')
else:
if serial != "\0\0\0\0\0\0\0\0":
self.invalidated.append(oid)
@@ -636,8 +644,7 @@
if isinstance(newserial, str):
newserial = [(oid, newserial)]
- if newserial:
- for oid, s in newserial:
+ for oid, s in newserial or ():
if s == ResolvedSerial:
self.stats.conflicts_resolved += 1
@@ -656,14 +663,7 @@
except (SystemExit, KeyboardInterrupt):
raise
except Exception, err:
- self.store_failed = 1
- if not isinstance(err, TransactionError):
- # Unexpected errors are logged and passed to the client
- self.log("store error: %s, %s" % sys.exc_info()[:2],
- logging.ERROR, exc_info=True)
- err = self._marshal_error(err)
- # The exception is reported back as newserial for this oid
- self.serials.append((oid, err))
+ self._op_error(oid, err, 'restore')
return err is None
@@ -674,14 +674,7 @@
except (SystemExit, KeyboardInterrupt):
raise
except Exception, err:
- self.store_failed = 1
- if not isinstance(err, TransactionError):
- # Unexpected errors are logged and passed to the client
- self.log("store error: %s, %s" % sys.exc_info()[:2],
- logging.ERROR, exc_info=True)
- err = self._marshal_error(err)
- # The exception is reported back as newserial for this oid
- self.serials.append((z64, err))
+ self._op_error(z64, err, 'undo')
else:
self.invalidated.extend(oids)
self.serials.extend((oid, ResolvedSerial) for oid in oids)
@@ -1535,3 +1528,44 @@
host, port = addr
return str(host) + ":" + str(port)
+class CommitLog:
+
+ def __init__(self):
+ self.file = tempfile.TemporaryFile(suffix=".comit-log")
+ self.pickler = cPickle.Pickler(self.file, 1)
+ self.pickler.fast = 1
+ self.stores = 0
+
+ def size(self):
+ return self.file.tell()
+
+ def delete(self, oid, serial):
+ self.pickler.dump(('_delete', (oid, serial)))
+ self.stores += 1
+
+ def checkread(self, oid, serial):
+ self.pickler.dump(('_checkread', (oid, serial)))
+ self.stores += 1
+
+ def store(self, oid, serial, data):
+ self.pickler.dump(('_store', (oid, serial, data)))
+ self.stores += 1
+
+ def restore(self, oid, serial, data, prev_txn):
+ self.pickler.dump(('_restore', (oid, serial, data, prev_txn)))
+ self.stores += 1
+
+ def undo(self, transaction_id):
+ self.pickler.dump(('_undo', (transaction_id, )))
+ self.stores += 1
+
+ def __iter__(self):
+ self.file.seek(0)
+ unpickler = cPickle.Unpickler(self.file)
+ for i in range(self.stores):
+ yield unpickler.load()
+
+ def close(self):
+ if self.file:
+ self.file.close()
+ self.file = None
More information about the Zodb-checkins
mailing list