[Zope-Checkins] CVS: ZODB3/ZEO - start.py:1.52 StorageServer.py:1.79 ClientStorage.py:1.78
Jeremy Hylton
jeremy@zope.com
Mon, 18 Nov 2002 18:17:41 -0500
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv21897/ZEO
Modified Files:
start.py StorageServer.py ClientStorage.py
Log Message:
Merge ZODB 3.1 changes to the trunk.
XXX Not sure if berkeley still works.
=== ZODB3/ZEO/start.py 1.51 => 1.52 ===
--- ZODB3/ZEO/start.py:1.51 Fri Nov 1 15:36:28 2002
+++ ZODB3/ZEO/start.py Mon Nov 18 18:17:41 2002
@@ -17,6 +17,8 @@
import sys, os, getopt
import types
+import errno
+import socket
def directory(p, n=1):
d = p
=== ZODB3/ZEO/StorageServer.py 1.78 => 1.79 ===
--- ZODB3/ZEO/StorageServer.py:1.78 Wed Nov 13 06:24:36 2002
+++ ZODB3/ZEO/StorageServer.py Mon Nov 18 18:17:41 2002
@@ -206,17 +206,16 @@
def __init__(self, server, read_only=0):
self.server = server
+ self.connection = None
self.client = None
self.storage = None
self.storage_id = "uninitialized"
self.transaction = None
self.read_only = read_only
- self.timeout = TimeoutThread()
- self.timeout.start()
def notifyConnected(self, conn):
+ self.connection = conn # For restart_other() below
self.client = self.ClientStorageStubClass(conn)
- self.timeout.notifyConnected(conn)
def notifyDisconnected(self):
# When this storage closes, we must ensure that it aborts
@@ -226,7 +225,6 @@
self.abort()
else:
self.log("disconnected")
- self.timeout.notifyDisconnected()
def __repr__(self):
tid = self.transaction and repr(self.transaction.id)
@@ -416,13 +414,8 @@
" requests from one client.")
# (This doesn't require a lock because we're using asyncore)
- if self.storage._transaction is None:
- self.strategy = self.ImmediateCommitStrategyClass(self.storage,
- self.client)
- self.timeout.begin()
- else:
- self.strategy = self.DelayedCommitStrategyClass(self.storage,
- self.wait)
+ self.strategy = self.DelayedCommitStrategyClass(self.storage,
+ self.wait)
t = Transaction()
t.id = id
@@ -436,7 +429,6 @@
def tpc_finish(self, id):
if not self.check_tid(id):
return
- self.timeout.end()
invalidated = self.strategy.tpc_finish()
if invalidated:
self.server.invalidate(self, self.storage_id,
@@ -448,7 +440,6 @@
def tpc_abort(self, id):
if not self.check_tid(id):
return
- self.timeout.end()
strategy = self.strategy
strategy.tpc_abort()
self.transaction = None
@@ -469,9 +460,7 @@
def vote(self, id):
self.check_tid(id, exc=StorageTransactionError)
- r = self.strategy.tpc_vote()
- self.timeout.begin()
- return r
+ return self.strategy.tpc_vote()
def abortVersion(self, src, id):
self.check_tid(id, exc=StorageTransactionError)
@@ -503,8 +492,10 @@
"Clients waiting: %d." % len(self.storage._waiting))
return d
else:
- self.restart()
- return None
+ return self.restart()
+
+ def dontwait(self):
+ return self.restart()
def handle_waiting(self):
while self.storage._waiting:
@@ -526,7 +517,7 @@
except:
self.log("Unexpected error handling waiting transaction",
level=zLOG.WARNING, error=sys.exc_info())
- zeo_storage._conn.close()
+ zeo_storage.connection.close()
return 0
else:
return 1
@@ -539,6 +530,8 @@
resp = old_strategy.restart(self.strategy)
if delay is not None:
delay.reply(resp)
+ else:
+ return resp
# A ZEOStorage instance can use different strategies to commit a
# transaction. The current implementation uses different strategies
@@ -767,79 +760,6 @@
self.delay.error(sys.exc_info())
else:
self.delay.reply(result)
-
-class TimeoutThread(threading.Thread):
- # A TimeoutThread is associated with a ZEOStorage. It trackes
- # how long transactions take to commit. If a transaction takes
- # too long, it will close the connection.
-
- TIMEOUT = 30
-
- def __init__(self):
- threading.Thread.__init__(self)
- self._lock = threading.Lock()
- self._timestamp = None
- self._conn = None
-
- def begin(self):
- self._lock.acquire()
- try:
- self._timestamp = time.time()
- finally:
- self._lock.release()
-
- def end(self):
- self._lock.acquire()
- try:
- self._timestamp = None
- finally:
- self._lock.release()
-
- # There's a race here, but I hope it is harmless.
-
- def notifyConnected(self, conn):
- self._conn = conn
-
- def notifyDisconnected(self):
- self._conn = None
-
- def run(self):
- timeout = self.TIMEOUT
- while self._conn is not None:
- time.sleep(timeout)
-
- self._lock.acquire()
- try:
- if self._timestamp is not None:
- deadline = self._timestamp + self.TIMEOUT
- else:
- log("TimeoutThread no current transaction",
- zLOG.BLATHER)
- timeout = self.TIMEOUT
- continue
- finally:
- self._lock.release()
-
- timeout = deadline - time.time()
- if deadline < time.time():
- self._abort()
- break
- else:
- elapsed = self.TIMEOUT - timeout
- log("TimeoutThread transaction has %0.2f sec to complete"
- " (%.2f elapsed)" % (timeout, elapsed), zLOG.BLATHER)
- log("TimeoutThread exiting. Connection closed.", zLOG.BLATHER)
-
- def _abort(self):
- # It's possible for notifyDisconnected to remove the connection
- # just before we use it. I think that's harmless, since it means
- # the connection was closed.
- log("TimeoutThread aborting transaction", zLOG.WARNING)
- try:
- self._conn.close()
- except AttributeError, msg:
- log(msg)
-
# Patch up class references
StorageServer.ZEOStorageClass = ZEOStorage
=== ZODB3/ZEO/ClientStorage.py 1.77 => 1.78 ===
--- ZODB3/ZEO/ClientStorage.py:1.77 Wed Nov 13 06:24:36 2002
+++ ZODB3/ZEO/ClientStorage.py Mon Nov 18 18:17:41 2002
@@ -28,9 +28,11 @@
import cPickle
import os
+import socket
import tempfile
import threading
import time
+import types
from ZEO import ClientCache, ServerStub
from ZEO.TransactionBuffer import TransactionBuffer
@@ -204,6 +206,8 @@
self._storage = storage
self._read_only_fallback = read_only_fallback
self._connection = None
+ # _server_addr is used by sortKey()
+ self._server_addr = None
self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
'supportsUndo':0, 'supportsVersions': 0,
@@ -339,6 +343,7 @@
log2(INFO, "Reconnected to storage")
else:
log2(INFO, "Connected to storage")
+ self.set_server_addr(conn.get_addr())
stub = self.StorageServerStubClass(conn)
self._oids = []
self._info.update(stub.get_info())
@@ -350,6 +355,33 @@
self._connection = conn
self._server = stub
+ def set_server_addr(self, addr):
+ # Normalize server address and convert to string
+ if isinstance(addr, types.StringType):
+ self._server_addr = addr
+ else:
+ assert isinstance(addr, types.TupleType)
+ # If the server is on a remote host, we need to guarantee
+ # that all clients used the same name for the server. If
+ # they don't, the sortKey() may be different for each client.
+ # The best solution seems to be the official name reported
+ # by gethostbyaddr().
+ host = addr[0]
+ try:
+ canonical, aliases, addrs = socket.gethostbyaddr(host)
+ except socket.error, err:
+ log2(BLATHER, "Error resoving host: %s (%s)" % (host, err))
+ canonical = host
+ self._server_addr = str((canonical, addr[1]))
+
+ def sortKey(self):
+ # If the client isn't connected to anything, it can't have a
+ # valid sortKey(). Raise an error to stop the transaction early.
+ if self._server_addr is None:
+ raise ClientDisconnected
+ else:
+ return self._server_addr
+
def verify_cache(self, server):
"""Internal routine called to verify the cache."""
# XXX beginZeoVerify ends up calling back to beginVerify() below.
@@ -622,10 +654,14 @@
"""Internal helper to end a transaction."""
# the right way to set self._transaction to None
# calls notify() on _tpc_cond in case there are waiting threads
+ self._ltid = self._serial
self._tpc_cond.acquire()
self._transaction = None
self._tpc_cond.notify()
self._tpc_cond.release()
+
+ def lastTransaction(self):
+ return self._ltid
def tpc_abort(self, transaction):
"""Storage API: abort a transaction."""