[Zope3-checkins] CVS: Zope3/src/zope/server/ftp/tests - __init__.py:1.1.2.1 test_ftpserver.py:1.1.2.1

Jim Fulton jim@zope.com
Mon, 23 Dec 2002 14:33:21 -0500


Update of /cvs-repository/Zope3/src/zope/server/ftp/tests
In directory cvs.zope.org:/tmp/cvs-serv19908/zope/server/ftp/tests

Added Files:
      Tag: NameGeddon-branch
	__init__.py test_ftpserver.py 
Log Message:
Initial renaming before debugging

=== Added File Zope3/src/zope/server/ftp/tests/__init__.py ===
# Copyright (c) 2001 Zope Corporation and Contributors.  All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.

""" Unit tests for Zope.Server """


=== Added File Zope3/src/zope/server/ftp/tests/test_ftpserver.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: test_ftpserver.py,v 1.1.2.1 2002/12/23 19:33:20 jim Exp $
"""

import asyncore
import unittest
import tempfile
import os
import socket
import shutil
import sys
import traceback
from types import StringType
from StringIO import StringIO

from threading import Thread, Event
from zope.server.taskthreads import ThreadedTaskDispatcher
from zope.server.ftp.ftpserver import FTPServer
from zope.server.ftp.ftpstatusmessages import status_msgs
from zope.server.adjustments import Adjustments
from zope.server.interfaces.interfaces import ITask

from zope.server.vfs.osfilesystem import OSFileSystem
from zope.server.ftp.testfilesystemaccess import TestFilesystemAccess

from zope.server.tests.asyncerror import AsyncoreErrorHook

import ftplib

from time import sleep, time

td = ThreadedTaskDispatcher()

LOCALHOST = '127.0.0.1'
SERVER_PORT = 0      # Set these port numbers to 0 to auto-bind, or
CONNECT_TO_PORT = 0  # use specific numbers to inspect using TCPWatch.


my_adj = Adjustments()


def retrlines(ftpconn, cmd):
    res = []
    ftpconn.retrlines(cmd, res.append)
    return ''.join(res)


class Tests(unittest.TestCase, AsyncoreErrorHook):

    def setUp(self):
        td.setThreadCount(1)
        self.orig_map_size = len(asyncore.socket_map)
        self.hook_asyncore_error()

        self.root_dir = tempfile.mktemp()
        os.mkdir(self.root_dir)
        os.mkdir(os.path.join(self.root_dir, 'test'))

        fs = OSFileSystem(self.root_dir)
        fs_access = TestFilesystemAccess(fs)

        self.server = FTPServer(LOCALHOST, SERVER_PORT, fs_access,
                                task_dispatcher=td, adj=my_adj)
        if CONNECT_TO_PORT == 0:
            self.port = self.server.socket.getsockname()[1]
        else:
            self.port = CONNECT_TO_PORT
        self.run_loop = 1
        self.counter = 0
        self.thread_started = Event()
        self.thread = Thread(target=self.loop)
        self.thread.start()
        self.thread_started.wait(10.0)
        self.assert_(self.thread_started.isSet())

    def tearDown(self):
        self.run_loop = 0
        self.thread.join()
        td.shutdown()
        self.server.close()
        # Make sure all sockets get closed by asyncore normally.
        timeout = time() + 2
        while 1:
            if len(asyncore.socket_map) == self.orig_map_size:
                # Clean!
                break
            if time() >= timeout:
                self.fail('Leaked a socket: %s' % `asyncore.socket_map`)
                break
            asyncore.poll(0.1)

        self.unhook_asyncore_error()

        if os.path.exists(self.root_dir):
            shutil.rmtree(self.root_dir)

    def loop(self):
        self.thread_started.set()
        import select
        from errno import EBADF
        while self.run_loop:
            self.counter = self.counter + 1
            # print 'loop', self.counter
            try:
                asyncore.poll(0.1)
            except select.error, data:
                if data[0] == EBADF:
                    print "exception polling in loop(): ", data
                else:
                    raise

    def getFTPConnection(self, login=1):
        ftp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        ftp.connect((LOCALHOST, self.port))
        result = ftp.recv(10000).split()[0]
        self.assertEqual(result, '220')
        if login:
            ftp.send('USER foo\r\n')
            self.assertEqual(ftp.recv(1024),
                             status_msgs['PASS_REQUIRED'] +'\r\n')
            ftp.send('PASS bar\r\n')
            self.assertEqual(ftp.recv(1024),
                             status_msgs['LOGIN_SUCCESS'] +'\r\n')

        return ftp


    def execute(self, commands, login=1):
        ftp = self.getFTPConnection(login)

        try:
            if type(commands) is StringType:
                commands = (commands,)

            for command in commands:
                ftp.send('%s\r\n' %command)
                result = ftp.recv(10000)

            self.failUnless(result.endswith('\r\n'))
        finally:
            ftp.close()
        return result.split('\r\n')[0]


    def testABOR(self):
        self.assertEqual(self.execute('ABOR', 1),
                         status_msgs['TRANSFER_ABORTED'])


    def testCDUP(self):
        self.assertEqual(self.execute(('CWD test', 'CDUP'), 1),
                         status_msgs['SUCCESS_250'] %'CDUP')
        self.assertEqual(self.execute('CDUP', 1),
                         status_msgs['SUCCESS_250'] %'CDUP')


    def testCWD(self):
        self.assertEqual(self.execute('CWD test', 1),
                         status_msgs['SUCCESS_250'] %'CWD')
        self.assertEqual(self.execute('CWD foo', 1),
                         status_msgs['ERR_NO_DIR'] %'/foo')


    def testDELE(self):
        open(os.path.join(self.root_dir, 'foo'), 'w').write('blah')
        self.assertEqual(self.execute('DELE foo', 1),
                         status_msgs['SUCCESS_250'] %'DELE')
        res = self.execute('DELE bar', 1).split()[0]
        self.assertEqual(res, '550')
        self.assertEqual(self.execute('DELE', 1),
                         status_msgs['ERR_ARGS'])


    def testHELP(self):
        result = status_msgs['HELP_START'] #+ '\r\n'
        #result += 'Help goes here somewhen.\r\n'
        #result += status_msgs['HELP_END']

        self.assertEqual(self.execute('HELP', 1), result)


    def testLIST(self):
        conn = ftplib.FTP()
        try:
            conn.connect(LOCALHOST, self.port)
            conn.login('foo', 'bar')
            self.assertRaises(ftplib.Error, retrlines, conn, 'LIST /foo')
            listing = retrlines(conn, 'LIST')
            self.assert_(len(listing) > 0)
        finally:
            conn.close()
        # Make sure no garbage was left behind.
        self.testNOOP()


    def testNOOP(self):
        self.assertEqual(self.execute('NOOP', 0),
                         status_msgs['SUCCESS_200'] %'NOOP')
        self.assertEqual(self.execute('NOOP', 1),
                         status_msgs['SUCCESS_200'] %'NOOP')


    def testPASS(self):
        self.assertEqual(self.execute('PASS', 0),
                         status_msgs['LOGIN_MISMATCH'])
        self.assertEqual(self.execute(('USER blah', 'PASS bar'), 0),
                         status_msgs['LOGIN_MISMATCH'])


    def testQUIT(self):
        self.assertEqual(self.execute('QUIT', 0),
                         status_msgs['GOODBYE'])
        self.assertEqual(self.execute('QUIT', 1),
                         status_msgs['GOODBYE'])


    def testSTOR(self):
        conn = ftplib.FTP()
        try:
            conn.connect(LOCALHOST, self.port)
            conn.login('foo', 'bar')
            fp = StringIO('Speak softly')
            # Can't overwrite directory
            self.assertRaises(
                ftplib.error_perm, conn.storbinary, 'STOR /test', fp)
            fp = StringIO('Charity never faileth')
            # Successful write
            conn.storbinary('STOR /stuff', fp)
        finally:
            conn.close()
        # Make sure no garbage was left behind.
        self.testNOOP()


    def testUSER(self):
        self.assertEqual(self.execute('USER foo', 0),
                         status_msgs['PASS_REQUIRED'])
        self.assertEqual(self.execute('USER', 0),
                         status_msgs['ERR_ARGS'])



def test_suite():
    loader = unittest.TestLoader()
    return loader.loadTestsFromTestCase(Tests)

if __name__=='__main__':
    unittest.TextTestRunner().run( test_suite() )