[Zope3-checkins] CVS: Zope3/src/zodb/zeo - stubs.py:1.4 server.py:1.6 client.py:1.4
Jeremy Hylton
jeremy@zope.com
Wed, 5 Feb 2003 18:28:58 -0500
Update of /cvs-repository/Zope3/src/zodb/zeo
In directory cvs.zope.org:/tmp/cvs-serv16759/src/zodb/zeo
Modified Files:
stubs.py server.py client.py
Log Message:
Merge storage-interface-branch to trunk.
Rename methods that had underscores to use camel case.
new_oid => newObjectId
tpc_begin => tpcBegin
tpc_vote => tpcVote
tpc_finish => tpcFinish
tpc_abort => tpcAbort
transactionalUndo => undo
Remove some methods from storage interfaces.
Move storage-related exceptions to zodb.storage.interfaces.
Add __implements__ to storages and use for introspection in place of
supportsXXX().
=== Zope3/src/zodb/zeo/stubs.py 1.3 => 1.4 ===
--- Zope3/src/zodb/zeo/stubs.py:1.3 Fri Jan 24 18:20:49 2003
+++ Zope3/src/zodb/zeo/stubs.py Wed Feb 5 18:28:24 2003
@@ -112,12 +112,6 @@
def endZeoVerify(self):
self.rpc.callAsync('endZeoVerify')
- def new_oids(self, n=None):
- if n is None:
- return self.rpc.call('new_oids')
- else:
- return self.rpc.call('new_oids', n)
-
def pack(self, t, wait=None):
if wait is None:
self.rpc.call('pack', t)
@@ -130,17 +124,17 @@
def storea(self, oid, serial, data, version, id):
self.rpc.callAsync('storea', oid, serial, data, version, id)
- def tpc_begin(self, id, user, descr, ext, tid, status):
- return self.rpc.call('tpc_begin', id, user, descr, ext, tid, status)
+ def tpcBegin(self, id, user, descr, ext, tid, status):
+ return self.rpc.call('tpcBegin', id, user, descr, ext, tid, status)
- def vote(self, trans_id):
- return self.rpc.call('vote', trans_id)
+ def tpcVote(self, trans_id):
+ return self.rpc.call('tpcVote', trans_id)
- def tpc_finish(self, id):
- return self.rpc.call('tpc_finish', id)
+ def tpcFinish(self, id):
+ return self.rpc.call('tpcFinish', id)
- def tpc_abort(self, id):
- self.rpc.callAsync('tpc_abort', id)
+ def tpcAbort(self, id):
+ self.rpc.callAsync('tpcAbort', id)
def abortVersion(self, src, id):
return self.rpc.call('abortVersion', src, id)
@@ -163,7 +157,13 @@
def modifiedInVersion(self, oid):
return self.rpc.call('modifiedInVersion', oid)
- def new_oid(self, last=None):
+ def newObjectIds(self, n=None):
+ if n is None:
+ return self.rpc.call('newObjectIds')
+ else:
+ return self.rpc.call('newObjectIds', n)
+
+ def newObjectId(self, last=None):
if last is None:
return self.rpc.call('new_oid')
else:
@@ -172,11 +172,8 @@
def store(self, oid, serial, data, version, trans):
return self.rpc.call('store', oid, serial, data, version, trans)
- def transactionalUndo(self, trans_id, trans):
- return self.rpc.call('transactionalUndo', trans_id, trans)
-
- def undo(self, trans_id):
- return self.rpc.call('undo', trans_id)
+ def undo(self, trans_id, trans):
+ return self.rpc.call('undo', trans_id, trans)
def undoLog(self, first, last):
return self.rpc.call('undoLog', first, last)
=== Zope3/src/zodb/zeo/server.py 1.5 => 1.6 ===
--- Zope3/src/zodb/zeo/server.py:1.5 Tue Feb 4 11:05:19 2003
+++ Zope3/src/zodb/zeo/server.py Wed Feb 5 18:28:24 2003
@@ -34,8 +34,10 @@
from zodb.zeo.zrpc.connection import ManagedServerConnection, Delay, MTDelay
from zodb.ztransaction import Transaction
-from zodb.interfaces import StorageError, StorageTransactionError
-from zodb.interfaces import TransactionError, ReadOnlyError
+from zodb.interfaces import TransactionError
+from zodb.storage.interfaces import *
+
+from zope.interface.implements import objectImplements
class StorageServerError(StorageError):
"""Error reported when an unpickleable exception is raised."""
@@ -301,11 +303,10 @@
self.server.register_connection(storage_id, self)
def get_info(self):
- return {'name': self.storage.getName(),
- 'supportsVersions': self.storage.supportsVersions(),
- 'supportsTransactionalUndo':
- self.storage.supportsTransactionalUndo(),
- 'extensionMethods': self.getExtensionMethods(),
+ return {"name": self.storage.getName(),
+ "implements": [iface.__name__
+ for iface in objectImplements(self.storage)],
+ "extensionMethods": self.getExtensionMethods(),
}
def getExtensionMethods(self):
@@ -386,13 +387,13 @@
# Broadcast new size statistics
self.server.invalidate(0, self.storage_id, ())
- def new_oids(self, n=100):
+ def newObjectIds(self, n=100):
"""Return a sequence of n new oids, where n defaults to 100"""
if self.read_only:
raise ReadOnlyError()
if n <= 0:
n = 1
- return [self.storage.new_oid() for i in range(n)]
+ return [self.storage.newObjectId() for i in range(n)]
def undo(self, transaction_id):
if self.read_only:
@@ -412,7 +413,7 @@
def undoLog(self, first, last):
return run_in_thread(self.storage.undoLog, first, last)
- def tpc_begin(self, id, user, description, ext, tid, status):
+ def tpcBegin(self, id, user, description, ext, tid, status):
if self.read_only:
raise ReadOnlyError()
if self.transaction is not None:
@@ -434,13 +435,13 @@
t.description = description
t._extension = ext
- self.strategy.tpc_begin(t, tid, status)
+ self.strategy.tpcBegin(t, tid, status)
self.transaction = t
- def tpc_finish(self, id):
+ def tpcFinish(self, id):
if not self.check_tid(id):
return
- invalidated = self.strategy.tpc_finish()
+ invalidated = self.strategy.tpcFinish()
if invalidated:
self.server.invalidate(self, self.storage_id,
invalidated)
@@ -448,11 +449,11 @@
self.strategy = None
self.handle_waiting()
- def tpc_abort(self, id):
+ def tpcAbort(self, id):
if not self.check_tid(id):
return
strategy = self.strategy
- strategy.tpc_abort()
+ strategy.tpcAbort()
self.transaction = None
self.strategy = None
self.handle_waiting()
@@ -469,9 +470,9 @@
self.check_tid(id, exc=StorageTransactionError)
self.strategy.store(oid, serial, data, version)
- def vote(self, id):
+ def tpcVote(self, id):
self.check_tid(id, exc=StorageTransactionError)
- return self.strategy.tpc_vote()
+ return self.strategy.tpcVote()
def abortVersion(self, src, id):
self.check_tid(id, exc=StorageTransactionError)
@@ -481,9 +482,9 @@
self.check_tid(id, exc=StorageTransactionError)
return self.strategy.commitVersion(src, dest)
- def transactionalUndo(self, trans_id, id):
+ def undo(self, trans_id, id):
self.check_tid(id, exc=StorageTransactionError)
- return self.strategy.transactionalUndo(trans_id)
+ return self.strategy.undo(trans_id)
# When a delayed transaction is restarted, the dance is
# complicated. The restart occurs when one ZEOStorage instance
@@ -575,7 +576,7 @@
# This isn't a proper Zope interface, because I don't want to
# introduce a dependency between ZODB and Zope interfaces.
- def tpc_begin(self, trans, tid, status): pass
+ def tpcBegin(self, trans, tid, status): pass
def store(self, oid, serial, data, version): pass
@@ -584,13 +585,13 @@
def commitVersion(self, src, dest): pass
# the trans_id arg to transactionalUndo is not the current txn's id
- def transactionalUndo(self, trans_id): pass
+ def undo(self, trans_id): pass
- def tpc_vote(self): pass
+ def tpcVote(self): pass
- def tpc_abort(self): pass
+ def tpcAbort(self): pass
- def tpc_finish(self): pass
+ def tpcFinish(self): pass
# What to do if a connection is closed in mid-transaction
def abort(self, zeo_storage): pass
@@ -606,21 +607,21 @@
self.logger = logger
self.log_label = log_label
- def tpc_begin(self, txn, tid, status):
+ def tpcBegin(self, txn, tid, status):
self.txn = txn
- self.storage.tpc_begin(txn, tid, status)
+ self.storage.tpcBegin(txn, tid, status)
- def tpc_vote(self):
+ def tpcVote(self):
# send all the serialnos as a batch
self.client.serialnos(self.serials)
- return self.storage.tpc_vote(self.txn)
+ return self.storage.tpcVote(self.txn)
- def tpc_finish(self):
- self.storage.tpc_finish(self.txn)
+ def tpcFinish(self):
+ self.storage.tpcFinish(self.txn)
return self.invalidated
- def tpc_abort(self):
- self.storage.tpc_abort(self.txn)
+ def tpcAbort(self):
+ self.storage.tpcAbort(self.txn)
def store(self, oid, serial, data, version):
try:
@@ -666,14 +667,14 @@
self.invalidated.extend(inv)
return oids
- def transactionalUndo(self, trans_id):
- oids = self.storage.transactionalUndo(trans_id, self.txn)
+ def undo(self, trans_id):
+ oids = self.storage.undo(trans_id, self.txn)
inv = [(oid, None) for oid in oids]
self.invalidated.extend(inv)
return oids
def abort(self, zeo_storage):
- self.tpc_abort()
+ self.tpcAbort()
zeo_storage.handle_waiting()
class DelayedCommitStrategy:
@@ -689,7 +690,7 @@
self.name = None
self.args = None
- def tpc_begin(self, txn, tid, status):
+ def tpcBegin(self, txn, tid, status):
self.txn = txn
self.tid = tid
self.status = status
@@ -697,19 +698,19 @@
def store(self, oid, serial, data, version):
self.log.store(oid, serial, data, version)
- def tpc_abort(self):
+ def tpcAbort(self):
pass # just forget about this strategy
- def tpc_finish(self):
- # There has to be a tpc_vote() call before tpc_finish() is
+ def tpcFinish(self):
+ # There has to be a tpcVote() call before tpcFinish() is
# called, and tpc_vote() always blocks, so a proper
- # tpc_finish() call will always be sent to the immediate
+ # tpcFinish() call will always be sent to the immediate
# commit strategy object. So, if we get here, it means no
- # call to tpc_vote() was made, which is a bug in the caller.
+ # call to tpcVote() was made, which is a bug in the caller.
raise RuntimeError, "Logic error. This method must not be called."
- def tpc_vote(self):
- self.name = "tpc_vote"
+ def tpcVote(self):
+ self.name = "tpcVote"
self.args = ()
return self.block()
@@ -723,15 +724,15 @@
self.args = src,
return self.block()
- def transactionalUndo(self, trans_id):
- self.name = "transactionalUndo"
+ def undo(self, trans_id):
+ self.name = "undo"
self.args = trans_id,
return self.block()
def restart(self, new_strategy):
# called by the storage when the storage is available
assert isinstance(new_strategy, ImmediateCommitStrategy)
- new_strategy.tpc_begin(self.txn, self.tid, self.status)
+ new_strategy.tpcBegin(self.txn, self.tid, self.status)
loads, loader = self.log.get_loader()
for i in range(loads):
oid, serial, data, version = loader.load()
=== Zope3/src/zodb/zeo/client.py 1.3 => 1.4 ===
--- Zope3/src/zodb/zeo/client.py:1.3 Fri Jan 24 18:20:49 2003
+++ Zope3/src/zodb/zeo/client.py Wed Feb 5 18:28:24 2003
@@ -41,7 +41,7 @@
from zodb.zeo.tbuf import TransactionBuffer
from zodb.zeo.zrpc.client import ConnectionManager
-from zodb import interfaces
+from zodb.storage.interfaces import *
from zodb.timestamp import TimeStamp
try:
@@ -49,7 +49,7 @@
except ImportError:
ResolvedSerial = 'rs'
-class ClientStorageError(interfaces.StorageError):
+class ClientStorageError(StorageError):
"""An error occured in the ZEO Client Storage."""
class UnrecognizedResult(ClientStorageError):
@@ -95,7 +95,7 @@
This is a faithful implementation of the Storage API.
This class is thread-safe; transactions are serialized in
- tpc_begin().
+ tpcBegin().
"""
# Classes we instantiate. A subclass might override.
@@ -105,6 +105,13 @@
ConnectionManagerClass = ConnectionManager
StorageServerStubClass = stubs.StorageServer
+ # The exact storage interfaces depend on the server that the client
+ # connects to. We know that every storage must implement IStorage,
+ # but once connected we may change the instance's __implements__
+ # to reflect features available on the storage.
+
+ __implements__ = IStorage
+
def __init__(self, addr, storage='1', cache_size=20 * MB,
name='', client=None, debug=0, var=None,
min_disconnect_poll=5, max_disconnect_poll=300,
@@ -210,10 +217,8 @@
# _server_addr is used by sortKey()
self._server_addr = None
- self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
- 'supportsUndo':0, 'supportsVersions': 0,
- 'supportsTransactionalUndo': 0}
-
+ self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client'}
+
self._tbuf = self.TransactionBufferClass()
self._db = None
@@ -239,7 +244,7 @@
# variable should only be modified while holding the
# _oid_lock.
self._oid_lock = threading.Lock()
- self._oids = [] # Object ids retrieved from new_oids()
+ self._oids = [] # Object ids retrieved from newObjectIds()
t = self._ts = get_timestamp()
self._serial = `t`
@@ -325,7 +330,7 @@
try:
stub.register(str(self._storage), self._is_read_only)
return 1
- except interfaces.ReadOnlyError:
+ except ReadOnlyError:
if not self._read_only_fallback:
raise
self.logger.warn(
@@ -347,6 +352,7 @@
stub = self.StorageServerStubClass(conn)
self._oids = []
self._info.update(stub.get_info())
+ self.update_interfaces()
self.verify_cache(stub)
# XXX The stub should be saved here and set in endVerify() below.
@@ -355,6 +361,16 @@
self._connection = conn
self._server = stub
+ def update_interfaces(self):
+ # Update instance's __implements__ based on the server.
+ L = [IStorage]
+ for name in self._info.get("implements", ()):
+ if name == "IUndoStorage":
+ L.append(IUndoStorage)
+ elif name == "IVersionStorage":
+ L.append(IVersionStorage)
+ self.__implements__ = tuple(L)
+
def set_server_addr(self, addr):
# Normalize server address and convert to string
if isinstance(addr, types.StringType):
@@ -433,18 +449,6 @@
"""
return self._info['extensionMethods']
- def supportsUndo(self):
- """Storage API: return whether we support undo."""
- return self._info['supportsUndo']
-
- def supportsVersions(self):
- """Storage API: return whether we support versions."""
- return self._info['supportsVersions']
-
- def supportsTransactionalUndo(self):
- """Storage API: return whether we support transactional undo."""
- return self._info['supportsTransactionalUndo']
-
def isReadOnly(self):
"""Storage API: return whether we are in read-only mode.
@@ -464,10 +468,9 @@
def _check_trans(self, trans):
"""Internal helper to check a transaction argument for sanity."""
if self._is_read_only:
- raise interfaces.ReadOnlyError()
+ raise ReadOnlyError()
if self._transaction is not trans:
- raise interfaces.StorageTransactionError(self._transaction,
- trans)
+ raise StorageTransactionError(self._transaction, trans)
def abortVersion(self, version, transaction):
"""Storage API: clear any changes made by the given version."""
@@ -550,15 +553,15 @@
return v
return self._server.modifiedInVersion(oid)
- def new_oid(self):
+ def newObjectId(self):
"""Storage API: return a new object identifier."""
if self._is_read_only:
- raise interfaces.ReadOnlyError()
+ raise ReadOnlyError()
# avoid multiple oid requests to server at the same time
self._oid_lock.acquire()
try:
if not self._oids:
- self._oids = self._server.new_oids()
+ self._oids = self._server.newObjectIds()
self._oids.reverse()
return self._oids.pop()
finally:
@@ -603,17 +606,17 @@
self._tbuf.store(oid, version, data)
return self._check_serials()
- def tpc_vote(self, transaction):
+ def tpcVote(self, transaction):
"""Storage API: vote on a transaction."""
if transaction is not self._transaction:
return
- self._server.vote(self._serial)
+ self._server.tpcVote(self._serial)
return self._check_serials()
- def tpc_begin(self, txn, tid=None, status=' '):
+ def tpcBegin(self, txn, tid=None, status=' '):
"""Storage API: begin a transaction."""
if self._is_read_only:
- raise interfaces.ReadOnlyError()
+ raise ReadOnlyError()
self._tpc_cond.acquire()
while self._transaction is not None:
# It is allowable for a client to call two tpc_begins in a
@@ -634,7 +637,7 @@
id = tid
try:
- self._server.tpc_begin(id, txn.user, txn.description,
+ self._server.tpcBegin(id, txn.user, txn.description,
txn._extension, tid, status)
except:
# Client may have disconnected during the tpc_begin().
@@ -659,19 +662,19 @@
def lastTransaction(self):
return self._ltid
- def tpc_abort(self, transaction):
+ def tpcAbort(self, transaction):
"""Storage API: abort a transaction."""
if transaction is not self._transaction:
return
try:
- self._server.tpc_abort(self._serial)
+ self._server.tpcAbort(self._serial)
self._tbuf.clear()
self._seriald.clear()
del self._serials[:]
finally:
self.end_transaction()
- def tpc_finish(self, transaction, f=None):
+ def tpcFinish(self, transaction, f=None):
"""Storage API: finish a transaction."""
if transaction is not self._transaction:
return
@@ -679,7 +682,7 @@
if f is not None:
f()
- self._server.tpc_finish(self._serial)
+ self._server.tpcFinish(self._serial)
r = self._check_serials()
assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
@@ -721,7 +724,7 @@
self._cache.update(oid, s, v, p)
self._tbuf.clear()
- def transactionalUndo(self, trans_id, trans):
+ def undo(self, trans_id, trans):
"""Storage API: undo a transaction.
This is executed in a transactional context. It has no effect
@@ -731,19 +734,9 @@
a storage.
"""
self._check_trans(trans)
- oids = self._server.transactionalUndo(trans_id, self._serial)
+ oids = self._server.undo(trans_id, self._serial)
for oid in oids:
self._tbuf.invalidate(oid, '')
- return oids
-
- def undo(self, transaction_id):
- """Storage API: undo a transaction, writing directly to the storage."""
- if self._is_read_only:
- raise interfaces.ReadOnlyError()
- # XXX what are the sync issues here?
- oids = self._server.undo(transaction_id)
- for oid in oids:
- self._cache.invalidate(oid, '')
return oids
def undoInfo(self, first=0, last=-20, specification=None):