[Zope-CVS] CVS: Packages/zasync/client/zasync - client.py:1.4
plugins.py:1.3
Gary Poster
gary at zope.com
Thu Oct 21 15:30:00 EDT 2004
Update of /cvs-repository/Packages/zasync/client/zasync
In directory cvs.zope.org:/tmp/cvs-serv6303/client/zasync
Modified Files:
client.py plugins.py
Log Message:
fix bug: if tool disappeared, then zasync was unable to recover because of a number of problems. most importantly, it was holding on to connections without closing them. Fixed this bug and similar problems elsewhere.
More paranoid about starting up now, and more paranoid about telling the manager about the available plugins when the tool has been re-found.
=== Packages/zasync/client/zasync/client.py 1.3 => 1.4 ===
--- Packages/zasync/client/zasync/client.py:1.3 Tue Oct 19 12:45:44 2004
+++ Packages/zasync/client/zasync/client.py Thu Oct 21 15:29:59 2004
@@ -74,11 +74,14 @@
# helpers
def housekeeping(interval=4):
+ global chores
+ log = logging.getLogger('zasync')
+ log.debug('housekeeping: performing %d chores', len(chores))
for (f, args, kwargs) in chores:
reactor.callLater(0, f, *args, **kwargs)
reactor.callLater(interval, housekeeping, interval)
-def getRequestApp():
+def getRequestApp(application):
response = HTTPResponse(stdout=sys.stdout)
request = HTTPRequest(
sys.stdin,
@@ -86,7 +89,7 @@
'SERVER_PORT':'80',
'REQUEST_METHOD':'GET'},
response)
- return app().__of__(RequestContainer(REQUEST=request))
+ return application.__of__(RequestContainer(REQUEST=request))
def logException(msg='', log=None):
global verbose_traceback
@@ -168,12 +171,31 @@
def retryTool(path, delay):
global retry_exponential_backoff, max_total_retry
- global tool_retry_start, app
- tool = None
+ global tool_retry_start, app, DB
+ application = None
try:
try:
- tool = app().unrestrictedTraverse(path)
+ try:
+ sync = DB._storage.sync # important
+ except AttributeError:
+ pass
+ else:
+ sync()
+ application = app()
+ tool = application.unrestrictedTraverse(path)
+ get_transaction().commit()
+ except ConflictError:
+ logging.getLogger('zasync').info(
+ 'Received ConflictError while trying to retry tool; retrying.')
+ get_transaction().abort()
+ reactor.callLater(delay, retryTool, path, delay)
+ except ClientDisconnected:
+ logging.getLogger('zasync').critical(
+ 'ZEO server disconnected while trying to retry tool')
+ get_transaction().abort()
+ scheduleServerRetry(retryTool, path, delay)
except (AttributeError, LookupError):
+ get_transaction().abort()
delay = pow(delay, retry_exponential_backoff)
diff = datetime.datetime.now() - tool_retry_start
seconds = diff.seconds + 86400 * diff.days
@@ -193,14 +215,21 @@
retries = tool_retries.pop(path)
delay = 0
interval = 0.25
+ # schedule all the past calls
+ found = False
for call, args, kwargs in retries:
reactor.callLater(delay, call, *args, **kwargs)
delay += interval
+ if not found and call is _setPlugins:
+ found = True
+ if not found:
+ # make sure tool (possibly new) knows about my plugins
+ setPlugins(path) # returns a deferred, which we ignore
logging.getLogger('zasync').info(
'tool /%s found again; recommencing calls.', '/'.join(path))
finally:
- if tool is not None:
- tool._p_jar.close()
+ if application is not None:
+ application._p_jar.close()
def timeoutErrback(deferred):
if not deferred.called:
@@ -212,11 +241,11 @@
return value
# pollZope schedules makeCall which schedules returnResult, possibly using
-# prepareDeferredCall if the result is a deferred.
+# prepareDeferredCall if the result is a deferred (and it usually is).
def pollZope(path):
"polls Zope to check for new calls, timeout changes, and expired calls"
global active, app, DB
- tool = None
+ application = None
try:
if is_connected():
log = logging.getLogger('zasync')
@@ -228,8 +257,9 @@
pass
else:
sync()
+ application = app()
try:
- tool = app().unrestrictedTraverse(path)
+ tool = application.unrestrictedTraverse(path)
except (AttributeError, LookupError):
scheduleToolRetry(path, pollZope, path)
return
@@ -281,8 +311,8 @@
else:
scheduleServerRetry(pollZope, path)
finally:
- if tool is not None:
- tool._p_jar.close()
+ if application is not None:
+ application._p_jar.close()
def makeCall(path, zopeDeferredId, name, args, kwargs, count=0):
# zopeDeferredId is an id of a deferred in the asynchronous tool
@@ -346,12 +376,13 @@
log = logging.getLogger('zasync')
log.debug('prepareDeferredCall called for %s (%s)',
zopeDeferredId, path)
- tool = None
+ application = None
try:
if is_connected():
try:
+ application = app()
try:
- tool = app().unrestrictedTraverse(path)
+ tool = application.unrestrictedTraverse(path)
except (AttributeError, LookupError):
scheduleToolRetry(
path, prepareDeferredCall, path, zopeDeferredId,
@@ -405,11 +436,11 @@
scheduleServerRetry(
makeCall, path, zopeDeferredId, name, args, kwargs, count)
finally:
- if tool is not None:
- tool._p_jar.close()
+ if application is not None:
+ application._p_jar.close()
def returnResult(value, path, zopeDeferredId, error=False, count=0):
- global active
+ global active, app
if isinstance(value, failure.Failure):
value.cleanFailure()
log = logging.getLogger('zasync')
@@ -419,12 +450,13 @@
del active[zopeDeferredId]
except KeyError:
pass
- tool = None
+ application = None
try:
if is_connected():
try:
+ application = app()
try:
- tool = getRequestApp().unrestrictedTraverse(path)
+ tool = getRequestApp(application).unrestrictedTraverse(path)
except (AttributeError, LookupError):
scheduleToolRetry(
path, returnResult, value, path, zopeDeferredId,
@@ -467,63 +499,155 @@
scheduleServerRetry(
returnResult, value, path, zopeDeferredId, error, count)
finally:
- if tool is not None:
- tool._p_jar.close()
+ if application is not None:
+ application._p_jar.close()
return res
def run(path=None):
# to be called after config.initialize
- global app, tool_path, plugins
+ d = setPlugins(path)
+ d.addCallbacks(handlePastCalls, stop)
+ d.addErrback(stop)
+ log = logging.getLogger('zasync')
+ log.debug('beginning reactor')
+ try:
+ reactor.run()
+ finally:
+ shutdown(None)
+
+def stop(ignored=None):
+ raise SystemExit()
+
+def setPlugins(path=None):
+ global tool_path
if path is None:
path = tool_path
- path = tuple(path)
- tool = app().unrestrictedTraverse(tool_path)
- tool.setPlugins([(n, p['description']) for n, p in plugins.items()])
- for attempt in range(max_conflict_resolution_attempts):
+ tool_path = path = tuple(path)
+ d = defer.Deferred()
+ reactor.callLater(0, _setPlugins, d, path)
+ return d
+
+def _setPlugins(deferred, path, count=0):
+ global app, plugins, max_conflict_resolution_attempts
+ log = logging.getLogger('zasync')
+ application = None
+ try:
try:
- info = [(d.getSignature(), d.key) for d in
- tool.getAcceptedCalls()]
- get_transaction().commit()
+ application = app()
+ try:
+ tool = application.unrestrictedTraverse(tool_path)
+ except (AttributeError, LookupError):
+ scheduleToolRetry(path, _setPlugins, deferred, path, count)
+ return
+ else:
+ tool.setPlugins(
+ [(n, p['description']) for n, p in plugins.items()])
+ get_transaction().commit()
except ConflictError:
- pass
+ get_transaction().abort()
+ if count == max_conflict_resolution_attempts-1:
+ log.critical(
+ "Too many conflicts trying to setPlugins!", exc_info=True)
+ deferred.errback(failure.Failure())
+ else:
+ count += 1
+ log.info("Conflict error %d trying to setPlugins", count)
+ reactor.callLater(count, _setPlugins, deferred, path, count)
+ except ClientDisconnected:
+ scheduleServerRetry(_setPlugins, deferred, path, count)
+ return
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except:
+ get_transaction().abort()
+ log.critical("Unexpected error: probably serious", exc_info=True)
+ deferred.errback(failure.Failure())
else:
- break
- else:
- raise RuntimeError("Too many conflicts")
- for (name, args, kwargs), key in info:
+ deferred.callback(path)
+ finally:
+ if application is not None:
+ application._p_jar.close()
+
+def handlePastCalls(path, count=0):
+ global app
+ log = logging.getLogger('zasync')
+ application = None
+ try:
try:
- call_info = plugins[name]
- except KeyError:
- retry = False
- else:
- retry = call_info.get('retry', False)
- if retry: # XXX don't retry if timed out
- reactor.callLater(
- 0, makeCall, tool_path, key, name, args, kwargs)
+ application = app()
+ try:
+ tool = application.unrestrictedTraverse(path)
+ except (AttributeError, LookupError):
+ scheduleToolRetry(path, handlePastCalls, path, count)
+ return
+ else:
+ info = [(d.getSignature(), d.key) for d in
+ tool.getAcceptedCalls()]
+ get_transaction().commit()
+ except ConflictError:
+ get_transaction().abort()
+ if count==max_conflict_resolution_attempts-1:
+ log.critical(
+ "Too many conflicts trying to handlePastCalls!",
+ exc_info=True)
+ raise SystemExit("Too many conflicts trying to handlePastCalls")
+ else:
+ count+=1
+ log.info("Conflict error %d trying to handlePastCalls", count)
+ reactor.callLater(count, handlePastCalls, path, count)
+ except ClientDisconnected:
+ scheduleServerRetry(handlePastCalls, path, count)
+ return
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except:
+ get_transaction().abort()
+ log.critical("Unexpected error: probably serious", exc_info=True)
+ deferred.errback(failure.Failure())
else:
- reactor.callLater(
- 0,
- returnResult,
- failure.Failure(
- defer.TimeoutError(
- 'zasync was disconnected (now reconnected)')),
- tool_path,
- key,
- error=True)
- tool._p_jar.close()
- reactor.callLater(0, pollZope, tool_path)
- reactor.callLater(0, housekeeping)
+ for (name, args, kwargs), key in info:
+ try:
+ call_info = plugins[name]
+ except KeyError:
+ retry = False
+ else:
+ retry = call_info.get('retry', False)
+ if retry: # XXX don't retry if timed out
+ reactor.callLater(
+ 0, makeCall, tool_path, key, name, args, kwargs)
+ else:
+ reactor.callLater(
+ 0,
+ returnResult,
+ failure.Failure(
+ defer.TimeoutError(
+ 'zasync was disconnected (now reconnected)')),
+ tool_path,
+ key,
+ error=True)
+ log.debug('scheduling first pollZope')
+ reactor.callLater(0, pollZope, path)
+ log.debug('scheduling first housekeeping')
+ reactor.callLater(0, housekeeping)
+ finally:
+ if application is not None:
+ application._p_jar.close()
+
+def shutdown(ignored=None):
+ global tool_path, app
+ application = None
try:
- reactor.run()
- finally:
get_transaction().abort()
+ application = app()
try:
- tool = app().unrestrictedTraverse(tool_path)
- except (AttributeError, KeyError, ConflictError, ClientDisconnected):
- pass
- else:
+ tool = application.unrestrictedTraverse(tool_path)
tool.setPlugins(())
get_transaction().commit()
- tool._p_jar.close()
+ except (AttributeError, KeyError, ConflictError, ClientDisconnected):
+ get_transaction().abort()
+ finally:
+ if application is not None:
+ application._p_jar.close()
logging.getLogger('zasync').critical('Shutting down')
logging.shutdown()
+ return ignored
=== Packages/zasync/client/zasync/plugins.py 1.2 => 1.3 ===
--- Packages/zasync/client/zasync/plugins.py:1.2 Tue Oct 19 12:51:38 2004
+++ Packages/zasync/client/zasync/plugins.py Thu Oct 21 15:29:59 2004
@@ -374,7 +374,7 @@
'zope_exec: beginning worker thread %r', thread_id)
global taskQueue, taskStatus, threadIds, callbacks, serverDown
from zasync.client import app, max_conflict_resolution_attempts
- root = None
+ application = None
try:
while 1: # keep on looking for tasks
zopeDeferredTuple, homepath, actions = taskQueue.get() # blocks
@@ -391,7 +391,14 @@
try:
try:
deferred = None
- home = root = client.getRequestApp()
+ try:
+ sync = client.DB._storage.sync # important
+ except AttributeError:
+ pass
+ else:
+ sync()
+ application = client.app()
+ home = root = client.getRequestApp(application)
tool = root.unrestrictedTraverse(toolpath)
zopeDeferred = tool.getDeferred(zopeDeferredId)
user = zopeDeferred.getWrappedOwner()
@@ -607,8 +614,9 @@
callbacks.put((deferred, fail))
break
finally:
- if root is not None:
- root._p_jar.close()
+ if application is not None:
+ application._p_jar.close()
+ application = None
else: # too many conflict resolution attempts
logger.debug(
'zope_exec: worker %s got too many conflict errors: '
More information about the Zope-CVS
mailing list