[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/POP3/tests - foo_mb.py:1.1.2.1 testPOP3Message.py:1.1.2.1 testPOP3MessageList.py:1.1.2.1 testPOP3Server.py:1.1.2.1
Stephan Richter
srichter@cbu.edu
Sun, 7 Apr 2002 17:39:17 -0400
Update of /cvs-repository/Zope3/lib/python/Zope/Server/POP3/tests
In directory cvs.zope.org:/tmp/cvs-serv25992/POP3/tests
Added Files:
Tag: Zope3-Server-Branch
foo_mb.py testPOP3Message.py testPOP3MessageList.py
testPOP3Server.py
Log Message:
I just finished the POP3 server implmentation. Right now it does only work
with Unix-like mailboxes, but is not limited to run on Unix I believe. It
will not make much sense to write the Zope connection until we have the
SMTP implemented and a MailMessageService has been written.
Please try the code and let me know what could be improved. BTW, I tested
the server with Eudora and it worked great, which included teh APOP authen-
tication mechanism!
=== Added File Zope3/lib/python/Zope/Server/POP3/tests/foo_mb.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: foo_mb.py,v 1.1.2.1 2002/04/07 21:39:17 srichter Exp $
"""
mailbox = """
>From srichter@iuveno-net.de Sun Apr 7 11:14:50 2002
Received: from Stephan.iuveno-net.de (bohr.cbu.edu [192.168.160.8])
by cbu.edu (8.11.6+Sun/8.11.6) with ESMTP id g37GEnM20286
for <srichter@cbu.edu>; Sun, 7 Apr 2002 11:14:50 -0500 (CDT)
Message-Id: <5.1.0.14.2.20020407111422.00bc5fd0@imail.iuveno-net.de>
X-Sender: srichter@imail.iuveno-net.de
X-Mailer: QUALCOMM Windows Eudora Version 5.1
Date: Sun, 07 Apr 2002 11:14:41 -0500
To: srichter@cbu.edu
From: Stephan Richter <srichter@iuveno-net.de>
Subject: Test Message 1
Mime-Version: 1.0
Content-Type: text/plain; charset="us-ascii"; format=flowed
Content-Length: 114
Message 1 Body
--
Stephan Richter
>From srichter@iuveno-net.de Sun Apr 7 11:15:29 2002
Received: from Stephan.iuveno-net.de (bohr.cbu.edu [192.168.160.8])
by cbu.edu (8.11.6+Sun/8.11.6) with ESMTP id g37GFSM20303;
Sun, 7 Apr 2002 11:15:28 -0500 (CDT)
Message-Id: <5.1.0.14.2.20020407111444.00bc5b20@imail.iuveno-net.de>
X-Sender: srichter@imail.iuveno-net.de
X-Mailer: QUALCOMM Windows Eudora Version 5.1
Date: Sun, 07 Apr 2002 11:15:20 -0500
To: Stephan Richter <srichter@cbu.edu>
From: Stephan Richter <srichter@iuveno-net.de>
Subject: Test Message 2
Cc: Stephan Richter <srichter@cbu.edu>
Mime-Version: 1.0
Content-Type: text/plain; charset="us-ascii"; format=flowed
Content-Length: 121
One more test message
--
Stephan Richter
>From srichter@iuveno-net.de Sun Apr 7 11:16:00 2002
Received: from Stephan.iuveno-net.de (bohr.cbu.edu [192.168.160.8])
by cbu.edu (8.11.6+Sun/8.11.6) with ESMTP id g37GFxM20326;
Sun, 7 Apr 2002 11:16:00 -0500 (CDT)
Message-Id: <5.1.0.14.2.20020407111522.02331ac0@imail.iuveno-net.de>
X-Sender: srichter@imail.iuveno-net.de
X-Mailer: QUALCOMM Windows Eudora Version 5.1
Date: Sun, 07 Apr 2002 11:15:52 -0500
To: Stephan Richter <srichter@cbu.edu>, Stephan Richter <srichter@cbu.edu>
From: Stephan Richter <srichter@iuveno-net.de>
Subject: Test Message 3
Mime-Version: 1.0
Content-Type: text/plain; charset="us-ascii"; format=flowed
Content-Length: 119
The last one, okay?
--
Stephan Richter
"""
=== Added File Zope3/lib/python/Zope/Server/POP3/tests/testPOP3Message.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: testPOP3Message.py,v 1.1.2.1 2002/04/07 21:39:17 srichter Exp $
"""
import unittest
import rfc822
import md5
import tempfile
from Interface.Verify import verifyClass
from Zope.Server.POP3.IPOP3Message import IPOP3Message
from Zope.Server.POP3.POP3Message import POP3Message
import foo_mb
class Tests(unittest.TestCase):
def setUp(self):
fn = tempfile.mktemp()
open(fn, 'w').write(foo_mb.mailbox[1:663])
msg = rfc822.Message(open(fn, 'r'))
self.msg = POP3Message(msg)
def testIsDeleted(self):
self.assertEqual(self.msg.isDeleted(), 0)
def testSetDeleted(self):
self.assertEqual(self.msg.isDeleted(), 0)
self.msg.setDeleted(1)
self.assertEqual(self.msg.isDeleted(), 1)
def testGetEntireMessage(self):
self.assertEqual(self.msg.getEntireMessage(),
foo_mb.mailbox[1:663])
def testGetBody(self):
self.assertEqual(self.msg.getBody(),
foo_mb.mailbox[626:663])
def testGetSize(self):
self.assertEqual(self.msg.getSize(), 662)
def testGetUID(self):
hash = md5.md5(foo_mb.mailbox[626:663]).hexdigest()
self.assertEqual(self.msg.getUID(), hash)
def testInterface(self):
self.failUnless(IPOP3Message.isImplementedByInstancesOf(POP3Message))
self.failUnless(verifyClass(IPOP3Message, POP3Message))
def test_suite():
loader = unittest.TestLoader()
return loader.loadTestsFromTestCase(Tests)
if __name__=='__main__':
unittest.TextTestRunner().run( test_suite() )
=== Added File Zope3/lib/python/Zope/Server/POP3/tests/testPOP3MessageList.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: testPOP3MessageList.py,v 1.1.2.1 2002/04/07 21:39:17 srichter Exp $
"""
import unittest
import mailbox
import tempfile
import os
from Interface.Verify import verifyClass
from Zope.Server.POP3.IPOP3MessageList import IPOP3MessageList
from Zope.Server.POP3.POP3MessageList import POP3MessageList
from Zope.Server.VFS.UnixFileSystem import UnixFileSystem
import foo_mb
class Tests(unittest.TestCase):
def setUp(self):
dir = tempfile.gettempdir()
open(os.path.join(dir, 'foo'), 'w').write(foo_mb.mailbox)
self.msg_list = POP3MessageList(UnixFileSystem(dir), 'foo')
self.msg_list.open()
def testOpen(self):
self._messagelist = []
self.msg_list.open()
self.assertEqual(len(self.msg_list._messagelist), 3)
def testExists(self):
self.assertEqual(self.msg_list.exists(1), 1)
self.assertEqual(self.msg_list.exists(4), 0)
def testGetMessages(self):
self.assertEqual(len(self.msg_list.getMessages()), 3)
def testGetMessage(self):
msg = self.msg_list._messagelist[0]
self.assertEqual(self.msg_list.getMessage(1), msg)
def testGetTotal(self):
self.assertEqual(self.msg_list.getTotalSize(), 2062)
def testGetIndex(self):
msg = self.msg_list._messagelist[0]
self.assertEqual(self.msg_list.getIndex(msg), 1)
def testInterface(self):
self.failUnless(IPOP3MessageList.isImplementedByInstancesOf(
POP3MessageList))
self.failUnless(verifyClass(IPOP3MessageList, POP3MessageList))
def test_suite():
loader = unittest.TestLoader()
return loader.loadTestsFromTestCase(Tests)
if __name__=='__main__':
unittest.TextTestRunner().run( test_suite() )
=== Added File Zope3/lib/python/Zope/Server/POP3/tests/testPOP3Server.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: testPOP3Server.py,v 1.1.2.1 2002/04/07 21:39:17 srichter Exp $
"""
import unittest
import tempfile
import os
from asyncore import socket_map, poll
from threading import Thread
from Zope.Server.TaskThreads import ThreadedTaskDispatcher
from Zope.Server.POP3.POP3Server import POP3Server
from Zope.Server.POP3.POP3ServerChannel import POP3ServerChannel
from Zope.Server.POP3.POP3StatusMessages import status_msgs
from Zope.Server.Adjustments import Adjustments
from Zope.Server.ITask import ITask
from Zope.Server.VFS.UnixFileSystem import UnixFileSystem
from Zope.Server.Authentication.DictionaryAuthentication import \
DictionaryAuthentication
import poplib
import foo_mb
from time import sleep, time
td = ThreadedTaskDispatcher()
LOCALHOST = '127.0.0.1'
SERVER_PORT = 0 # Set these port numbers to 0 to auto-bind, or
CONNECT_TO_PORT = 0 # use specific numbers to inspect using TCPWatch.
my_adj = Adjustments()
# Reduce overflows to make testing easier.
my_adj.outbuf_overflow = 10000
my_adj.inbuf_overflow = 10000
class Tests(unittest.TestCase):
def setUp(self):
td.setThreadCount(1)
self.orig_map_size = len(socket_map)
root_dir = tempfile.mktemp()
os.mkdir(root_dir)
self.root_dir = UnixFileSystem(root_dir)
file = open(os.path.join(root_dir, 'foo'), 'w')
file.write(foo_mb.mailbox)
file.close()
self.server = POP3Server(LOCALHOST, SERVER_PORT, root_dir,
DictionaryAuthentication({'foo': 'bar'}),
task_dispatcher=td, adj=my_adj)
if CONNECT_TO_PORT == 0:
self.port = self.server.socket.getsockname()[1]
else:
self.port = CONNECT_TO_PORT
self.run_loop = 1
self.counter = 0
self.thread = Thread(target=self.loop)
self.thread.start()
sleep(0.1) # Give the thread some time to start.
def tearDown(self):
self.run_loop = 0
self.thread.join()
td.shutdown()
self.server.close()
# Make sure all sockets get closed by asyncore normally.
# timeout = time() + 5
# while 1:
# if len(socket_map) == self.orig_map_size:
# # Clean!
# break
# if time() >= timeout:
# print 'Leaked a socket: %s' % `socket_map`
# break
# #self.fail('Leaked a socket: %s' % `socket_map`)
# poll(0.1, socket_map)
def loop(self):
while self.run_loop:
self.counter = self.counter + 1
# print 'loop', self.counter
poll(0.1, socket_map)
def getPOP3Connection(self, login=1):
# Refresh the file every single time, since some operations might
# modify it.
file = self.root_dir.open('foo', 'w')
file.write(foo_mb.mailbox)
file.close()
pop = poplib.POP3(LOCALHOST, self.port)
if login:
self.assertEqual(pop.user('foo'),
status_msgs['OK_USER'] %'foo')
self.assertEqual(pop.pass_('bar'),
status_msgs['OK_LOGIN'])
return pop
def testAPOP(self):
pop = self.getPOP3Connection(0)
self.assertEqual(pop.apop('foo', 'bar'),
status_msgs['OK_LOGIN'])
pop.quit()
def testDELE(self):
pop = self.getPOP3Connection(1)
self.assertEqual(pop.dele(1),
status_msgs['OK_DELETE'])
pop.quit()
def testLIST(self):
pop = self.getPOP3Connection(1)
self.assertEqual(pop.list(),
(status_msgs['OK_MSG_LIST'] %(3, 2062),
['1 662', '2 703', '3 697'], 21)
)
pop.quit()
def testNOOP(self):
pop = self.getPOP3Connection(1)
self.assertEqual(pop.noop(),
status_msgs['OK_GENERAL'])
pop.quit()
def testPASS(self):
pop = self.getPOP3Connection(0)
pop.user('foo')
self.assertEqual(pop.pass_('bar'),
status_msgs['OK_LOGIN'])
pop.quit()
pop = self.getPOP3Connection(0)
pop.user('foo')
try:
pop.pass_('blah')
except poplib.error_proto, error:
self.assertEqual(str(error),
status_msgs['ERR_LOGIN_MISMATCH'])
def testQUIT(self):
pop = self.getPOP3Connection(1)
self.assertEqual(pop.quit(),
status_msgs['OK_QUIT'])
def testRETR(self):
pop = self.getPOP3Connection(1)
self.assertEqual(pop.retr(1),
(status_msgs['OK_RETR'] %662, [
'From srichter@iuveno-net.de Sun Apr 7 11:14:50 2002',
'Received: from Stephan.iuveno-net.de (bohr.cbu.edu [192.168.160.8])',
'\tby cbu.edu (8.11.6+Sun/8.11.6) with ESMTP id g37GEnM20286',
'\tfor <srichter@cbu.edu>; Sun, 7 Apr 2002 11:14:50 -0500 (CDT)',
'Message-Id: <5.1.0.14.2.20020407111422.00bc5fd0@imail.iuveno-net.de>',
'X-Sender: srichter@imail.iuveno-net.de',
'X-Mailer: QUALCOMM Windows Eudora Version 5.1',
'Date: Sun, 07 Apr 2002 11:14:41 -0500',
'To: srichter@cbu.edu', 'From: Stephan Richter <srichter@iuveno-net.de>',
'Subject: Test Message 1', 'Mime-Version: 1.0',
'Content-Type: text/plain; charset="us-ascii"; format=flowed',
'Content-Length: 114', '',
'Message 1 Body', '', '--', 'Stephan Richter', '', '', ''], 664))
try:
pop.retr(4)
except poplib.error_proto, error:
self.assertEqual(str(error),
status_msgs['ERR_MSG_UNKNOWN'])
pop.quit()
def testRSET(self):
pop = self.getPOP3Connection(1)
pop.dele(1)
self.assertEqual(len(pop.list()[1]), 2)
pop.rset()
self.assertEqual(len(pop.list()[1]), 3)
pop.quit()
def testSTAT(self):
pop = self.getPOP3Connection(1)
self.assertEqual(pop.stat(),
(2062, 3))
pop.quit()
def testTOP(self):
pop = self.getPOP3Connection(1)
self.assertEqual(pop.top(1, 2),
(status_msgs['OK_TOP'] % 2,
['Message 1 Body', ''], 18))
try:
pop.top(4, 1)
except poplib.error_proto, error:
self.assertEqual(str(error),
status_msgs['ERR_MSG_UNKNOWN'])
pop.quit()
def testUIDL(self):
pop = self.getPOP3Connection(1)
self.assertEqual(pop.uidl(),
(status_msgs['OK_MSG_UIDL'],
['1 f10230af2271c19b56442b4422d30244',
'2 f35cffda322101a0aaf354c1ca4a7840',
'3 059fa436bc14db93c2094c11ba224227'], 108))
self.assertEqual(pop.uidl(1),
status_msgs['OK_SINGLE_UIDL'] %(
1, 'd41d8cd98f00b204e9800998ecf8427e'))
try:
pop.uidl(4)
except poplib.error_proto, error:
self.assertEqual(str(error),
status_msgs['ERR_MSG_UNKNOWN'])
pop.quit()
def testUSER(self):
pop = self.getPOP3Connection(0)
self.assertEqual(pop.user('foo'),
status_msgs['OK_USER'] %'foo')
try:
pop.user('blah')
except poplib.error_proto, error:
self.assertEqual(str(error),
status_msgs['ERR_NOT_USER'])
pop.quit()
def test_suite():
loader = unittest.TestLoader()
return loader.loadTestsFromTestCase(Tests)
if __name__=='__main__':
unittest.TextTestRunner().run( test_suite() )