[Zope-Checkins] CVS: ZODB3/zdaemon/tests - testzdrun.py:1.5.16.3
Jeremy Hylton
jeremy at zope.com
Fri Sep 19 17:03:08 EDT 2003
Update of /cvs-repository/ZODB3/zdaemon/tests
In directory cvs.zope.org:/tmp/cvs-serv11598/zdaemon/tests
Modified Files:
Tag: Zope-2_7-branch
testzdrun.py
Log Message:
Restore test deleted by the merge of ZODB3-3_2-branch.
=== ZODB3/zdaemon/tests/testzdrun.py 1.5.16.2 => 1.5.16.3 ===
--- ZODB3/zdaemon/tests/testzdrun.py:1.5.16.2 Mon Sep 15 14:02:54 2003
+++ ZODB3/zdaemon/tests/testzdrun.py Fri Sep 19 17:03:08 2003
@@ -6,8 +6,8 @@
import signal
import tempfile
import unittest
+import socket
from StringIO import StringIO
-
from zdaemon import zdrun, zdctl
class ZDaemonTests(unittest.TestCase):
@@ -145,6 +145,49 @@
self.assertEqual(os.WTERMSIG(wsts), signal.SIGTERM)
proc.setstatus(wsts)
self.assertEqual(proc.pid, 0)
+
+ def testRunIgnoresParentSignals(self):
+ # Spawn a process which will in turn spawn a zdrun process.
+ # We make sure that the zdrun process is still running even if
+ # its parent process receives an interrupt signal (it should
+ # not be passed to zdrun).
+ zdrun_socket = os.path.join(self.here, 'testsock')
+ zdctlpid = os.spawnvp(
+ os.P_NOWAIT,
+ sys.executable,
+ [sys.executable, os.path.join(self.here, 'parent.py')]
+ )
+ time.sleep(2) # race condition possible here
+ os.kill(zdctlpid, signal.SIGINT)
+ try:
+ response = send_action('status\n', zdrun_socket) or ''
+ except socket.error, msg:
+ response = ''
+ params = response.split('\n')
+ self.assert_(len(params) > 1)
+ # kill the process
+ send_action('exit\n', zdrun_socket)
+
+def send_action(action, sockname):
+ """Send an action to the zdrun server and return the response.
+
+ Return None if the server is not up or any other error happened.
+ """
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ try:
+ sock.connect(sockname)
+ sock.send(action + "\n")
+ sock.shutdown(1) # We're not writing any more
+ response = ""
+ while 1:
+ data = sock.recv(1000)
+ if not data:
+ break
+ response += data
+ sock.close()
+ return response
+ except socket.error, msg:
+ return None
def test_suite():
suite = unittest.TestSuite()
More information about the Zope-Checkins
mailing list