[Zodb-checkins] CVS: Packages/ZEO - StorageServer.py:1.21.4.7
jeremy@digicool.com
jeremy@digicool.com
Wed, 25 Apr 2001 16:43:53 -0400 (EDT)
Update of /cvs-repository/Packages/ZEO
In directory korak:/tmp/cvs-serv28767
Modified Files:
Tag: ZEO-ZRPC-Dev
StorageServer.py
Log Message:
Fixes to support multiple simultaneous clients of a single storage.
Add _check_tid() helper that does check if the current call is using
the current transaction id. Complain if not and raise exception if
specified.
Use _check_tid() helper for all interesting methods except
tpc_begin().
Use _restart_delayed_transaction() to do correct bookkeeping about
current transaction when one is restarted from the delayed queue. This
new comment explains some of the details:
# When multiple clients are using a single storage, there are several
# different _transaction attributes to keep track of. Each
# StorageProxy object has a single _transaction that refers to its
# current transaction. The storage (self.__storage) has another
# _transaction that is used for the *real* transaction.
# The real trick comes with the __waiting queue for a storage.
# When a StorageProxy pulls a new transaction from the queue, it
# must inform the new transaction's proxy. (The two proxies may
# be the same.) The new transaction's proxy sets its _transaction
# and continues from there.
Use handle_waiting() to check if a queue transaction exists.
Remove old async tpc_begin() defs.
Add __repr__() and _log() methods that provide per-instance labels for
debugging purposes.
--- Updated File StorageServer.py in package Packages/ZEO --
--- StorageServer.py 2001/04/20 19:14:08 1.21.4.6
+++ StorageServer.py 2001/04/25 20:43:52 1.21.4.7
@@ -59,8 +59,8 @@
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
+#
#
-#
# Disclaimer
#
# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
@@ -86,12 +86,16 @@
This server acts as a front-end for one or more real storages, like
file storage or Berkeley storage.
+
+XXX Need some basic access control-- a declaration of the methods
+exported for invocation by the server.
"""
import cPickle
import os
import sys
import threading
+import types
import ClientStub
import zrpc2
@@ -110,8 +114,9 @@
pickler.fast = 1 # Don't use the memo
dump = pickler.dump
-def log(message, level=zeolog.INFO, label="zeoserver:%s" % os.getpid()):
- zeolog.LOG(label, level, message)
+def log(message, level=zeolog.INFO, label="ZEO Server:%s" % os.getpid(),
+ error=None):
+ zeolog.LOG(label, level, message, error=error)
class StorageServerError(StorageError):
pass
@@ -165,6 +170,59 @@
self.__invalidated = []
self._transaction = None
+ def __repr__(self):
+ tid = self._transaction and repr(self._transaction.id)
+ if self.__storage:
+ stid = self.__storage._transaction and \
+ repr(self.__storage._transaction.id)
+ else:
+ stid = None
+ return "<StorageProxy %X trans=%s s_trans=%s>" % (id(self), tid,
+ stid)
+
+ def _log(self, msg, level=zeolog.INFO, error=None, pid=os.getpid()):
+ zeolog.LOG("ZEO Server %s %X" % (pid, id(self)),
+ level, msg, error=error)
+
+ def setup_delegation(self):
+ """Delegate several methods to the storage"""
+ self.undoInfo = self.__storage.undoInfo
+ self.undoLog = self.__storage.undoLog
+ self.versionEmpty = self.__storage.versionEmpty
+ self.versions = self.__storage.versions
+ self.history = self.__storage.history
+ self.load = self.__storage.load
+ self.loadSerial = self.__storage.loadSerial
+
+ def _check_tid(self, tid, exc=None):
+ caller = sys._getframe().f_back.f_code.co_name
+ if self._transaction is None:
+ self._log("no current transaction: %s()" % caller,
+ zeolog.PROBLEM)
+ if exc is not None:
+ raise exc(None, tid)
+ else:
+ return 0
+ if self._transaction.id != tid:
+ self._log("%s(%s) invalid; current transaction = %s" % \
+ (caller, repr(tid), repr(self._transaction.id)),
+ zeolog.PROBLEM)
+ if exc is not None:
+ raise exc(self._transaction.id, tid)
+ else:
+ return 0
+ return 1
+
+ def _restart_delayed_transaction(self, delay, tinfo):
+ self._transaction = t = Transaction()
+ t.id = tinfo[0]
+ t.user = tinfo[1]
+ t.description = tinfo[2]
+ self.__storage.tpc_begin(t)
+ self.__invalidated = []
+ assert self._transaction.id == self.__storage._transaction.id
+ delay.reply(None)
+
def register(self, storage_id):
"""Select the storage that this client will use
@@ -172,25 +230,15 @@
"""
storage = self.server.storages.get(storage_id)
if storage is None:
- log("unknown storage_id: %s" % storage_id)
+ self._log("unknown storage_id: %s" % storage_id)
self.get_caller.close()
self.__storage_id = storage_id
self.__storage = storage
self.setup_delegation()
self.server.register(storage_id, self)
- log("registered storage %s: %s" % (storage_id, storage))
+ self._log("registered storage %s: %s" % (storage_id, storage))
- def setup_delegation(self):
- """Delegate several methods to the storage"""
- self.undoInfo = self.__storage.undoInfo
- self.undoLog = self.__storage.undoLog
- self.versionEmpty = self.__storage.versionEmpty
- self.versions = self.__storage.versions
- self.history = self.__storage.history
- self.load = self.__storage.load
- self.loadSerial = self.__storage.loadSerial
-
def get_info(self):
return {
'length': len(self.__storage),
@@ -247,9 +295,9 @@
try:
self.__storage.pack(t, referencesf)
except:
- log('ZEO Server', zeolog.ERROR,
- 'Pack failed for %s' % self.__storage_id,
- error=sys.exc_info())
+ self._log('ZEO Server', zeolog.ERROR,
+ 'Pack failed for %s' % self.__storage_id,
+ error=sys.exc_info())
if wait:
raise
else:
@@ -259,19 +307,15 @@
self.get_size_info())
def abortVersion(self, src, id):
- t = self._transaction
- if t is None or id != t.id:
- raise StorageTransactionError(self._transaction, id)
- oids = self.__storage.abortVersion(src, t)
+ self._check_tid(id, exc=StorageTransactionError)
+ oids = self.__storage.abortVersion(src, self._transaction)
for oid in oids:
self.__invalidated.append((oid, src))
return oids
def commitVersion(self, src, dest, id):
- t = self._transaction
- if t is None or id != t.id:
- raise StorageTransactionError(self, id)
- oids = self.__storage.commitVersion(src, dest, t)
+ self._check_tid(id, exc=StorageTransactionError)
+ oids = self.__storage.commitVersion(src, dest, self._transaction)
for oid in oids:
self.__invalidated.append((oid, dest))
if dest:
@@ -279,26 +323,25 @@
return oids
def storea(self, oid, serial, data, version, id):
- t = self._transaction
+ self._check_tid(id, exc=StorageTransactionError)
try:
# XXX does this stmt need to be in the try/except?
- if t is None or id != t.id:
- raise StorageTransactionError(self, id)
- newserial = self.__storage.store(oid, serial, data, version, t)
+ newserial = self.__storage.store(oid, serial, data, version,
+ self._transaction)
except TransactionError, v:
# This is a normal transaction error such as a conflict error
# or a version lock or conflict error. It doesn't need to be
# logged.
- log("transaction error: %s" % repr(v))
+ self._log("transaction error: %s" % repr(v))
newserial = v
except:
# all errors need to be serialized to prevent unexpected
# returns, which would screw up the return handling.
# IOW, Anything that ends up here is evil enough to be logged.
error = sys.exc_info()
- log('store error: %s: %s' % (error[0], error[1]).
- zeolog.ERROR)
+ self._log('store error: %s: %s' % (error[0], error[1]),
+ zeolog.ERROR, error=error)
newserial = sys.exc_info()[1]
else:
if serial != '\0\0\0\0\0\0\0\0':
@@ -307,8 +350,8 @@
try:
nil = dump(newserial, 1)
except:
- log("couldn't pickle newserial: %s" % repr(newserial),
- zeolog.ERROR)
+ self._log("couldn't pickle newserial: %s" % repr(newserial),
+ zeolog.ERROR)
dump('', 1) # clear pickler
r = StorageServerError("Couldn't pickle exception %s" % \
`newserial`)
@@ -316,11 +359,9 @@
self.client.serialno((oid, newserial))
- def vote(self, id):
- t = self._transaction
- if t is None or id != t.id:
- raise StorageTransactionError(self._transaction, id)
- return self.__storage.tpc_vote(t)
+ def vote(self, id):
+ self._check_tid(id, exc=StorageTransactionError)
+ return self.__storage.tpc_vote(self._transaction)
def undo(self, transaction_id):
oids = self.__storage.undo(transaction_id)
@@ -330,38 +371,36 @@
return oids
return ()
- def tpc_abort(self, id):
- t = self._transaction
- if t is None or id != t.id:
- return
- r = self.__storage.tpc_abort(t)
-
- waiting = self.__storage.__waiting
- while waiting:
- f, args = waiting.pop(0)
- if apply(f, args):
- break
-
- self._transaction = None
- self.__invalidated = []
-
def unlock(self):
-# if self.__closed:
-# return
+## if self.__closed:
+## return
self.client.unlock()
+ # When multiple clients are using a single storage, there are several
+ # different _transaction attributes to keep track of. Each
+ # StorageProxy object has a single _transaction that refers to its
+ # current transaction. The storage (self.__storage) has another
+ # _transaction that is used for the *real* transaction.
+
+ # The real trick comes with the __waiting queue for a storage.
+ # When a StorageProxy pulls a new transaction from the queue, it
+ # must inform the new transaction's proxy. (The two proxies may
+ # be the same.) The new transaction's proxy sets its _transaction
+ # and continues from there.
+
def tpc_begin(self, id, user, description, ext):
- t = self._transaction
- if t is not None:
- if id == t.id:
+ if self._transaction is not None:
+ if self._transaction.id == id:
+ self._log("duplicate tpc_begin(%s)" % repr(id))
return
else:
- raise StorageServerError("Multiple simultaneous tpc_begin "
- "requests from the same client.")
+ raise StorageTransactionError("Multiple simultaneous tpc_begin"
+ " requests from one client.")
if self.__storage._transaction is not None:
- self.__storage.__waiting.append((self.unlock, ()))
- return 1 # Return a flag indicating a lock condition.
-
+ d = zrpc2.Delay()
+ self.__storage.__waiting.append((d, self, (id, user, description)))
+ return d
+
self._transaction = t = Transaction()
t.id = id
t.user = user
@@ -369,62 +408,52 @@
self.__storage.tpc_begin(t)
self.__invalidated = []
- def tpc_begin_sync(self, id, user, description, ext):
-# if self.__closed:
-# return
- t = self._transaction
- if t is not None and id == t.id:
+ def tpc_finish(self, id, user, description, ext):
+ if not self._check_tid(id):
return
- if self.__storage._transaction is None:
- return self.try_again_sync(id, user, description, ext)
- else:
- d = Delay()
- self.__storage.__waiting.append((self.try_again_sync,
- (id, user, description, ext, d)))
- return d
-
- def try_again_sync(self, id, user, description, ext, delay=None):
- if self.__storage._transaction is None:
- self._transaction = t = Transaction()
- t.id = id
- t.user = user
- t.description = description
- self.__storage.tpc_begin(t)
- self.__invalidated = []
- if delay is not None:
- delay.reply(None)
- else:
- return None
- return 1
- def tpc_finish(self, id, user, description, ext):
+ # XXX Why do we do this for the begin and the end?
t = self._transaction
- if id != t.id:
- return
t.user = user
t.description = description
t.ext = ext
- r = self.__storage.tpc_finish(t)
-
- while self.__storage.__waiting:
- f, args = self.__storage.__waiting.pop(0)
- if apply(f,args):
- break
+ r = self.__storage.tpc_finish(self._transaction)
+ assert self.__storage._transaction is None
- self._transaction = None
if self.__invalidated:
self.server.invalidate(self, self.__storage_id,
self.__invalidated,
self.get_size_info())
+
+ if not self.handle_waiting():
+ self._transaction = None
+ self.__invalidated = []
+ assert self._transaction is None
+
+ def tpc_abort(self, id):
+ if not self._check_tid(id):
+ return
+ r = self.__storage.tpc_abort(self._transaction)
+ assert self.__storage._transaction is None
+
+ if not self.handle_waiting():
+ self._transaction = None
self.__invalidated = []
+ assert self._transaction is None
+ def handle_waiting(self):
+ if self.__storage.__waiting:
+ d, proxy, tinfo = self.__storage.__waiting.pop(0)
+ proxy._restart_delayed_transaction(d, tinfo)
+ if self is proxy:
+ return 1
+
def new_oids(self, n=100):
"""Return a sequence of n new oids, where n defaults to 100"""
if n < 0:
n = 1
return map(lambda x, self=self: self.__storage.new_oid(), range(n))
-
def fixup_storage(storage):
# backwards compatibility hack