[Zope3-checkins] CVS: Zope3/src/zope/fssync/tests - __init__.py:1.1 test_merger.py:1.1 test_metadata.py:1.1
Guido van Rossum
guido@python.org
Mon, 12 May 2003 16:19:40 -0400
Update of /cvs-repository/Zope3/src/zope/fssync/tests
In directory cvs.zope.org:/tmp/cvs-serv334/tests
Added Files:
__init__.py test_merger.py test_metadata.py
Log Message:
The complexity of the merge algorithm was driving me crazy. Start
over using test-driven design. Also use a more reasonable approach to
loading and storing "entries" file -- this is now abstracted away in a
metadata database. There's still much to do, but this is a better
foundation for sure!
=== Added File Zope3/src/zope/fssync/tests/__init__.py ===
#
# This file is necessary to make this directory a package.
=== Added File Zope3/src/zope/fssync/tests/test_merger.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests for the Merger class.
$Id: test_merger.py,v 1.1 2003/05/12 20:19:39 gvanrossum Exp $
"""
import os
import shutil
import unittest
import tempfile
from os.path import exists, isdir, isfile, realpath, normcase
from zope.fssync.merger import Merger
class MockMetadatabase(object):
def __init__(self):
self.database = {}
def makekey(self, file):
file = realpath(file)
key = normcase(file)
return key, file
def getentry(self, file):
key, file = self.makekey(file)
if key not in self.database:
self.database[key] = {}
return self.database[key]
def setmetadata(self, file, metadata={}):
key, file = self.makekey(file)
if key not in self.database:
self.database[key] = {"path": file}
self.database[key].update(metadata)
def delmetadata(self, file):
key, file = self.makekey(file)
if key in self.database:
del self.database[key]
added = {"flag": "added"}
removed = {"flag": "removed"}
class TestMerger(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
# Create a list of temporary files to clean up at the end
self.tempfiles = []
def tearDown(self):
# Clean up temporary files (or directories)
for fn in self.tempfiles:
try:
if isdir(fn):
shutil.rmtree(fn)
else:
os.unlink(fn)
except os.error:
pass
unittest.TestCase.tearDown(self)
def addfile(self, data, suffix="", mode="w"):
# Register a temporary file; write data to it if given
file = tempfile.mktemp(suffix)
self.tempfiles.append(file)
if data is not None:
f = open(file, mode)
try:
f.write(data)
finally:
f.close()
return file
def cmpfile(self, file1, file2, mode="r"):
# Compare two files; they must exist
f1 = open(file1, mode)
try:
data1 = f1.read()
finally:
f1.close()
f2 = open(file2, mode)
try:
data2 = f2.read()
finally:
f2.close()
return data1 == data2
def runtest(self, localdata, origdata, remotedata,
localmetadata, remotemetadata, exp_action, exp_state,
exp_merge_state=None):
local = self.addfile(localdata)
orig = self.addfile(origdata)
remote = self.addfile(remotedata)
md = MockMetadatabase()
if localmetadata is not None:
md.setmetadata(local, localmetadata)
if remotemetadata is not None:
md.setmetadata(remote, remotemetadata)
m = Merger(md, verbose=False)
action, state = m.classify_files(local, orig, remote)
self.assertEqual((action, state), (exp_action, exp_state))
# Now try the actual merge
state = m.merge_files(local, orig, remote, action, state)
self.assertEqual(state, exp_merge_state or exp_state)
self.assert_(md.getentry(remote).get("flag") is None)
# Verify that the returned state matches reality
if state == "Uptodate":
self.assert_(self.cmpfile(local, orig))
self.assert_(self.cmpfile(orig, remote))
self.assert_(md.getentry(local))
self.assert_(not md.getentry(local).get("flag"),
md.getentry(local))
self.assert_(md.getentry(remote))
elif state == "Modified":
self.assert_(not self.cmpfile(local, orig))
self.assert_(self.cmpfile(orig, remote))
self.assert_(md.getentry(local))
self.assert_(not md.getentry(local).get("flag"))
self.assert_(md.getentry(remote))
elif state == "Added":
self.assert_(exists(local))
self.assert_(not exists(orig))
self.assert_(not exists(remote))
self.assert_(not md.getentry(remote))
self.assert_(md.getentry(local).get("flag") == "added")
elif state == "Removed":
self.assert_(not exists(local))
self.assert_(self.cmpfile(orig, remote))
self.assert_(md.getentry(local).get("flag") == "removed")
self.assert_(md.getentry(remote))
elif state == "Nonexistent":
self.assert_(not exists(local))
self.assert_(not exists(orig))
self.assert_(not exists(remote))
self.assert_(not md.getentry(local))
self.assert_(not md.getentry(remote))
elif state == "Conflict":
self.assert_(md.getentry(local).has_key("conflict"))
# No other checks; there are many kinds of conflicts
elif state == "Spurious":
self.assert_(exists(local))
# Don't care about orig
self.assert_(not exists(remote))
self.assert_(not md.getentry(local))
self.assert_(not md.getentry(remote))
else:
self.assert_(False)
# Test cases for files
def test_all_equal(self):
self.runtest("a", "a", "a", {}, {}, "Nothing", "Uptodate")
def test_local_modified(self):
self.runtest("ab", "a", "a", {}, {}, "Nothing", "Modified")
def test_remote_modified(self):
self.runtest("a", "a", "ab", {}, {}, "Copy", "Uptodate")
def test_both_modified_resolved(self):
self.runtest("l\na\n", "a\n", "a\nr\n", {}, {}, "Merge", "Modified")
def test_both_modified_conflict(self):
self.runtest("ab", "a", "ac", {}, {}, "Merge", "Modified", "Conflict")
def test_local_added(self):
self.runtest("a", None, None, added, None, "Nothing", "Added")
def test_remote_added(self):
self.runtest(None, None, "a", None, {}, "Copy", "Uptodate")
def test_both_added_same(self):
self.runtest("a", None, "a", added, {}, "Fix", "Uptodate")
def test_both_added_different(self):
self.runtest("a", None, "b", added, {},
"Merge", "Modified", "Conflict")
def test_local_removed(self):
self.runtest(None, "a", "a", removed, {}, "Nothing", "Removed")
def test_remote_removed(self):
self.runtest("a", "a", None, {}, None, "Remove", "Nonexistent")
def test_both_removed(self):
self.runtest(None, "a", None, removed, None, "Remove", "Nonexistent")
def test_local_lost_remote_unchanged(self):
self.runtest(None, "a", "a", {}, {}, "Copy", "Uptodate")
def test_local_lost_remote_modified(self):
self.runtest(None, "a", "b", {}, {}, "Copy", "Uptodate")
def test_local_lost_remote_removed(self):
self.runtest(None, "a", None, {}, None, "Remove", "Nonexistent")
def test_spurious(self):
self.runtest("a", None, None, None, None, "Nothing", "Spurious")
# XXX need test cases for anomalies, e.g. files missing or present
# in spite of metadata, or directories instead of files, etc.
def test_suite():
loader = unittest.TestLoader()
return loader.loadTestsFromTestCase(TestMerger)
def test_main():
unittest.TextTestRunner().run(test_suite())
if __name__=='__main__':
test_main()
=== Added File Zope3/src/zope/fssync/tests/test_metadata.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests for the Metadata class.
$Id: test_metadata.py,v 1.1 2003/05/12 20:19:39 gvanrossum Exp $
"""
import os
import shutil
import unittest
import tempfile
from os.path import exists, isdir, isfile, split, join, realpath, normcase
from zope.xmlpickle import loads, dumps
from zope.fssync.metadata import Metadata
class TestMetadata(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
# Create a list of temporary files to clean up at the end
self.tempfiles = []
def tearDown(self):
# Clean up temporary files (or directories)
for fn in self.tempfiles:
try:
if isdir(fn):
shutil.rmtree(fn)
else:
os.unlink(fn)
except os.error:
pass
unittest.TestCase.tearDown(self)
def adddir(self):
# Register and create a temporary directory
dir = tempfile.mktemp()
self.tempfiles.append(dir)
os.mkdir(dir)
return dir
def test_initial_state(self):
md = Metadata()
dir = self.adddir()
self.assertEqual(md.getnames(dir), [])
foo = join(dir, "foo")
self.assertEqual(md.getentry(foo), {})
self.assertEqual(md.getnames(dir), [])
def test_adding(self):
md = Metadata()
dir = self.adddir()
foo = join(dir, "foo")
e = md.getentry(foo)
e["hello"] = "world"
self.assertEqual(md.getentry(foo), {"hello": "world"})
self.assert_(md.getentry(foo) is e)
self.assertEqual(md.getnames(dir), ["foo"])
return md, foo
def test_deleting(self):
md, foo = self.test_adding()
dir = os.path.dirname(foo)
md.getentry(foo).clear()
self.assertEqual(md.getentry(foo), {})
self.assertEqual(md.getnames(dir), [])
def test_flush(self):
md, foo = self.test_adding()
dir = os.path.dirname(foo)
md.flush()
efile = join(dir, "@@Zope", "Entries.xml")
self.assert_(isfile(efile))
f = open(efile)
data = f.read()
f.close()
entries = loads(data)
self.assertEqual(entries, {"foo": {"hello": "world"}})
md.getentry(foo).clear()
md.flush()
self.assert_(not exists(efile))
def test_suite():
loader = unittest.TestLoader()
return loader.loadTestsFromTestCase(TestMetadata)
def test_main():
unittest.TextTestRunner().run(test_suite())
if __name__=='__main__':
test_main()