[Zope3-checkins] CVS: Zope3/src/zope/app/fssync/tests - test_committer.py:1.15.2.1 sampleclass.py:1.2.4.1

Grégoire Weber zope@i-con.ch
Sun, 22 Jun 2003 10:24:05 -0400


Update of /cvs-repository/Zope3/src/zope/app/fssync/tests
In directory cvs.zope.org:/tmp/cvs-serv24874/src/zope/app/fssync/tests

Modified Files:
      Tag: cw-mail-branch
	sampleclass.py 
Added Files:
      Tag: cw-mail-branch
	test_committer.py 
Log Message:
Synced up with HEAD

=== Added File Zope3/src/zope/app/fssync/tests/test_committer.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests for the Committer class.

$Id: test_committer.py,v 1.15.2.1 2003/06/22 14:23:04 gregweb Exp $
"""

import os
import shutil
import unittest

from zope.component.service import serviceManager
from zope.component.adapter import provideAdapter
from zope.app.tests.placelesssetup import PlacelessSetup
from zope.exceptions import NotFoundError
from zope.testing.cleanup import CleanUp

from zope.xmlpickle import loads, dumps
from zope.fssync import fsutil
from zope.fssync.tests.mockmetadata import MockMetadata
from zope.fssync.tests.tempfiles import TempFiles

from zope.app.interfaces.container import IContainer, IZopeContainer
from zope.app.interfaces.file import IFileFactory, IDirectoryFactory
from zope.app.interfaces.fssync import IGlobalFSSyncService
from zope.app.interfaces.traversing import ITraversable

from zope.app.fssync import committer # The module
from zope.app.fssync.committer import Checker, Committer, SynchronizationError
from zope.app.fssync.fsregistry import provideSynchronizer, fsRegistry
from zope.app.fssync.classes import Default

from zope.app.content.fssync import DirectoryAdapter

from zope.interface import implements


class Sample(object):
    pass


class PretendContainer(object):

    implements(IContainer, ITraversable, IZopeContainer)

    def __init__(self):
        self.holding = {}

    def setObject(self, name, value):
        name = name.lower()
        if name in self.holding:
            raise KeyError
        self.holding[name] = value
        return name

    def __delitem__(self, name):
        name = name.lower()
        del self.holding[name]

    def __getitem__(self, name):
        name = name.lower()
        return self.holding[name]

    def get(self, name):
        name = name.lower()
        return self.holding.get(name)

    def __contains__(self, name):
        name = name.lower()
        return name in self.holding

    def keys(self):
        return self.holding.keys()

    def items(self):
        return self.holding.items()

    def traverse(self, name, parameters, pname, furtherPath):
        try:
            return self[name]
        except KeyError:
            raise NotFoundError

PCname = PretendContainer.__module__ + "." + PretendContainer.__name__


class DictAdapter(Default):

    def setBody(self, body):
        old = self.context
        assert type(old) is dict
        new = loads(body)
        assert type(new) is dict
        old.update(new)
        for key in old.keys():
            if key not in new:
                del old[key]


class TestBase(TempFiles, PlacelessSetup):

    # Base class for test classes

    def setUp(self):
        # Set up standard services
        PlacelessSetup.setUp(self)

        # Set up FSRegistryService
        serviceManager.defineService("FSRegistryService", IGlobalFSSyncService)
        serviceManager.provideService("FSRegistryService", fsRegistry)
        provideSynchronizer(None, Default)

        # Set up temporary name administration
        TempFiles.setUp(self)

    def tearDown(self):
        # Clean up temporary files and directories
        TempFiles.tearDown(self)

        # Clean up service registrations etc.
        PlacelessSetup.tearDown(self)


def file_factory_maker(container):
    def file_factory(name, content_type, data):
        return loads(data)
    return file_factory

def directory_factory_maker(container):
    def directory_factory(name):
        return PretendContainer()
    return directory_factory

def sort(lst):
    lst.sort()
    return lst


class TestCommitterModule(TestBase):

    def test_get_adapter(self):
        obj = Sample()
        adapter = committer.get_adapter(obj)
        self.assertEqual(adapter.__class__, Default)

    def test_read_file(self):
        data = "12345\rabcde\n12345\r\nabcde"
        tfn = self.tempfile(data, "wb")
        x = committer.read_file(tfn)
        self.assertEqual(x, data)

    def test_load_file(self):
        data = {"foo": [42]}
        tfn = self.tempfile(dumps(data))
        x = committer.load_file(tfn)
        self.assertEqual(x, data)

    def test_set_item_non_icontainer(self):
        container = {}
        committer.set_item(container, "foo", 42)
        self.assertEqual(container, {"foo": 42})

    def test_set_item_icontainer_new(self):
        container = PretendContainer()
        committer.set_item(container, "foo", 42)
        self.assertEqual(container.holding, {"foo": 42})

    def test_set_item_icontainer_replace(self):
        container = PretendContainer()
        committer.set_item(container, "foo", 42)
        committer.set_item(container, "foo", 24, replace=True)
        self.assertEqual(container.holding, {"foo": 24})

    def test_set_item_icontainer_error_existing(self):
        container = PretendContainer()
        committer.set_item(container, "foo", 42)
        self.assertRaises(KeyError, committer.set_item,
                          container, "foo", 42)

    def test_set_item_icontainer_error_nonexisting(self):
        container = PretendContainer()
        self.assertRaises(KeyError, committer.set_item,
                          container, "foo", 42, replace=True)

    def test_set_item_icontainer_error_newname(self):
        container = PretendContainer()
        self.assertRaises(SynchronizationError, committer.set_item,
                          container, "Foo", 42)

    def test_create_object_factory_file(self):
        provideSynchronizer(dict, DictAdapter)
        container = {}
        entry = {"flag": "added", "factory": "__builtin__.dict"}
        tfn = os.path.join(self.tempdir(), "foo")
        data = {"hello": "world"}
        self.writefile(dumps(data), tfn)
        committer.create_object(container, "foo", entry, tfn)
        self.assertEqual(container, {"foo": data})

    def test_create_object_factory_directory(self):
        provideSynchronizer(PretendContainer, DirectoryAdapter)
        container = {}
        entry = {"flag": "added", "factory": PCname}
        tfn = os.path.join(self.tempdir(), "foo")
        os.mkdir(tfn)
        committer.create_object(container, "foo", entry, tfn)
        self.assertEqual(container.keys(), ["foo"])
        self.assertEqual(container["foo"].__class__, PretendContainer)

    def test_create_object_default(self):
        container = {}
        entry = {"flag": "added"}
        data = ["hello", "world"]
        tfn = os.path.join(self.tempdir(), "foo")
        self.writefile(dumps(data), tfn, "wb")
        committer.create_object(container, "foo", entry, tfn)
        self.assertEqual(container, {"foo": ["hello", "world"]})

    def test_create_object_ifilefactory(self):
        provideAdapter(IContainer, IFileFactory, file_factory_maker)
        container = PretendContainer()
        entry = {"flag": "added"}
        data = ["hello", "world"]
        tfn = os.path.join(self.tempdir(), "foo")
        self.writefile(dumps(data), tfn, "wb")
        committer.create_object(container, "foo", entry, tfn)
        self.assertEqual(container.holding, {"foo": ["hello", "world"]})

    def test_create_object_idirectoryfactory(self):
        provideAdapter(IContainer, IDirectoryFactory, directory_factory_maker)
        container = PretendContainer()
        entry = {"flag": "added"}
        tfn = os.path.join(self.tempdir(), "foo")
        os.mkdir(tfn)
        committer.create_object(container, "foo", entry, tfn)
        self.assertEqual(container.holding["foo"].__class__, PretendContainer)


class TestCheckerClass(TestBase):

    def setUp(self):
        # Set up base class (PlacelessSetup and TempNames)
        TestBase.setUp(self)

        # Set up environment
        provideSynchronizer(PretendContainer, DirectoryAdapter)
        provideSynchronizer(dict, DictAdapter)
        provideAdapter(IContainer, IFileFactory, file_factory_maker)
        provideAdapter(IContainer, IDirectoryFactory, directory_factory_maker)

        # Set up fixed part of object tree
        self.parent = PretendContainer()
        self.child = PretendContainer()
        self.grandchild = PretendContainer()
        self.parent.setObject("child", self.child)
        self.child.setObject("grandchild", self.grandchild)
        self.foo = ["hello", "world"]
        self.child.setObject("foo", self.foo)

        # Set up fixed part of filesystem tree
        self.parentdir = self.tempdir()
        self.childdir = os.path.join(self.parentdir, "child")
        os.mkdir(self.childdir)
        self.foofile = os.path.join(self.childdir, "foo")
        self.writefile(dumps(self.foo), self.foofile, "wb")
        self.originalfoofile = fsutil.getoriginal(self.foofile)
        self.writefile(dumps(self.foo), self.originalfoofile, "wb")
        self.grandchilddir = os.path.join(self.childdir, "grandchild")
        os.mkdir(self.grandchilddir)

        # Set up metadata
        self.metadata = MockMetadata()
        self.getentry = self.metadata.getentry

        # Set up fixed part of entries
        self.parententry = self.getentry(self.parentdir)
        self.parententry["path"] = "/parent"
        self.childentry = self.getentry(self.childdir)
        self.childentry["path"] = "/parent/child"
        self.grandchildentry = self.getentry(self.grandchilddir)
        self.grandchildentry["path"] = "/parent/child/grandchild"
        self.fooentry = self.getentry(self.foofile)
        self.fooentry["path"] = "/parent/child/foo"

        # Set up checker
        self.checker = Checker(self.metadata)

    def check_errors(self, expected_errors):
        # Helper to call the checker and assert a given set of errors
        self.checker.check(self.parent, "", self.parentdir)
        self.assertEqual(sort(self.checker.errors()), sort(expected_errors))

    def check_no_errors(self):
        # Helper to call the checker and assert there are no errors
        self.check_errors([])

    def test_vanilla(self):
        # The vanilla situation should not be an error
        self.check_no_errors()

    def test_file_changed(self):
        # Changing a file is okay
        self.newfoo = self.foo + ["news"]
        self.writefile(dumps(self.newfoo), self.foofile, "wb")
        self.check_no_errors()

    def test_file_type_changed(self):
        # Changing a file's type is okay
        self.newfoo = ("one", "two")
        self.fooentry["type"] = "__builtin__.tuple"
        self.writefile(dumps(self.newfoo), self.foofile, "wb")
        self.check_no_errors()

    def test_file_conflict(self):
        # A real conflict is an error
        newfoo = self.foo + ["news"]
        self.writefile(dumps(newfoo), self.foofile, "wb")
        self.foo.append("something else")
        self.check_errors([self.foofile])

    def test_file_sticky_conflict(self):
        # A sticky conflict is an error
        self.fooentry["conflict"] = 1
        self.check_errors([self.foofile])

    def test_file_added(self):
        # Adding a file properly is okay
        self.bar = ["this", "is", "bar"]
        barfile = os.path.join(self.childdir, "bar")
        self.writefile(dumps(self.bar), barfile, "wb")
        barentry = self.getentry(barfile)
        barentry["flag"] = "added"
        self.check_no_errors()

    def test_file_added_no_file(self):
        # Flagging a non-existing file as added is an error
        barfile = os.path.join(self.childdir, "bar")
        barentry = self.getentry(barfile)
        barentry["flag"] = "added"
        self.check_errors([barfile])

    def test_file_spurious(self):
        # A spurious file (empty entry) is okay
        bar = ["this", "is", "bar"]
        barfile = os.path.join(self.childdir, "bar")
        self.writefile(dumps(bar), barfile, "wb")
        self.check_no_errors()

    def test_file_added_no_flag(self):
        # Adding a file without setting the "added" flag is an error
        bar = ["this", "is", "bar"]
        barfile = os.path.join(self.childdir, "bar")
        self.writefile(dumps(bar), barfile, "wb")
        barentry = self.getentry(barfile)
        barentry["path"] = "/parent/child/bar"
        self.check_errors([barfile])

    def test_file_added_twice(self):
        # Adding a file in both places is an error
        bar = ["this", "is", "bar"]
        self.child.setObject("bar", bar)
        barfile = os.path.join(self.childdir, "bar")
        self.writefile(dumps(bar), barfile, "wb")
        barentry = self.getentry(barfile)
        barentry["path"] = "/parent/child/bar"
        self.check_errors([barfile])

    def test_file_lost(self):
        # Losing a file is an error
        os.remove(self.foofile)
        self.check_errors([self.foofile])

    def test_file_lost_originial(self):
        # Losing the original file is an error
        os.remove(self.originalfoofile)
        self.check_errors([self.foofile])

    def test_file_removed(self):
        # Removing a file properly is okay
        os.remove(self.foofile)
        self.fooentry["flag"] = "removed"
        self.check_no_errors()

    def test_file_removed_conflict(self):
        # Removing a file that was changed in the database is an error
        os.remove(self.foofile)
        self.fooentry["flag"] = "removed"
        self.foo.append("news")
        self.check_errors([self.foofile])

    def test_file_removed_twice(self):
        # Removing a file in both places is an error
        os.remove(self.foofile)
        self.fooentry["flag"] = "removed"
        del self.child["foo"]
        self.check_errors([self.foofile])

    def test_file_removed_object(self):
        # Removing the object should cause a conflict
        del self.child["foo"]
        self.check_errors([self.foofile])

    def test_file_entry_cleared(self):
        # Clearing out a file's entry is an error
        self.fooentry.clear()
        self.check_errors([self.foofile])

    def test_dir_added(self):
        # Adding a directory is okay
        bardir = os.path.join(self.childdir, "bar")
        os.mkdir(bardir)
        barentry = self.getentry(bardir)
        barentry["flag"] = "added"
        self.check_no_errors()

    def test_dir_spurious(self):
        # A spurious directory is okay
        bardir = os.path.join(self.childdir, "bar")
        os.mkdir(bardir)
        self.check_no_errors()

    def test_dir_added_no_flag(self):
        # Adding a directory without setting the "added" flag is an error
        bardir = os.path.join(self.childdir, "bar")
        os.mkdir(bardir)
        barentry = self.getentry(bardir)
        barentry["path"] = "/parent/child/bar"
        self.check_errors([bardir])

    def test_dir_lost(self):
        # Losing a directory is an error
        shutil.rmtree(self.grandchilddir)
        self.check_errors([self.grandchilddir])

    def test_dir_removed(self):
        # Removing a directory properly is okay
        shutil.rmtree(self.grandchilddir)
        self.grandchildentry["flag"] = "removed"
        self.check_no_errors()

    def test_dir_entry_cleared(self):
        # Clearing ot a directory's entry is an error
        self.grandchildentry.clear()
        self.check_errors([self.grandchilddir])

    def test_tree_added(self):
        # Adding a subtree is okay
        bardir = os.path.join(self.childdir, "bar")
        os.mkdir(bardir)
        barentry = self.getentry(bardir)
        barentry["path"] = "/parent/child/bar"
        barentry["flag"] = "added"
        bazfile = os.path.join(bardir, "baz")
        self.baz = ["baz"]
        self.writefile(dumps(self.baz), bazfile, "wb")
        bazentry = self.getentry(bazfile)
        bazentry["flag"] = "added"
        burpdir = os.path.join(bardir, "burp")
        os.mkdir(burpdir)
        burpentry = self.getentry(burpdir)
        burpentry["flag"] = "added"
        self.check_no_errors()

    def test_tree_added_no_flag(self):
        # Adding a subtree without flagging everything as added is an error
        bardir = os.path.join(self.childdir, "bar")
        os.mkdir(bardir)
        barentry = self.getentry(bardir)
        barentry["path"] = "/parent/child/bar"
        barentry["flag"] = "added"
        bazfile = os.path.join(bardir, "baz")
        baz = ["baz"]
        self.writefile(dumps(baz), bazfile, "wb")
        bazentry = self.getentry(bazfile)
        bazentry["path"] = "/parent/child/bar/baz"
        burpdir = os.path.join(bardir, "burp")
        os.mkdir(burpdir)
        burpentry = self.getentry(burpdir)
        burpentry["path"] = "/parent/child/bar/burp"
        self.check_errors([bazfile, burpdir])

    def test_tree_removed(self):
        # Removing a subtree is okay
        shutil.rmtree(self.childdir)
        self.childentry["flag"] = "removed"
        self.grandchildentry.clear()
        self.fooentry.clear()
        self.check_no_errors()

    # XXX Extra and Annotations is not tested directly

    # XXX Changing directories into files or vice versa is not tested



class TestCommitterClass(TestCheckerClass):

    # This class extends all tests from TestCheckerClass that call
    # self.check_no_errors() to carry out the change and check on it.
    # Yes, this means that all the tests that call check_errors() are
    # repeated.  Big deal. :-)

    def __init__(self, name):
        TestCheckerClass.__init__(self, name)
        self.name = name

    def setUp(self):
        TestCheckerClass.setUp(self)
        self.committer = Committer(self.metadata)

    def check_no_errors(self):
        TestCheckerClass.check_no_errors(self)
        self.committer.synch(self.parent, "", self.parentdir)
        name = "verify" + self.name[4:]
        method = getattr(self, name, None)
        if method:
            method()
        else:
            print "?", name

    def verify_vanilla(self):
        self.assertEqual(self.parent.keys(), ["child"])
        self.assertEqual(self.parent["child"], self.child)
        self.assertEqual(sort(self.child.keys()), ["foo", "grandchild"])
        self.assertEqual(self.child["foo"], self.foo)
        self.assertEqual(self.child["grandchild"], self.grandchild)
        self.assertEqual(self.grandchild.keys(), [])

    def verify_file_added(self):
        self.assertEqual(self.child["bar"], self.bar)
        self.assertEqual(sort(self.child.keys()), ["bar", "foo", "grandchild"])

    def verify_file_changed(self):
        self.assertEqual(self.child["foo"], self.newfoo)
    
    def verify_file_removed(self):
        self.assertEqual(self.child.keys(), ["grandchild"])

    def verify_file_spurious(self):
        self.verify_vanilla()

    def verify_file_type_changed(self):
        self.assertEqual(self.child["foo"], self.newfoo)

    def verify_dir_removed(self):
        self.assertEqual(self.child.keys(), ["foo"])

    def verify_dir_added(self):
        self.assertEqual(sort(self.child.keys()), ["bar", "foo", "grandchild"])

    def verify_dir_spurious(self):
        self.verify_vanilla()

    def verify_tree_added(self):
        self.assertEqual(sort(self.child.keys()), ["bar", "foo", "grandchild"])
        bar = self.child["bar"]
        self.assertEqual(bar.__class__, PretendContainer)
        baz = bar["baz"]
        self.assertEqual(self.baz, baz)

    def verify_tree_removed(self):
        self.assertEqual(self.parent.keys(), [])


def test_suite():
    s = unittest.TestSuite()
    s.addTest(unittest.makeSuite(TestCommitterModule))
    s.addTest(unittest.makeSuite(TestCheckerClass))
    s.addTest(unittest.makeSuite(TestCommitterClass))
    return s

def test_main():
    unittest.TextTestRunner().run(test_suite())

if __name__=='__main__':
    test_main()


=== Zope3/src/zope/app/fssync/tests/sampleclass.py 1.2 => 1.2.4.1 ===
--- Zope3/src/zope/app/fssync/tests/sampleclass.py:1.2	Mon May  5 14:01:02 2003
+++ Zope3/src/zope/app/fssync/tests/sampleclass.py	Sun Jun 22 10:23:04 2003
@@ -17,6 +17,7 @@
 """
 
 from zope.app.interfaces.fssync import IObjectDirectory, IObjectFile
+from zope.interface import implements
 
 class C1: "C1 Doc"
 class C2: "C2 Doc"
@@ -25,7 +26,7 @@
 class CDefaultAdapter:
     """Default File-system representation for object
     """
-    __implements__ =  IObjectFile
+    implements(IObjectFile)
 
     def __init__(self, object):
          self.context = object
@@ -41,16 +42,16 @@
 
     def getBody(self):
          return self.context.__doc__
-     
+
     def setBody(self):
          pass
-                                                        
+
 class CDirAdapter:
     """Directory Adapter
     """
 
-    __implements__ = IObjectDirectory
-    	
+    implements(IObjectDirectory)
+
     def __init__(self, object):
         self.context = object
 
@@ -64,18 +65,18 @@
         return "Folder Factory"
 
     def contents(self):
-    	return []
+        return []
 
 class CFileAdapter:
     """File Adapter
     """
 
-    __implements__ = IObjectFile
+    implements(IObjectFile)
 
     def __init__(self, object):
         self.context = object
 
-    def extra(self):    
+    def extra(self):
         pass
 
     def typeIdentifier(self):
@@ -83,7 +84,7 @@
 
     def factory(self):
         return "File Factory"
-    
+
     def getBody(self):
         return self.context.__doc__