[Zope3-checkins] CVS: Zope3/src/zope/fssync/tests - mockmetadata.py:1.2.4.1 tempfiles.py:1.3.4.1 test_snarf.py:1.2.4.1 test_fsmerger.py:1.2.2.1 test_fsutil.py:1.2.2.1 test_merger.py:1.9.2.1 test_metadata.py:1.2.4.1 test_network.py:1.3.2.1
Grégoire Weber
zope@i-con.ch
Sun, 22 Jun 2003 10:24:15 -0400
Update of /cvs-repository/Zope3/src/zope/fssync/tests
In directory cvs.zope.org:/tmp/cvs-serv24874/src/zope/fssync/tests
Modified Files:
Tag: cw-mail-branch
test_fsmerger.py test_fsutil.py test_merger.py
test_metadata.py test_network.py
Added Files:
Tag: cw-mail-branch
mockmetadata.py tempfiles.py test_snarf.py
Log Message:
Synced up with HEAD
=== Added File Zope3/src/zope/fssync/tests/mockmetadata.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Mock Metadata class used for testing.
$Id: mockmetadata.py,v 1.2.4.1 2003/06/22 14:23:42 gregweb Exp $
"""
import os
class MockMetadata(object):
def __init__(self):
self.database = {}
def getentry(self, filename):
key, filename = self.makekey(filename)
if key not in self.database:
self.database[key] = {}
return self.database[key]
def getnames(self, dirpath):
dirkey, dirpath = self.makekey(dirpath)
names = []
for key, entry in self.database.iteritems():
if entry:
head, tail = os.path.split(key)
if head == dirkey:
names.append(tail)
return names
def flush(self):
pass
# These only exist for the test framework
def makekey(self, path):
path = os.path.realpath(path)
key = os.path.normcase(path)
return key, path
def setmetadata(self, filename, metadata={}):
key, filename = self.makekey(filename)
if key not in self.database:
self.database[key] = {"path": filename}
self.database[key].update(metadata)
def delmetadata(self, filename):
key, filename = self.makekey(filename)
if key in self.database:
del self.database[key]
def dump(self):
return dict([(k, v) for (k, v) in self.database.iteritems() if v])
=== Added File Zope3/src/zope/fssync/tests/tempfiles.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Handy mixin for test case classes to manipulate temporary files.
$Id: tempfiles.py,v 1.3.4.1 2003/06/22 14:23:42 gregweb Exp $
"""
import os
import shutil
import tempfile
import unittest
class TempFiles(unittest.TestCase):
def setUp(self):
"""Initialize the list of temporary names."""
self.tempnames = []
def tearDown(self):
"""Clean up temporary names."""
for fn in self.tempnames:
if os.path.isdir(fn):
shutil.rmtree(fn)
elif os.path.isfile(fn):
os.remove(fn)
def tempdir(self):
"""Create and register a temporary directory."""
dir = tempfile.mktemp()
self.tempnames.append(dir)
os.mkdir(dir)
return dir
def tempfile(self, data=None, mode="w"):
"""Create and register a temporary file."""
tfn = tempfile.mktemp()
self.tempnames.append(tfn)
if data is not None:
self.writefile(data, tfn, mode)
return tfn
def cmpfile(self, fn1, fn2, mode="r"):
"""Compare two files for equality; they must exist."""
assert mode in ("r", "rb")
data1 = self.readfile(fn1)
data2 = self.readfile(fn2)
return data1 == data2
def readfile(self, fn, mode="r"):
"""Read data from a given file."""
assert mode in ("r", "rb")
f = open(fn, mode)
try:
data = f.read()
finally:
f.close()
return data
def writefile(self, data, fn, mode="w"):
"""Write data to a given file."""
assert mode in ("w", "wb")
self.ensuredir(os.path.dirname(fn))
f = open(fn, mode)
try:
f.write(data)
finally:
f.close()
def ensuredir(self, dn):
"""Ensure that a given directory exists."""
if not os.path.exists(dn):
os.makedirs(dn)
=== Added File Zope3/src/zope/fssync/tests/test_snarf.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests for the Snarfer and Unsnarfer classes.
$Id: test_snarf.py,v 1.2.4.1 2003/06/22 14:23:42 gregweb Exp $
"""
import os
import unittest
from StringIO import StringIO
from zope.fssync.snarf import copybytes, Snarfer, Unsnarfer
from zope.fssync.tests.tempfiles import TempFiles
class TestCopyBytes(unittest.TestCase):
def test_copybytes_short(self):
data = "12345"*25
istr = StringIO(data)
ostr = StringIO()
copybytes(100, istr, ostr)
self.assertEqual(ostr.getvalue(), data[:100])
def test_copybytes_long(self):
data = "12345"*9000
istr = StringIO(data)
ostr = StringIO()
copybytes(len(data), istr, ostr)
self.assertEqual(ostr.getvalue(), data)
def test_copybytes_fail_1(self):
data = "12345"
istr = StringIO(data)
ostr = StringIO()
self.assertRaises(IOError, copybytes, 9000, istr, ostr)
def test_copybytes_fail_2(self):
data = "12345"
istr = StringIO(data)
ostr = StringIO()
self.assertRaises(IOError, copybytes, 6, istr, ostr)
class TestSnarfer(TempFiles):
def setUp(self):
TempFiles.setUp(self)
self.ostr = StringIO()
self.snf = Snarfer(self.ostr)
def test_addstream(self):
istr = StringIO("12345")
self.snf.addstream(istr, 5, "foo")
self.assertEqual(self.ostr.getvalue(), "5 foo\n12345")
def test_addfile(self):
tfn = self.tempfile("12345")
self.snf.addfile(tfn, "foo")
self.assertEqual(self.ostr.getvalue(), "5 foo\n12345")
def test_addtree(self):
tfn = self.maketree()
self.snf.addtree(tfn)
self.assertEqual(self.ostr.getvalue(),
"8 d1/f1\n" "d1f1data"
"6 f1\n" "f1data"
"7 f1~\n" "f1adata"
"6 f2\n" "f2data")
def test_addtree_prefix(self):
tfn = self.maketree()
self.snf.addtree(tfn, "top/")
self.assertEqual(self.ostr.getvalue(),
"8 top/d1/f1\n" "d1f1data"
"6 top/f1\n" "f1data"
"7 top/f1~\n" "f1adata"
"6 top/f2\n" "f2data")
def test_addtree_filter(self):
tfn = self.maketree()
self.snf.addtree(tfn, filter=lambda x: not x.endswith("~"))
self.assertEqual(self.ostr.getvalue(),
"8 d1/f1\n" "d1f1data"
"6 f1\n" "f1data"
"6 f2\n" "f2data")
def test_add_addfile(self):
tfn = self.tempfile("12345")
self.snf.add(tfn, "top")
self.assertEqual(self.ostr.getvalue(),
"5 top\n" "12345")
def test_add_addtree(self):
tfn = self.maketree()
self.snf.add(tfn, "top")
self.assertEqual(self.ostr.getvalue(),
"8 top/d1/f1\n" "d1f1data"
"6 top/f1\n" "f1data"
"7 top/f1~\n" "f1adata"
"6 top/f2\n" "f2data")
def maketree(self):
tfn = self.tempdir()
f1 = os.path.join(tfn, "f1")
f1a = os.path.join(tfn, "f1~")
f2 = os.path.join(tfn, "f2")
self.writefile("f1data", f1)
self.writefile("f1adata", f1a)
self.writefile("f2data", f2)
d1 = os.path.join(tfn, "d1")
os.mkdir(d1)
d1f1 = os.path.join(d1, "f1")
self.writefile("d1f1data", d1f1)
return tfn
class TestUnsnarfer(unittest.TestCase):
def test_translatepath(self):
snf = Unsnarfer(StringIO(""))
snf.root = "root"
self.assertEqual(snf.translatepath("a/b/c"),
os.path.join("root", "a", "b", "c"))
self.assertRaises(IOError, snf.translatepath, "a/./b")
self.assertRaises(IOError, snf.translatepath, "a/../b")
self.assertRaises(IOError, snf.translatepath, "a//b")
self.assertRaises(IOError, snf.translatepath, "/a")
self.assertRaises(IOError, snf.translatepath, "a/")
self.assertRaises(IOError, snf.translatepath, "")
# XXX More to add...
def test_suite():
s = unittest.TestSuite()
s.addTest(unittest.makeSuite(TestCopyBytes))
s.addTest(unittest.makeSuite(TestSnarfer))
s.addTest(unittest.makeSuite(TestUnsnarfer))
return s
def test_main():
unittest.TextTestRunner().run(test_suite())
if __name__=='__main__':
test_main()
=== Zope3/src/zope/fssync/tests/test_fsmerger.py 1.2 => 1.2.2.1 ===
--- Zope3/src/zope/fssync/tests/test_fsmerger.py:1.2 Thu May 15 07:38:39 2003
+++ Zope3/src/zope/fssync/tests/test_fsmerger.py Sun Jun 22 10:23:42 2003
@@ -11,100 +11,31 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-"""Tests for the FSMerger class.
+"""Tests for the (high-level) FSMerger class.
$Id$
"""
import os
-import shutil
+import sys
import unittest
-import tempfile
from os.path import exists, isdir, isfile, realpath, normcase, split, join
from zope.fssync.fsmerger import FSMerger
-class MockMetadata(object):
+from zope.fssync.tests.mockmetadata import MockMetadata
+from zope.fssync.tests.tempfiles import TempFiles
- def __init__(self):
- self.database = {}
-
- def getentry(self, filename):
- key, filename = self.makekey(filename)
- if key not in self.database:
- self.database[key] = {}
- return self.database[key]
-
- def getnames(self, dirpath):
- dirkey, dirpath = self.makekey(dirpath)
- names = []
- for key in self.database:
- head, tail = split(key)
- if head == dirkey:
- names.append(tail)
- return names
-
- def flush(self):
- pass
-
- # These only exist for the test framework
-
- def makekey(self, path):
- path = realpath(path)
- key = normcase(path)
- return key, path
-
- def setmetadata(self, filename, metadata={}):
- key, filename = self.makekey(filename)
- if key not in self.database:
- self.database[key] = {"path": filename}
- self.database[key].update(metadata)
-
- def delmetadata(self, filename):
- key, filename = self.makekey(filename)
- if key in self.database:
- del self.database[key]
-
- def dump(self):
- return dict([(k, v) for (k, v) in self.database.iteritems() if v])
-
-class TestFSMerger(unittest.TestCase):
+class TestFSMerger(TempFiles):
def setUp(self):
+ TempFiles.setUp(self)
# Create a mock metadata database
self.metadata = MockMetadata()
- # Create a list of temporary names to be removed in tearDown
- self.tempnames = []
# Create a handy entry
self.entry = {"path": "/foo"}
- def tearDown(self):
- # Clean up temporary files and directories
- for fn in self.tempnames:
- if isdir(fn):
- shutil.rmtree(fn)
- elif isfile(fn):
- os.remove(fn)
-
- def addtempdir(self):
- # Create and register a temporary directory
- dir = tempfile.mktemp()
- self.tempnames.append(dir)
- os.mkdir(dir)
- return dir
-
- def addtempfile(self, data):
- # Create and register a temporary file
- filename = tempfile.mktemp()
- self.tempnames.append(filename)
- f = open(filename, "w")
- try:
- f.write(data)
- finally:
- f.close()
- return filename
-
diff3ok = None
def check_for_diff3(self):
@@ -114,19 +45,18 @@
def diff3_check(self):
if not hasattr(os, "popen"):
+ sys.stderr.write("\nos.popen() not found, diff3 tests disabled\n")
return False
- f1 = self.addtempfile("a")
- f2 = self.addtempfile("b")
- f3 = self.addtempfile("b")
+ f1 = self.tempfile("a")
+ f2 = self.tempfile("a")
+ f3 = self.tempfile("b")
pipe = os.popen("diff3 -m -E %s %s %s" % (f1, f2, f3), "r")
output = pipe.read()
sts = pipe.close()
- return output == "b" and not sts
-
- def ensuredir(self, dir):
- # Ensure that a given directory exists
- if not isdir(dir):
- os.makedirs(dir)
+ ok = output == "b" and not sts
+ if not ok:
+ sys.stderr.write("\ndiff3 doesn't work, diff3 tests disabled\n")
+ return ok
def addfile(self, dir, path, data, entry=None):
# Create a file or directory and write some data to it. If
@@ -136,11 +66,13 @@
# not None, entries are also synthesized for the directory
# contents.
path = join(dir, path)
- if entry is not None:
+ if entry is not None and not callable(entry):
self.addentry(path, entry)
if isinstance(data, dict):
self.ensuredir(path)
pentry = self.metadata.getentry(path)
+ if entry is not None and "flag" in entry:
+ pentry["flag"] = entry["flag"]
for x in data:
if entry is not None:
newentry = entry.copy()
@@ -154,6 +86,8 @@
f.write(data)
finally:
f.close()
+ if callable(entry):
+ self.addentry(path, entry(path))
return path
def addorigfile(self, dir, path, data):
@@ -277,8 +211,8 @@
reports = []
m = FSMerger(self.metadata, reports.append)
- localtopdir = self.addtempdir()
- remotetopdir = self.addtempdir()
+ localtopdir = self.tempdir()
+ remotetopdir = self.tempdir()
localdir = join(localtopdir, "local")
remotedir = join(remotetopdir, "remote")
os.mkdir(localdir)
@@ -292,6 +226,7 @@
expected_reports = []
for er in expected_reports_template:
+ er = er.replace("/", os.sep)
er = er.replace("%l", localfile)
er = er.replace("%r", remotefile)
expected_reports.append(er)
@@ -344,17 +279,197 @@
self.make_conflict_entry, self.entry)
def make_conflict_entry(self, local):
+ # Helper for test_*_conflict
e = {"conflict": os.path.getmtime(local)}
e.update(self.entry)
return e
- def test_new_directory(self):
- if not self.check_for_diff3():
- return
- self.mergetest("foo", None, None, {"x": "x"}, None, self.entry,
+ # Tests for sticky conflict reporting
+
+ def test_sticky_conflict(self):
+ conflict = "<<<<<<< foo\nl\n=======\nr\n>>>>>>> foo\n"
+ self.mergetest("foo", conflict, "r\n", "r\n",
+ self.make_conflict_entry, self.entry,
+ ["C %l"], conflict, "r\n", "r\n",
+ self.make_conflict_entry, self.entry)
+
+ def test_unstuck_conflict(self):
+ conflict_entry = {"conflict": 12345}
+ conflict_entry.update(self.entry)
+ self.mergetest("foo", "resolved\n", "r\n", "r\n",
+ conflict_entry, self.entry,
+ ["M %l"], "resolved\n", "r\n", "r\n",
+ self.entry, self.entry)
+
+ def test_cleared_conflict(self):
+ conflict_entry = {"conflict": 12345}
+ conflict_entry.update(self.entry)
+ self.mergetest("foo", "r\n", "r\n", "r\n",
+ conflict_entry, self.entry,
+ [], "r\n", "r\n", "r\n",
+ self.entry, self.entry)
+
+ # Tests for added files: local, remote, both
+
+ def test_added_file_local(self):
+ added_entry = {"flag": "added"}
+ added_entry.update(self.entry)
+ self.mergetest("foo", "x", None, None,
+ added_entry, None,
+ ["A %l"],
+ "x", None, None,
+ added_entry, {})
+
+ def test_added_file_remote(self):
+ self.mergetest("foo", None, None, "x",
+ None, self.entry,
+ ["U %l"],
+ "x", "x", "x",
+ self.entry, self.entry)
+
+ def test_added_file_both(self):
+ added_entry = {"flag": "added"}
+ added_entry.update(self.entry)
+ self.mergetest("foo", "x", None, "x",
+ added_entry, self.entry,
+ ["U %l"],
+ "x", "x", "x",
+ self.entry, self.entry)
+
+ # Tests for removed files: local, remote, both
+
+ def test_removed_file_local(self):
+ removed_entry = {"flag": "removed"}
+ removed_entry.update(self.entry)
+ self.mergetest("foo", None, "x", "x",
+ removed_entry, self.entry,
+ ["R %l"],
+ None, "x", "x",
+ removed_entry, self.entry)
+
+ def test_removed_file_remote(self):
+ self.mergetest("foo", "x", "x", None,
+ self.entry, {},
+ ["D %l"],
+ None, None, None,
+ {}, {})
+
+ def test_removed_file_both(self):
+ removed_entry = {"flag": "removed"}
+ removed_entry.update(self.entry)
+ self.mergetest("foo", None, "x", None,
+ removed_entry, {},
+ ["D %l"],
+ None, None, None,
+ {}, {})
+
+ # Tests for added empty directories: local, remote, both
+
+ def test_added_dir_local(self):
+ added_entry = {"flag": "added"}
+ added_entry.update(self.entry)
+ self.mergetest("foo", {}, None, None,
+ added_entry, None,
+ ["A %l/"],
+ {}, None, None,
+ added_entry, {})
+
+ def test_added__dir_remote(self):
+ self.mergetest("foo", None, None, {},
+ None, self.entry,
+ ["N %l/"],
+ {}, {}, {},
+ self.entry, self.entry)
+
+ def test_added_dir_both(self):
+ added_entry = {"flag": "added"}
+ added_entry.update(self.entry)
+ self.mergetest("foo", {}, None, {},
+ added_entry, self.entry,
+ ["U %l/"],
+ {}, None, {},
+ self.entry, self.entry)
+
+ # Tests for added directory trees: local, remote, both
+
+ def test_added_tree_local(self):
+ added_entry = {"flag": "added"}
+ added_entry.update(self.entry)
+ self.mergetest("foo", {"x": "x"}, None, None,
+ added_entry, None,
+ ["A %l/", "A %l/x"],
+ {"x": "x"}, None, None,
+ added_entry, {})
+
+ def test_added_tree_remote(self):
+ self.mergetest("foo", None, None, {"x": "x"},
+ None, self.entry,
["N %l/", "U %l/x"],
{"x": "x"}, {"x": "x"}, {"x": "x"},
self.entry, self.entry)
+
+ def test_added_tree_both(self):
+ added_entry = {"flag": "added"}
+ added_entry.update(self.entry)
+ self.mergetest("foo", {"x": "x"}, None, {"x": "x"},
+ added_entry, self.entry,
+ ["U %l/", "U %l/x"],
+ {"x": "x"}, {"x": "x"}, {"x": "x"},
+ self.entry, self.entry)
+
+ # Tests for removed empty directories: local, remote, both
+
+ def test_removed_dir_local(self):
+ removed_entry = {"flag": "removed"}
+ removed_entry.update(self.entry)
+ self.mergetest("foo", None, None, {},
+ removed_entry, self.entry,
+ ["R %l/"],
+ None, None, {},
+ removed_entry, self.entry)
+
+ def test_removed_dir_remote(self):
+ self.mergetest("foo", {}, None, None,
+ self.entry, {},
+ ["D %l/"],
+ None, None, None,
+ {}, {})
+
+ def test_removed_dir_both(self):
+ removed_entry = {"flag": "removed"}
+ removed_entry.update(self.entry)
+ self.mergetest("foo", None, None, None,
+ removed_entry, {},
+ ["D %l"],
+ None, None, None,
+ {}, {})
+
+ # Tests for removed non-empty directories: local, remote, both
+
+ def test_removed_tree_local(self):
+ removed_entry = {"flag": "removed"}
+ removed_entry.update(self.entry)
+ self.mergetest("foo", None, None, {"x": "x"},
+ removed_entry, self.entry,
+ ["R %l/"],
+ None, None, {"x": "x"},
+ removed_entry, self.entry)
+
+ def test_removed_tree_remote(self):
+ self.mergetest("foo", {"x": "x"}, {"x": "x"}, None,
+ self.entry, {},
+ ["D %l/x", "D %l/"],
+ None, None, None,
+ {}, {})
+
+ def test_removed_tree_both(self):
+ removed_entry = {"flag": "removed"}
+ removed_entry.update(self.entry)
+ self.mergetest("foo", None, None, None,
+ removed_entry, {},
+ ["D %l"],
+ None, None, None,
+ {}, {})
def test_suite():
s = unittest.TestSuite()
=== Zope3/src/zope/fssync/tests/test_fsutil.py 1.2 => 1.2.2.1 ===
--- Zope3/src/zope/fssync/tests/test_fsutil.py:1.2 Thu May 15 07:25:40 2003
+++ Zope3/src/zope/fssync/tests/test_fsutil.py Sun Jun 22 10:23:42 2003
@@ -17,12 +17,12 @@
"""
import os
-import tempfile
import unittest
from os.path import split, join, exists, isdir, isfile
from zope.fssync import fsutil
+from zope.fssync.tests.tempfiles import TempFiles
def FIX(path):
# This fixes only relative paths
@@ -31,7 +31,7 @@
parts = [mapping.get(x, x) for x in parts]
return os.path.join(*parts)
-class TestFSUtil(unittest.TestCase):
+class TestFSUtil(TempFiles):
def test_split(self):
self.assertEqual(fsutil.split(FIX("foo/bar")), ("foo", "bar"))
@@ -58,30 +58,17 @@
FIX("foo/@@Zope/Annotations/bar"))
def test_ensuredir(self):
- tmpdir = tempfile.mktemp()
- try:
- self.assertEqual(exists(tmpdir), False)
- self.assertEqual(isdir(tmpdir), False)
- fsutil.ensuredir(tmpdir)
- self.assertEqual(isdir(tmpdir), True)
- fsutil.ensuredir(tmpdir)
- self.assertEqual(isdir(tmpdir), True)
- finally:
- if isdir(tmpdir):
- os.rmdir(tmpdir)
+ tmpdir = self.tempfile(None)
+ self.assertEqual(exists(tmpdir), False)
+ self.assertEqual(isdir(tmpdir), False)
+ fsutil.ensuredir(tmpdir)
+ self.assertEqual(isdir(tmpdir), True)
+ fsutil.ensuredir(tmpdir)
+ self.assertEqual(isdir(tmpdir), True)
def test_ensuredir_error(self):
- tmpfile = tempfile.mktemp()
- try:
- f = open(tmpfile, "w")
- try:
- f.write("x\n")
- finally:
- f.close()
- self.assertRaises(OSError, fsutil.ensuredir, tmpfile)
- finally:
- if isfile(tmpfile):
- os.remove(tmpfile)
+ tmpfile = self.tempfile("x\n")
+ self.assertRaises(OSError, fsutil.ensuredir, tmpfile)
def test_suite():
loader = unittest.TestLoader()
=== Zope3/src/zope/fssync/tests/test_merger.py 1.9 => 1.9.2.1 ===
--- Zope3/src/zope/fssync/tests/test_merger.py:1.9 Thu May 15 08:05:12 2003
+++ Zope3/src/zope/fssync/tests/test_merger.py Sun Jun 22 10:23:42 2003
@@ -17,85 +17,19 @@
"""
import os
-import shutil
import unittest
-import tempfile
-from os.path import exists, isdir, isfile, realpath, normcase
+from os.path import exists
from zope.fssync.merger import Merger
-class MockMetadatabase(object):
-
- def __init__(self):
- self.database = {}
-
- def makekey(self, file):
- file = realpath(file)
- key = normcase(file)
- return key, file
-
- def getentry(self, file):
- key, file = self.makekey(file)
- if key not in self.database:
- self.database[key] = {}
- return self.database[key]
-
- def setmetadata(self, file, metadata={}):
- key, file = self.makekey(file)
- if key not in self.database:
- self.database[key] = {"path": file}
- self.database[key].update(metadata)
-
- def delmetadata(self, file):
- key, file = self.makekey(file)
- if key in self.database:
- del self.database[key]
+from zope.fssync.tests.mockmetadata import MockMetadata
+from zope.fssync.tests.tempfiles import TempFiles
added = {"flag": "added"}
removed = {"flag": "removed"}
-class TestMerger(unittest.TestCase):
-
- def setUp(self):
- unittest.TestCase.setUp(self)
- # Create a list of temporary files to clean up at the end
- self.tempfiles = []
-
- def tearDown(self):
- # Clean up temporary files (or directories)
- for fn in self.tempfiles:
- if isdir(fn):
- shutil.rmtree(fn)
- elif isfile(fn):
- os.remove(fn)
- unittest.TestCase.tearDown(self)
-
- def addfile(self, data, suffix="", mode="w"):
- # Register a temporary file; write data to it if given
- file = tempfile.mktemp(suffix)
- self.tempfiles.append(file)
- if data is not None:
- f = open(file, mode)
- try:
- f.write(data)
- finally:
- f.close()
- return file
-
- def cmpfile(self, file1, file2, mode="r"):
- # Compare two files; they must exist
- f1 = open(file1, mode)
- try:
- data1 = f1.read()
- finally:
- f1.close()
- f2 = open(file2, mode)
- try:
- data2 = f2.read()
- finally:
- f2.close()
- return data1 == data2
+class TestMerger(TempFiles):
diff3ok = None
@@ -107,9 +41,9 @@
def diff3_check(self):
if not hasattr(os, "popen"):
return False
- f1 = self.addfile("a")
- f2 = self.addfile("b")
- f3 = self.addfile("b")
+ f1 = self.tempfile("a")
+ f2 = self.tempfile("b")
+ f3 = self.tempfile("b")
pipe = os.popen("diff3 -m -E %s %s %s" % (f1, f2, f3), "r")
output = pipe.read()
sts = pipe.close()
@@ -118,10 +52,10 @@
def runtest(self, localdata, origdata, remotedata,
localmetadata, remotemetadata, exp_localdata,
exp_action, exp_state, exp_merge_state=None):
- local = self.addfile(localdata)
- orig = self.addfile(origdata)
- remote = self.addfile(remotedata)
- md = MockMetadatabase()
+ local = self.tempfile(localdata)
+ orig = self.tempfile(origdata)
+ remote = self.tempfile(remotedata)
+ md = MockMetadata()
if localmetadata is not None:
md.setmetadata(local, localmetadata)
if remotemetadata is not None:
@@ -249,6 +183,21 @@
def test_spurious(self):
self.runtest("a", None, None, None, None, "a", "Nothing", "Spurious")
+
+ def test_conflict_report(self):
+ local = self.tempfile("CONFLICT")
+ orig = self.tempfile("x")
+ remote = self.tempfile("x")
+ md = MockMetadata()
+ m = Merger(md)
+ lentry = md.getentry(local)
+ lentry["path"] = "/foo"
+ lentry["conflict"] = mtime = os.path.getmtime(local)
+ rentry = md.getentry(remote)
+ rentry["path"] = "/foo"
+ action, state = m.classify_files(local, orig, remote)
+ self.assertEqual((action, state), ("Nothing", "Conflict"))
+ self.assertEqual(lentry.get("conflict"), mtime)
# XXX need test cases for anomalies, e.g. files missing or present
# in spite of metadata, or directories instead of files, etc.
=== Zope3/src/zope/fssync/tests/test_metadata.py 1.2 => 1.2.4.1 ===
--- Zope3/src/zope/fssync/tests/test_metadata.py:1.2 Mon May 12 16:41:23 2003
+++ Zope3/src/zope/fssync/tests/test_metadata.py Sun Jun 22 10:23:42 2003
@@ -17,42 +17,20 @@
"""
import os
-import shutil
import unittest
-import tempfile
-from os.path import exists, isdir, isfile, split, join, realpath, normcase
+from os.path import exists, dirname, isfile, join
from zope.xmlpickle import loads, dumps
from zope.fssync.metadata import Metadata
+from zope.fssync.tests.tempfiles import TempFiles
-class TestMetadata(unittest.TestCase):
-
- def setUp(self):
- unittest.TestCase.setUp(self)
- # Create a list of temporary files to clean up at the end
- self.tempfiles = []
-
- def tearDown(self):
- # Clean up temporary files (or directories)
- for fn in self.tempfiles:
- if isdir(fn):
- shutil.rmtree(fn)
- elif isfile(fn):
- os.remove(fn)
- unittest.TestCase.tearDown(self)
-
- def adddir(self):
- # Register and create a temporary directory
- dir = tempfile.mktemp()
- self.tempfiles.append(dir)
- os.mkdir(dir)
- return dir
+class TestMetadata(TempFiles):
def test_initial_state(self):
md = Metadata()
- dir = self.adddir()
+ dir = self.tempdir()
self.assertEqual(md.getnames(dir), [])
foo = join(dir, "foo")
self.assertEqual(md.getentry(foo), {})
@@ -60,7 +38,7 @@
def test_adding(self):
md = Metadata()
- dir = self.adddir()
+ dir = self.tempdir()
foo = join(dir, "foo")
e = md.getentry(foo)
e["hello"] = "world"
@@ -71,25 +49,26 @@
def test_deleting(self):
md, foo = self.test_adding()
- dir = os.path.dirname(foo)
+ dir = dirname(foo)
md.getentry(foo).clear()
self.assertEqual(md.getentry(foo), {})
self.assertEqual(md.getnames(dir), [])
def test_flush(self):
md, foo = self.test_adding()
- dir = os.path.dirname(foo)
+ dir = dirname(foo)
md.flush()
efile = join(dir, "@@Zope", "Entries.xml")
self.assert_(isfile(efile))
- f = open(efile)
- data = f.read()
- f.close()
+ data = self.readfile(efile)
entries = loads(data)
self.assertEqual(entries, {"foo": {"hello": "world"}})
md.getentry(foo).clear()
md.flush()
- self.assert_(not exists(efile))
+ self.assert_(isfile(efile))
+ data = self.readfile(efile)
+ entries = loads(data)
+ self.assertEqual(entries, {})
def test_suite():
loader = unittest.TestLoader()
=== Zope3/src/zope/fssync/tests/test_network.py 1.3 => 1.3.2.1 ===
--- Zope3/src/zope/fssync/tests/test_network.py:1.3 Thu May 15 16:03:05 2003
+++ Zope3/src/zope/fssync/tests/test_network.py Sun Jun 22 10:23:42 2003
@@ -17,55 +17,82 @@
"""
import os
-import shutil
+import select
+import socket
import unittest
-import tempfile
+import threading
from StringIO import StringIO
from os.path import isdir, isfile, join
from zope.fssync.fssync import Network, Error
+from zope.fssync.tests.tempfiles import TempFiles
sample_rooturl = "http://user:passwd@host:8080/path"
-class TestNetwork(unittest.TestCase):
+HOST = "127.0.0.1" # localhost
+PORT = 60841 # random number
+RESPONSE = """HTTP/1.0 404 Not found\r
+Content-type: text/plain\r
+Content-length: 0\r
+\r
+"""
+
+class DummyServer(threading.Thread):
+
+ """A server that can handle one HTTP request (returning a 404 error)."""
+
+ def __init__(self, ready):
+ self.ready = ready # Event signaling we're listening
+ self.stopping = False
+ threading.Thread.__init__(self)
+
+ def run(self):
+ svr = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ svr.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ svr.bind((HOST, PORT))
+ svr.listen(1)
+ self.ready.set()
+ conn = None
+ sent_response = False
+ while not self.stopping:
+ if conn is None:
+ r = [svr]
+ else:
+ r = [conn]
+ r, w, x = select.select(r, [], [], 0.01)
+ if not r:
+ continue
+ s = r[0]
+ if s is svr:
+ conn, addr = svr.accept()
+ ##print "connect from", `addr`
+ else:
+ assert s is conn
+ data = conn.recv(1000)
+ ##print "received", `data`
+ if not data:
+ break
+ if not sent_response:
+ conn.send(RESPONSE)
+ conn.close()
+ conn = None
+ sent_response = True
+ if conn is not None:
+ conn.close()
+ svr.close()
+ ##print "stopped"
+
+ def stop(self):
+ ##print "stopping"
+ self.stopping = True
+
+class TestNetwork(TempFiles):
def setUp(self):
- unittest.TestCase.setUp(self)
+ TempFiles.setUp(self)
self.network = Network()
- # Create a list of temporary files to clean up at the end
- self.tempfiles = []
-
- def tearDown(self):
- # Clean up temporary files (or directories)
- for fn in self.tempfiles:
- if isdir(fn):
- shutil.rmtree(fn)
- elif isfile(fn):
- os.remove(fn)
- unittest.TestCase.tearDown(self)
-
- def adddir(self):
- # Create and register a temporary directory
- dir = tempfile.mktemp()
- self.tempfiles.append(dir)
- os.mkdir(dir)
- return dir
-
- def cmpfile(self, file1, file2, mode="r"):
- # Compare two files; they must exist
- f1 = open(file1, mode)
- try:
- data1 = f1.read()
- finally:
- f1.close()
- f2 = open(file2, mode)
- try:
- data2 = f2.read()
- finally:
- f2.close()
- return data1 == data2
def test_initial_state(self):
self.assertEqual(self.network.rooturl, None)
@@ -93,11 +120,11 @@
def test_findrooturl_notfound(self):
# XXX This test will fail if a file /tmp/@@Zope/Root exists :-(
- target = self.adddir()
+ target = self.tempdir()
self.assertEqual(self.network.findrooturl(target), None)
def test_findrooturl_found(self):
- target = self.adddir()
+ target = self.tempdir()
zdir = join(target, "@@Zope")
os.mkdir(zdir)
rootfile = join(zdir, "Root")
@@ -108,7 +135,7 @@
def test_saverooturl(self):
self.network.setrooturl(sample_rooturl)
- target = self.adddir()
+ target = self.tempdir()
zdir = join(target, "@@Zope")
os.mkdir(zdir)
rootfile = join(zdir, "Root")
@@ -119,7 +146,7 @@
self.assertEqual(data.strip(), sample_rooturl)
def test_loadrooturl(self):
- target = self.adddir()
+ target = self.tempdir()
self.assertRaises(Error, self.network.loadrooturl, target)
zdir = join(target, "@@Zope")
os.mkdir(zdir)
@@ -130,11 +157,15 @@
self.assertEqual(new.rooturl, sample_rooturl)
def test_httpreq(self):
- # XXX I don't want to write up a dummy server just to test
- # this so I'll just send a request to python.org that I know
- # will fail.
- self.network.setrooturl("http://python.org")
- self.assertRaises(Error, self.network.httpreq, "/xyzzy", "@@view")
+ ready = threading.Event()
+ svr = DummyServer(ready)
+ svr.start()
+ ready.wait()
+ try:
+ self.network.setrooturl("http://%s:%s" % (HOST, PORT))
+ self.assertRaises(Error, self.network.httpreq, "/xyzzy", "@@view")
+ finally:
+ svr.stop()
def test_slurptext_html(self):
fp = StringIO("<p>This is some\n\ntext.</p>\n")