[Zodb-checkins] CVS: Zope3/src/zdaemon/tests - __init__.py:1.2.4.1 nokill.py:1.1.26.1 testDaemon.py:1.8.2.1 testzdoptions.py:1.1.4.1 testzdrun.py:1.1.4.1

Grégoire Weber zope at i-con.ch
Sun Jun 22 11:22:27 EDT 2003


Update of /cvs-repository/Zope3/src/zdaemon/tests
In directory cvs.zope.org:/tmp/cvs-serv24874/src/zdaemon/tests

Added Files:
      Tag: cw-mail-branch
	__init__.py nokill.py testDaemon.py testzdoptions.py 
	testzdrun.py 
Log Message:
Synced up with HEAD

=== Added File Zope3/src/zdaemon/tests/__init__.py ===
# This file is needed to make this a package.


=== Added File Zope3/src/zdaemon/tests/nokill.py ===
#! /usr/bin/env python

import signal

signal.signal(signal.SIGTERM, signal.SIG_IGN)

while 1:
    signal.pause()


=== Added File Zope3/src/zdaemon/tests/testDaemon.py ===
# This file contains unit tests and a simple script that is explicitly
# invoked by the tests.  The script block goes first so that I don't
# have to bother getting the imports right.

# The script just kills itself or exits in a variety of ways.
import os
import sys

if __name__ == "__main__":
    arg = sys.argv[1]
    if arg == "signal":
        import signal
        os.kill(os.getpid(), signal.SIGKILL)
    elif arg == "exit":
        os._exit(2)
    os._exit(0)

# The rest is unittest stuff that can be run by testrunner.py.

import logging
import unittest
import zdaemon
import zdaemon.tests
import zdaemon.Daemon

class TestDoneError(RuntimeError):
    pass

class TestErrorHandler(logging.Handler):

    def emit(self, record):
        if record.levelno >= logging.ERROR:
            raise TestDoneError(self.format(record))

class DaemonTest(unittest.TestCase):

    dir, file = os.path.split(zdaemon.tests.__file__)
    script = os.path.join(dir, "testDaemon.py")

    def setUp(self):
        self.handler = TestErrorHandler()
        logging.root.addHandler(self.handler)
        self.level = logging.root.level # is there a better way?
        logging.root.setLevel(logging.ERROR)
        os.environ["Z_DEBUG_MODE"] = ""
        if os.environ.has_key("ZDAEMON_MANAGED"):
            del os.environ["ZDAEMON_MANAGED"]

    def tearDown(self):
        logging.root.removeHandler(self.handler)
        logging.root.setLevel(self.level)

    def run(self, arg):
        try:
            zdaemon.Daemon.run((self.script, arg))
        except SystemExit:
            pass

    def testDeathBySignal(self):
        try:
            self.run("signal")
        except TestDoneError, msg:
            self.assert_(str(msg).find("terminated by signal") != -1)
        else:
            self.fail("signal was not caught")

    def testDeathByExit(self):
        try:
            self.run("exit")
        except TestDoneError, msg:
            self.assert_(str(msg).find("exit status") != -1)
        else:
            self.fail("exit didn't raise an exception")

def test_suite():
    if hasattr(os, 'setsid'):
        return unittest.makeSuite(DaemonTest)
    else:
        return None


=== Added File Zope3/src/zdaemon/tests/testzdoptions.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

"""Test suite for zdaemon.zdoptions."""

import os
import sys
import tempfile
import unittest
from StringIO import StringIO

import ZConfig
import zdaemon
from zdaemon.zdoptions import ZDOptions

class ZDOptionsTestBase(unittest.TestCase):

    OptionsClass = ZDOptions

    def save_streams(self):
        self.save_stdout = sys.stdout
        self.save_stderr = sys.stderr
        sys.stdout = self.stdout = StringIO()
        sys.stderr = self.stderr = StringIO()

    def restore_streams(self):
        sys.stdout = self.save_stdout
        sys.stderr = self.save_stderr

    def check_exit_code(self, options, args):
        save_sys_stderr = sys.stderr
        try:
            sys.stderr = StringIO()
            try:
                options.realize(args)
            except SystemExit, err:
                self.assertEqual(err.code, 2)
            else:
                self.fail("SystemExit expected")
        finally:
            sys.stderr = save_sys_stderr


class TestZDOptions(ZDOptionsTestBase):

    input_args = ["arg1", "arg2"]
    output_opts = []
    output_args = ["arg1", "arg2"]

    def test_basic(self):
        progname = "progname"
        doc = "doc"
        options = self.OptionsClass()
        options.positional_args_allowed = 1
        options.schemadir = os.path.dirname(zdaemon.__file__)
        options.realize(self.input_args, progname, doc)
        self.assertEqual(options.progname, "progname")
        self.assertEqual(options.doc, "doc")
        self.assertEqual(options.options, self.output_opts)
        self.assertEqual(options.args, self.output_args)

    def test_configure(self):
        configfile = os.path.join(os.path.dirname(zdaemon.__file__),
                                  "sample.conf")
        for arg in "-C", "--c", "--configure":
            options = self.OptionsClass()
            options.realize([arg, configfile])
            self.assertEqual(options.configfile, configfile)

    def test_help(self):
        for arg in "-h", "--h", "--help":
            options = self.OptionsClass()
            try:
                self.save_streams()
                try:
                    options.realize([arg])
                finally:
                    self.restore_streams()
            except SystemExit, err:
                self.assertEqual(err.code, 0)
            else:
                self.fail("%s didn't call sys.exit()" % repr(arg))

    def test_unrecognized(self):
        # Check that we get an error for an unrecognized option
        self.check_exit_code(self.OptionsClass(), ["-/"])


class TestBasicFunctionality(TestZDOptions):

    def test_no_positional_args(self):
        # Check that we get an error for positional args when they
        # haven't been enabled.
        self.check_exit_code(self.OptionsClass(), ["A"])

    def test_positional_args(self):
        options = self.OptionsClass()
        options.positional_args_allowed = 1
        options.realize(["A", "B"])
        self.assertEqual(options.args, ["A", "B"])

    def test_positional_args_empty(self):
        options = self.OptionsClass()
        options.positional_args_allowed = 1
        options.realize([])
        self.assertEqual(options.args, [])

    def test_positional_args_unknown_option(self):
        # Make sure an unknown option doesn't become a positional arg.
        options = self.OptionsClass()
        options.positional_args_allowed = 1
        self.check_exit_code(options, ["-o", "A", "B"])

    def test_conflicting_flags(self):
        # Check that we get an error for flags which compete over the
        # same option setting.
        options = self.OptionsClass()
        options.add("setting", None, "a", flag=1)
        options.add("setting", None, "b", flag=2)
        self.check_exit_code(options, ["-a", "-b"])

    def test_handler_simple(self):
        # Test that a handler is called; use one that doesn't return None.
        options = self.OptionsClass()
        options.add("setting", None, "a:", handler=int)
        options.realize(["-a2"])
        self.assertEqual(options.setting, 2)

    def test_handler_side_effect(self):
        # Test that a handler is called and conflicts are not
        # signalled when it returns None.
        options = self.OptionsClass()
        L = []
        options.add("setting", None, "a:", "append=", handler=L.append)
        options.realize(["-a2", "--append", "3"])
        self.assert_(options.setting is None)
        self.assertEqual(L, ["2", "3"])

    def test_handler_with_bad_value(self):
        options = self.OptionsClass()
        options.add("setting", None, "a:", handler=int)
        self.check_exit_code(options, ["-afoo"])


class EnvironmentOptions(ZDOptionsTestBase):

    saved_schema = None

    class OptionsClass(ZDOptions):
        def __init__(self):
            ZDOptions.__init__(self)
            self.add("opt", "opt", "o:", "opt=",
                     default=42, handler=int, env="OPT")

        def load_schema(self):
            # Doing this here avoids needing a separate file for the schema:
            if self.schema is None:
                if EnvironmentOptions.saved_schema is None:
                    schema = ZConfig.loadSchemaFile(StringIO("""\
                        <schema>
                          <key name='opt' datatype='integer' default='12'/>
                        </schema>
                        """))
                    EnvironmentOptions.saved_schema = schema
                self.schema = EnvironmentOptions.saved_schema

        def load_configfile(self):
            if getattr(self, "configtext", None):
                self.configfile = tempfile.mktemp()
                f = open(self.configfile, 'w')
                f.write(self.configtext)
                f.close()
                try:
                    ZDOptions.load_configfile(self)
                finally:
                    os.unlink(self.configfile)
            else:
                ZDOptions.load_configfile(self)

    # Save and restore the environment around each test:

    def setUp(self):
        self._oldenv = os.environ
        env = {}
        for k, v in os.environ.items():
            env[k] = v
        os.environ = env

    def tearDown(self):
        os.environ = self._oldenv

    def create_with_config(self, text):
        options = self.OptionsClass()
        zdpkgdir = os.path.dirname(os.path.abspath(zdaemon.__file__))
        options.schemadir = os.path.join(zdpkgdir, 'tests')
        options.schemafile = "envtest.xml"
        # configfile must be set for ZDOptions to use ZConfig:
        if text:
            options.configfile = "not used"
            options.configtext = text
        return options


class TestZDOptionsEnvironment(EnvironmentOptions):

    def test_with_environment(self):
        os.environ["OPT"] = "2"
        self.check_from_command_line()
        options = self.OptionsClass()
        options.realize([])
        self.assertEqual(options.opt, 2)

    def test_without_environment(self):
        self.check_from_command_line()
        options = self.OptionsClass()
        options.realize([])
        self.assertEqual(options.opt, 42)

    def check_from_command_line(self):
        for args in (["-o1"], ["--opt", "1"]):
            options = self.OptionsClass()
            options.realize(args)
            self.assertEqual(options.opt, 1)

    def test_with_bad_environment(self):
        os.environ["OPT"] = "Spooge!"
        # make sure the bad value is ignored if the command-line is used:
        self.check_from_command_line()
        options = self.OptionsClass()
        try:
            self.save_streams()
            try:
                options.realize([])
            finally:
                self.restore_streams()
        except SystemExit, e:
            self.assertEqual(e.code, 2)
        else:
            self.fail("expected SystemExit")

    def test_environment_overrides_configfile(self):
        options = self.create_with_config("opt 3")
        options.realize([])
        self.assertEqual(options.opt, 3)

        os.environ["OPT"] = "2"
        options = self.create_with_config("opt 3")
        options.realize([])
        self.assertEqual(options.opt, 2)


class TestCommandLineOverrides(EnvironmentOptions):

    def test_simple_override(self):
        options = self.create_with_config("# empty config")
        options.realize(["-X", "opt=-2"])
        self.assertEqual(options.opt, -2)

    def test_error_propogation(self):
        self.check_exit_code(self.create_with_config("# empty"),
                             ["-Xopt=1", "-Xopt=2"])
        self.check_exit_code(self.create_with_config("# empty"),
                             ["-Xunknown=foo"])


def test_suite():
    suite = unittest.TestSuite()
    for cls in [TestBasicFunctionality,
                TestZDOptionsEnvironment,
                TestCommandLineOverrides]:
        suite.addTest(unittest.makeSuite(cls))
    return suite

if __name__ == "__main__":
    unittest.main(defaultTest='test_suite')


=== Added File Zope3/src/zdaemon/tests/testzdrun.py ===
"""Test suite for zdrun.py."""

import os
import sys
import time
import signal
import tempfile
import unittest
from StringIO import StringIO

from zdaemon import zdrun, zdctl

class ZDaemonTests(unittest.TestCase):

    python = os.path.abspath(sys.executable)
    assert os.path.exists(python)
    here = os.path.abspath(os.path.dirname(__file__))
    assert os.path.isdir(here)
    nokill = os.path.join(here, "nokill.py")
    assert os.path.exists(nokill)
    parent = os.path.dirname(here)
    zdrun = os.path.join(parent, "zdrun.py")
    assert os.path.exists(zdrun)

    ppath = os.pathsep.join(sys.path)

    def setUp(self):
        self.zdsock = tempfile.mktemp()
        self.new_stdout = StringIO()
        self.save_stdout = sys.stdout
        sys.stdout = self.new_stdout
        self.expect = ""

    def tearDown(self):
        sys.stdout = self.save_stdout
        for sig in (signal.SIGTERM,
                    signal.SIGHUP,
                    signal.SIGINT,
                    signal.SIGCHLD):
            signal.signal(sig, signal.SIG_DFL)
        try:
            os.unlink(self.zdsock)
        except os.error:
            pass
        output = self.new_stdout.getvalue()
        self.assertEqual(self.expect, output)

    def quoteargs(self, args):
        for i in range(len(args)):
            if " " in args[i]:
                args[i] = '"%s"' % args[i]
        return " ".join(args)

    def rundaemon(self, args):
        # Add quotes, in case some pathname contains spaces (e.g. Mac OS X)
        args = self.quoteargs(args)
        cmd = ('PYTHONPATH="%s" "%s" "%s" -d -s "%s" %s' %
               (self.ppath, self.python, self.zdrun, self.zdsock, args))
        os.system(cmd)
        # When the daemon crashes, the following may help debug it:
        ##os.system("PYTHONPATH=%s %s %s -s %s %s &" %
        ##    (self.ppath, self.python, self.zdrun, self.zdsock, args))

    def run(self, args):
        if type(args) is type(""):
            args = args.split()
        try:
            zdctl.main(["-s", self.zdsock] + args)
        except SystemExit:
            pass

    def testSystem(self):
        self.rundaemon(["echo", "-n"])
        self.expect = ""

##     def testInvoke(self):
##         self.run("echo -n")
##         self.expect = ""

##     def testControl(self):
##         self.rundaemon(["sleep", "1000"])
##         time.sleep(1)
##         self.run("stop")
##         time.sleep(1)
##         self.run("exit")
##         self.expect = "Sent SIGTERM\nExiting now\n"

##     def testStop(self):
##         self.rundaemon([self.python, self.nokill])
##         time.sleep(1)
##         self.run("stop")
##         time.sleep(1)
##         self.run("exit")
##         self.expect = "Sent SIGTERM\nSent SIGTERM; will exit later\n"

    def testHelp(self):
        self.run("-h")
        import __main__
        self.expect = __main__.__doc__

    def testOptionsSysArgv(self):
        # Check that options are parsed from sys.argv by default
        options = zdrun.ZDRunOptions()
        save_sys_argv = sys.argv
        try:
            sys.argv = ["A", "B", "C"]
            options.realize()
        finally:
            sys.argv = save_sys_argv
        self.assertEqual(options.options, [])
        self.assertEqual(options.args, ["B", "C"])

    def testOptionsBasic(self):
        # Check basic option parsing
        options = zdrun.ZDRunOptions()
        options.realize(["B", "C"], "foo")
        self.assertEqual(options.options, [])
        self.assertEqual(options.args, ["B", "C"])
        self.assertEqual(options.progname, "foo")

    def testOptionsHelp(self):
        # Check that -h behaves properly
        options = zdrun.ZDRunOptions()
        try:
            options.realize(["-h"], doc=zdrun.__doc__)
        except SystemExit, err:
            self.failIf(err.code)
        else:
            self.fail("SystemExit expected")
        self.expect = zdrun.__doc__

    def testSubprocessBasic(self):
        # Check basic subprocess management: spawn, kill, wait
        options = zdrun.ZDRunOptions()
        options.realize(["sleep", "100"])
        proc = zdrun.Subprocess(options)
        self.assertEqual(proc.pid, 0)
        pid = proc.spawn()
        self.assertEqual(proc.pid, pid)
        msg = proc.kill(signal.SIGTERM)
        self.assertEqual(msg, None)
        wpid, wsts = os.waitpid(pid, 0)
        self.assertEqual(wpid, pid)
        self.assertEqual(os.WIFSIGNALED(wsts), 1)
        self.assertEqual(os.WTERMSIG(wsts), signal.SIGTERM)
        proc.setstatus(wsts)
        self.assertEqual(proc.pid, 0)

def test_suite():
    suite = unittest.TestSuite()
    if os.name == "posix":
        suite.addTest(unittest.makeSuite(ZDaemonTests))
    return suite

if __name__ == '__main__':
    __file__ = sys.argv[0]
    unittest.main(defaultTest='test_suite')




More information about the Zodb-checkins mailing list