[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/FTP/tests - __init__.py:1.1.2.1 testFTPServer.py:1.1.2.1 testPublisherServer.py:1.1.2.1
Stephan Richter
srichter@cbu.edu
Sat, 6 Apr 2002 14:57:10 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/Server/FTP/tests
In directory cvs.zope.org:/tmp/cvs-serv2221/tests
Added Files:
Tag: Zope3-Server-Branch
__init__.py testFTPServer.py testPublisherServer.py
Log Message:
Started writing FTP Server tests. Some are fairly easy, but othere are
hard, since FTP opens other connections as well.
=== Added File Zope3/lib/python/Zope/Server/FTP/tests/__init__.py ===
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
""" Unit tests for Zope.Server """
=== Added File Zope3/lib/python/Zope/Server/FTP/tests/testFTPServer.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: testFTPServer.py,v 1.1.2.1 2002/04/06 19:57:09 srichter Exp $
"""
import unittest
import tempfile
import os
from asyncore import socket_map, poll
import socket
from types import StringType
from threading import Thread
from Zope.Server.TaskThreads import ThreadedTaskDispatcher
from Zope.Server.FTP.FTPServer import FTPServer
from Zope.Server.FTP.FTPStatusMessages import status_msgs
from Zope.Server.Adjustments import Adjustments
from Zope.Server.ITask import ITask
from Zope.Server.Authentication.DictionaryAuthentication import \
DictionaryAuthentication
import ftplib
from time import sleep, time
td = ThreadedTaskDispatcher()
LOCALHOST = '127.0.0.1'
SERVER_PORT = 0 # Set these port numbers to 0 to auto-bind, or
CONNECT_TO_PORT = 0 # use specific numbers to inspect using TCPWatch.
my_adj = Adjustments()
# Reduce overflows to make testing easier.
my_adj.outbuf_overflow = 10000
my_adj.inbuf_overflow = 10000
class Tests(unittest.TestCase):
def setUp(self):
td.setThreadCount(1)
self.orig_map_size = len(socket_map)
self.root_dir = tempfile.mktemp()
os.mkdir(self.root_dir)
os.mkdir(os.path.join(self.root_dir, 'test'))
self.server = FTPServer(LOCALHOST, SERVER_PORT, self.root_dir,
DictionaryAuthentication({'foo': 'bar'}),
task_dispatcher=td, adj=my_adj)
if CONNECT_TO_PORT == 0:
self.port = self.server.socket.getsockname()[1]
else:
self.port = CONNECT_TO_PORT
self.run_loop = 1
self.counter = 0
self.thread = Thread(target=self.loop)
self.thread.start()
sleep(0.1) # Give the thread some time to start.
def tearDown(self):
self.run_loop = 0
self.thread.join()
td.shutdown()
self.server.close()
# Make sure all sockets get closed by asyncore normally.
timeout = time() + 5
#while 1:
# if len(socket_map) == self.orig_map_size:
# # Clean!
# break
# if time() >= timeout:
# print 'Leaked a socket: %s' % `socket_map`
# break
# #self.fail('Leaked a socket: %s' % `socket_map`)
# poll(0.1, socket_map)
def loop(self):
while self.run_loop:
self.counter = self.counter + 1
# print 'loop', self.counter
poll(0.1, socket_map)
def getFTPConnection(self, login=1):
ftp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ftp.connect((LOCALHOST, self.port))
result = ftp.recv(10000)
self.assertEqual(result, status_msgs['SERVER_READY'] %(
'localhost.localdomain') + '\r\n')
if login:
ftp.send('USER foo\r\n')
self.assertEqual(ftp.recv(1024),
status_msgs['PASS_REQUIRED'] +'\r\n')
ftp.send('PASS bar\r\n')
self.assertEqual(ftp.recv(1024),
status_msgs['LOGIN_SUCCESS'] +'\r\n')
return ftp
def execute(self, commands, login=1):
ftp = self.getFTPConnection(login)
if type(commands) is StringType:
commands = (commands,)
for command in commands:
ftp.send('%s\r\n' %command)
result = ftp.recv(10000)
#print result[:-2]
self.failUnless(result.endswith('\r\n'))
ftp.close()
return result[:-2]
def testABOR(self):
self.assertEqual(self.execute('ABOR', 1),
status_msgs['TRANSFER_ABORTED'])
#def testABOR(self):
def testCDUP(self):
self.assertEqual(self.execute(('CWD test', 'CDUP'), 1),
status_msgs['SUCCESS_250'] %'CDUP')
self.assertEqual(self.execute('CDUP', 1),
status_msgs['SUCCESS_250'] %'CDUP')
def testCWD(self):
self.assertEqual(self.execute('CWD test', 1),
status_msgs['SUCCESS_250'] %'CWD')
self.assertEqual(self.execute('CWD foo', 1),
status_msgs['ERR_NO_DIR'] %'/foo')
def testDELE(self):
open(os.path.join(self.root_dir, 'foo'), 'w').write('blah')
self.assertEqual(self.execute('DELE foo', 1),
status_msgs['SUCCESS_250'] %'DELE')
self.assertEqual(self.execute('DELE bar', 1),
status_msgs['ERR_DELETE_FILE'])
self.assertEqual(self.execute('DELE', 1),
status_msgs['CMD_UNKNOWN'] %'DELE')
def testHELP(self):
result = status_msgs['HELP_START'] #+ '\r\n'
#result += 'Help goes here somewhen.\r\n'
#result += status_msgs['HELP_END']
self.assertEqual(self.execute('HELP', 1), result)
def testLIST(self):
path = os.path.join(self.root_dir, 'foo')
result = "[Errno 2] No such file or directory: '%s'" %path
self.assertEqual(self.execute('LIST /foo', 1),
status_msgs['ERR_NO_LIST'] %result)
self.assertEqual(self.execute('LIST', 1),
status_msgs['SUCCESS_200'] %'NOOP')
def testNOOP(self):
self.assertEqual(self.execute('NOOP', 0),
status_msgs['SUCCESS_200'] %'NOOP')
self.assertEqual(self.execute('NOOP', 1),
status_msgs['SUCCESS_200'] %'NOOP')
def testPASS(self):
self.assertEqual(self.execute('PASS', 0),
status_msgs['LOGIN_MISMATCH'])
self.assertEqual(self.execute(('USER blah', 'PASS bar'), 0),
status_msgs['LOGIN_MISMATCH'])
def testQUIT(self):
self.assertEqual(self.execute('QUIT', 0),
status_msgs['GOODBYE'])
self.assertEqual(self.execute('QUIT', 1),
status_msgs['GOODBYE'])
def testUSER(self):
self.assertEqual(self.execute('USER foo', 0),
status_msgs['PASS_REQUIRED'])
self.assertEqual(self.execute('USER', 0),
status_msgs['CMD_UNKNOWN'] %'USER')
def test_suite():
loader = unittest.TestLoader()
return loader.loadTestsFromTestCase(Tests)
if __name__=='__main__':
unittest.TextTestRunner().run( test_suite() )
=== Added File Zope3/lib/python/Zope/Server/FTP/tests/testPublisherServer.py ===
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
##############################################################################
"""
$Id: testPublisherServer.py,v 1.1.2.1 2002/04/06 19:57:09 srichter Exp $
"""
#import unittest
#from asyncore import socket_map, poll
#import sys
#
#from threading import Thread
#from Zope.Server.TaskThreads import ThreadedTaskDispatcher
#from Zope.Server.HTTP.PublisherHTTPServer import PublisherHTTPServer
#from Zope.Publisher.Browser.BrowserRequest import BrowserRequest
#
#from Zope.Publisher.DefaultPublication import DefaultPublication
#from Zope.Publisher.Exceptions import Redirect, Retry
#from Zope.Publisher.HTTP import HTTPRequest
#
#from httplib import HTTPConnection
#
#from time import sleep, time
#
#td = ThreadedTaskDispatcher()
#
#LOCALHOST = '127.0.0.1'
#
#HTTPRequest.STAGGER_RETRIES = 0 # Don't pause.
#
#
#class Conflict (Exception):
# """
# Pseudo ZODB conflict error.
# """
#
#
#class PublicationWithConflict(DefaultPublication):
#
# def handleException(self, request, exc_info, retry_allowed=1):
# if exc_info[0] is Conflict and retry_allowed:
# # This simulates a ZODB retry.
# raise Retry(exc_info)
# else:
# DefaultPublication.handleException(self, request, exc_info,
# retry_allowed)
#
#
#class tested_object:
# " "
# tries = 0
#
# def __call__(self, REQUEST):
# return 'URL invoked: %s' % REQUEST.URL
#
# def redirect_method(self, REQUEST):
# "Generates a redirect using the redirect() method."
# REQUEST.getResponse().redirect("http://somewhere.com/redirect")
#
# def redirect_exception(self):
# "Generates a redirect using an exception."
# raise Redirect("http://somewhere.com/exception")
#
# def conflict(self, REQUEST, wait_tries):
# """
# Returns 202 status only after (wait_tries) tries.
# """
# if self.tries >= int(wait_tries):
# raise "Accepted"
# else:
# self.tries += 1
# raise Conflict
#
#
#
#class Tests(unittest.TestCase):
#
# def setUp(self):
#
# obj = tested_object()
# obj.folder = tested_object()
# obj.folder.item = tested_object()
#
# obj._protected = tested_object()
#
# pub = PublicationWithConflict(obj)
#
# def request_factory(input_stream, output_steam, env):
# request = BrowserRequest(input_stream, output_steam, env)
# request.setPublication(pub)
# return request
#
# td.setThreadCount(4)
# # Bind to any port on localhost.
# self.server = PublisherHTTPServer(request_factory, 'Browser',
# LOCALHOST, 0, task_dispatcher=td)
# self.port = self.server.socket.getsockname()[1]
# self.run_loop = 1
# self.thread = Thread(target=self.loop)
# self.thread.start()
# sleep(0.1) # Give the thread some time to start.
#
# def tearDown(self):
# self.run_loop = 0
# self.thread.join()
# td.shutdown()
# self.server.close()
#
# def loop(self):
# while self.run_loop:
# poll(0.1, socket_map)
#
# def testResponse(self, path='/', status_expected=200,
# add_headers=None, request_body=''):
# h = HTTPConnection(LOCALHOST, self.port)
# h.putrequest('GET', path)
# h.putheader('Accept', 'text/plain')
# if add_headers:
# for k, v in add_headers.items():
# h.putheader(k, v)
# if request_body:
# h.putheader('Content-Length', str(int(len(request_body))))
# h.endheaders()
# if request_body:
# h.send(request_body)
# response = h.getresponse()
# length = int(response.getheader('Content-Length', '0'))
# if length:
# response_body = response.read(length)
# else:
# response_body = ''
# # XXX How to test this now that we don't set the response code?
# ##self.failUnlessEqual(int(response.status), status_expected)
# self.failUnlessEqual(length, len(response_body))
#
# if (status_expected == 200):
# if path == '/': path = ''
# expect_response = 'URL invoked: http://%s:%d%s' % (LOCALHOST,
# self.port, path)
# self.failUnlessEqual(response_body, expect_response)
#
# def testDeeperPath(self):
# self.testResponse(path='/folder/item')
#
# def testNotFound(self):
# self.testResponse(path='/foo/bar', status_expected=404)
#
# def testUnauthorized(self):
# self.testResponse(path='/_protected', status_expected=401)
#
# def testRedirectMethod(self):
# self.testResponse(path='/redirect_method', status_expected=302)
#
# def testRedirectException(self):
# self.testResponse(path='/redirect_exception', status_expected=302)
# self.testResponse(path='/folder/redirect_exception',
# status_expected=302)
#
# def testConflictRetry(self):
# # Expect the "Accepted" response since the retries will succeed.
# self.testResponse(path='/conflict?wait_tries=2', status_expected=202)
#
# def testFailedConflictRetry(self):
# # Expect a "Conflict" response since there will be too many
# # conflicts.
# self.testResponse(path='/conflict?wait_tries=10', status_expected=409)
#
#
#
#def test_suite():
# loader = unittest.TestLoader()
# return loader.loadTestsFromTestCase(Tests)
#
#if __name__=='__main__':
# unittest.TextTestRunner().run( test_suite() )