[Zope3-checkins]
SVN: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/test/
Finially integrated Twisted ftp tests with Zope. This way we
can run Twisted's
Michael Kerrin
michael.kerrin at openapp.biz
Fri Oct 7 07:38:56 EDT 2005
Log message for revision 38870:
Finially integrated Twisted ftp tests with Zope. This way we can run Twisted's
functional FTP tests on the Zope server.
To run these tests run:
trial.py zope.app.twisted.ftp.test
Changed:
A Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/test/
A Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/test/__init__.py
A Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/test/test_zope_ftp.py
-=-
Added: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/test/__init__.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/test/__init__.py 2005-10-07 11:38:20 UTC (rev 38869)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/test/__init__.py 2005-10-07 11:38:55 UTC (rev 38870)
@@ -0,0 +1 @@
+## This package contains a Twisted trial test for Zope 3.
Property changes on: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/test/__init__.py
___________________________________________________________________
Name: svn:eol-style
+ native
Added: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/test/test_zope_ftp.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/test/test_zope_ftp.py 2005-10-07 11:38:20 UTC (rev 38869)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/test/test_zope_ftp.py 2005-10-07 11:38:55 UTC (rev 38870)
@@ -0,0 +1,195 @@
+##############################################################################
+#
+# Copyright (c) 2005 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""This is Zope's first trial test.
+
+This file basically contains FTP functional tests.
+
+To run these tests run::
+
+ $INSTANCE_HOME/trial.py zope.app.twisted.ftp.test
+"""
+__docformat__="restructuredtext"
+from cStringIO import StringIO
+import posixpath
+
+from twisted.test import test_ftp
+from twisted.internet import reactor, protocol, defer
+from twisted.protocols import ftp
+from twisted.trial.util import wait
+
+from zope.app.twisted.ftp.server import FTPRealm, FTPFactory
+from zope.app.twisted.ftp.tests.test_publisher import RequestFactory
+from zope.app.twisted.ftp.tests import demofs
+
+class DemoFileSystem(demofs.DemoFileSystem):
+ def mkdir_nocheck(self, path):
+ path, name = posixpath.split(path)
+ d = self.getdir(path)
+ if name in d.files:
+ raise OSError("Already exists:", name)
+ newdir = self.Directory()
+ newdir.grant(self.user, demofs.read | demofs.write)
+ d.files[name] = newdir
+
+ def writefile_nocheck(self, path, instream, start = None,
+ end = None, append = False):
+ path, name = posixpath.split(path)
+ d = self.getdir(path)
+ f = d.files.get(name)
+ if f is None:
+ d = self.getdir(path)
+ f = d.files[name] = self.File()
+ f.grant(self.user, demofs.read | demofs.write)
+ elif f.type != 'f':
+ raise OSError("Can't overwrite a directory")
+
+ if append:
+ f.data += instream.read()
+ else:
+ f.data = instream.read()
+
+class FTPServerTestCase(test_ftp.FTPServerTestCase):
+ def tearDown(self):
+ # Clean up sockets
+ self.client.transport.loseConnection()
+ d = self.port.stopListening()
+ if d is not None:
+ wait(d)
+
+ del self.serverProtocol
+
+ def setUp(self):
+ root = demofs.Directory()
+ # the tuple has a user name is used by ZopeSimpleAuthentication to
+ # authenticate users.
+ root.grant(('root', 'root'), demofs.write)
+ self.rootfs = rootfs = DemoFileSystem(root, ('root', 'root'))
+
+ # Start the server
+ self.factory = FTPFactory(request_factory = RequestFactory(rootfs))
+ self.port = reactor.listenTCP(0, self.factory, interface="127.0.0.1")
+
+ # Hook the server's buildProtocol to make the protocol instance
+ # accessible to tests.
+ buildProtocol = self.factory.buildProtocol
+ def _rememberProtocolInstance(addr):
+ protocol = buildProtocol(addr)
+ self.serverProtocol = protocol.wrappedProtocol
+ return protocol
+ self.factory.buildProtocol = _rememberProtocolInstance
+
+ # Connect a client to it
+ portNum = self.port.getHost().port
+ clientCreator = protocol.ClientCreator(reactor, ftp.FTPClientBasic)
+ self.client = wait(clientCreator.connectTCP("127.0.0.1", portNum))
+
+ def _anonymousLogin(self):
+ responseLines = wait(self.client.queueStringCommand('USER anonymous'))
+ self.assertEquals(
+ ['331 Password required for anonymous.'],
+ responseLines
+ )
+
+ responseLines = wait(self.client.queueStringCommand(
+ 'PASS test at twistedmatrix.com')
+ )
+ self.assertEquals(
+ ['230 User logged in, proceed'],
+ responseLines
+ )
+
+class BasicFTPServerTestCase(FTPServerTestCase, test_ftp.BasicFTPServerTestCase):
+ pass
+
+class FTPServerPasvDataConnectionTestCase(FTPServerTestCase,
+ test_ftp.FTPServerPasvDataConnectionTestCase):
+
+ def testLIST(self):
+ # Login
+ self._anonymousLogin()
+
+ # Download a listing
+ downloader = self._makeDataConnection()
+ d = self.client.queueStringCommand('LIST')
+ wait(defer.gatherResults([d, downloader.d]))
+
+ # No files, so the file listing should be empty
+ self.assertEqual('', downloader.buffer)
+
+ # Make some directories
+ self.rootfs.mkdir_nocheck('/foo')
+ self.rootfs.mkdir_nocheck('/bar')
+## os.mkdir(os.path.join(self.directory, 'foo'))
+## os.mkdir(os.path.join(self.directory, 'bar'))
+
+ # Download a listing again
+ downloader = self._makeDataConnection()
+ d = self.client.queueStringCommand('LIST')
+ wait(defer.gatherResults([d, downloader.d]))
+
+ # Now we expect 2 lines because there are two files.
+ self.assertEqual(2, len(downloader.buffer[:-2].split('\r\n')))
+
+ # Download a names-only listing
+ downloader = self._makeDataConnection()
+ d = self.client.queueStringCommand('NLST ')
+ wait(defer.gatherResults([d, downloader.d]))
+ filenames = downloader.buffer[:-2].split('\r\n')
+ filenames.sort()
+ self.assertEqual(['bar', 'foo'], filenames)
+
+ # Download a listing of the 'foo' subdirectory
+ downloader = self._makeDataConnection()
+ d = self.client.queueStringCommand('LIST foo')
+ wait(defer.gatherResults([d, downloader.d]))
+
+ # 'foo' has no files, so the file listing should be empty
+ self.assertEqual('', downloader.buffer)
+
+ # Change the current working directory to 'foo'
+ wait(self.client.queueStringCommand('CWD foo'))
+
+ # Download a listing from within 'foo', and again it should be empty
+ downloader = self._makeDataConnection()
+ d = self.client.queueStringCommand('LIST')
+ wait(defer.gatherResults([d, downloader.d]))
+ self.assertEqual('', downloader.buffer)
+
+ def testManyLargeDownloads(self):
+ # Login
+ self._anonymousLogin()
+
+
+ # Download a range of different size files
+ for size in range(100000, 110000, 500):
+ self.rootfs.writefile_nocheck('/%d.txt' % (size,), StringIO('x' * size))
+## fObj = file(os.path.join(self.directory, '%d.txt' % (size,)), 'wb')
+## fObj.write('x' * size)
+## fObj.close()
+
+ downloader = self._makeDataConnection()
+ d = self.client.queueStringCommand('RETR %d.txt' % (size,))
+ wait(defer.gatherResults([d, downloader.d]))
+ self.assertEqual('x' * size, downloader.buffer)
+
+
+class FTPServerPortDataConnectionTestCaes(FTPServerPasvDataConnectionTestCase, test_ftp.FTPServerPortDataConnectionTestCase):
+ def setUp(self):
+ FTPServerPasvDataConnectionTestCase.setUp(self)
+ self.dataPorts = []
+
+ def tearDown(self):
+ l = [defer.maybeDeferred(port.stopListening) for port in self.dataPorts]
+ wait(defer.DeferredList(l, fireOnOneErrback=True))
+ return FTPServerPasvDataConnectionTestCase.tearDown(self)
Property changes on: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/test/test_zope_ftp.py
___________________________________________________________________
Name: svn:eol-style
+ native
More information about the Zope3-Checkins
mailing list