[Zodb-checkins] CVS: ZODB3/ZEO - StorageServer.py:1.63
Guido van Rossum
guido@python.org
Tue, 17 Sep 2002 14:08:43 -0400
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv29773
Modified Files:
StorageServer.py
Log Message:
Simple-minded cleanup pass.
- Remove unused imports.
- Got rid of \ continuation lines.
- Remove leading underscores from all names of methods and instance
variables of ZEOStorage. There was no usage consistency and I don't
see any other reason to maintain the existing names.
=== ZODB3/ZEO/StorageServer.py 1.62 => 1.63 ===
--- ZODB3/ZEO/StorageServer.py:1.62 Mon Sep 16 23:51:47 2002
+++ ZODB3/ZEO/StorageServer.py Tue Sep 17 14:08:43 2002
@@ -32,11 +32,10 @@
from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay
import zLOG
-from ZODB.POSException import StorageError, StorageTransactionError, \
- TransactionError, ReadOnlyError
+from ZODB.POSException import StorageError, StorageTransactionError
+from ZODB.POSException import TransactionError, ReadOnlyError
from ZODB.referencesf import referencesf
from ZODB.Transaction import Transaction
-from ZODB.TmpStore import TmpStore
# We create a special fast pickler! This allows us
# to create slightly more efficient pickles and
@@ -121,62 +120,63 @@
def __init__(self, server):
self.server = server
self.client = None
- self._conn = None # the connection associated with client
- self.__storage = None
- self.__storage_id = "uninitialized"
- self._transaction = None
+ self.conn = None # the connection associated with client
+ self.storage = None
+ self.storage_id = "uninitialized"
+ self.transaction = None
def notifyConnected(self, conn):
- self._conn = conn
+ self.conn = conn
self.client = ClientStub.ClientStorage(conn)
def notifyDisconnected(self):
# When this storage closes, we must ensure that it aborts
# any pending transaction. Not sure if this is the cleanest way.
- if self._transaction is not None:
- self._log("disconnected during transaction %s" % self._transaction)
- self.tpc_abort(self._transaction.id)
+ if self.transaction is not None:
+ self.log("disconnected during transaction %s" % self.transaction)
+ self.tpc_abort(self.transaction.id)
else:
- self._log("disconnected")
+ self.log("disconnected")
def __repr__(self):
- tid = self._transaction and repr(self._transaction.id)
- if self.__storage:
- stid = self.__storage._transaction and \
- repr(self.__storage._transaction.id)
+ tid = self.transaction and repr(self.transaction.id)
+ if self.storage:
+ stid = (self.storage._transaction and
+ repr(self.storage._transaction.id))
else:
stid = None
name = self.__class__.__name__
return "<%s %X trans=%s s_trans=%s>" % (name, id(self), tid, stid)
- def _log(self, msg, level=zLOG.INFO, error=None):
- name = getattr(self.__storage, '__name__', None)
+ def log(self, msg, level=zLOG.INFO, error=None):
+ name = getattr(self.storage, '__name__', None)
if name is None:
- name = str(self.__storage)
+ name = str(self.storage)
zLOG.LOG("%s:%s" % (_label, name), level, msg, error=error)
def setup_delegation(self):
"""Delegate several methods to the storage"""
- self.versionEmpty = self.__storage.versionEmpty
- self.versions = self.__storage.versions
- self.history = self.__storage.history
- self.load = self.__storage.load
- self.loadSerial = self.__storage.loadSerial
- self.modifiedInVersion = self.__storage.modifiedInVersion
+ self.versionEmpty = self.storage.versionEmpty
+ self.versions = self.storage.versions
+ self.history = self.storage.history
+ self.load = self.storage.load
+ self.loadSerial = self.storage.loadSerial
+ self.modifiedInVersion = self.storage.modifiedInVersion
- def _check_tid(self, tid, exc=None):
+ def check_tid(self, tid, exc=None):
caller = sys._getframe().f_back.f_code.co_name
- if self._transaction is None:
- self._log("no current transaction: %s()" % caller, zLOG.PROBLEM)
+ if self.transaction is None:
+ self.log("no current transaction: %s()" % caller, zLOG.PROBLEM)
if exc is not None:
raise exc(None, tid)
else:
return 0
- if self._transaction.id != tid:
- self._log("%s(%s) invalid; current transaction = %s" % \
- (caller, repr(tid), repr(self._transaction.id)), zLOG.PROBLEM)
+ if self.transaction.id != tid:
+ self.log("%s(%s) invalid; current transaction = %s" %
+ (caller, repr(tid), repr(self.transaction.id)),
+ zLOG.PROBLEM)
if exc is not None:
- raise exc(self._transaction.id, tid)
+ raise exc(self.transaction.id, tid)
else:
return 0
return 1
@@ -186,44 +186,44 @@
This method must be the first one called by the client.
"""
- self._log("register(%r, %s)" % (storage_id, read_only))
+ self.log("register(%r, %s)" % (storage_id, read_only))
storage = self.server.storages.get(storage_id)
if storage is None:
- self._log("unknown storage_id: %s" % storage_id)
+ self.log("unknown storage_id: %s" % storage_id)
raise ValueError, "unknown storage: %s" % storage_id
if not read_only and (self.server.read_only or storage.isReadOnly()):
raise ReadOnlyError()
- self.__storage_id = storage_id
- self.__storage = storage
+ self.storage_id = storage_id
+ self.storage = storage
self.setup_delegation()
self.server.register_connection(storage_id, self)
- self._log("registered storage %r: %s" % (storage_id, storage))
+ self.log("registered storage %r: %s" % (storage_id, storage))
def get_info(self):
- return {'length': len(self.__storage),
- 'size': self.__storage.getSize(),
- 'name': self.__storage.getName(),
- 'supportsUndo': self.__storage.supportsUndo(),
- 'supportsVersions': self.__storage.supportsVersions(),
+ return {'length': len(self.storage),
+ 'size': self.storage.getSize(),
+ 'name': self.storage.getName(),
+ 'supportsUndo': self.storage.supportsUndo(),
+ 'supportsVersions': self.storage.supportsVersions(),
'supportsTransactionalUndo':
- self.__storage.supportsTransactionalUndo(),
+ self.storage.supportsTransactionalUndo(),
}
def get_size_info(self):
- return {'length': len(self.__storage),
- 'size': self.__storage.getSize(),
+ return {'length': len(self.storage),
+ 'size': self.storage.getSize(),
}
def zeoLoad(self, oid):
- v = self.__storage.modifiedInVersion(oid)
+ v = self.storage.modifiedInVersion(oid)
if v:
- pv, sv = self.__storage.load(oid, v)
+ pv, sv = self.storage.load(oid, v)
else:
pv = sv = None
try:
- p, s = self.__storage.load(oid, '')
+ p, s = self.storage.load(oid, '')
except KeyError:
if sv:
# Created in version, no non-version data
@@ -250,29 +250,29 @@
def pack(self, time, wait=None):
if wait is not None:
- return run_in_thread(self._pack, time)
+ return run_in_thread(self.pack_impl, time)
else:
# If the client isn't waiting for a reply, start a thread
# and forget about it.
- t = threading.Thread(target=self._pack, args=(time,))
+ t = threading.Thread(target=self.pack_impl, args=(time,))
t.start()
return None
- def _pack(self, time):
- self.__storage.pack(time, referencesf)
+ def pack_impl(self, time):
+ self.storage.pack(time, referencesf)
# Broadcast new size statistics
- self.server.invalidate(0, self.__storage_id, (), self.get_size_info())
+ self.server.invalidate(0, self.storage_id, (), self.get_size_info())
def new_oids(self, n=100):
"""Return a sequence of n new oids, where n defaults to 100"""
if n <= 0:
n = 1
- return [self.__storage.new_oid() for i in range(n)]
+ return [self.storage.new_oid() for i in range(n)]
def undo(self, transaction_id):
- oids = self.__storage.undo(transaction_id)
+ oids = self.storage.undo(transaction_id)
if oids:
- self.server.invalidate(self, self.__storage_id,
+ self.server.invalidate(self, self.storage_id,
map(lambda oid: (oid, ''), oids))
return oids
return ()
@@ -280,26 +280,26 @@
# undoLog and undoInfo are potentially slow methods
def undoInfo(self, first, last, spec):
- return run_in_thread(self.__storage.undoInfo, first, last, spec)
+ return run_in_thread(self.storage.undoInfo, first, last, spec)
def undoLog(self, first, last):
- return run_in_thread(self.__storage.undoLog, first, last)
+ return run_in_thread(self.storage.undoLog, first, last)
def tpc_begin(self, id, user, description, ext, tid, status):
- if self._transaction is not None:
- if self._transaction.id == id:
- self._log("duplicate tpc_begin(%s)" % repr(id))
+ if self.transaction is not None:
+ if self.transaction.id == id:
+ self.log("duplicate tpc_begin(%s)" % repr(id))
return
else:
raise StorageTransactionError("Multiple simultaneous tpc_begin"
" requests from one client.")
# (This doesn't require a lock because we're using asyncore)
- if self.__storage._transaction is None:
- self.strategy = ImmediateCommitStrategy(self.__storage,
+ if self.storage._transaction is None:
+ self.strategy = ImmediateCommitStrategy(self.storage,
self.client)
else:
- self.strategy = DelayedCommitStrategy(self.__storage,
+ self.strategy = DelayedCommitStrategy(self.storage,
self.wait)
t = Transaction()
@@ -309,56 +309,56 @@
t._extension = ext
self.strategy.tpc_begin(t, tid, status)
- self._transaction = t
+ self.transaction = t
def tpc_finish(self, id):
- if not self._check_tid(id):
+ if not self.check_tid(id):
return
invalidated = self.strategy.tpc_finish()
if invalidated:
- self.server.invalidate(self, self.__storage_id,
+ self.server.invalidate(self, self.storage_id,
invalidated, self.get_size_info())
- self._transaction = None
+ self.transaction = None
self.strategy = None
- self._handle_waiting()
+ self.handle_waiting()
def tpc_abort(self, id):
- if not self._check_tid(id):
+ if not self.check_tid(id):
return
strategy = self.strategy
strategy.tpc_abort()
- self._transaction = None
+ self.transaction = None
self.strategy = None
# When ZEOStorage.notifyDisconnected() calls self.tpc_abort(),
# it is possible that self.strategy is DelayedCommitStrategy.
# In that case, ZEOStorage.tpc_abort() should *not* call
- # self._handle_waiting(), otherwise there could be two
+ # self.handle_waiting(), otherwise there could be two
# ZEOStorage instances whose strategy is
# ImmediateCommitStrategy!
if isinstance(strategy, ImmediateCommitStrategy):
- self._handle_waiting()
+ self.handle_waiting()
# XXX else, should we remove ourselves from storage._waiting???
# XXX handle new serialnos
def storea(self, oid, serial, data, version, id):
- self._check_tid(id, exc=StorageTransactionError)
+ self.check_tid(id, exc=StorageTransactionError)
self.strategy.store(oid, serial, data, version)
def vote(self, id):
- self._check_tid(id, exc=StorageTransactionError)
+ self.check_tid(id, exc=StorageTransactionError)
return self.strategy.tpc_vote()
def abortVersion(self, src, id):
- self._check_tid(id, exc=StorageTransactionError)
+ self.check_tid(id, exc=StorageTransactionError)
return self.strategy.abortVersion(src)
def commitVersion(self, src, dest, id):
- self._check_tid(id, exc=StorageTransactionError)
+ self.check_tid(id, exc=StorageTransactionError)
return self.strategy.commitVersion(src, dest)
def transactionalUndo(self, trans_id, id):
- self._check_tid(id, exc=StorageTransactionError)
+ self.check_tid(id, exc=StorageTransactionError)
return self.strategy.transactionalUndo(trans_id)
# When a delayed transaction is restarted, the dance is
@@ -372,36 +372,36 @@
# client will be blocked until it finishes.
def wait(self):
- if self.__storage._transaction:
+ if self.storage._transaction:
d = Delay()
- self.__storage._waiting.append((d, self))
- self._log("Transaction blocked waiting for storage. "
- "Clients waiting: %d." % len(self.__storage._waiting))
+ self.storage._waiting.append((d, self))
+ self.log("Transaction blocked waiting for storage. "
+ "Clients waiting: %d." % len(self.storage._waiting))
return d
else:
self.restart()
return None
- def _handle_waiting(self):
- while self.__storage._waiting:
- delay, zeo_storage = self.__storage._waiting.pop(0)
- if self._restart(zeo_storage, delay):
- if self.__storage._waiting:
- n = len(self.__storage._waiting)
- self._log("Blocked transaction restarted. "
- "Clients waiting: %d" % n)
+ def handle_waiting(self):
+ while self.storage._waiting:
+ delay, zeo_storage = self.storage._waiting.pop(0)
+ if self.restart_other(zeo_storage, delay):
+ if self.storage._waiting:
+ n = len(self.storage._waiting)
+ self.log("Blocked transaction restarted. "
+ "Clients waiting: %d" % n)
else:
- self._log("Blocked transaction restarted.")
+ self.log("Blocked transaction restarted.")
return
- def _restart(self, zeo_storage, delay):
+ def restart_other(self, zeo_storage, delay):
# Return True if the server restarted.
# call the restart() method on the appropriate server.
try:
zeo_storage.restart(delay)
except:
- self._log("Unexpected error handling waiting transaction",
- level=zLOG.WARNING, error=sys.exc_info())
+ self.log("Unexpected error handling waiting transaction",
+ level=zLOG.WARNING, error=sys.exc_info())
zeo_storage._conn.close()
return 0
else:
@@ -410,7 +410,7 @@
def restart(self, delay=None):
old_strategy = self.strategy
assert isinstance(old_strategy, DelayedCommitStrategy)
- self.strategy = ImmediateCommitStrategy(self.__storage,
+ self.strategy = ImmediateCommitStrategy(self.storage,
self.client)
resp = old_strategy.restart(self.strategy)
if delay is not None: