[Zope3-checkins] CVS: Zope3/src/zope/fssync/tests - mockmetadata.py:1.2.4.1 tempfiles.py:1.3.4.1 test_snarf.py:1.2.4.1 test_fsmerger.py:1.2.2.1 test_fsutil.py:1.2.2.1 test_merger.py:1.9.2.1 test_metadata.py:1.2.4.1 test_network.py:1.3.2.1

Grégoire Weber zope@i-con.ch
Sun, 22 Jun 2003 10:24:15 -0400


Update of /cvs-repository/Zope3/src/zope/fssync/tests
In directory cvs.zope.org:/tmp/cvs-serv24874/src/zope/fssync/tests

Modified Files:
      Tag: cw-mail-branch
	test_fsmerger.py test_fsutil.py test_merger.py 
	test_metadata.py test_network.py 
Added Files:
      Tag: cw-mail-branch
	mockmetadata.py tempfiles.py test_snarf.py 
Log Message:
Synced up with HEAD

=== Added File Zope3/src/zope/fssync/tests/mockmetadata.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Mock Metadata class used for testing.

$Id: mockmetadata.py,v 1.2.4.1 2003/06/22 14:23:42 gregweb Exp $
"""

import os

class MockMetadata(object):

    def __init__(self):
        self.database = {}

    def getentry(self, filename):
        key, filename = self.makekey(filename)
        if key not in self.database:
            self.database[key] = {}
        return self.database[key]

    def getnames(self, dirpath):
        dirkey, dirpath = self.makekey(dirpath)
        names = []
        for key, entry in self.database.iteritems():
            if entry:
                head, tail = os.path.split(key)
                if head == dirkey:
                    names.append(tail)
        return names

    def flush(self):
        pass

    # These only exist for the test framework

    def makekey(self, path):
        path = os.path.realpath(path)
        key = os.path.normcase(path)
        return key, path

    def setmetadata(self, filename, metadata={}):
        key, filename = self.makekey(filename)
        if key not in self.database:
            self.database[key] = {"path": filename}
        self.database[key].update(metadata)

    def delmetadata(self, filename):
        key, filename = self.makekey(filename)
        if key in self.database:
            del self.database[key]

    def dump(self):
        return dict([(k, v) for (k, v) in self.database.iteritems() if v])


=== Added File Zope3/src/zope/fssync/tests/tempfiles.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Handy mixin for test case classes to manipulate temporary files.

$Id: tempfiles.py,v 1.3.4.1 2003/06/22 14:23:42 gregweb Exp $
"""

import os
import shutil
import tempfile
import unittest

class TempFiles(unittest.TestCase):

    def setUp(self):
        """Initialize the list of temporary names."""
        self.tempnames = []

    def tearDown(self):
        """Clean up temporary names."""
        for fn in self.tempnames:
            if os.path.isdir(fn):
                shutil.rmtree(fn)
            elif os.path.isfile(fn):
                os.remove(fn)

    def tempdir(self):
        """Create and register a temporary directory."""
        dir = tempfile.mktemp()
        self.tempnames.append(dir)
        os.mkdir(dir)
        return dir

    def tempfile(self, data=None, mode="w"):
        """Create and register a temporary file."""
        tfn = tempfile.mktemp()
        self.tempnames.append(tfn)
        if data is not None:
            self.writefile(data, tfn, mode)
        return tfn

    def cmpfile(self, fn1, fn2, mode="r"):
        """Compare two files for equality; they must exist."""
        assert mode in ("r", "rb")
        data1 = self.readfile(fn1)
        data2 = self.readfile(fn2)
        return data1 == data2

    def readfile(self, fn, mode="r"):
        """Read data from a given file."""
        assert mode in ("r", "rb")
        f = open(fn, mode)
        try:
            data = f.read()
        finally:
            f.close()
        return data

    def writefile(self, data, fn, mode="w"):
        """Write data to a given file."""
        assert mode in ("w", "wb")
        self.ensuredir(os.path.dirname(fn))
        f = open(fn, mode)
        try:
            f.write(data)
        finally:
            f.close()

    def ensuredir(self, dn):
        """Ensure that a given directory exists."""
        if not os.path.exists(dn):
            os.makedirs(dn)


=== Added File Zope3/src/zope/fssync/tests/test_snarf.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests for the Snarfer and Unsnarfer classes.

$Id: test_snarf.py,v 1.2.4.1 2003/06/22 14:23:42 gregweb Exp $
"""

import os
import unittest

from StringIO import StringIO

from zope.fssync.snarf import copybytes, Snarfer, Unsnarfer
from zope.fssync.tests.tempfiles import TempFiles

class TestCopyBytes(unittest.TestCase):

    def test_copybytes_short(self):
        data = "12345"*25
        istr = StringIO(data)
        ostr = StringIO()
        copybytes(100, istr, ostr)
        self.assertEqual(ostr.getvalue(), data[:100])

    def test_copybytes_long(self):
        data = "12345"*9000
        istr = StringIO(data)
        ostr = StringIO()
        copybytes(len(data), istr, ostr)
        self.assertEqual(ostr.getvalue(), data)

    def test_copybytes_fail_1(self):
        data = "12345"
        istr = StringIO(data)
        ostr = StringIO()
        self.assertRaises(IOError, copybytes, 9000, istr, ostr)

    def test_copybytes_fail_2(self):
        data = "12345"
        istr = StringIO(data)
        ostr = StringIO()
        self.assertRaises(IOError, copybytes, 6, istr, ostr)


class TestSnarfer(TempFiles):

    def setUp(self):
        TempFiles.setUp(self)
        self.ostr = StringIO()
        self.snf = Snarfer(self.ostr)

    def test_addstream(self):
        istr = StringIO("12345")
        self.snf.addstream(istr, 5, "foo")
        self.assertEqual(self.ostr.getvalue(), "5 foo\n12345")

    def test_addfile(self):
        tfn = self.tempfile("12345")
        self.snf.addfile(tfn, "foo")
        self.assertEqual(self.ostr.getvalue(), "5 foo\n12345")

    def test_addtree(self):
        tfn = self.maketree()
        self.snf.addtree(tfn)
        self.assertEqual(self.ostr.getvalue(),
                         "8 d1/f1\n"   "d1f1data"
                         "6 f1\n"      "f1data"
                         "7 f1~\n"     "f1adata"
                         "6 f2\n"      "f2data")

    def test_addtree_prefix(self):
        tfn = self.maketree()
        self.snf.addtree(tfn, "top/")
        self.assertEqual(self.ostr.getvalue(),
                         "8 top/d1/f1\n"   "d1f1data"
                         "6 top/f1\n"      "f1data"
                         "7 top/f1~\n"     "f1adata"
                         "6 top/f2\n"      "f2data")

    def test_addtree_filter(self):
        tfn = self.maketree()
        self.snf.addtree(tfn, filter=lambda x: not x.endswith("~"))
        self.assertEqual(self.ostr.getvalue(),
                         "8 d1/f1\n"   "d1f1data"
                         "6 f1\n"      "f1data"
                         "6 f2\n"      "f2data")

    def test_add_addfile(self):
        tfn = self.tempfile("12345")
        self.snf.add(tfn, "top")
        self.assertEqual(self.ostr.getvalue(),
                         "5 top\n"   "12345")

    def test_add_addtree(self):
        tfn = self.maketree()
        self.snf.add(tfn, "top")
        self.assertEqual(self.ostr.getvalue(),
                         "8 top/d1/f1\n"   "d1f1data"
                         "6 top/f1\n"      "f1data"
                         "7 top/f1~\n"     "f1adata"
                         "6 top/f2\n"      "f2data")

    def maketree(self):
        tfn = self.tempdir()
        f1 = os.path.join(tfn, "f1")
        f1a = os.path.join(tfn, "f1~")
        f2 = os.path.join(tfn, "f2")
        self.writefile("f1data", f1)
        self.writefile("f1adata", f1a)
        self.writefile("f2data", f2)
        d1 = os.path.join(tfn, "d1")
        os.mkdir(d1)
        d1f1 = os.path.join(d1, "f1")
        self.writefile("d1f1data", d1f1)
        return tfn

class TestUnsnarfer(unittest.TestCase):

    def test_translatepath(self):
        snf = Unsnarfer(StringIO(""))
        snf.root = "root"
        self.assertEqual(snf.translatepath("a/b/c"),
                         os.path.join("root", "a", "b", "c"))
        self.assertRaises(IOError, snf.translatepath, "a/./b")
        self.assertRaises(IOError, snf.translatepath, "a/../b")
        self.assertRaises(IOError, snf.translatepath, "a//b")
        self.assertRaises(IOError, snf.translatepath, "/a")
        self.assertRaises(IOError, snf.translatepath, "a/")
        self.assertRaises(IOError, snf.translatepath, "")

    # XXX More to add...

def test_suite():
    s = unittest.TestSuite()
    s.addTest(unittest.makeSuite(TestCopyBytes))
    s.addTest(unittest.makeSuite(TestSnarfer))
    s.addTest(unittest.makeSuite(TestUnsnarfer))
    return s

def test_main():
    unittest.TextTestRunner().run(test_suite())

if __name__=='__main__':
    test_main()


=== Zope3/src/zope/fssync/tests/test_fsmerger.py 1.2 => 1.2.2.1 ===
--- Zope3/src/zope/fssync/tests/test_fsmerger.py:1.2	Thu May 15 07:38:39 2003
+++ Zope3/src/zope/fssync/tests/test_fsmerger.py	Sun Jun 22 10:23:42 2003
@@ -11,100 +11,31 @@
 # FOR A PARTICULAR PURPOSE.
 #
 ##############################################################################
-"""Tests for the FSMerger class.
+"""Tests for the (high-level) FSMerger class.
 
 $Id$
 """
 
 import os
-import shutil
+import sys
 import unittest
-import tempfile
 
 from os.path import exists, isdir, isfile, realpath, normcase, split, join
 
 from zope.fssync.fsmerger import FSMerger
 
-class MockMetadata(object):
+from zope.fssync.tests.mockmetadata import MockMetadata
+from zope.fssync.tests.tempfiles import TempFiles
 
-    def __init__(self):
-        self.database = {}
-
-    def getentry(self, filename):
-        key, filename = self.makekey(filename)
-        if key not in self.database:
-            self.database[key] = {}
-        return self.database[key]
-
-    def getnames(self, dirpath):
-        dirkey, dirpath = self.makekey(dirpath)
-        names = []
-        for key in self.database:
-            head, tail = split(key)
-            if head == dirkey:
-                names.append(tail)
-        return names
-
-    def flush(self):
-        pass
-
-    # These only exist for the test framework
-
-    def makekey(self, path):
-        path = realpath(path)
-        key = normcase(path)
-        return key, path
-
-    def setmetadata(self, filename, metadata={}):
-        key, filename = self.makekey(filename)
-        if key not in self.database:
-            self.database[key] = {"path": filename}
-        self.database[key].update(metadata)
-
-    def delmetadata(self, filename):
-        key, filename = self.makekey(filename)
-        if key in self.database:
-            del self.database[key]
-
-    def dump(self):
-        return dict([(k, v) for (k, v) in self.database.iteritems() if v])
-
-class TestFSMerger(unittest.TestCase):
+class TestFSMerger(TempFiles):
 
     def setUp(self):
+        TempFiles.setUp(self)
         # Create a mock metadata database
         self.metadata = MockMetadata()
-        # Create a list of temporary names to be removed in tearDown
-        self.tempnames = []
         # Create a handy entry
         self.entry = {"path": "/foo"}
 
-    def tearDown(self):
-        # Clean up temporary files and directories
-        for fn in self.tempnames:
-            if isdir(fn):
-                shutil.rmtree(fn)
-            elif isfile(fn):
-                os.remove(fn)
-
-    def addtempdir(self):
-        # Create and register a temporary directory
-        dir = tempfile.mktemp()
-        self.tempnames.append(dir)
-        os.mkdir(dir)
-        return dir
-
-    def addtempfile(self, data):
-        # Create and register a temporary file
-        filename = tempfile.mktemp()
-        self.tempnames.append(filename)
-        f = open(filename, "w")
-        try:
-            f.write(data)
-        finally:
-            f.close()
-        return filename
-
     diff3ok = None
 
     def check_for_diff3(self):
@@ -114,19 +45,18 @@
 
     def diff3_check(self):
         if not hasattr(os, "popen"):
+            sys.stderr.write("\nos.popen() not found, diff3 tests disabled\n")
             return False
-        f1 = self.addtempfile("a")
-        f2 = self.addtempfile("b")
-        f3 = self.addtempfile("b")
+        f1 = self.tempfile("a")
+        f2 = self.tempfile("a")
+        f3 = self.tempfile("b")
         pipe = os.popen("diff3 -m -E %s %s %s" % (f1, f2, f3), "r")
         output = pipe.read()
         sts = pipe.close()
-        return output == "b" and not sts
-
-    def ensuredir(self, dir):
-        # Ensure that a given directory exists
-        if not isdir(dir):
-            os.makedirs(dir)
+        ok = output == "b" and not sts
+        if not ok:
+            sys.stderr.write("\ndiff3 doesn't work, diff3 tests disabled\n")
+        return ok
 
     def addfile(self, dir, path, data, entry=None):
         # Create a file or directory and write some data to it.  If
@@ -136,11 +66,13 @@
         # not None, entries are also synthesized for the directory
         # contents.
         path = join(dir, path)
-        if entry is not None:
+        if entry is not None and not callable(entry):
             self.addentry(path, entry)
         if isinstance(data, dict):
             self.ensuredir(path)
             pentry = self.metadata.getentry(path)
+            if entry is not None and "flag" in entry:
+                pentry["flag"] = entry["flag"]
             for x in data:
                 if entry is not None:
                     newentry = entry.copy()
@@ -154,6 +86,8 @@
                 f.write(data)
             finally:
                 f.close()
+            if callable(entry):
+                self.addentry(path, entry(path))
         return path
 
     def addorigfile(self, dir, path, data):
@@ -277,8 +211,8 @@
         reports = []
         m = FSMerger(self.metadata, reports.append)
 
-        localtopdir = self.addtempdir()
-        remotetopdir = self.addtempdir()
+        localtopdir = self.tempdir()
+        remotetopdir = self.tempdir()
         localdir = join(localtopdir, "local")
         remotedir = join(remotetopdir, "remote")
         os.mkdir(localdir)
@@ -292,6 +226,7 @@
 
         expected_reports = []
         for er in expected_reports_template:
+            er = er.replace("/", os.sep)
             er = er.replace("%l", localfile)
             er = er.replace("%r", remotefile)
             expected_reports.append(er)
@@ -344,17 +279,197 @@
                        self.make_conflict_entry, self.entry)
 
     def make_conflict_entry(self, local):
+        # Helper for test_*_conflict
         e = {"conflict": os.path.getmtime(local)}
         e.update(self.entry)
         return e
 
-    def test_new_directory(self):
-        if not self.check_for_diff3():
-            return
-        self.mergetest("foo", None, None, {"x": "x"}, None, self.entry,
+    # Tests for sticky conflict reporting
+
+    def test_sticky_conflict(self):
+        conflict = "<<<<<<< foo\nl\n=======\nr\n>>>>>>> foo\n"
+        self.mergetest("foo", conflict, "r\n", "r\n",
+                       self.make_conflict_entry, self.entry,
+                       ["C %l"], conflict, "r\n", "r\n",
+                       self.make_conflict_entry, self.entry)
+
+    def test_unstuck_conflict(self):
+        conflict_entry = {"conflict": 12345}
+        conflict_entry.update(self.entry)
+        self.mergetest("foo", "resolved\n", "r\n", "r\n",
+                       conflict_entry, self.entry,
+                       ["M %l"], "resolved\n", "r\n", "r\n",
+                       self.entry, self.entry)
+
+    def test_cleared_conflict(self):
+        conflict_entry = {"conflict": 12345}
+        conflict_entry.update(self.entry)
+        self.mergetest("foo", "r\n", "r\n", "r\n",
+                       conflict_entry, self.entry,
+                       [], "r\n", "r\n", "r\n",
+                       self.entry, self.entry)
+
+    # Tests for added files: local, remote, both
+
+    def test_added_file_local(self):
+        added_entry = {"flag": "added"}
+        added_entry.update(self.entry)
+        self.mergetest("foo", "x", None, None,
+                       added_entry, None,
+                       ["A %l"],
+                       "x", None, None,
+                       added_entry, {})
+
+    def test_added_file_remote(self):
+        self.mergetest("foo", None, None, "x",
+                       None, self.entry,
+                       ["U %l"],
+                       "x", "x", "x",
+                       self.entry, self.entry)
+
+    def test_added_file_both(self):
+        added_entry = {"flag": "added"}
+        added_entry.update(self.entry)
+        self.mergetest("foo", "x", None, "x",
+                       added_entry, self.entry,
+                       ["U %l"],
+                       "x", "x", "x",
+                       self.entry, self.entry)
+
+    # Tests for removed files: local, remote, both
+
+    def test_removed_file_local(self):
+        removed_entry = {"flag": "removed"}
+        removed_entry.update(self.entry)
+        self.mergetest("foo", None, "x", "x",
+                       removed_entry, self.entry,
+                       ["R %l"],
+                       None, "x", "x",
+                       removed_entry, self.entry)
+
+    def test_removed_file_remote(self):
+        self.mergetest("foo", "x", "x", None,
+                       self.entry, {},
+                       ["D %l"],
+                       None, None, None,
+                       {}, {})
+
+    def test_removed_file_both(self):
+        removed_entry = {"flag": "removed"}
+        removed_entry.update(self.entry)
+        self.mergetest("foo", None, "x", None,
+                       removed_entry, {},
+                       ["D %l"],
+                       None, None, None,
+                       {}, {})
+
+    # Tests for added empty directories: local, remote, both
+
+    def test_added_dir_local(self):
+        added_entry = {"flag": "added"}
+        added_entry.update(self.entry)
+        self.mergetest("foo", {}, None, None,
+                       added_entry, None,
+                       ["A %l/"],
+                       {}, None, None,
+                       added_entry, {})
+
+    def test_added__dir_remote(self):
+        self.mergetest("foo", None, None, {},
+                       None, self.entry,
+                       ["N %l/"],
+                       {}, {}, {},
+                       self.entry, self.entry)
+
+    def test_added_dir_both(self):
+        added_entry = {"flag": "added"}
+        added_entry.update(self.entry)
+        self.mergetest("foo", {}, None, {},
+                       added_entry, self.entry,
+                       ["U %l/"],
+                       {}, None, {},
+                       self.entry, self.entry)
+
+    # Tests for added directory trees: local, remote, both
+
+    def test_added_tree_local(self):
+        added_entry = {"flag": "added"}
+        added_entry.update(self.entry)
+        self.mergetest("foo", {"x": "x"}, None, None,
+                       added_entry, None,
+                       ["A %l/", "A %l/x"],
+                       {"x": "x"}, None, None,
+                       added_entry, {})
+
+    def test_added_tree_remote(self):
+        self.mergetest("foo", None, None, {"x": "x"},
+                       None, self.entry,
                        ["N %l/", "U %l/x"],
                        {"x": "x"}, {"x": "x"}, {"x": "x"},
                        self.entry, self.entry)
+
+    def test_added_tree_both(self):
+        added_entry = {"flag": "added"}
+        added_entry.update(self.entry)
+        self.mergetest("foo", {"x": "x"}, None, {"x": "x"},
+                       added_entry, self.entry,
+                       ["U %l/", "U %l/x"],
+                       {"x": "x"}, {"x": "x"}, {"x": "x"},
+                       self.entry, self.entry)
+
+    # Tests for removed empty directories: local, remote, both
+
+    def test_removed_dir_local(self):
+        removed_entry = {"flag": "removed"}
+        removed_entry.update(self.entry)
+        self.mergetest("foo", None, None, {},
+                       removed_entry, self.entry,
+                       ["R %l/"],
+                       None, None, {},
+                       removed_entry, self.entry)
+
+    def test_removed_dir_remote(self):
+        self.mergetest("foo", {}, None, None,
+                       self.entry, {},
+                       ["D %l/"],
+                       None, None, None,
+                       {}, {})
+
+    def test_removed_dir_both(self):
+        removed_entry = {"flag": "removed"}
+        removed_entry.update(self.entry)
+        self.mergetest("foo", None, None, None,
+                       removed_entry, {},
+                       ["D %l"],
+                       None, None, None,
+                       {}, {})
+
+    # Tests for removed non-empty directories: local, remote, both
+
+    def test_removed_tree_local(self):
+        removed_entry = {"flag": "removed"}
+        removed_entry.update(self.entry)
+        self.mergetest("foo", None, None, {"x": "x"},
+                       removed_entry, self.entry,
+                       ["R %l/"],
+                       None, None, {"x": "x"},
+                       removed_entry, self.entry)
+
+    def test_removed_tree_remote(self):
+        self.mergetest("foo", {"x": "x"}, {"x": "x"}, None,
+                       self.entry, {},
+                       ["D %l/x", "D %l/"],
+                       None, None, None,
+                       {}, {})
+
+    def test_removed_tree_both(self):
+        removed_entry = {"flag": "removed"}
+        removed_entry.update(self.entry)
+        self.mergetest("foo", None, None, None,
+                       removed_entry, {},
+                       ["D %l"],
+                       None, None, None,
+                       {}, {})
 
 def test_suite():
     s = unittest.TestSuite()


=== Zope3/src/zope/fssync/tests/test_fsutil.py 1.2 => 1.2.2.1 ===
--- Zope3/src/zope/fssync/tests/test_fsutil.py:1.2	Thu May 15 07:25:40 2003
+++ Zope3/src/zope/fssync/tests/test_fsutil.py	Sun Jun 22 10:23:42 2003
@@ -17,12 +17,12 @@
 """
 
 import os
-import tempfile
 import unittest
 
 from os.path import split, join, exists, isdir, isfile
 
 from zope.fssync import fsutil
+from zope.fssync.tests.tempfiles import TempFiles
 
 def FIX(path):
     # This fixes only relative paths
@@ -31,7 +31,7 @@
     parts = [mapping.get(x, x) for x in parts]
     return os.path.join(*parts)
 
-class TestFSUtil(unittest.TestCase):
+class TestFSUtil(TempFiles):
 
     def test_split(self):
         self.assertEqual(fsutil.split(FIX("foo/bar")), ("foo", "bar"))
@@ -58,30 +58,17 @@
                          FIX("foo/@@Zope/Annotations/bar"))
 
     def test_ensuredir(self):
-        tmpdir = tempfile.mktemp()
-        try:
-            self.assertEqual(exists(tmpdir), False)
-            self.assertEqual(isdir(tmpdir), False)
-            fsutil.ensuredir(tmpdir)
-            self.assertEqual(isdir(tmpdir), True)
-            fsutil.ensuredir(tmpdir)
-            self.assertEqual(isdir(tmpdir), True)
-        finally:
-            if isdir(tmpdir):
-                os.rmdir(tmpdir)
+        tmpdir = self.tempfile(None)
+        self.assertEqual(exists(tmpdir), False)
+        self.assertEqual(isdir(tmpdir), False)
+        fsutil.ensuredir(tmpdir)
+        self.assertEqual(isdir(tmpdir), True)
+        fsutil.ensuredir(tmpdir)
+        self.assertEqual(isdir(tmpdir), True)
 
     def test_ensuredir_error(self):
-        tmpfile = tempfile.mktemp()
-        try:
-            f = open(tmpfile, "w")
-            try:
-                f.write("x\n")
-            finally:
-                f.close()
-            self.assertRaises(OSError, fsutil.ensuredir, tmpfile)
-        finally:
-            if isfile(tmpfile):
-                os.remove(tmpfile)
+        tmpfile = self.tempfile("x\n")
+        self.assertRaises(OSError, fsutil.ensuredir, tmpfile)
 
 def test_suite():
     loader = unittest.TestLoader()


=== Zope3/src/zope/fssync/tests/test_merger.py 1.9 => 1.9.2.1 ===
--- Zope3/src/zope/fssync/tests/test_merger.py:1.9	Thu May 15 08:05:12 2003
+++ Zope3/src/zope/fssync/tests/test_merger.py	Sun Jun 22 10:23:42 2003
@@ -17,85 +17,19 @@
 """
 
 import os
-import shutil
 import unittest
-import tempfile
 
-from os.path import exists, isdir, isfile, realpath, normcase
+from os.path import exists
 
 from zope.fssync.merger import Merger
 
-class MockMetadatabase(object):
-
-    def __init__(self):
-        self.database = {}
-
-    def makekey(self, file):
-        file = realpath(file)
-        key = normcase(file)
-        return key, file
-
-    def getentry(self, file):
-        key, file = self.makekey(file)
-        if key not in self.database:
-            self.database[key] = {}
-        return self.database[key]
-
-    def setmetadata(self, file, metadata={}):
-        key, file = self.makekey(file)
-        if key not in self.database:
-            self.database[key] = {"path": file}
-        self.database[key].update(metadata)
-
-    def delmetadata(self, file):
-        key, file = self.makekey(file)
-        if key in self.database:
-            del self.database[key]
+from zope.fssync.tests.mockmetadata import MockMetadata
+from zope.fssync.tests.tempfiles import TempFiles
 
 added = {"flag": "added"}
 removed = {"flag": "removed"}
 
-class TestMerger(unittest.TestCase):
-
-    def setUp(self):
-        unittest.TestCase.setUp(self)
-        # Create a list of temporary files to clean up at the end
-        self.tempfiles = []
-
-    def tearDown(self):
-        # Clean up temporary files (or directories)
-        for fn in self.tempfiles:
-            if isdir(fn):
-                shutil.rmtree(fn)
-            elif isfile(fn):
-                os.remove(fn)
-        unittest.TestCase.tearDown(self)
-
-    def addfile(self, data, suffix="", mode="w"):
-        # Register a temporary file; write data to it if given
-        file = tempfile.mktemp(suffix)
-        self.tempfiles.append(file)
-        if data is not None:
-            f = open(file, mode)
-            try:
-                f.write(data)
-            finally:
-                f.close()
-        return file
-
-    def cmpfile(self, file1, file2, mode="r"):
-        # Compare two files; they must exist
-        f1 = open(file1, mode)
-        try:
-            data1 = f1.read()
-        finally:
-            f1.close()
-        f2 = open(file2, mode)
-        try:
-            data2 = f2.read()
-        finally:
-            f2.close()
-        return data1 == data2
+class TestMerger(TempFiles):
 
     diff3ok = None
 
@@ -107,9 +41,9 @@
     def diff3_check(self):
         if not hasattr(os, "popen"):
             return False
-        f1 = self.addfile("a")
-        f2 = self.addfile("b")
-        f3 = self.addfile("b")
+        f1 = self.tempfile("a")
+        f2 = self.tempfile("b")
+        f3 = self.tempfile("b")
         pipe = os.popen("diff3 -m -E %s %s %s" % (f1, f2, f3), "r")
         output = pipe.read()
         sts = pipe.close()
@@ -118,10 +52,10 @@
     def runtest(self, localdata, origdata, remotedata,
                 localmetadata, remotemetadata, exp_localdata,
                 exp_action, exp_state, exp_merge_state=None):
-        local = self.addfile(localdata)
-        orig = self.addfile(origdata)
-        remote = self.addfile(remotedata)
-        md = MockMetadatabase()
+        local = self.tempfile(localdata)
+        orig = self.tempfile(origdata)
+        remote = self.tempfile(remotedata)
+        md = MockMetadata()
         if localmetadata is not None:
             md.setmetadata(local, localmetadata)
         if remotemetadata is not None:
@@ -249,6 +183,21 @@
 
     def test_spurious(self):
         self.runtest("a", None, None, None, None, "a", "Nothing", "Spurious")
+
+    def test_conflict_report(self):
+        local = self.tempfile("CONFLICT")
+        orig = self.tempfile("x")
+        remote = self.tempfile("x")
+        md = MockMetadata()
+        m = Merger(md)
+        lentry = md.getentry(local)
+        lentry["path"] = "/foo"
+        lentry["conflict"] = mtime = os.path.getmtime(local)
+        rentry = md.getentry(remote)
+        rentry["path"] = "/foo"
+        action, state = m.classify_files(local, orig, remote)
+        self.assertEqual((action, state), ("Nothing", "Conflict"))
+        self.assertEqual(lentry.get("conflict"), mtime)
 
     # XXX need test cases for anomalies, e.g. files missing or present
     # in spite of metadata, or directories instead of files, etc.


=== Zope3/src/zope/fssync/tests/test_metadata.py 1.2 => 1.2.4.1 ===
--- Zope3/src/zope/fssync/tests/test_metadata.py:1.2	Mon May 12 16:41:23 2003
+++ Zope3/src/zope/fssync/tests/test_metadata.py	Sun Jun 22 10:23:42 2003
@@ -17,42 +17,20 @@
 """
 
 import os
-import shutil
 import unittest
-import tempfile
 
-from os.path import exists, isdir, isfile, split, join, realpath, normcase
+from os.path import exists, dirname, isfile, join
 
 from zope.xmlpickle import loads, dumps
 
 from zope.fssync.metadata import Metadata
+from zope.fssync.tests.tempfiles import TempFiles
 
-class TestMetadata(unittest.TestCase):
-
-    def setUp(self):
-        unittest.TestCase.setUp(self)
-        # Create a list of temporary files to clean up at the end
-        self.tempfiles = []
-
-    def tearDown(self):
-        # Clean up temporary files (or directories)
-        for fn in self.tempfiles:
-            if isdir(fn):
-                shutil.rmtree(fn)
-            elif isfile(fn):
-                os.remove(fn)
-        unittest.TestCase.tearDown(self)
-
-    def adddir(self):
-        # Register and create a temporary directory
-        dir = tempfile.mktemp()
-        self.tempfiles.append(dir)
-        os.mkdir(dir)
-        return dir
+class TestMetadata(TempFiles):
 
     def test_initial_state(self):
         md = Metadata()
-        dir = self.adddir()
+        dir = self.tempdir()
         self.assertEqual(md.getnames(dir), [])
         foo = join(dir, "foo")
         self.assertEqual(md.getentry(foo), {})
@@ -60,7 +38,7 @@
 
     def test_adding(self):
         md = Metadata()
-        dir = self.adddir()
+        dir = self.tempdir()
         foo = join(dir, "foo")
         e = md.getentry(foo)
         e["hello"] = "world"
@@ -71,25 +49,26 @@
 
     def test_deleting(self):
         md, foo = self.test_adding()
-        dir = os.path.dirname(foo)
+        dir = dirname(foo)
         md.getentry(foo).clear()
         self.assertEqual(md.getentry(foo), {})
         self.assertEqual(md.getnames(dir), [])
 
     def test_flush(self):
         md, foo = self.test_adding()
-        dir = os.path.dirname(foo)
+        dir = dirname(foo)
         md.flush()
         efile = join(dir, "@@Zope", "Entries.xml")
         self.assert_(isfile(efile))
-        f = open(efile)
-        data = f.read()
-        f.close()
+        data = self.readfile(efile)
         entries = loads(data)
         self.assertEqual(entries, {"foo": {"hello": "world"}})
         md.getentry(foo).clear()
         md.flush()
-        self.assert_(not exists(efile))
+        self.assert_(isfile(efile))
+        data = self.readfile(efile)
+        entries = loads(data)
+        self.assertEqual(entries, {})
 
 def test_suite():
     loader = unittest.TestLoader()


=== Zope3/src/zope/fssync/tests/test_network.py 1.3 => 1.3.2.1 ===
--- Zope3/src/zope/fssync/tests/test_network.py:1.3	Thu May 15 16:03:05 2003
+++ Zope3/src/zope/fssync/tests/test_network.py	Sun Jun 22 10:23:42 2003
@@ -17,55 +17,82 @@
 """
 
 import os
-import shutil
+import select
+import socket
 import unittest
-import tempfile
+import threading
 
 from StringIO import StringIO
 
 from os.path import isdir, isfile, join
 
 from zope.fssync.fssync import Network, Error
+from zope.fssync.tests.tempfiles import TempFiles
 
 sample_rooturl = "http://user:passwd@host:8080/path"
 
-class TestNetwork(unittest.TestCase):
+HOST = "127.0.0.1"     # localhost
+PORT = 60841           # random number
+RESPONSE = """HTTP/1.0 404 Not found\r
+Content-type: text/plain\r
+Content-length: 0\r
+\r
+"""
+
+class DummyServer(threading.Thread):
+
+    """A server that can handle one HTTP request (returning a 404 error)."""
+
+    def __init__(self, ready):
+        self.ready = ready     # Event signaling we're listening
+        self.stopping = False
+        threading.Thread.__init__(self)
+
+    def run(self):
+        svr = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        svr.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        svr.bind((HOST, PORT))
+        svr.listen(1)
+        self.ready.set()
+        conn = None
+        sent_response = False
+        while not self.stopping:
+            if conn is None:
+                r = [svr]
+            else:
+                r = [conn]
+            r, w, x = select.select(r, [], [], 0.01)
+            if not r:
+                continue
+            s = r[0]
+            if s is svr:
+                conn, addr = svr.accept()
+                ##print "connect from", `addr`
+            else:
+                assert s is conn
+                data = conn.recv(1000)
+                ##print "received", `data`
+                if not data:
+                    break
+                if not sent_response:
+                    conn.send(RESPONSE)
+                    conn.close()
+                    conn = None
+                    sent_response = True
+        if conn is not None:
+            conn.close()
+        svr.close()
+        ##print "stopped"
+        
+    def stop(self):
+        ##print "stopping"
+        self.stopping = True
+
+class TestNetwork(TempFiles):
 
     def setUp(self):
-        unittest.TestCase.setUp(self)
+        TempFiles.setUp(self)
         self.network = Network()
-        # Create a list of temporary files to clean up at the end
-        self.tempfiles = []
-
-    def tearDown(self):
-        # Clean up temporary files (or directories)
-        for fn in self.tempfiles:
-            if isdir(fn):
-                shutil.rmtree(fn)
-            elif isfile(fn):
-                os.remove(fn)
-        unittest.TestCase.tearDown(self)
-
-    def adddir(self):
-        # Create and register a temporary directory
-        dir = tempfile.mktemp()
-        self.tempfiles.append(dir)
-        os.mkdir(dir)
-        return dir
-
-    def cmpfile(self, file1, file2, mode="r"):
-        # Compare two files; they must exist
-        f1 = open(file1, mode)
-        try:
-            data1 = f1.read()
-        finally:
-            f1.close()
-        f2 = open(file2, mode)
-        try:
-            data2 = f2.read()
-        finally:
-            f2.close()
-        return data1 == data2
 
     def test_initial_state(self):
         self.assertEqual(self.network.rooturl, None)
@@ -93,11 +120,11 @@
 
     def test_findrooturl_notfound(self):
         # XXX This test will fail if a file /tmp/@@Zope/Root exists :-(
-        target = self.adddir()
+        target = self.tempdir()
         self.assertEqual(self.network.findrooturl(target), None)
 
     def test_findrooturl_found(self):
-        target = self.adddir()
+        target = self.tempdir()
         zdir = join(target, "@@Zope")
         os.mkdir(zdir)
         rootfile = join(zdir, "Root")
@@ -108,7 +135,7 @@
 
     def test_saverooturl(self):
         self.network.setrooturl(sample_rooturl)
-        target = self.adddir()
+        target = self.tempdir()
         zdir = join(target, "@@Zope")
         os.mkdir(zdir)
         rootfile = join(zdir, "Root")
@@ -119,7 +146,7 @@
         self.assertEqual(data.strip(), sample_rooturl)
 
     def test_loadrooturl(self):
-        target = self.adddir()
+        target = self.tempdir()
         self.assertRaises(Error, self.network.loadrooturl, target)
         zdir = join(target, "@@Zope")
         os.mkdir(zdir)
@@ -130,11 +157,15 @@
         self.assertEqual(new.rooturl, sample_rooturl)
 
     def test_httpreq(self):
-        # XXX I don't want to write up a dummy server just to test
-        # this so I'll just send a request to python.org that I know
-        # will fail.
-        self.network.setrooturl("http://python.org")
-        self.assertRaises(Error, self.network.httpreq, "/xyzzy", "@@view")
+        ready = threading.Event()
+        svr = DummyServer(ready)
+        svr.start()
+        ready.wait()
+        try:
+            self.network.setrooturl("http://%s:%s" % (HOST, PORT))
+            self.assertRaises(Error, self.network.httpreq, "/xyzzy", "@@view")
+        finally:
+            svr.stop()
 
     def test_slurptext_html(self):
         fp = StringIO("<p>This is some\n\ntext.</p>\n")