[Zope3-checkins] CVS: Zope3/src/zope/app/fssync/tests - test_committer.py:1.1

Guido van Rossum guido@python.org
Tue, 27 May 2003 15:41:53 -0400


Update of /cvs-repository/Zope3/src/zope/app/fssync/tests
In directory cvs.zope.org:/tmp/cvs-serv1102

Added Files:
	test_committer.py 
Log Message:
A new committer, replacing fromFS().  With unit tests!


=== Added File Zope3/src/zope/app/fssync/tests/test_committer.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests for the Committer class.

$Id: test_committer.py,v 1.1 2003/05/27 19:41:53 gvanrossum Exp $
"""

import os
import shutil
import tempfile
import unittest

from zope.component.service import serviceManager
from zope.component.adapter import provideAdapter
from zope.component.tests.placelesssetup import PlacelessSetup
from zope.testing.cleanup import CleanUp

from zope.xmlpickle import loads, dumps
from zope.fssync import fsutil

from zope.app.interfaces.container import IContainer
from zope.app.interfaces.file import IFileFactory
from zope.app.interfaces.fssync import IGlobalFSSyncService

from zope.app.fssync.committer import Committer, SynchronizationError
from zope.app.fssync.fsregistry import provideSynchronizer, fsRegistry
from zope.app.fssync.classes import Default

from zope.app.content.fssync import DirectoryAdapter

class Sample(object):
    pass

class PretendContainer(object):
    
    __implements__ = IContainer

    def __init__(self):
        self.holding = {}

    def setObject(self, name, value):
        name = name.lower()
        if name in self.holding:
            raise KeyError
        self.holding[name] = value
        return name

    def __delitem__(self, name):
        name = name.lower()
        del self.holding[name]

    def __getitem__(self, name):
        name = name.lower()
        return self.holding[name]

    def __contains__(self, name):
        name = name.lower()
        return name in self.holding

    def items(self):
        return self.holding.items()

PCname = PretendContainer.__module__ + "." + PretendContainer.__name__

class DictAdapter(Default):

    def setBody(self, body):
        old = self.context
        assert type(old) is dict
        new = loads(body)
        assert type(new) is dict
        old.update(new)
        for key in old.keys():
            if key not in new:
                del old[key]

class TestCommitter(unittest.TestCase, PlacelessSetup):

    def setUp(self):
        # Set up standard services
        PlacelessSetup.setUp(self)

        # Set up FSRegistryService
        serviceManager.defineService("FSRegistryService", IGlobalFSSyncService)
        serviceManager.provideService("FSRegistryService", fsRegistry)
        provideSynchronizer(None, Default)

        # Instance initialization
        self.tempfiles = []
        self.com = Committer()
        self.metadata = self.com.metadata
        self.getentry = self.metadata.getentry
        self.getnames = self.metadata.getnames

    def tearDown(self):
        # Clean up temporary files and directories
        for tfn in self.tempfiles:
            if os.path.isdir(tfn):
                shutil.rmtree(tfn)
            elif os.path.exists(tfn):
                os.remove(tfn)

        # Clean up service registrations etc.
        PlacelessSetup.tearDown(self)

    def writefile(self, fn, data):
        head, tail = os.path.split(fn)
        if not os.path.exists(head):
            os.makedirs(head)
        fp = open(fn, "wb")
        try:
            fp.write(data)
        finally:
            fp.close()

    def tempfile(self, data):
        tfn = tempfile.mktemp()
        self.tempfiles.append(tfn)
        if data is not None:
            self.writefile(tfn, data)
        return tfn

    def tempdir(self):
        tfn = tempfile.mktemp()
        self.tempfiles.append(tfn)
        os.mkdir(tfn)
        return tfn

    def test_get_adapter(self):
        obj = Sample()
        adapter = self.com.get_adapter(obj)
        self.assertEqual(adapter.__class__, Default)

    def test_remove(self):
        tfn = self.tempfile("12345")
        self.com.remove(tfn)
        self.failIf(os.path.exists(tfn))
        self.com.remove(tfn)
        tfn = self.tempdir()
        self.writefile(os.path.join(tfn, "foo", "bar"), "12345")
        self.com.remove(tfn)
        self.failIf(os.path.exists(tfn))

    def test_remove_all(self):
        tfn = self.tempdir()
        foo = os.path.join(tfn, "foo")
        originalfoo = fsutil.getoriginal(foo)
        extrafoo = os.path.join(fsutil.getextra(foo), "x")
        annfoo = os.path.join(fsutil.getannotations(foo), "a")
        self.writefile(foo, "12345")
        self.writefile(originalfoo, "12345")
        self.writefile(extrafoo, "12345")
        self.writefile(annfoo, "12345")
        self.com.remove_all(foo)
        self.failIf(os.path.exists(foo))
        self.failIf(os.path.exists(originalfoo))
        self.failIf(os.path.exists(extrafoo))
        self.failIf(os.path.exists(annfoo))

    def test_write_file(self):
        data = "12345\rabcde\n12345\r\nabcde"
        tfn = self.tempfile(None)
        self.com.write_file(data, tfn)
        f = open(tfn, "rb")
        try:
            x = f.read()
        finally:
            f.close()
        self.assertEqual(x, data)

    def test_write_file_and_original(self):
        data = "12345\rabcde\n12345\r\nabcde"
        tfn = self.tempdir()
        foofile = os.path.join(tfn, "foo")
        self.com.write_file_and_original(data, foofile)
        f = open(foofile, "rb")
        try:
            x = f.read()
        finally:
            f.close()
        self.assertEqual(x, data)
        f = open(fsutil.getoriginal(foofile), "rb")
        try:
            x = f.read()
        finally:
            f.close()
        self.assertEqual(x, data)

    def test_read_file(self):
        data = "12345\rabcde\n12345\r\nabcde"
        tfn = self.tempfile(data)
        x = self.com.read_file(tfn)
        self.assertEqual(x, data)

    def test_load_file(self):
        data = {"foo": [42]}
        tfn = self.tempfile(dumps(data))
        x = self.com.load_file(tfn)
        self.assertEqual(x, data)

    def test_set_item_non_icontainer(self):
        container = {}
        self.com.set_item(container, "foo", 42)
        self.assertEqual(container, {"foo": 42})

    def test_set_item_icontainer_new(self):
        container = PretendContainer()
        self.com.set_item(container, "foo", 42)
        self.assertEqual(container.holding, {"foo": 42})

    def test_set_item_icontainer_replace(self):
        container = PretendContainer()
        self.com.set_item(container, "foo", 42)
        self.com.set_item(container, "foo", 24, replace=True)
        self.assertEqual(container.holding, {"foo": 24})

    def test_set_item_icontainer_error_existing(self):
        container = PretendContainer()
        self.com.set_item(container, "foo", 42)
        self.assertRaises(KeyError, self.com.set_item,
                          container, "foo", 42)

    def test_set_item_icontainer_error_nonexisting(self):
        container = PretendContainer()
        self.assertRaises(KeyError, self.com.set_item,
                          container, "foo", 42, replace=True)

    def test_set_item_icontainer_error_newname(self):
        container = PretendContainer()
        self.assertRaises(SynchronizationError, self.com.set_item,
                          container, "Foo", 42)

    def test_create_object_factory(self):
        container = {}
        entry = {"flag": "added", "factory": "__builtin__.int"}
        tfn = os.path.join(self.tempdir(), "foo")
        self.com.create_object(container, "foo", entry, tfn)
        self.assertEqual(container, {"foo": 0})
        self.assertEqual(entry, {"factory": None, "type": "__builtin__.int"})

    def test_create_object_default(self):
        container = {}
        entry = {"flag": "added"}
        data = ["hello", "world"]
        tfn = os.path.join(self.tempdir(), "foo")
        self.writefile(tfn, dumps(data))
        self.com.create_object(container, "foo", entry, tfn)
        self.assertEqual(container, {"foo": ["hello", "world"]})
        self.assertEqual(entry, {"factory": None, "type": "__builtin__.list"})

    def test_create_object_ifilefactory(self):
        def factory_maker(container):
            def factory(name, content_type, data):
                return loads(data)
            return factory
        provideAdapter(IContainer, IFileFactory, factory_maker)
        container = PretendContainer()
        entry = {"flag": "added"}
        data = ["hello", "world"]
        tfn = os.path.join(self.tempdir(), "foo")
        self.writefile(tfn, dumps(data))
        self.com.create_object(container, "foo", entry, tfn)
        self.assertEqual(container.holding, {"foo": ["hello", "world"]})
        self.assertEqual(entry, {"factory": None, "type": "__builtin__.list"})

    def test_synch(self):
        # This is a big-ass test that tests various aspects of
        # synch(), including synch_dir(), synch_new(), and
        # synch_old().  It's such a pain to set things up that I don't
        # split it up in a separate test method for each case in each
        # method.  Nevertheless I expect to get good coverage.

        provideSynchronizer(PretendContainer, DirectoryAdapter)
        provideSynchronizer(dict, DictAdapter)
        parent = PretendContainer()
        child = PretendContainer()
        parent.setObject("child", child)
        foo = {}
        child.setObject("foo", foo)
        parentdir = self.tempdir()
        childdir = os.path.join(parentdir, "child")
        foofile = os.path.join(childdir, "foo")
        self.writefile(foofile, dumps(foo))
        originalfoofile = fsutil.getoriginal(foofile)
        self.writefile(originalfoofile, dumps(foo))
        parententry = self.getentry(parentdir)
        parententry["@"] = "@" # To make it non-empty
        childentry = self.getentry(childdir)
        childentry["@"] = "@"
        fooentry = self.getentry(foofile)
        fooentry["type"] = "__builtin__.dict"

        # Test a no-op
        self.com.synch(parent, "", parentdir)
        self.assertEqual(self.com.get_errors(), [])
        self.assertEqual(parent.holding, {"child": child})
        self.assertEqual(child.holding, {"foo": foo})
        self.assertEqual(self.com.read_file(foofile), dumps(foo))
        self.assertEqual(self.com.read_file(originalfoofile), dumps(foo))

        # Test modifying a file
        newfoo = {"x": 42}
        self.writefile(foofile, dumps(newfoo))
        self.com.synch(parent, "", parentdir)
        self.assertEqual(self.com.get_errors(), [])
        self.assertEqual(parent.holding, {"child": child})
        self.assertEqual(child.holding, {"foo": newfoo})
        self.assertEqual(self.com.read_file(foofile), dumps(newfoo))
        self.assertEqual(self.com.read_file(originalfoofile), dumps(newfoo))

        # Test adding a file
        bar = {"y": 42}
        barfile = os.path.join(childdir, "bar")
        originalbarfile = fsutil.getoriginal(barfile)
        barentry = self.getentry(barfile)
        barentry["flag"] = "added"
        self.writefile(barfile, dumps(bar))
        self.com.synch(parent, "", parentdir)
        self.assertEqual(child.holding, {"foo": newfoo, "bar": bar})
        self.assertEqual(self.com.read_file(barfile), dumps(bar))
        self.assertEqual(self.com.read_file(originalbarfile), dumps(bar))
        self.assertEqual(barentry.get("flag"), None)

        # Test removing a file
        os.remove(barfile)
        barentry["flag"] = "removed"
        self.com.synch(parent, "", parentdir)
        self.assertEqual(child.holding, {"foo": newfoo})
        self.assertEqual(barentry, {})
        self.failIf(os.path.exists(barfile))
        self.failIf(os.path.exists(originalbarfile))

        # Test changing the type of an object
        altfoo = 42
        self.writefile(foofile, dumps(altfoo))
        fooentry["type"] = "__builtin__.int"
        self.com.synch(parent, "", parentdir)
        self.assertEqual(child.holding, {"foo": altfoo})
        self.assertEqual(self.com.read_file(foofile), dumps(altfoo))
        self.assertEqual(self.com.read_file(originalfoofile), dumps(altfoo))

        # Test adding an empty directory
        kwikdir = os.path.join(childdir, "kwik")
        os.mkdir(kwikdir)
        kwikentry = self.getentry(kwikdir)
        kwikentry["factory"] = PCname
        kwikentry["flag"] = "added"
        self.com.synch(parent, "", parentdir)
        self.failUnless("kwik" in child)
        self.assertEqual(child["kwik"].__class__, PretendContainer)
        self.assertEqual(kwikentry.get("flag"), None)

        # Test removing an empty directory
        kwikentry["flag"] = "removed"
        shutil.rmtree(kwikdir)
        self.com.synch(parent, "", parentdir)
        self.failIf("kwik" in child)
        self.assertEqual(kwikentry, {})

        # Test adding a non-empty directory tree (parent/child/kwik/kwek/kwak)
        kwikentry["flag"] = "added"
        kwikentry["factory"] = PCname
        kwekdir = os.path.join(kwikdir, "kwek")
        kwekentry = self.getentry(kwekdir)
        kwekentry["flag"] = "added"
        kwekentry["factory"] = PCname
        kwakfile = os.path.join(kwekdir, "kwak")
        kwakentry = self.getentry(kwakfile)
        kwakentry["flag"] = "added"
        data = dumps(["Google", "Kwik", "Kwek", "Kwak"])
        self.writefile(kwakfile, data)
        assert os.path.isdir(kwikdir)
        assert os.path.isdir(kwekdir)
        assert os.path.isfile(kwakfile)
        self.com.synch(parent, "", parentdir)
        self.assertEqual(kwikentry.get("flag"), None)
        self.assertEqual(kwekentry.get("flag"), None)
        self.assertEqual(kwakentry.get("flag"), None)
        self.failUnless(os.path.isfile(kwakfile))
        self.assertEqual(self.com.read_file(kwakfile), data)
        self.failUnless(os.path.isfile(fsutil.getoriginal(kwakfile)))

        # Test removing the subtree rooted at kwek
        kwekentry["flag"] = "removed"
        kwakentry["flag"] = "removed"
        os.remove(kwakfile)
        self.com.synch(parent, "", parentdir)
        self.assertEqual(kwekentry, {})
        self.assertEqual(kwakentry, {})
        self.failIf(os.path.exists(kwakfile))
        self.failIf(os.path.exists(kwekdir))
        self.failUnless(os.path.exists(kwikdir))

        # Test conflict reporting for object modified in both places
        self.writefile(originalfoofile, "something else")
        self.com.synch(parent, "", parentdir)
        self.assertEqual(self.com.get_errors(), [foofile])
        self.assertEqual(self.com.read_file(foofile),
                         self.com.read_file(originalfoofile))

        # Test conflict reporting for object added in both places
        os.remove(originalfoofile)
        fooentry["flag"] = "added"
        self.com.conflicts = []
        self.com.synch(parent, "", parentdir)
        self.assertEqual(self.com.get_errors(), [foofile])
        self.assertEqual(fooentry.get("flag"), None)

        # Test conflict reporting for object removed from container
        del child["foo"]
        self.com.conflicts = []
        self.com.synch(parent, "", parentdir)
        self.assertEqual(self.com.get_errors(), [foofile])

        # Test conflict reporting for existing file marked as removed
        fooentry["flag"] = "removed"
        self.com.conflicts = []
        self.com.synch(parent, "", parentdir)
        self.assertEqual(self.com.get_errors(), [foofile])
        self.assertEqual(fooentry.get("flag"), None)

        # Test conflict reporting for non-existing file marked as added
        fooentry["flag"] = "added"
        self.com.conflicts = []
        self.com.synch(parent, "", parentdir)
        self.assertEqual(self.com.get_errors(), [foofile])
        self.assertEqual(fooentry.get("flag"), None)

        # XXX Manipulation of Extra and Annotations is not tested

        # XXX Changing directories into files or vice versa is not supported

def test_suite():
    s = unittest.TestSuite()
    s.addTest(unittest.makeSuite(TestCommitter))
    return s

def test_main():
    unittest.TextTestRunner().run(test_suite())

if __name__=='__main__':
    test_main()