[Zodb-checkins] CVS: StandaloneZODB/ZEO - ClientStorage.py:1.35.6.3 StorageServer.py:1.32.6.2 zrpc2.py:NONE
Jeremy Hylton
jeremy@zope.com
Wed, 16 Jan 2002 09:58:39 -0500
Update of /cvs-repository/StandaloneZODB/ZEO
In directory cvs.zope.org:/tmp/cvs-serv23840
Modified Files:
Tag: Standby-branch
ClientStorage.py StorageServer.py
Removed Files:
Tag: Standby-branch
zrpc2.py
Log Message:
Various cleanups and simplifications.
Convert from zrpc2 module to zrpc package. This includes some changes
to client code:
- Dispatcher factory (StorageServer.newConnection) doesn't have an
unused third argument anymore.
Remove unused imports.
Remove fixup_storage(). I think this backwards compatibility feature
is not needed. Happy to add it back if someone misses it.
Fix possible leaks of traceback (sys.exc_info() stored in local).
Cleanup comments in ClientStorage and removed unreachable code.
Replace uses of thread module with threading module.
=== StandaloneZODB/ZEO/ClientStorage.py 1.35.6.2 => 1.35.6.3 ===
##############################################################################
"""Network ZODB storage client
-
-XXX support multiple outstanding requests up until the vote?
-XXX is_connected() vis ClientDisconnected error
"""
__version__='$Revision$'[11:-2]
import cPickle
import os
-import socket
-import string
-import struct
-import sys
import tempfile
-import thread
import threading
import time
-from types import TupleType, StringType
-from struct import pack, unpack
-import ExtensionClass, Sync, ThreadLock
-import ClientCache
-import zrpc2
-import ServerStub
-from TransactionBuffer import TransactionBuffer
+from ZEO import ClientCache, ServerStub
+from ZEO.TransactionBuffer import TransactionBuffer
+from ZEO.Exceptions import Disconnected
+from ZEO.zrpc.client import ConnectionManager
from ZODB import POSException
from ZODB.TimeStamp import TimeStamp
from zLOG import LOG, PROBLEM, INFO, BLATHER
-from Exceptions import Disconnected
def log2(type, msg, subsys="ClientStorage %d" % os.getpid()):
LOG(subsys, type, msg)
@@ -130,10 +118,11 @@
class ClientDisconnected(ClientStorageError, Disconnected):
"""The database storage is disconnected from the storage."""
-def get_timestamp(prev_ts):
+def get_timestamp(prev_ts=None):
t = time.time()
t = apply(TimeStamp, (time.gmtime(t)[:5] + (t % 60,)))
- t = t.laterThan(prev_ts)
+ if prev_ts is not None:
+ t = t.laterThan(prev_ts)
return t
class DisconnectedServerStub:
@@ -161,7 +150,9 @@
self._tbuf = TransactionBuffer()
self._db = None
self._oids = []
- # XXX It's confusing to have _serial, _serials, and _seriald.
+ # _serials: stores (oid, serialno) as returned by server
+ # _seriald: _check_serials() moves from _serials to _seriald,
+ # which maps oid to serialno
self._serials = []
self._seriald = {}
@@ -171,17 +162,15 @@
client = client or os.environ.get('ZEO_CLIENT', '')
self._cache = ClientCache.ClientCache(storage, cache_size,
client=client, var=var)
- self._cache.open() # XXX
+ self._cache.open() # XXX open now?
- self._rpc_mgr = zrpc2.ConnectionManager(addr, self,
- #debug=debug,
- tmin=min_disconnect_poll,
- tmax=max_disconnect_poll)
+ self._rpc_mgr = ConnectionManager(addr, self,
+ tmin=min_disconnect_poll,
+ tmax=max_disconnect_poll)
# XXX What if we can only get a read-only connection and we
# want a read-write connection? Looks like the current code
# will block forever.
-
if wait_for_server_on_startup:
self._rpc_mgr.connect(sync=1)
else:
@@ -191,6 +180,7 @@
def _basic_init(self, name):
"""Handle initialization activites of BaseStorage"""
+ # XXX does anything depend on attr being __name__
self.__name__ = name
# A ClientStorage only allows one client to commit at a time.
@@ -205,15 +195,19 @@
# oid_cond.
self.oid_cond = threading.Condition()
- commit_lock = thread.allocate_lock()
+ commit_lock = threading.Lock()
self._commit_lock_acquire = commit_lock.acquire
self._commit_lock_release = commit_lock.release
- t = time.time()
- t = self._ts = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
+ t = self._ts = get_timestamp()
self._serial = `t`
self._oid='\0\0\0\0\0\0\0\0'
+ def close(self):
+ self._rpc_mgr.close()
+ if self._cache is not None:
+ self._cache.close()
+
def registerDB(self, db, limit):
"""Register that the storage is controlled by the given DB."""
log2(INFO, "registerDB(%s, %s)" % (repr(db), repr(limit)))
@@ -253,10 +247,6 @@
### notifyDisconnected had to get the instance lock. There's
### nothing to gain by getting the instance lock.
- ### Note that we *don't* have to worry about getting connected
- ### in the middle of notifyDisconnected, because *it's*
- ### responsible for starting the thread that makes the connection.
-
def notifyDisconnected(self):
log2(PROBLEM, "Disconnected from storage")
self._server = disconnected_stub
@@ -298,14 +288,6 @@
return 1
def _check_tid(self, tid, exc=None):
- # XXX Is all this locking unnecessary? The only way to
- # begin a transaction is to call tpc_begin(). If we assume
- # clients are single-threaded and well-behaved, i.e. they call
- # tpc_begin() first, then there appears to be no need for
- # locking. If _check_tid() is called and self.tpc_tid != tid,
- # then there is no way it can be come equal during the call.
- # Thus, there should be no race.
-
if self.tpc_tid != tid:
if exc is None:
return 0
@@ -313,19 +295,6 @@
raise exc(self.tpc_tid, tid)
return 1
- # XXX But I'm not sure
-
- self.tpc_cond.acquire()
- try:
- if self.tpc_tid != tid:
- if exc is None:
- return 0
- else:
- raise exc(self.tpc_tid, tid)
- return 1
- finally:
- self.tpc_cond.release()
-
def abortVersion(self, src, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
@@ -388,8 +357,7 @@
def new_oid(self, last=None):
if self._is_read_only:
raise POSException.ReadOnlyError()
- # We want to avoid a situation where multiple oid requests are
- # made at the same time.
+ # avoid multiple oid requests to server at the same time
self.oid_cond.acquire()
if not self._oids:
self._oids = self._server.new_oids()
@@ -402,8 +370,7 @@
def pack(self, t=None, rf=None, wait=0, days=0):
if self._is_read_only:
raise POSException.ReadOnlyError()
- # Note that we ignore the rf argument. The server
- # will provide it's own implementation.
+ # rf argument ignored; server will provide it's own implementation
if t is None:
t = time.time()
t = t - (days * 86400)
@@ -467,9 +434,8 @@
transaction.description,
transaction._extension)
except:
- # If _server is None, then the client disconnected during
- # the tpc_begin() and notifyDisconnected() will have
- # released the lock.
+ # Client may have disconnected during the tpc_begin().
+ # Then notifyDisconnected() will have released the lock.
if self._server is not disconnected_stub:
self.tpc_cond.release()
raise
@@ -481,7 +447,7 @@
def tpc_finish(self, transaction, f=None):
if transaction is not self._transaction:
return
- if f is not None: # XXX what is f()?
+ if f is not None:
f()
self._server.tpc_finish(self._serial)
@@ -575,7 +541,6 @@
if self._pickler is None:
return
self._pickler.dump((0,0))
-## self._pickler.dump = None
self._tfile.seek(0)
unpick = cPickle.Unpickler(self._tfile)
self._tfile = None
@@ -588,7 +553,6 @@
self._db.invalidate(oid, version=version)
def Invalidate(self, args):
- # XXX _db could be None
for oid, version in args:
self._cache.invalidate(oid, version=version)
try:
@@ -598,4 +562,3 @@
"Invalidate(%s, %s) failed for _db: %s" % (repr(oid),
repr(version),
msg))
-
=== StandaloneZODB/ZEO/StorageServer.py 1.32.6.1 => 1.32.6.2 ===
import sys
import threading
-import types
-import ClientStub
-import zrpc2
-import zLOG
+from ZEO import ClientStub
+from ZEO.zrpc.server import Dispatcher
+from ZEO.zrpc.connection import ManagedServerConnection, Handler, Delay
-from zrpc2 import Dispatcher, Handler, ManagedServerConnection, Delay
+import zLOG
from ZODB.POSException import StorageError, StorageTransactionError, \
TransactionError, ReadOnlyError
from ZODB.referencesf import referencesf
@@ -129,14 +128,14 @@
self.storages = storages
self.read_only = read_only
self.connections = {}
- for name, store in storages.items():
- fixup_storage(store)
self.dispatcher = Dispatcher(addr, factory=self.newConnection,
reuse_addr=1)
- def newConnection(self, sock, addr, nil):
+ def newConnection(self, sock, addr):
+ # XXX figure out how to do the connection / proxy dance better
c = ManagedServerConnection(sock, addr, None, self)
c.register_object(StorageProxy(self, c))
+ log("new connection %s: %s" % (addr, `c`))
return c
def register(self, storage_id, proxy):
@@ -187,6 +186,7 @@
self.server = server
self.client = ClientStub.ClientStorage(conn)
self.__storage = None
+ self.__storage_id = "uninitialized"
self.__invalidated = []
self._transaction = None
@@ -201,7 +201,7 @@
stid)
def _log(self, msg, level=zLOG.INFO, error=None, pid=os.getpid()):
- zLOG.LOG("ZEO Server %s %X" % (pid, id(self)),
+ zLOG.LOG("ZEO Server:%s:%s" % (pid, self.__storage_id),
level, msg, error=error)
def setup_delegation(self):
@@ -313,6 +313,7 @@
if wait:
raise
else:
+ # XXX Why doesn't we broadcast on wait?
if not wait:
# Broadcast new size statistics
self.server.invalidate(0, self.__storage_id, (),
@@ -338,7 +339,6 @@
self._check_tid(id, exc=StorageTransactionError)
try:
# XXX does this stmt need to be in the try/except?
-
newserial = self.__storage.store(oid, serial, data, version,
self._transaction)
except TransactionError, v:
@@ -354,11 +354,13 @@
error = sys.exc_info()
self._log('store error: %s: %s' % (error[0], error[1]),
zLOG.ERROR, error=error)
- newserial = sys.exc_info()[1]
+ newserial = error[1]
+ del error
else:
if serial != '\0\0\0\0\0\0\0\0':
self.__invalidated.append((oid, version))
+ # Is all this error checking necessary?
try:
nil = dump(newserial, 1)
except:
@@ -415,7 +417,7 @@
t._extension = ext
if self.__storage._transaction is not None:
- d = zrpc2.Delay()
+ d = Delay()
self.__storage.__waiting.append((d, self, t))
return d
@@ -468,8 +470,3 @@
if n < 0:
n = 1
return [self.__storage.new_oid() for i in range(n)]
-
-def fixup_storage(storage):
- # backwards compatibility hack
- if not hasattr(storage,'tpc_vote'):
- storage.tpc_vote = lambda *args: None
=== Removed File StandaloneZODB/ZEO/zrpc2.py ===