[CMF-checkins] SVN: CMF/branches/mj-cmf14-compat-branch/CMFSetup/
Backport TarballImportContext from GS 1.1.
Tres Seaver
tseaver at palladion.com
Mon Mar 10 17:02:45 EDT 2008
Log message for revision 84577:
Backport TarballImportContext from GS 1.1.
Changed:
U CMF/branches/mj-cmf14-compat-branch/CMFSetup/context.py
U CMF/branches/mj-cmf14-compat-branch/CMFSetup/tests/test_context.py
-=-
Modified: CMF/branches/mj-cmf14-compat-branch/CMFSetup/context.py
===================================================================
--- CMF/branches/mj-cmf14-compat-branch/CMFSetup/context.py 2008-03-10 20:41:54 UTC (rev 84576)
+++ CMF/branches/mj-cmf14-compat-branch/CMFSetup/context.py 2008-03-10 21:02:40 UTC (rev 84577)
@@ -186,6 +186,112 @@
InitializeClass( DirectoryExportContext )
+class TarballImportContext( Implicit ):
+
+ __implements__ = ( IImportContext, )
+
+ security = ClassSecurityInfo()
+
+ def __init__( self, tool, archive_bits, encoding=None, should_purge=False ):
+
+ self._site = aq_parent( aq_inner( tool ) )
+ timestamp = time.gmtime()
+ self._archive_stream = StringIO(archive_bits)
+ self._archive = TarFile.open( 'foo.bar', 'r:gz'
+ , self._archive_stream )
+ self._encoding = encoding
+ self._should_purge = bool( should_purge )
+
+ security.declareProtected( ManagePortal, 'getSite' )
+ def getSite( self ):
+
+ """ See ISetupContext.
+ """
+ return aq_self(self._site)
+
+ security.declareProtected( ManagePortal, 'getEncoding' )
+ def getEncoding( self ):
+
+ """ See IImportContext.
+ """
+ return self._encoding
+
+ def readDataFile( self, filename, subdir=None ):
+
+ """ See IImportContext.
+ """
+ if subdir is not None:
+ filename = '/'.join( ( subdir, filename ) )
+
+ try:
+ file = self._archive.extractfile( filename )
+ except KeyError:
+ return None
+
+ return file.read()
+
+ def getLastModified( self, path ):
+
+ """ See IImportContext.
+ """
+ info = self._getTarInfo( path )
+ return info and info.mtime or None
+
+ def isDirectory( self, path ):
+
+ """ See IImportContext.
+ """
+ info = self._getTarInfo( path )
+
+ if info is not None:
+ return info.isdir()
+
+ def listDirectory(self, path, skip=()):
+
+ """ See IImportContext.
+ """
+ if path is None: # root is special case: no leading '/'
+ path = ''
+ else:
+ if not self.isDirectory(path):
+ return None
+
+ if path[-1] != '/':
+ path = path + '/'
+
+ pfx_len = len(path)
+
+ names = []
+ for name in self._archive.getnames():
+ if name == path or not name.startswith(path):
+ continue
+ name = name[pfx_len:]
+ if '/' in name or name in skip:
+ continue
+ names.append(name)
+
+ return names
+
+ def shouldPurge( self ):
+
+ """ See IImportContext.
+ """
+ return self._should_purge
+
+ def _getTarInfo( self, path ):
+ if path[-1] == '/':
+ path = path[:-1]
+ try:
+ return self._archive.getmember( path )
+ except KeyError:
+ pass
+ try:
+ return self._archive.getmember( path + '/' )
+ except KeyError:
+ return None
+
+
+
class TarballExportContext( Implicit ):
__implements__ = ( IExportContext, )
Modified: CMF/branches/mj-cmf14-compat-branch/CMFSetup/tests/test_context.py
===================================================================
--- CMF/branches/mj-cmf14-compat-branch/CMFSetup/tests/test_context.py 2008-03-10 20:41:54 UTC (rev 84576)
+++ CMF/branches/mj-cmf14-compat-branch/CMFSetup/tests/test_context.py 2008-03-10 21:02:40 UTC (rev 84577)
@@ -23,6 +23,8 @@
import os
import time
from StringIO import StringIO
+from tarfile import TarFile
+from tarfile import TarInfo
from DateTime.DateTime import DateTime
from OFS.Folder import Folder
@@ -380,6 +382,304 @@
self.assertEqual( open( fqname, 'rb' ).read(), digits )
+class TarballImportContextTests( SecurityRequestTest
+ , ConformsToISetupContext
+ , ConformsToIImportContext
+ ):
+
+ def _getTargetClass( self ):
+
+ from Products.CMFSetup.context import TarballImportContext
+ return TarballImportContext
+
+ def _makeOne( self, file_dict={}, mod_time=None, *args, **kw ):
+
+ archive_stream = StringIO()
+ archive = TarFile.open('test.tar.gz', 'w:gz', archive_stream)
+
+ def _addOneMember(path, data, modtime):
+ stream = StringIO(v)
+ info = TarInfo(k)
+ info.size = len(v)
+ info.mtime = mod_time
+ archive.addfile(info, stream)
+
+ def _addMember(path, data, modtime):
+ from tarfile import DIRTYPE
+ elements = path.split('/')
+ parents = filter(None, [elements[x] for x in range(len(elements))])
+ for parent in parents:
+ info = TarInfo()
+ info.name = parent
+ info.size = 0
+ info.mtime = mod_time
+ info.type = DIRTYPE
+ archive.addfile(info, StringIO())
+ _addOneMember(path, data, modtime)
+
+ file_items = file_dict.items() or [('dummy', '')] # empty archive barfs
+
+ if mod_time is None:
+ mod_time = time.time()
+
+ for k, v in file_items:
+ _addMember(k, v, mod_time)
+
+ archive.close()
+ bits = archive_stream.getvalue()
+
+ site = DummySite( 'site' ).__of__( self.root )
+ site._setObject( 'setup_tool', Folder( 'setup_tool' ) )
+ tool = site._getOb( 'setup_tool' )
+
+ ctx = self._getTargetClass()( tool, bits, *args, **kw )
+
+ return site, tool, ctx.__of__( tool )
+
+ def test_ctorparms( self ):
+
+ ENCODING = 'latin-1'
+ site, tool, ctx = self._makeOne( encoding=ENCODING
+ , should_purge=True
+ )
+
+ self.assertEqual( ctx.getEncoding(), ENCODING )
+ self.assertEqual( ctx.shouldPurge(), True )
+
+ def test_empty( self ):
+
+ site, tool, ctx = self._makeOne()
+
+ self.assertEqual( ctx.getSite(), site )
+ self.assertEqual( ctx.getEncoding(), None )
+ self.assertEqual( ctx.shouldPurge(), False )
+
+ # These methods are all specified to return 'None' for non-existing
+ # paths / entities
+ self.assertEqual( ctx.isDirectory( 'nonesuch/path' ), None )
+ self.assertEqual( ctx.listDirectory( 'nonesuch/path' ), None )
+
+ def test_readDataFile_nonesuch( self ):
+
+ FILENAME = 'nonesuch.txt'
+
+ site, tool, ctx = self._makeOne()
+
+ self.assertEqual( ctx.readDataFile( FILENAME ), None )
+ self.assertEqual( ctx.readDataFile( FILENAME, 'subdir' ), None )
+
+ def test_readDataFile_simple( self ):
+
+ from string import printable
+
+ FILENAME = 'simple.txt'
+
+ site, tool, ctx = self._makeOne( { FILENAME: printable } )
+
+ self.assertEqual( ctx.readDataFile( FILENAME ), printable )
+
+ def test_readDataFile_subdir( self ):
+
+ from string import printable
+
+ FILENAME = 'subdir.txt'
+ SUBDIR = 'subdir'
+
+ site, tool, ctx = self._makeOne( { '%s/%s' % (SUBDIR, FILENAME):
+ printable } )
+
+ self.assertEqual( ctx.readDataFile( FILENAME, SUBDIR ), printable )
+
+ def test_getLastModified_nonesuch( self ):
+
+ FILENAME = 'nonesuch.txt'
+
+ site, tool, ctx = self._makeOne()
+
+ self.assertEqual( ctx.getLastModified( FILENAME ), None )
+
+ def test_getLastModified_simple( self ):
+
+ from string import printable
+
+ FILENAME = 'simple.txt'
+ WHEN = DateTime( '2004-01-01T00:00:00Z' )
+
+ site, tool, ctx = self._makeOne( { FILENAME : printable }
+ , mod_time=WHEN )
+
+ self.assertEqual( ctx.getLastModified( FILENAME ), WHEN )
+
+ def test_getLastModified_subdir( self ):
+
+ from string import printable
+
+ FILENAME = 'subdir.txt'
+ SUBDIR = 'subdir'
+ PATH = '%s/%s' % ( SUBDIR, FILENAME )
+ WHEN = DateTime( '2004-01-01T00:00:00Z' )
+
+ site, tool, ctx = self._makeOne( { PATH: printable }
+ , mod_time=WHEN )
+
+ self.assertEqual( ctx.getLastModified( PATH ), WHEN )
+
+ def test_getLastModified_directory( self ):
+
+ from string import printable
+
+ FILENAME = 'subdir.txt'
+ SUBDIR = 'subdir'
+ PATH = '%s/%s' % ( SUBDIR, FILENAME )
+ WHEN = DateTime( '2004-01-01T00:00:00Z' )
+
+ site, tool, ctx = self._makeOne( { PATH: printable }
+ , mod_time=WHEN
+ )
+
+ self.assertEqual( ctx.getLastModified( SUBDIR ), WHEN )
+
+ def test_isDirectory_nonesuch( self ):
+
+ FILENAME = 'nonesuch.txt'
+
+ site, tool, ctx = self._makeOne()
+
+ self.assertEqual( ctx.isDirectory( FILENAME ), None )
+
+ def test_isDirectory_simple( self ):
+
+ from string import printable
+
+ FILENAME = 'simple.txt'
+
+ site, tool, ctx = self._makeOne( { FILENAME: printable } )
+
+ self.assertEqual( ctx.isDirectory( FILENAME ), False )
+
+ def test_isDirectory_nested( self ):
+
+ from string import printable
+
+ SUBDIR = 'subdir'
+ FILENAME = 'nested.txt'
+ PATH = '%s/%s' % ( SUBDIR, FILENAME )
+
+ site, tool, ctx = self._makeOne( { PATH: printable } )
+
+ self.assertEqual( ctx.isDirectory( PATH ), False )
+
+ def test_isDirectory_subdir( self ):
+
+ from string import printable
+
+ SUBDIR = 'subdir'
+ FILENAME = 'nested.txt'
+ PATH = '%s/%s' % ( SUBDIR, FILENAME )
+
+ site, tool, ctx = self._makeOne( { PATH: printable } )
+
+ self.assertEqual( ctx.isDirectory( SUBDIR ), True )
+
+ def test_listDirectory_nonesuch( self ):
+
+ SUBDIR = 'nonesuch/path'
+
+ site, tool, ctx = self._makeOne()
+
+ self.assertEqual( ctx.listDirectory( SUBDIR ), None )
+
+ def test_listDirectory_root( self ):
+
+ from string import printable
+
+ FILENAME = 'simple.txt'
+
+ site, tool, ctx = self._makeOne( { FILENAME: printable } )
+
+ self.assertEqual( len( ctx.listDirectory( None ) ), 1 )
+ self.failUnless( FILENAME in ctx.listDirectory( None ) )
+
+ def test_listDirectory_simple( self ):
+
+ from string import printable
+
+ FILENAME = 'simple.txt'
+
+ site, tool, ctx = self._makeOne( { FILENAME: printable } )
+
+ self.assertEqual( ctx.listDirectory( FILENAME ), None )
+
+ def test_listDirectory_nested( self ):
+
+ from string import printable
+
+ SUBDIR = 'subdir'
+ FILENAME = 'nested.txt'
+ PATH = '%s/%s' % ( SUBDIR, FILENAME )
+
+ site, tool, ctx = self._makeOne( { PATH: printable } )
+
+ self.assertEqual( ctx.listDirectory( PATH ), None )
+
+ def test_listDirectory_single( self ):
+
+ from string import printable
+
+ SUBDIR = 'subdir'
+ FILENAME = 'nested.txt'
+ PATH = '%s/%s' % ( SUBDIR, FILENAME )
+
+ site, tool, ctx = self._makeOne( { PATH: printable } )
+
+ names = ctx.listDirectory( SUBDIR )
+ self.assertEqual( len( names ), 1 )
+ self.failUnless( FILENAME in names )
+
+ def test_listDirectory_multiple( self ):
+
+ from string import printable, uppercase
+
+ SUBDIR = 'subdir'
+ FILENAME1 = 'nested.txt'
+ PATH1 = '%s/%s' % ( SUBDIR, FILENAME1 )
+ FILENAME2 = 'another.txt'
+ PATH2 = '%s/%s' % ( SUBDIR, FILENAME2 )
+
+ site, tool, ctx = self._makeOne( { PATH1: printable
+ , PATH2: uppercase
+ } )
+
+ names = ctx.listDirectory( SUBDIR )
+ self.assertEqual( len( names ), 2 )
+ self.failUnless( FILENAME1 in names )
+ self.failUnless( FILENAME2 in names )
+
+ def test_listDirectory_skip( self ):
+
+ from string import printable, uppercase
+
+ SUBDIR = 'subdir'
+ FILENAME1 = 'nested.txt'
+ PATH1 = '%s/%s' % ( SUBDIR, FILENAME1 )
+ FILENAME2 = 'another.txt'
+ PATH2 = '%s/%s' % ( SUBDIR, FILENAME2 )
+ FILENAME3 = 'another.bak'
+ PATH3 = '%s/%s' % ( SUBDIR, FILENAME3 )
+
+ site, tool, ctx = self._makeOne( { PATH1: printable
+ , PATH2: uppercase
+ , PATH3: 'xyz'
+ } )
+
+ names = ctx.listDirectory(SUBDIR, skip=(FILENAME1,))
+ self.assertEqual( len( names ), 2 )
+ self.failIf( FILENAME1 in names )
+ self.failUnless( FILENAME2 in names )
+ self.failUnless( FILENAME3 in names )
+
+
+
class TarballExportContextTests( FilesystemTestBase
, TarballTester
, ConformsToISetupContext
@@ -977,6 +1277,7 @@
unittest.makeSuite( DirectoryImportContextTests ),
unittest.makeSuite( DirectoryExportContextTests ),
unittest.makeSuite( TarballExportContextTests ),
+ unittest.makeSuite( TarballImportContextTests ),
unittest.makeSuite( SnapshotExportContextTests ),
unittest.makeSuite( SnapshotImportContextTests ),
))
More information about the CMF-checkins
mailing list