[CMF-checkins] SVN: CMF/trunk/CMFCore/ Add adapters for content
export / import.
Tres Seaver
tseaver at palladion.com
Sun Sep 25 18:42:33 EDT 2005
Log message for revision 38636:
Add adapters for content export / import.
o StructureFolderWalkingAdapter traverses the content tree, creating folders,
and dispatching to the content objects.
o CSVAwareFileAdapter maps to / from CSV files.
o INIAwareFileAdapter maps to / from INI files.
Changed:
A CMF/trunk/CMFCore/exportimport.py
U CMF/trunk/CMFCore/interfaces/__init__.py
U CMF/trunk/CMFCore/interfaces/_content.py
A CMF/trunk/CMFCore/interfaces/_exportimport.py
U CMF/trunk/CMFCore/tests/base/dummy.py
A CMF/trunk/CMFCore/tests/conformance.py
A CMF/trunk/CMFCore/tests/test_exportimport.py
-=-
Added: CMF/trunk/CMFCore/exportimport.py
===================================================================
--- CMF/trunk/CMFCore/exportimport.py 2005-09-25 22:38:27 UTC (rev 38635)
+++ CMF/trunk/CMFCore/exportimport.py 2005-09-25 22:42:33 UTC (rev 38636)
@@ -0,0 +1,269 @@
+""" CMFCore filesystem exporter / importer adapters.
+
+Filesystem Representation of Site Structure
+===========================================
+
+"Folderish" Types
+-----------------
+
+Folderish instances are mapped to directories within the 'structure'
+portion of the profile, where the folder's relative path within the site
+corresponds to the path of its directory under 'structure'.
+
+The subobjects of a folderish instance are enumerated in the '.objects' file
+in the corresponding directory. This file is a CSV file, with one row per
+subobject, with the following wtructure::
+
+ "<subobject id>","<subobject portal_type>"
+
+Subobjects themselves are represented as individual files or subdirectories
+within the parent's directory.
+
+$Id$
+"""
+from csv import excel
+from csv import reader
+from csv import register_dialect
+from csv import writer
+from ConfigParser import ConfigParser
+import re
+from StringIO import StringIO
+
+from Acquisition import aq_inner
+from Acquisition import aq_parent
+
+from zope.interface import implements
+from zope.interface import directlyProvides
+
+from Products.GenericSetup.interfaces import ISetupTool
+from Products.GenericSetup.tool import SetupTool
+
+from Products.CMFCore.interfaces import IFilesystemExporter
+from Products.CMFCore.interfaces import IFilesystemImporter
+from Products.CMFCore.interfaces import ISiteRoot
+from Products.CMFCore.PortalFolder import PortalFolder
+from Products.CMFCore.utils import getToolByName
+
+#
+# setup_tool handlers
+#
+def exportSiteStructure(context):
+ IFilesystemExporter(context.getSite()).export(context, 'structure')
+
+def importSiteStructure(context):
+ IFilesystemImporter(context.getSite()).import_(context, 'structure')
+
+
+_FSDUMP_OPTION_PATTERN = re.compile( r'(\w+):\w+=(.*)')
+
+class excel_colon(excel):
+ delimiter = ':'
+
+register_dialect('excel_colon', excel_colon)
+
+#
+# Filesystem export/import adapters
+#
+class StructureFolderWalkingAdapter(object):
+
+ implements(IFilesystemExporter, IFilesystemImporter)
+
+ def __init__(self, context):
+ self.context = context
+
+ def export(self, export_context, subdir):
+ """ See IFilesystemExporter.
+ """
+ # Enumerate exportable children
+ exportable = self.context.contentItems()
+ exportable = [x + (IFilesystemExporter(x, None),) for x in exportable]
+ exportable = [x for x in exportable if x[1] is not None]
+
+ stream = StringIO()
+ csv_writer = writer(stream)
+
+ for object_id, object, ignored in exportable:
+ csv_writer.writerow((object_id, object.getPortalTypeName()))
+
+ if not ISiteRoot.providedBy(self.context):
+ subdir = '%s/%s' % (subdir, self.context.getId())
+
+ export_context.writeDataFile('.objects',
+ text=stream.getvalue(),
+ content_type='text/comma-separated-values',
+ subdir=subdir,
+ )
+
+ parser = ConfigParser()
+
+ parser.set('DEFAULT', 'Title', self.context.Title())
+ parser.set('DEFAULT', 'Description', self.context.Description())
+ stream = StringIO()
+ parser.write(stream)
+
+ export_context.writeDataFile('.properties',
+ text=stream.getvalue(),
+ content_type='text/plain',
+ subdir=subdir,
+ )
+
+ for id, object in self.context.objectItems():
+
+ adapter = IFilesystemExporter(object, None)
+
+ if adapter is not None:
+ adapter.export(export_context, subdir)
+
+ def import_(self, import_context, subdir):
+ """ See IFilesystemImporter.
+ """
+ context = self.context
+ if not ISiteRoot.providedBy(context):
+ subdir = '%s/%s' % (subdir, context.getId())
+
+ preserve = import_context.readDataFile('.preserve', subdir)
+
+ prior = context.contentIds()
+
+ if not preserve:
+ preserve = []
+ else:
+ preserve = _globtest(preserve, prior)
+
+ for id in prior:
+ if id not in preserve:
+ context._delObject(id)
+
+ objects = import_context.readDataFile('.objects', subdir)
+ if objects is None:
+ return
+
+ dialect = 'excel'
+ stream = StringIO(objects)
+
+ rowiter = reader(stream, dialect)
+
+ existing = context.objectIds()
+
+ for object_id, portal_type in rowiter:
+
+ if object_id not in existing:
+ object = self._makeInstance(object_id, portal_type,
+ subdir, import_context)
+ if object is None:
+ message = "Couldn't make instance: %s/%s" % (subdir,
+ object_id)
+ import_context.note('SFWA', message)
+ continue
+
+ wrapped = context._getOb(object_id)
+
+ IFilesystemImporter(wrapped).import_(import_context, subdir)
+
+ def _makeInstance(self, id, portal_type, subdir, import_context):
+
+ context = self.context
+ properties = import_context.readDataFile('.properties',
+ '%s/%s' % (subdir, id))
+ tool = getToolByName(context, 'portal_types')
+
+ try:
+ tool.constructContent(portal_type, context, id)
+ except ValueError: # invalid type
+ return None
+
+ content = context._getOb(id)
+
+ if properties is not None:
+ lines = properties.splitlines()
+
+ stream = StringIO('\n'.join(lines))
+ parser = ConfigParser(defaults={'title': '', 'description': 'NONE'})
+ parser.readfp(stream)
+
+ title = parser.get('DEFAULT', 'title')
+ description = parser.get('DEFAULT', 'description')
+
+ content.setTitle(title)
+ content.setDescription(description)
+
+ return content
+
+class CSVAwareFileAdapter(object):
+
+ implements(IFilesystemExporter, IFilesystemImporter)
+
+ def __init__(self, context):
+ self.context = context
+
+ def export(self, export_context, subdir):
+ """ See IFilesystemExporter.
+ """
+ export_context.writeDataFile('%s.csv' % self.context.getId(),
+ self.context.as_csv(),
+ 'text/comma-separated-values',
+ subdir,
+ )
+
+ def listExportableItems(self):
+ """ See IFilesystemExporter.
+ """
+ return ()
+
+ def import_(self, import_context, subdir):
+ """ See IFilesystemImporter.
+ """
+ cid = self.context.getId()
+ data = import_context.readDataFile('%s.csv' % cid, subdir)
+ if data is None:
+ import_context.note('CSAFA',
+ 'no .csv file for %s/%s' % (subdir, cid))
+ else:
+ stream = StringIO(data)
+ self.context.put_csv(stream)
+
+class INIAwareFileAdapter(object):
+
+ implements(IFilesystemExporter, IFilesystemImporter)
+
+ def __init__(self, context):
+ self.context = context
+
+ def export(self, export_context, subdir):
+ """ See IFilesystemExporter.
+ """
+ export_context.writeDataFile('%s.ini' % self.context.getId(),
+ self.context.as_ini(),
+ 'text/plain',
+ subdir,
+ )
+
+ def listExportableItems(self):
+ """ See IFilesystemExporter.
+ """
+ return ()
+
+ def import_(self, import_context, subdir):
+ """ See IFilesystemImporter.
+ """
+ cid = self.context.getId()
+ data = import_context.readDataFile('%s.ini' % cid, subdir)
+ if data is None:
+ import_context.note('SGAIFA',
+ 'no .ini file for %s/%s' % (subdir, cid))
+ else:
+ self.context.put_ini(data)
+
+
+def _globtest(globpattern, namelist):
+ """ Filter names in 'namelist', returning those which match 'globpattern'.
+ """
+ import re
+ pattern = globpattern.replace(".", r"\.") # mask dots
+ pattern = pattern.replace("*", r".*") # change glob sequence
+ pattern = pattern.replace("?", r".") # change glob char
+ pattern = '|'.join(pattern.split()) # 'or' each line
+
+ compiled = re.compile(pattern)
+
+ return filter(compiled.match, namelist)
Modified: CMF/trunk/CMFCore/interfaces/__init__.py
===================================================================
--- CMF/trunk/CMFCore/interfaces/__init__.py 2005-09-25 22:38:27 UTC (rev 38635)
+++ CMF/trunk/CMFCore/interfaces/__init__.py 2005-09-25 22:42:33 UTC (rev 38636)
@@ -17,10 +17,8 @@
from _content import *
from _tools import *
+from _exportimport import *
-# BBB: will be removed in CMF 2.2
-# create zope2 interfaces
-from Interface.bridge import createZope3Bridge
import CachingPolicyManager
import Contentish
import ContentTypeRegistry
@@ -44,6 +42,9 @@
import portal_workflow
import Syndicatable
+# BBB: will be removed in CMF 2.2
+# create zope2 interfaces
+from Interface.bridge import createZope3Bridge
createZope3Bridge(ICachingPolicyManager, CachingPolicyManager,
'CachingPolicyManager')
createZope3Bridge(IContentish, Contentish, 'Contentish')
Modified: CMF/trunk/CMFCore/interfaces/_content.py
===================================================================
--- CMF/trunk/CMFCore/interfaces/_content.py 2005-09-25 22:38:27 UTC (rev 38635)
+++ CMF/trunk/CMFCore/interfaces/_content.py 2005-09-25 22:42:33 UTC (rev 38636)
@@ -541,6 +541,9 @@
o Permission -- List folder contents
"""
+class ISiteRoot(IFolderish):
+ """ Marker interface for the object which serves as the root of a site.
+ """
#
# IOpaqueItems interfaces
Added: CMF/trunk/CMFCore/interfaces/_exportimport.py
===================================================================
--- CMF/trunk/CMFCore/interfaces/_exportimport.py 2005-09-25 22:38:27 UTC (rev 38635)
+++ CMF/trunk/CMFCore/interfaces/_exportimport.py 2005-09-25 22:42:33 UTC (rev 38636)
@@ -0,0 +1,76 @@
+""" Interfaces for content import / export, based on GenericSetup
+
+$Id$
+"""
+from zope.interface import Interface
+from zope.interface import Attribute
+
+class IFilesystemExporter(Interface):
+ """ Plugin interface for site structure export.
+ """
+ def export(export_context, subdir):
+ """ Export our 'context' using the API of 'export_context'.
+
+ o 'export_context' must implement
+ Products.GenericSupport.interfaces.IExportContext.
+
+ o 'subdir', if passed, is the relative subdirectory containing our
+ context within the site.
+ """
+
+ def listExportableItems():
+ """ Return a sequence of the child items to be exported.
+
+ o Each item in the returned sequence will be a tuple,
+ (id, object, adapter) where adapter must implement
+ IFilesystemExporter.
+ """
+
+class IFilesystemImporter(Interface):
+ """ Plugin interface for site structure export.
+ """
+ def import_(import_context, subdir):
+ """ Import our 'context' using the API of 'import_context'.
+
+ o 'import_context' must implement
+ Products.GenericSupport.interfaces.IImportContext.
+
+ o 'subdir', if passed, is the relative subdirectory containing our
+ context within the site.
+ """
+
+class ICSVAware(Interface):
+ """ Interface for objects which dump / load 'text/comma-separated-values'.
+ """
+ def getId():
+ """ Return the Zope id of the object.
+ """
+
+ def as_csv():
+ """ Return a string representing the object as CSV.
+ """
+
+ def put_csv(fd):
+ """ Parse CSV and update the object.
+
+ o 'fd' must be a file-like object whose 'read' method returns
+ CSV text parseable by the 'csv.reader'.
+ """
+
+class IINIAware(Interface):
+ """ Interface for objects which dump / load INI-format files..
+ """
+ def getId():
+ """ Return the Zope id of the object.
+ """
+
+ def as_ini():
+ """ Return a string representing the object as INI.
+ """
+
+ def put_ini(stream_or_text):
+ """ Parse INI-formatted text and update the object.
+
+ o 'stream_or_text' must be either a string, or else a stream
+ directly parseable by ConfigParser.
+ """
Modified: CMF/trunk/CMFCore/tests/base/dummy.py
===================================================================
--- CMF/trunk/CMFCore/tests/base/dummy.py 2005-09-25 22:38:27 UTC (rev 38635)
+++ CMF/trunk/CMFCore/tests/base/dummy.py 2005-09-25 22:42:33 UTC (rev 38636)
@@ -51,6 +51,7 @@
class DummyType(DummyObject):
""" A Dummy Type object """
+ _isTypeInformation = True
def __init__(self, id='Dummy Content', title='Dummy Content', actions=()):
""" To fake out some actions, pass in a sequence of tuples where the
@@ -59,7 +60,7 @@
a page template.
"""
- self.id = id
+ self.id = self._id = id
self.title = title
self._actions = {}
Added: CMF/trunk/CMFCore/tests/conformance.py
===================================================================
--- CMF/trunk/CMFCore/tests/conformance.py 2005-09-25 22:38:27 UTC (rev 38635)
+++ CMF/trunk/CMFCore/tests/conformance.py 2005-09-25 22:42:33 UTC (rev 38636)
@@ -0,0 +1,46 @@
+""" Mix-in classes for testing interface conformance.
+
+$Id$
+"""
+
+class ConformsToISimpleItem:
+
+ def test_conforms_to_Five_ISimpleItem(self):
+ from zope.interface.verify import verifyClass
+ from Products.Five.interfaces import ISimpleItem
+
+ verifyClass(ISimpleItem, self._getTargetClass())
+
+class ConformsToIINIAware:
+
+ def test_conforms_to_IINIAware(self):
+ from zope.interface.verify import verifyClass
+ from Products.CMFCore.interfaces import IINIAware
+
+ verifyClass(IINIAware, self._getTargetClass())
+
+class ConformsToICSVAware:
+
+ def test_conforms_to_ICSVAware(self):
+ from zope.interface.verify import verifyClass
+ from Products.CMFCore.interfaces import ICSVAware
+
+ verifyClass(ICSVAware, self._getTargetClass())
+
+class ConformsToIFilesystemExporter:
+ """ Mix-in for test cases whose target class implements IFilesystemExporter.
+ """
+ def test_conforms_to_IFilesystemExporter(self):
+ from zope.interface.verify import verifyClass
+ from Products.CMFCore.interfaces import IFilesystemExporter
+
+ verifyClass(IFilesystemExporter, self._getTargetClass())
+
+class ConformsToIFilesystemImporter:
+ """ Mix-in for test cases whose target class implements IFilesystemImporter.
+ """
+ def test_conforms_to_IFilesystemImporter(self):
+ from zope.interface.verify import verifyClass
+ from Products.CMFCore.interfaces import IFilesystemImporter
+
+ verifyClass(IFilesystemImporter, self._getTargetClass())
Added: CMF/trunk/CMFCore/tests/test_exportimport.py
===================================================================
--- CMF/trunk/CMFCore/tests/test_exportimport.py 2005-09-25 22:38:27 UTC (rev 38635)
+++ CMF/trunk/CMFCore/tests/test_exportimport.py 2005-09-25 22:42:33 UTC (rev 38636)
@@ -0,0 +1,778 @@
+""" Unit tests for Products.CMFCore.exportimport
+
+$Id$
+"""
+import unittest
+from csv import reader
+from ConfigParser import ConfigParser
+from StringIO import StringIO
+
+from zope.app.tests.placelesssetup import PlacelessSetup
+
+from Products.GenericSetup.tests.common import DummyExportContext
+from Products.GenericSetup.tests.common import DummyImportContext
+
+from conformance import ConformsToIFilesystemExporter
+from conformance import ConformsToIFilesystemImporter
+
+TEST_CSV_AWARE = 'Test CSV Aware'
+KNOWN_CSV = """\
+one,two,three
+four,five,six
+"""
+
+def _makeCSVAware(id):
+ from OFS.SimpleItem import SimpleItem
+ from zope.interface import implements
+ from Products.CMFCore.interfaces import IDynamicType
+ from Products.CMFCore.interfaces import ICSVAware
+
+ class _TestCSVAware(SimpleItem):
+ implements(IDynamicType, ICSVAware)
+ _was_put = None
+ portal_type = TEST_CSV_AWARE
+
+ def getPortalTypeName(self):
+ return self.portal_type
+
+ def as_csv(self):
+ return KNOWN_CSV
+
+ def put_csv(self, text):
+ self._was_put = text
+
+ aware = _TestCSVAware()
+ aware._setId(id)
+
+ return aware
+
+TEST_INI_AWARE = 'Test INI Aware'
+KNOWN_INI = """\
+[DEFAULT]
+title = %s
+description = %s
+"""
+
+def _makeINIAware(id):
+ from OFS.SimpleItem import SimpleItem
+ from zope.interface import implements
+ from Products.CMFCore.interfaces import IDynamicType
+ from Products.CMFCore.interfaces import IINIAware
+
+ class _TestINIAware(SimpleItem):
+ implements(IDynamicType, IINIAware)
+ _was_put = None
+ title = 'INI title'
+ description = 'INI description'
+ portal_type = TEST_INI_AWARE
+
+ def getPortalTypeName(self):
+ return self.portal_type
+
+ def as_ini(self):
+ return KNOWN_INI % (self.title, self.description)
+
+ def put_ini(self, text):
+ self._was_put = text
+
+ aware = _TestINIAware()
+ aware._setId(id)
+
+ return aware
+
+TEST_CONTENT = 'Test Content'
+
+def _makeItem(self):
+ from OFS.SimpleItem import SimpleItem
+ from zope.interface import implements
+ from Products.CMFCore.interfaces import IDynamicType
+
+ class _TestContent(SimpleItem):
+ implements(IDynamicType)
+ portal_type = TEST_CONTENT
+
+ def getPortalTypeName(self):
+ return self.portal_type
+
+ aware = _TestContent()
+ aware._setId(id)
+
+ return aware
+
+TEST_FOLDER = 'Test Folder'
+
+def _makeFolder(id, site_folder=False):
+ from zope.interface import directlyProvides
+ from zope.interface import providedBy
+ from Products.CMFCore.PortalFolder import PortalFolder
+ from Products.CMFCore.interfaces import ISiteRoot
+ from Products.CMFCore.TypesTool import TypesTool
+ from Products.CMFCore.tests.base.dummy import DummyType
+
+ class _TypeInfo(DummyType):
+ def _getId(self):
+ return self._id
+ def constructInstance(self, container, id, *args, **kw):
+ portal_type = self._getId()
+ if portal_type == TEST_FOLDER:
+ content = PortalFolder(id)
+ elif portal_type == TEST_CONTENT:
+ content = _makeItem()
+ content._setId(id)
+ elif portal_type == TEST_INI_AWARE:
+ content = _makeINIAware(id)
+ elif portal_type == TEST_CSV_AWARE:
+ content = _makeCSVAware(id)
+ else:
+ raise ValueError, 'Ugh'
+ content.portal_type = portal_type
+ container._setObject(id, content)
+ return container._getOb(id)
+
+ folder = PortalFolder(id)
+ folder.portal_type = TEST_FOLDER
+ if site_folder:
+ directlyProvides(folder, ISiteRoot + providedBy(folder))
+ tool = folder.portal_types = TypesTool()
+ tool._setObject(TEST_CSV_AWARE, _TypeInfo(TEST_CSV_AWARE))
+ tool._setObject(TEST_INI_AWARE, _TypeInfo(TEST_INI_AWARE))
+ tool._setObject(TEST_CONTENT, _TypeInfo(TEST_CONTENT))
+ tool._setObject(TEST_FOLDER, _TypeInfo(TEST_FOLDER))
+
+ return folder
+
+class SiteStructureExporterTests(PlacelessSetup,
+ unittest.TestCase,
+ ):
+
+ def _getExporter(self):
+ from Products.CMFCore.exportimport import exportSiteStructure
+ return exportSiteStructure
+
+ def _getImporter(self):
+ from Products.CMFCore.exportimport import importSiteStructure
+ return importSiteStructure
+
+ def _makeSetupTool(self):
+ from Products.GenericSetup.tool import SetupTool
+ return SetupTool()
+
+ def _setUpAdapters(self):
+ from zope.app.tests import ztapi
+ from OFS.Image import File
+ from zope.interface import classImplements
+
+ from Products.CMFCore.interfaces import IFilesystemExporter
+ from Products.CMFCore.interfaces import IFilesystemImporter
+ from Products.CMFCore.interfaces import IFolderish
+ from Products.CMFCore.interfaces import ICSVAware
+ from Products.CMFCore.interfaces import IINIAware
+
+ from Products.CMFCore.exportimport import \
+ StructureFolderWalkingAdapter
+ from Products.CMFCore.exportimport import \
+ CSVAwareFileAdapter
+ from Products.CMFCore.exportimport import \
+ INIAwareFileAdapter
+ from Products.CMFCore.exportimport import \
+ OFSFileAdapter
+
+ ztapi.provideAdapter(IFolderish,
+ IFilesystemExporter,
+ StructureFolderWalkingAdapter,
+ )
+
+ ztapi.provideAdapter(IFolderish,
+ IFilesystemImporter,
+ StructureFolderWalkingAdapter,
+ )
+
+ ztapi.provideAdapter(ICSVAware,
+ IFilesystemExporter,
+ CSVAwareFileAdapter,
+ )
+
+ ztapi.provideAdapter(ICSVAware,
+ IFilesystemImporter,
+ CSVAwareFileAdapter,
+ )
+
+ ztapi.provideAdapter(IINIAware,
+ IFilesystemExporter,
+ INIAwareFileAdapter,
+ )
+
+ ztapi.provideAdapter(IINIAware,
+ IFilesystemImporter,
+ INIAwareFileAdapter,
+ )
+
+
+ def test_export_empty_site(self):
+ self._setUpAdapters()
+ site = _makeFolder('site', site_folder=True)
+ site.title = 'test_export_empty_site'
+ site.description = 'Testing export of an empty site.'
+ context = DummyExportContext(site)
+ exporter = self._getExporter()
+ exporter(context)
+
+ self.assertEqual(len(context._wrote), 2)
+ filename, text, content_type = context._wrote[0]
+ self.assertEqual(filename, 'structure/.objects')
+ self.assertEqual(content_type, 'text/comma-separated-values')
+
+ objects = [x for x in reader(StringIO(text))]
+ self.assertEqual(len(objects), 0)
+
+ filename, text, content_type = context._wrote[1]
+ self.assertEqual(filename, 'structure/.properties')
+ self.assertEqual(content_type, 'text/plain')
+
+ parser = ConfigParser()
+ parser.readfp(StringIO(text))
+
+ self.assertEqual(parser.get('DEFAULT', 'Title'),
+ site.title)
+ self.assertEqual(parser.get('DEFAULT', 'Description'),
+ site.description)
+
+ def test_export_empty_site_with_setup_tool(self):
+ self._setUpAdapters()
+ site = _makeFolder('site', site_folder=True)
+ site._setObject('setup_tool', self._makeSetupTool())
+ site.title = 'test_export_empty_site_with_setup_tool'
+ site.description = 'Testing export of an empty site with setup tool.'
+ context = DummyExportContext(site)
+ exporter = self._getExporter()
+ exporter(context)
+
+ self.assertEqual(len(context._wrote), 2)
+ filename, text, content_type = context._wrote[0]
+ self.assertEqual(filename, 'structure/.objects')
+ self.assertEqual(content_type, 'text/comma-separated-values')
+
+ objects = [x for x in reader(StringIO(text))]
+ self.assertEqual(len(objects), 0)
+
+ filename, text, content_type = context._wrote[1]
+ self.assertEqual(filename, 'structure/.properties')
+ self.assertEqual(content_type, 'text/plain')
+
+ parser = ConfigParser()
+ parser.readfp(StringIO(text))
+
+ self.assertEqual(parser.get('DEFAULT', 'Title'),
+ site.title)
+ self.assertEqual(parser.get('DEFAULT', 'Description'),
+ site.description)
+
+ def test_export_site_with_non_exportable_simple_items(self):
+ self._setUpAdapters()
+ ITEM_IDS = ('foo', 'bar', 'baz')
+
+ site = _makeFolder('site', site_folder=True)
+ site.title = 'AAA'
+ site.description = 'BBB'
+ for id in ITEM_IDS:
+ site._setObject(id, _makeItem(id))
+
+ context = DummyExportContext(site)
+ exporter = self._getExporter()
+ exporter(context)
+
+ self.assertEqual(len(context._wrote), 2)
+ filename, text, content_type = context._wrote[0]
+ self.assertEqual(filename, 'structure/.objects')
+ self.assertEqual(content_type, 'text/comma-separated-values')
+
+ objects = [x for x in reader(StringIO(text))]
+ self.assertEqual(len(objects), 3)
+ for index in range(len(ITEM_IDS)):
+ self.assertEqual(objects[index][0], ITEM_IDS[index])
+ self.assertEqual(objects[index][1], TEST_CONTENT)
+
+ filename, text, content_type = context._wrote[1]
+ self.assertEqual(filename, 'structure/.properties')
+ self.assertEqual(content_type, 'text/plain')
+ parser = ConfigParser()
+ parser.readfp(StringIO(text))
+
+ self.assertEqual(parser.get('DEFAULT', 'title'), 'AAA')
+ self.assertEqual(parser.get('DEFAULT', 'description'), 'BBB')
+
+ def test_export_site_with_exportable_simple_items(self):
+ self._setUpAdapters()
+ ITEM_IDS = ('foo', 'bar', 'baz')
+
+ site = _makeFolder('site', site_folder=True)
+ site.title = 'AAA'
+ site.description = 'BBB'
+ for id in ITEM_IDS:
+ site._setObject(id, _makeINIAware(id))
+
+ context = DummyExportContext(site)
+ exporter = self._getExporter()
+ exporter(context)
+
+ self.assertEqual(len(context._wrote), 2 + len(ITEM_IDS))
+ filename, text, content_type = context._wrote[0]
+ self.assertEqual(filename, 'structure/.objects')
+ self.assertEqual(content_type, 'text/comma-separated-values')
+
+ objects = [x for x in reader(StringIO(text))]
+ self.assertEqual(len(objects), 3)
+ for index in range(len(ITEM_IDS)):
+ self.assertEqual(objects[index][0], ITEM_IDS[index])
+ self.assertEqual(objects[index][1], TEST_INI_AWARE)
+
+ filename, text, content_type = context._wrote[index+2]
+ self.assertEqual(filename, 'structure/%s.ini' % ITEM_IDS[index])
+ object = site._getOb(ITEM_IDS[index])
+ self.assertEqual(text.strip(),
+ object.as_ini().strip())
+ self.assertEqual(content_type, 'text/plain')
+
+ filename, text, content_type = context._wrote[1]
+ self.assertEqual(filename, 'structure/.properties')
+ self.assertEqual(content_type, 'text/plain')
+ parser = ConfigParser()
+ parser.readfp(StringIO(text))
+
+ self.assertEqual(parser.get('DEFAULT', 'title'), 'AAA')
+ self.assertEqual(parser.get('DEFAULT', 'description'), 'BBB')
+
+ def test_export_site_with_subfolders(self):
+ self._setUpAdapters()
+ FOLDER_IDS = ('foo', 'bar', 'baz')
+
+ site = _makeFolder('site', site_folder=True)
+ site.title = 'AAA'
+ site.description = 'BBB'
+ for id in FOLDER_IDS:
+ folder = _makeFolder(id)
+ folder.title = 'Title: %s' % id
+ folder.description = 'xyzzy'
+ site._setObject(id, folder)
+
+ context = DummyExportContext(site)
+ exporter = self._getExporter()
+ exporter(context)
+
+ self.assertEqual(len(context._wrote), 2 + (2 *len(FOLDER_IDS)))
+ filename, text, content_type = context._wrote[0]
+ self.assertEqual(filename, 'structure/.objects')
+ self.assertEqual(content_type, 'text/comma-separated-values')
+
+ objects = [x for x in reader(StringIO(text))]
+ self.assertEqual(len(objects), 3)
+
+ for index in range(len(FOLDER_IDS)):
+ id = FOLDER_IDS[index]
+ self.assertEqual(objects[index][0], id)
+ self.assertEqual(objects[index][1], TEST_FOLDER)
+
+ filename, text, content_type = context._wrote[2 + (2 * index)]
+ self.assertEqual(filename, '/'.join(('structure', id, '.objects')))
+ self.assertEqual(content_type, 'text/comma-separated-values')
+ subobjects = [x for x in reader(StringIO(text))]
+ self.assertEqual(len(subobjects), 0)
+
+ filename, text, content_type = context._wrote[2 + (2 * index) + 1]
+ self.assertEqual(filename,
+ '/'.join(('structure', id, '.properties')))
+ self.assertEqual(content_type, 'text/plain')
+ parser = ConfigParser()
+ parser.readfp(StringIO(text))
+
+ self.assertEqual(parser.get('DEFAULT', 'Title'), 'Title: %s' % id)
+
+ filename, text, content_type = context._wrote[1]
+ self.assertEqual(filename, 'structure/.properties')
+ self.assertEqual(content_type, 'text/plain')
+
+ parser = ConfigParser()
+ parser.readfp(StringIO(text))
+
+ self.assertEqual(parser.get('DEFAULT', 'title'), 'AAA')
+ self.assertEqual(parser.get('DEFAULT', 'description'), 'BBB')
+
+ def test_export_site_with_csvaware(self):
+ self._setUpAdapters()
+
+ site = _makeFolder('site', site_folder=True)
+ site.title = 'test_export_site_with_csvaware'
+ site.description = 'Testing export of an site with CSV-aware content.'
+
+ site._setObject('aware', _makeCSVAware('aware'))
+
+ context = DummyExportContext(site)
+ exporter = self._getExporter()
+ exporter(context)
+
+ self.assertEqual(len(context._wrote), 3)
+ filename, text, content_type = context._wrote[0]
+ self.assertEqual(filename, 'structure/.objects')
+ self.assertEqual(content_type, 'text/comma-separated-values')
+
+ objects = [x for x in reader(StringIO(text))]
+ self.assertEqual(len(objects), 1)
+ self.assertEqual(objects[0][0], 'aware')
+ self.assertEqual(objects[0][1], TEST_CSV_AWARE)
+
+ filename, text, content_type = context._wrote[1]
+ self.assertEqual(filename, 'structure/.properties')
+ self.assertEqual(content_type, 'text/plain')
+
+ parser = ConfigParser()
+ parser.readfp(StringIO(text))
+
+ self.assertEqual(parser.get('DEFAULT', 'Title'),
+ site.title)
+ self.assertEqual(parser.get('DEFAULT', 'Description'),
+ site.description)
+
+ filename, text, content_type = context._wrote[2]
+ self.assertEqual(filename, 'structure/aware.csv')
+ self.assertEqual(content_type, 'text/comma-separated-values')
+ rows = [x for x in reader(StringIO(text))]
+ self.assertEqual(len(rows), 2)
+ self.assertEqual(rows[0][0], 'one')
+ self.assertEqual(rows[0][1], 'two')
+ self.assertEqual(rows[0][2], 'three')
+ self.assertEqual(rows[1][0], 'four')
+ self.assertEqual(rows[1][1], 'five')
+ self.assertEqual(rows[1][2], 'six')
+
+ def test_import_empty_site(self):
+ self._setUpAdapters()
+ site = _makeFolder('site', site_folder=True)
+ context = DummyImportContext(site)
+ context._files['structure/.objects'] = ''
+ importer = self._getImporter()
+ self.assertEqual(len(site.objectIds()), 0)
+ importer(context)
+ self.assertEqual(len(site.objectIds()), 0)
+
+ def test_import_empty_site_with_setup_tool(self):
+ self._setUpAdapters()
+ site = _makeFolder('site', site_folder=True)
+ site._setObject('setup_tool', self._makeSetupTool())
+ context = DummyImportContext(site)
+ importer = self._getImporter()
+
+ self.assertEqual(len(site.objectIds()), 1)
+ self.assertEqual(site.objectIds()[0], 'setup_tool')
+ importer(context)
+ self.assertEqual(len(site.objectIds()), 1)
+ self.assertEqual(site.objectIds()[0], 'setup_tool')
+
+ def test_import_site_with_subfolders(self):
+ self._setUpAdapters()
+ FOLDER_IDS = ('foo', 'bar', 'baz')
+
+ site = _makeFolder('site', site_folder=True)
+
+ context = DummyImportContext(site)
+
+ for id in FOLDER_IDS:
+ context._files['structure/%s/.objects' % id] = ''
+ context._files['structure/%s/.properties' % id] = (
+ _PROPERTIES_TEMPLATE % id )
+
+ _ROOT_OBJECTS = '\n'.join(['%s,%s' % (id, TEST_FOLDER)
+ for id in FOLDER_IDS])
+
+ context._files['structure/.objects'] = _ROOT_OBJECTS
+ context._files['structure/.properties'] = (
+ _PROPERTIES_TEMPLATE % 'Test Site')
+
+ importer = self._getImporter()
+ importer(context)
+
+ content = site.contentValues()
+ self.assertEqual(len(content), len(FOLDER_IDS))
+
+ def test_import_site_with_subitems(self):
+ self._setUpAdapters()
+ ITEM_IDS = ('foo', 'bar', 'baz')
+
+ site = _makeFolder('site', site_folder=True)
+
+ context = DummyImportContext(site)
+ # We want to add 'baz' to 'foo', without losing 'bar'
+ context._files['structure/.objects'] = '\n'.join(
+ ['%s,%s' % (x, TEST_INI_AWARE) for x in ITEM_IDS])
+ for index in range(len(ITEM_IDS)):
+ id = ITEM_IDS[index]
+ context._files[
+ 'structure/%s.ini' % id] = KNOWN_INI % ('Title: %s' % id,
+ 'xyzzy',
+ )
+ importer = self._getImporter()
+ importer(context)
+
+ after = site.objectIds()
+ self.assertEqual(len(after), len(ITEM_IDS))
+ for found_id, expected_id in zip(after, ITEM_IDS):
+ self.assertEqual(found_id, expected_id)
+
+ def test_import_site_with_subitem_unknown_portal_type(self):
+ self._setUpAdapters()
+ ITEM_IDS = ('foo', 'bar', 'baz')
+
+ site = _makeFolder('site', site_folder=True)
+
+ context = DummyImportContext(site)
+ # We want to add 'baz' to 'foo', without losing 'bar'
+ context._files['structure/.objects'] = '\n'.join(
+ ['%s,Unknown Type' % x for x in ITEM_IDS])
+ for index in range(len(ITEM_IDS)):
+ id = ITEM_IDS[index]
+ context._files[
+ 'structure/%s.ini' % id] = KNOWN_INI % ('Title: %s' % id,
+ 'xyzzy',
+ )
+
+ importer = self._getImporter()
+ importer(context)
+
+ after = site.objectIds()
+ self.assertEqual(len(after), 0)
+ self.assertEqual(len(context._notes), len(ITEM_IDS))
+ for component, message in context._notes:
+ self.assertEqual(component, 'SFWA')
+ self.failUnless(message.startswith("Couldn't make"))
+
+ def test_import_site_with_subitems_and_no_preserve(self):
+ self._setUpAdapters()
+ ITEM_IDS = ('foo', 'bar', 'baz')
+
+ site = _makeFolder('site', site_folder=True)
+ for id in ITEM_IDS:
+ site._setObject(id, _makeItem(id))
+
+ context = DummyImportContext(site)
+ # We want to add 'baz' to 'foo', without losing 'bar'
+ context._files['structure/.objects'] = ''
+
+ importer = self._getImporter()
+ importer(context)
+
+ self.assertEqual(len(site.objectIds()), 0)
+
+ def test_import_site_with_subitemss_and_preserve(self):
+ self._setUpAdapters()
+ ITEM_IDS = ('foo', 'bar', 'baz')
+
+ site = _makeFolder('site', site_folder=True)
+ for id in ITEM_IDS:
+ site._setObject(id, _makeItem(id))
+
+ context = DummyImportContext(site)
+ # We want to add 'baz' to 'foo', without losing 'bar'
+ context._files['structure/.objects'] = ''
+ context._files['structure/.preserve'] = '*'
+
+ importer = self._getImporter()
+ importer(context)
+
+ after = site.objectIds()
+ self.assertEqual(len(after), len(ITEM_IDS))
+ for i in range(len(ITEM_IDS)):
+ self.assertEqual(after[i], ITEM_IDS[i])
+
+ def test_import_site_with_subitemss_and_preserve_partial(self):
+ self._setUpAdapters()
+ ITEM_IDS = ('foo', 'bar', 'baz')
+
+ site = _makeFolder('site', site_folder=True)
+ for id in ITEM_IDS:
+ site._setObject(id, _makeItem(id))
+
+ context = DummyImportContext(site)
+ # We want to add 'baz' to 'foo', without losing 'bar'
+ context._files['structure/.objects'] = ''
+ context._files['structure/.preserve'] = 'b*'
+
+ importer = self._getImporter()
+ importer(context)
+
+ after = site.objectIds()
+ self.assertEqual(len(after), 2)
+ self.assertEqual(after[0], 'bar')
+ self.assertEqual(after[1], 'baz')
+
+ def test_import_site_with_subfolders_and_preserve(self):
+ self._setUpAdapters()
+
+ site = _makeFolder('site', site_folder=True)
+ site._setObject('foo', _makeFolder('foo'))
+ site.foo._setObject('bar', _makeFolder('bar'))
+
+ context = DummyImportContext(site)
+ # We want to add 'baz' to 'foo', without losing 'bar'
+ context._files['structure/.objects'] = 'foo,%s' % TEST_FOLDER
+ context._files['structure/.preserve'] = '*'
+ context._files['structure/foo/.objects'] = 'baz,%s' % TEST_FOLDER
+ context._files['structure/foo/.preserve'] = '*'
+ context._files['structure/foo/baz/.objects'] = ''
+
+ importer = self._getImporter()
+ importer(context)
+
+ self.assertEqual(len(site.objectIds()), 1)
+ self.assertEqual(site.objectIds()[0], 'foo')
+
+ self.assertEqual(len(site.foo.objectIds()), 2, site.foo.objectIds())
+ self.assertEqual(site.foo.objectIds()[0], 'bar')
+ self.assertEqual(site.foo.objectIds()[1], 'baz')
+
+
+class CSVAwareFileAdapterTests(unittest.TestCase,
+ ConformsToIFilesystemExporter,
+ ConformsToIFilesystemImporter,
+ ):
+
+ def _getTargetClass(self):
+ from Products.CMFCore.exportimport import CSVAwareFileAdapter
+ return CSVAwareFileAdapter
+
+ def _makeOne(self, context, *args, **kw):
+ return self._getTargetClass()(context, *args, **kw)
+
+ def _makeCSVAware(self, sheet_id, csv=''):
+ class Foo:
+ def getId(self):
+ return self._id
+ def as_csv(self):
+ return self.csv
+ def put_csv(self, stream):
+ self.new_csv = stream.getvalue()
+
+ foo = Foo()
+ foo._id = sheet_id
+ foo.csv = csv
+ foo.new_csv = None
+
+ return foo
+
+
+ def test_export_with_known_CSV(self):
+ KNOWN_CSV = """\
+one,two,three
+four,five,six
+"""
+ sheet = self._makeCSVAware('config', KNOWN_CSV)
+
+ adapter = self._makeOne(sheet)
+ context = DummyExportContext(None)
+ adapter.export(context, 'subpath/to/sheet')
+
+ self.assertEqual(len(context._wrote), 1)
+ filename, text, content_type = context._wrote[0]
+ self.assertEqual(filename, 'subpath/to/sheet/config.csv')
+ self.assertEqual(content_type, 'text/comma-separated-values')
+
+ self.assertEqual(text.strip(), KNOWN_CSV.strip())
+
+ def test_import_with_known_CSV(self):
+ ORIG_CSV = """\
+one,two,three
+four,five,six
+"""
+ NEW_CSV = """\
+four,five,six
+one,two,three
+"""
+ sheet = self._makeCSVAware('config', ORIG_CSV)
+
+ adapter = self._makeOne(sheet)
+ context = DummyImportContext(None)
+ context._files['subpath/to/sheet/config.csv'] = NEW_CSV
+ adapter.import_(context, 'subpath/to/sheet')
+
+ self.assertEqual(sheet.new_csv.strip(), NEW_CSV.strip())
+
+
+_PROPERTIES_TEMPLATE = """
+[DEFAULT]
+Title = %s
+Description = This is a test
+"""
+
+class INIAwareFileAdapterTests(unittest.TestCase,
+ ConformsToIFilesystemExporter,
+ ConformsToIFilesystemImporter,
+ ):
+
+ def _getTargetClass(self):
+ from Products.CMFCore.exportimport import INIAwareFileAdapter
+ return INIAwareFileAdapter
+
+ def _makeOne(self, context, *args, **kw):
+ return self._getTargetClass()(context, *args, **kw)
+
+ def test_export_ini_file(self):
+ ini_file = _makeINIAware('ini_file.html')
+ adapter = self._makeOne(ini_file)
+ context = DummyExportContext(None)
+ adapter.export(context, 'subpath/to')
+
+ self.assertEqual(len(context._wrote), 1)
+ filename, text, content_type = context._wrote[0]
+ self.assertEqual(filename, 'subpath/to/ini_file.html.ini')
+ self.assertEqual(content_type, 'text/plain')
+
+ self.assertEqual(text.strip(), ini_file.as_ini().strip())
+
+ def test_import_ini_file(self):
+ ini_file = _makeINIAware('ini_file.html')
+ adapter = self._makeOne(ini_file)
+ context = DummyImportContext(None)
+ context._files['subpath/to/ini_file.html.ini'] = (
+ KNOWN_INI % ('Title: ini_file', 'abc'))
+
+ adapter.import_(context, 'subpath/to')
+ text = ini_file._was_put
+ parser = ConfigParser()
+ parser.readfp(StringIO(text))
+ self.assertEqual(parser.get('DEFAULT', 'title'), 'Title: ini_file')
+ self.assertEqual(parser.get('DEFAULT', 'description'), 'abc')
+
+class Test_globpattern(unittest.TestCase):
+
+ NAMELIST = ('foo', 'bar', 'baz', 'bam', 'qux', 'quxx', 'quxxx')
+
+ def _checkResults(self, globpattern, namelist, expected):
+ from Products.CMFCore.exportimport import _globtest
+ found = _globtest(globpattern, namelist)
+ self.assertEqual(len(found), len(expected))
+ for found_item, expected_item in zip(found, expected):
+ self.assertEqual(found_item, expected_item)
+
+ def test_star(self):
+ self._checkResults('*', self.NAMELIST, self.NAMELIST)
+
+ def test_simple(self):
+ self._checkResults('b*', self.NAMELIST,
+ [x for x in self.NAMELIST if x.startswith('b')])
+
+ def test_multiple(self):
+ self._checkResults('b*\n*x', self.NAMELIST,
+ [x for x in self.NAMELIST
+ if x.startswith('b') or x.endswith('x')])
+
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(SiteStructureExporterTests))
+ suite.addTest(unittest.makeSuite(CSVAwareFileAdapterTests))
+ suite.addTest(unittest.makeSuite(INIAwareFileAdapterTests))
+ suite.addTest(unittest.makeSuite(Test_globpattern))
+ return suite
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
+
+
More information about the CMF-checkins
mailing list