[Checkins] SVN: zope.sendmail/trunk/src/zope/sendmail/delivery.py
add code to allow several QueueProcessorThreads send messages
from the same
Benji York
benji at zope.com
Thu Oct 18 15:16:59 EDT 2007
Log message for revision 80928:
add code to allow several QueueProcessorThreads send messages from the same
maildir simultaneously; this is helpful if you run several instances of the
same app on a box
Changed:
U zope.sendmail/trunk/src/zope/sendmail/delivery.py
-=-
Modified: zope.sendmail/trunk/src/zope/sendmail/delivery.py
===================================================================
--- zope.sendmail/trunk/src/zope/sendmail/delivery.py 2007-10-18 19:15:26 UTC (rev 80927)
+++ zope.sendmail/trunk/src/zope/sendmail/delivery.py 2007-10-18 19:16:58 UTC (rev 80928)
@@ -19,12 +19,14 @@
"""
__docformat__ = 'restructuredtext'
+import atexit
+import logging
+import os
+import os.path
import rfc822
+import stat
import threading
-import logging
-import atexit
import time
-from os import unlink, getpid
from cStringIO import StringIO
from random import randrange
from time import strftime
@@ -37,6 +39,12 @@
import transaction
+# The longest time sending a file is expected to take. Longer than this and
+# the send attempt will be assumed to have failed. This means that sending
+# very large files or using very slow mail servers could result in duplicate
+# messages sent.
+MAX_SEND_TIME = 60*60*3
+
class MailDataManager(object):
implements(IDataManager)
@@ -86,7 +94,7 @@
"""Generates a new message ID according to RFC 2822 rules"""
randmax = 0x7fffffff
left_part = '%s.%d.%d' % (strftime('%Y%m%d%H%M%S'),
- getpid(),
+ os.getpid(),
randrange(0, randmax))
return "%s@%s" % (left_part, gethostname())
@@ -138,6 +146,59 @@
return MailDataManager(msg.commit, onAbort=msg.abort)
+# The below diagram depicts the operations performed while sending a message in
+# the ``run`` method of ``QueueProcessorThread``. This sequence of operations
+# will be performed for each file in the maildir each time the thread "wakes
+# up" to send messages.
+#
+# Any error conditions not depected on the diagram will provoke the catch-all
+# exception logging of the ``run`` method.
+#
+# In the diagram the "message file" is the file in the maildir's "cur" directory
+# that contains the message and "tmp file" is a hard link to the message file
+# created in the maildir's "tmp" directory.
+#
+# ( start trying to deliver a message )
+# |
+# |
+# V
+# +-----( get tmp file mtime )
+# | |
+# | | file exists
+# | V
+# | ( check age )-----------------------------+
+# tmp file | | file is new |
+# does not | | file is old |
+# exist | | |
+# | ( unlink tmp file )-----------------------+ |
+# | | file does | |
+# | | file unlinked not exist | |
+# | V | |
+# +---->( touch message file )------------------+ | |
+# | file does | | |
+# | not exist | | |
+# V | | |
+# ( link message file to tmp file )----------+ | | |
+# | tmp file | | | |
+# | already exists | | | |
+# | | | | |
+# V V V V V
+# ( send message ) ( skip this message )
+# |
+# V
+# ( unlink message file )---------+
+# | |
+# | file unlinked | file disappeared
+# | |
+# | +-----------------+
+# | |
+# | V
+# ( unlink tmp file )-------------+
+# | |
+# | file unlinked | file disappeared
+# V |
+# ( message delivered )<----------+
+
class QueueProcessorThread(threading.Thread):
"""This thread is started at configuration time from the
`mail:queuedDelivery` directive handler.
@@ -149,7 +210,7 @@
def __init__(self, interval=3.0):
threading.Thread.__init__(self)
- self.interval = interval
+ self.interval = interval
def setMaildir(self, maildir):
"""Set the maildir.
@@ -196,13 +257,100 @@
for filename in self.maildir:
fromaddr = ''
toaddrs = ()
+ head, tail = os.path.split(filename)
+ tmp_filename = os.path.join(head, 'sending-' + tail)
try:
+ # perform a series of operations in an attempt to ensure
+ # that no two threads/processes send this message
+ # simultaneously as well as attempting to not generate
+ # spurious failure messages in the log; a diagram that
+ # represents these operations is included in
+ # send-mail-states.txt
+ try:
+ # find the age of the tmp file (if it exists)
+ age = None
+ mtime = os.stat(tmp_filename)[stat.ST_MTIME]
+ age = time.time() - mtime
+ except OSError, e:
+ if e.errno == 2: # file does not exist
+ # the tmp file could not be stated because it
+ # doesn't exist, that's fine, keep going
+ pass
+ else:
+ # the tmp file could not be stated for some reason
+ # other than not existing; we'll report the error
+ raise
+
+ # if the tmp file exists, check it's age
+ if age is not None:
+ try:
+ if age > MAX_SEND_TIME:
+ # the tmp file is "too old" remove it
+ os.unlink(tmp_filename)
+ else:
+ # the tmp file is "new", so someone else may
+ # be sending this message, try again later
+ continue
+ # if we get here, the file existed, but was too
+ # old, so it was unlinked
+ except OSError, e:
+ if e.errno == 2: # file does not exist
+ # it looks like someone else removed the tmp file,
+ # that's fine, we'll try to deliver the message
+ # again later
+ continue
+
+ # now we know that the tmp file doesn't exist, we need to
+ # "touch" the message before we create the tmp file so the
+ # mtime will reflect the fact that the file is being
+ # processed (there is a race here, but it's OK for two or
+ # more processes to touch the file "simultaneously")
+ try:
+ os.utime(filename, None)
+ except OSError, e:
+ if e.errno == 2: # file does not exist
+ # someone removed the message before we could
+ # touch it, no need to complain, we'll just keep
+ # going
+ continue
+
+ # creating this hard link will fail if another process is
+ # also sending this message
+ try:
+ os.link(filename, tmp_filename)
+ except OSError, e:
+ if e.errno == 17: # file exists
+ # it looks like someone else is sending this
+ # message too; we'll try again later
+ continue
+
+ # read message file and send contents
file = open(filename)
message = file.read()
file.close()
fromaddr, toaddrs, message = self._parseMessage(message)
self.mailer.send(fromaddr, toaddrs, message)
- unlink(filename)
+
+ try:
+ os.unlink(filename)
+ except OSError, e:
+ if e.errno == 2: # file does not exist
+ # someone else unlinked the file; oh well
+ pass
+ else:
+ # something bad happend, log it
+ raise
+
+ try:
+ os.unlink(tmp_filename)
+ except OSError, e:
+ if e.errno == 2: # file does not exist
+ # someone else unlinked the file; oh well
+ pass
+ else:
+ # something bad happend, log it
+ raise
+
# TODO: maybe log the Message-Id of the message sent
self.log.info("Mail from %s to %s sent.",
fromaddr, ", ".join(toaddrs))
More information about the Checkins
mailing list