[Zodb-checkins] SVN: ZODB/trunk/src/ZODB/scripts/ Merged the tseaver-better_repozo_tests branch.
Jim Fulton
jim at zope.com
Fri May 14 14:30:13 EDT 2010
Log message for revision 112314:
Merged the tseaver-better_repozo_tests branch.
Changed:
U ZODB/trunk/src/ZODB/scripts/repozo.py
U ZODB/trunk/src/ZODB/scripts/tests/test_repozo.py
-=-
Modified: ZODB/trunk/src/ZODB/scripts/repozo.py
===================================================================
--- ZODB/trunk/src/ZODB/scripts/repozo.py 2010-05-14 17:51:11 UTC (rev 112313)
+++ ZODB/trunk/src/ZODB/scripts/repozo.py 2010-05-14 18:30:13 UTC (rev 112314)
@@ -93,6 +93,9 @@
VERBOSE = False
+class WouldOverwriteFiles(Exception):
+ pass
+
def usage(code, msg=''):
outfp = sys.stderr
if code == 0:
@@ -301,7 +304,9 @@
ext = '.deltafs'
if options.gzip:
ext += 'z'
- t = time.gmtime()[:6] + (ext,)
+ # Hook for testing
+ now = getattr(options, 'test_now', time.gmtime()[:6])
+ t = now + (ext,)
return '%04d-%02d-%02d-%02d-%02d-%02d%s' % t
# Return a list of files needed to reproduce state at time options.date.
@@ -419,8 +424,7 @@
options.full = True
dest = os.path.join(options.repository, gen_filename(options))
if os.path.exists(dest):
- print >> sys.stderr, 'Cannot overwrite existing file:', dest
- sys.exit(2)
+ raise WouldOverwriteFiles('Cannot overwrite existing file: %s' % dest)
log('writing full backup: %s bytes to %s', pos, dest)
sum = copyfile(options, dest, 0, pos)
# Write the data file for this full backup
@@ -447,8 +451,7 @@
options.full = False
dest = os.path.join(options.repository, gen_filename(options))
if os.path.exists(dest):
- print >> sys.stderr, 'Cannot overwrite existing file:', dest
- sys.exit(2)
+ raise WouldOverwriteFiles('Cannot overwrite existing file: %s' % dest)
log('writing incremental: %s bytes to %s', pos-reposz, dest)
sum = copyfile(options, dest, reposz, pos - reposz)
# The first file in repofiles points to the last full backup. Use this to
@@ -570,7 +573,11 @@
argv = sys.argv[1:]
options = parseargs(argv)
if options.mode == BACKUP:
- do_backup(options)
+ try:
+ do_backup(options)
+ except WouldOverwriteFiles, e:
+ print >> sys.stderr, str(e)
+ sys.exit(2)
else:
assert options.mode == RECOVER
do_recover(options)
Modified: ZODB/trunk/src/ZODB/scripts/tests/test_repozo.py
===================================================================
--- ZODB/trunk/src/ZODB/scripts/tests/test_repozo.py 2010-05-14 17:51:11 UTC (rev 112313)
+++ ZODB/trunk/src/ZODB/scripts/tests/test_repozo.py 2010-05-14 18:30:13 UTC (rev 112314)
@@ -13,12 +13,21 @@
##############################################################################
import unittest
import os
+try:
+ # the hashlib package is available from Python 2.5
+ from hashlib import md5
+except ImportError:
+ # the md5 package is deprecated in Python 2.6
+ from md5 import new as md5
+
import ZODB.tests.util # layer used at class scope
_NOISY = os.environ.get('NOISY_REPOZO_TEST_OUTPUT')
class OurDB:
+ _file_name = None
+
def __init__(self, dir):
from BTrees.OOBTree import OOBTree
import transaction
@@ -27,12 +36,13 @@
conn = self.db.open()
conn.root()['tree'] = OOBTree()
transaction.commit()
+ self.pos = self.db.storage._pos
self.close()
def getdb(self):
from ZODB import DB
from ZODB.FileStorage import FileStorage
- storage_filename = os.path.join(self.dir, 'Data.fs')
+ self._file_name = storage_filename = os.path.join(self.dir, 'Data.fs')
storage = FileStorage(storage_filename)
self.db = DB(storage)
@@ -63,11 +73,574 @@
if keys:
del tree[keys[0]]
transaction.commit()
+ self.pos = self.db.storage._pos
self.close()
-class RepozoTests(unittest.TestCase):
+class FileopsBase:
+ def _makeChunks(self):
+ from ZODB.scripts.repozo import READCHUNK
+ return ['x' * READCHUNK, 'y' * READCHUNK, 'z']
+
+ def _makeFile(self, text=None):
+ from StringIO import StringIO
+ if text is None:
+ text = ''.join(self._makeChunks())
+ return StringIO(text)
+
+
+class Test_dofile(unittest.TestCase, FileopsBase):
+
+ def _callFUT(self, func, fp, n):
+ from ZODB.scripts.repozo import dofile
+ return dofile(func, fp, n)
+
+ def test_empty_read_all(self):
+ chunks = []
+ file = self._makeFile('')
+ bytes = self._callFUT(chunks.append, file, None)
+ self.assertEqual(bytes, 0)
+ self.assertEqual(chunks, [])
+
+ def test_empty_read_count(self):
+ chunks = []
+ file = self._makeFile('')
+ bytes = self._callFUT(chunks.append, file, 42)
+ self.assertEqual(bytes, 0)
+ self.assertEqual(chunks, [])
+
+ def test_nonempty_read_all(self):
+ chunks = []
+ file = self._makeFile()
+ bytes = self._callFUT(chunks.append, file, None)
+ self.assertEqual(bytes, file.tell())
+ self.assertEqual(chunks, self._makeChunks())
+
+ def test_nonempty_read_count(self):
+ chunks = []
+ file = self._makeFile()
+ bytes = self._callFUT(chunks.append, file, 42)
+ self.assertEqual(bytes, 42)
+ self.assertEqual(chunks, ['x' * 42])
+
+
+class Test_checksum(unittest.TestCase, FileopsBase):
+
+ def _callFUT(self, fp, n):
+ from ZODB.scripts.repozo import checksum
+ return checksum(fp, n)
+
+ def test_empty_read_all(self):
+ file = self._makeFile('')
+ sum = self._callFUT(file, None)
+ self.assertEqual(sum, md5('').hexdigest())
+
+ def test_empty_read_count(self):
+ file = self._makeFile('')
+ sum = self._callFUT(file, 42)
+ self.assertEqual(sum, md5('').hexdigest())
+
+ def test_nonempty_read_all(self):
+ file = self._makeFile()
+ sum = self._callFUT(file, None)
+ self.assertEqual(sum, md5(''.join(self._makeChunks())).hexdigest())
+
+ def test_nonempty_read_count(self):
+ chunks = []
+ file = self._makeFile()
+ sum = self._callFUT(file, 42)
+ self.assertEqual(sum, md5('x' * 42).hexdigest())
+
+
+class OptionsTestBase:
+
+ _repository_directory = None
+ _data_directory = None
+
+ def tearDown(self):
+ if self._repository_directory is not None:
+ from shutil import rmtree
+ rmtree(self._repository_directory)
+ if self._data_directory is not None:
+ from shutil import rmtree
+ rmtree(self._data_directory)
+
+ def _makeOptions(self, **kw):
+ import tempfile
+ self._repository_directory = tempfile.mkdtemp()
+ class Options(object):
+ repository = self._repository_directory
+ def __init__(self, **kw):
+ self.__dict__.update(kw)
+ return Options(**kw)
+
+
+class Test_copyfile(OptionsTestBase, unittest.TestCase):
+
+ def _callFUT(self, options, dest, start, n):
+ from ZODB.scripts.repozo import copyfile
+ return copyfile(options, dest, start, n)
+
+ def test_no_gzip(self):
+ options = self._makeOptions(gzip=False)
+ source = options.file = os.path.join(self._repository_directory,
+ 'source.txt')
+ f = open(source, 'wb')
+ f.write('x' * 1000)
+ f.close()
+ target = os.path.join(self._repository_directory, 'target.txt')
+ sum = self._callFUT(options, target, 0, 100)
+ self.assertEqual(sum, md5('x' * 100).hexdigest())
+ self.assertEqual(open(target, 'rb').read(), 'x' * 100)
+
+ def test_w_gzip(self):
+ import gzip
+ options = self._makeOptions(gzip=True)
+ source = options.file = os.path.join(self._repository_directory,
+ 'source.txt')
+ f = open(source, 'wb')
+ f.write('x' * 1000)
+ f.close()
+ target = os.path.join(self._repository_directory, 'target.txt')
+ sum = self._callFUT(options, target, 0, 100)
+ self.assertEqual(sum, md5('x' * 100).hexdigest())
+ self.assertEqual(gzip.open(target, 'rb').read(), 'x' * 100)
+
+
+class Test_concat(OptionsTestBase, unittest.TestCase):
+
+ def _callFUT(self, files, ofp):
+ from ZODB.scripts.repozo import concat
+ return concat(files, ofp)
+
+ def _makeFile(self, name, text, gzip_file=False):
+ import gzip
+ import tempfile
+ if self._repository_directory is None:
+ self._repository_directory = tempfile.mkdtemp()
+ fqn = os.path.join(self._repository_directory, name)
+ if gzip_file:
+ f = gzip.open(fqn, 'wb')
+ else:
+ f = open(fqn, 'wb')
+ f.write(text)
+ f.flush()
+ f.close()
+ return fqn
+
+ def test_empty_list_no_ofp(self):
+ bytes, sum = self._callFUT([], None)
+ self.assertEqual(bytes, 0)
+ self.assertEqual(sum, md5('').hexdigest())
+
+ def test_w_plain_files_no_ofp(self):
+ files = [self._makeFile(x, x, False) for x in 'ABC']
+ bytes, sum = self._callFUT(files, None)
+ self.assertEqual(bytes, 3)
+ self.assertEqual(sum, md5('ABC').hexdigest())
+
+ def test_w_gzipped_files_no_ofp(self):
+ files = [self._makeFile('%s.fsz' % x, x, True) for x in 'ABC']
+ bytes, sum = self._callFUT(files, None)
+ self.assertEqual(bytes, 3)
+ self.assertEqual(sum, md5('ABC').hexdigest())
+
+ def test_w_ofp(self):
+
+ class Faux:
+ _closed = False
+ def __init__(self):
+ self._written = []
+ def write(self, data):
+ self._written.append(data)
+ def close(self):
+ self._closed = True
+
+ files = [self._makeFile(x, x, False) for x in 'ABC']
+ ofp = Faux()
+ bytes, sum = self._callFUT(files, ofp)
+ self.assertEqual(ofp._written, [x for x in 'ABC'])
+ self.failUnless(ofp._closed)
+
+_marker = object()
+class Test_gen_filename(OptionsTestBase, unittest.TestCase):
+
+ def _callFUT(self, options, ext=_marker):
+ from ZODB.scripts.repozo import gen_filename
+ if ext is _marker:
+ return gen_filename(options)
+ return gen_filename(options, ext)
+
+ def test_explicit_ext(self):
+ options = self._makeOptions(test_now = (2010, 5, 14, 12, 52, 31))
+ fn = self._callFUT(options, '.txt')
+ self.assertEqual(fn, '2010-05-14-12-52-31.txt')
+
+ def test_full_no_gzip(self):
+ options = self._makeOptions(test_now = (2010, 5, 14, 12, 52, 31),
+ full = True,
+ gzip = False,
+ )
+ fn = self._callFUT(options)
+ self.assertEqual(fn, '2010-05-14-12-52-31.fs')
+
+ def test_full_w_gzip(self):
+ options = self._makeOptions(test_now = (2010, 5, 14, 12, 52, 31),
+ full = True,
+ gzip = True,
+ )
+ fn = self._callFUT(options)
+ self.assertEqual(fn, '2010-05-14-12-52-31.fsz')
+
+ def test_incr_no_gzip(self):
+ options = self._makeOptions(test_now = (2010, 5, 14, 12, 52, 31),
+ full = False,
+ gzip = False,
+ )
+ fn = self._callFUT(options)
+ self.assertEqual(fn, '2010-05-14-12-52-31.deltafs')
+
+ def test_incr_w_gzip(self):
+ options = self._makeOptions(test_now = (2010, 5, 14, 12, 52, 31),
+ full = False,
+ gzip = True,
+ )
+ fn = self._callFUT(options)
+ self.assertEqual(fn, '2010-05-14-12-52-31.deltafsz')
+
+
+class Test_find_files(OptionsTestBase, unittest.TestCase):
+
+ def _callFUT(self, options):
+ from ZODB.scripts.repozo import find_files
+ return find_files(options)
+
+ def _makeFile(self, hour, min, sec, ext):
+ # call _makeOptions first!
+ name = '2010-05-14-%02d-%02d-%02d%s' % (hour, min, sec, ext)
+ fqn = os.path.join(self._repository_directory, name)
+ f = open(fqn, 'wb')
+ f.write(name)
+ f.flush()
+ f.close()
+ return fqn
+
+ def test_explicit_date(self):
+ options = self._makeOptions(date='2010-05-14-13-30-57')
+ files = []
+ for h, m, s, e in [(2, 13, 14, '.fs'),
+ (2, 13, 14, '.dat'),
+ (3, 14, 15, '.deltafs'),
+ (4, 14, 15, '.deltafs'),
+ (5, 14, 15, '.deltafs'),
+ (12, 13, 14, '.fs'),
+ (12, 13, 14, '.dat'),
+ (13, 14, 15, '.deltafs'),
+ (14, 15, 16, '.deltafs'),
+ ]:
+ files.append(self._makeFile(h, m, s, e))
+ found = self._callFUT(options)
+ # Older files, .dat file not included
+ self.assertEqual(found, [files[5], files[7]])
+
+ def test_using_gen_filename(self):
+ options = self._makeOptions(date=None,
+ test_now=(2010, 5, 14, 13, 30, 57))
+ files = []
+ for h, m, s, e in [(2, 13, 14, '.fs'),
+ (2, 13, 14, '.dat'),
+ (3, 14, 15, '.deltafs'),
+ (4, 14, 15, '.deltafs'),
+ (5, 14, 15, '.deltafs'),
+ (12, 13, 14, '.fs'),
+ (12, 13, 14, '.dat'),
+ (13, 14, 15, '.deltafs'),
+ (14, 15, 16, '.deltafs'),
+ ]:
+ files.append(self._makeFile(h, m, s, e))
+ found = self._callFUT(options)
+ # Older files, .dat file not included
+ self.assertEqual(found, [files[5], files[7]])
+
+
+class Test_scandat(OptionsTestBase, unittest.TestCase):
+
+ def _callFUT(self, repofiles):
+ from ZODB.scripts.repozo import scandat
+ return scandat(repofiles)
+
+ def test_no_dat_file(self):
+ options = self._makeOptions()
+ fsfile = os.path.join(self._repository_directory, 'foo.fs')
+ fn, startpos, endpos, sum = self._callFUT([fsfile])
+ self.assertEqual(fn, None)
+ self.assertEqual(startpos, None)
+ self.assertEqual(endpos, None)
+ self.assertEqual(sum, None)
+
+ def test_empty_dat_file(self):
+ options = self._makeOptions()
+ fsfile = os.path.join(self._repository_directory, 'foo.fs')
+ datfile = os.path.join(self._repository_directory, 'foo.dat')
+ open(datfile, 'wb').close()
+ fn, startpos, endpos, sum = self._callFUT([fsfile])
+ self.assertEqual(fn, None)
+ self.assertEqual(startpos, None)
+ self.assertEqual(endpos, None)
+ self.assertEqual(sum, None)
+
+ def test_single_line(self):
+ options = self._makeOptions()
+ fsfile = os.path.join(self._repository_directory, 'foo.fs')
+ datfile = os.path.join(self._repository_directory, 'foo.dat')
+ f = open(datfile, 'wb')
+ f.write('foo.fs 0 123 ABC\n')
+ f.flush()
+ f.close()
+ fn, startpos, endpos, sum = self._callFUT([fsfile])
+ self.assertEqual(fn, 'foo.fs')
+ self.assertEqual(startpos, 0)
+ self.assertEqual(endpos, 123)
+ self.assertEqual(sum, 'ABC')
+
+ def test_multiple_lines(self):
+ options = self._makeOptions()
+ fsfile = os.path.join(self._repository_directory, 'foo.fs')
+ datfile = os.path.join(self._repository_directory, 'foo.dat')
+ f = open(datfile, 'wb')
+ f.write('foo.fs 0 123 ABC\n')
+ f.write('bar.deltafs 123 456 DEF\n')
+ f.flush()
+ f.close()
+ fn, startpos, endpos, sum = self._callFUT([fsfile])
+ self.assertEqual(fn, 'bar.deltafs')
+ self.assertEqual(startpos, 123)
+ self.assertEqual(endpos, 456)
+ self.assertEqual(sum, 'DEF')
+
+
+class Test_delete_old_backups(OptionsTestBase, unittest.TestCase):
+
+ def _makeOptions(self, filenames=()):
+ options = super(Test_delete_old_backups, self)._makeOptions()
+ for filename in filenames:
+ fqn = os.path.join(options.repository, filename)
+ f = open(fqn, 'wb')
+ f.write('testing delete_old_backups')
+ f.close()
+ return options
+
+ def _callFUT(self, options=None, filenames=()):
+ from ZODB.scripts.repozo import delete_old_backups
+ if options is None:
+ options = self._makeOptions(filenames)
+ return delete_old_backups(options)
+
+ def test_empty_dir_doesnt_raise(self):
+ self._callFUT()
+ self.assertEqual(len(os.listdir(self._repository_directory)), 0)
+
+ def test_no_repozo_files_doesnt_raise(self):
+ FILENAMES = ['bogus.txt', 'not_a_repozo_file']
+ self._callFUT(filenames=FILENAMES)
+ remaining = os.listdir(self._repository_directory)
+ self.assertEqual(len(remaining), len(FILENAMES))
+ for name in FILENAMES:
+ fqn = os.path.join(self._repository_directory, name)
+ self.failUnless(os.path.isfile(fqn))
+
+ def test_doesnt_remove_current_repozo_files(self):
+ FILENAMES = ['2009-12-20-10-08-03.fs', '2009-12-20-10-08-03.dat']
+ self._callFUT(filenames=FILENAMES)
+ remaining = os.listdir(self._repository_directory)
+ self.assertEqual(len(remaining), len(FILENAMES))
+ for name in FILENAMES:
+ fqn = os.path.join(self._repository_directory, name)
+ self.failUnless(os.path.isfile(fqn))
+
+ def test_removes_older_repozo_files(self):
+ OLDER_FULL = ['2009-12-20-00-01-03.fs', '2009-12-20-00-01-03.dat']
+ DELTAS = ['2009-12-21-00-00-01.deltafs', '2009-12-22-00-00-01.deltafs']
+ CURRENT_FULL = ['2009-12-23-00-00-01.fs', '2009-12-23-00-00-01.dat']
+ FILENAMES = OLDER_FULL + DELTAS + CURRENT_FULL
+ self._callFUT(filenames=FILENAMES)
+ remaining = os.listdir(self._repository_directory)
+ self.assertEqual(len(remaining), len(CURRENT_FULL))
+ for name in OLDER_FULL:
+ fqn = os.path.join(self._repository_directory, name)
+ self.failIf(os.path.isfile(fqn))
+ for name in DELTAS:
+ fqn = os.path.join(self._repository_directory, name)
+ self.failIf(os.path.isfile(fqn))
+ for name in CURRENT_FULL:
+ fqn = os.path.join(self._repository_directory, name)
+ self.failUnless(os.path.isfile(fqn))
+
+ def test_removes_older_repozo_files_zipped(self):
+ OLDER_FULL = ['2009-12-20-00-01-03.fsz', '2009-12-20-00-01-03.dat']
+ DELTAS = ['2009-12-21-00-00-01.deltafsz',
+ '2009-12-22-00-00-01.deltafsz']
+ CURRENT_FULL = ['2009-12-23-00-00-01.fsz', '2009-12-23-00-00-01.dat']
+ FILENAMES = OLDER_FULL + DELTAS + CURRENT_FULL
+ self._callFUT(filenames=FILENAMES)
+ remaining = os.listdir(self._repository_directory)
+ self.assertEqual(len(remaining), len(CURRENT_FULL))
+ for name in OLDER_FULL:
+ fqn = os.path.join(self._repository_directory, name)
+ self.failIf(os.path.isfile(fqn))
+ for name in DELTAS:
+ fqn = os.path.join(self._repository_directory, name)
+ self.failIf(os.path.isfile(fqn))
+ for name in CURRENT_FULL:
+ fqn = os.path.join(self._repository_directory, name)
+ self.failUnless(os.path.isfile(fqn))
+
+
+class Test_do_full_backup(OptionsTestBase, unittest.TestCase):
+
+ def _callFUT(self, options):
+ from ZODB.scripts.repozo import do_full_backup
+ return do_full_backup(options)
+
+ def _makeDB(self):
+ import tempfile
+ datadir = self._data_directory = tempfile.mkdtemp()
+ return OurDB(self._data_directory)
+
+ def test_dont_overwrite_existing_file(self):
+ from ZODB.scripts.repozo import WouldOverwriteFiles
+ from ZODB.scripts.repozo import gen_filename
+ db = self._makeDB()
+ options = self._makeOptions(full=True,
+ file=db._file_name,
+ gzip=False,
+ test_now = (2010, 5, 14, 10, 51, 22),
+ )
+ f = open(os.path.join(self._repository_directory,
+ gen_filename(options)), 'w')
+ f.write('TESTING')
+ f.flush()
+ f.close()
+ self.assertRaises(WouldOverwriteFiles, self._callFUT, options)
+
+ def test_empty(self):
+ from ZODB.scripts.repozo import gen_filename
+ db = self._makeDB()
+ options = self._makeOptions(file=db._file_name,
+ gzip=False,
+ killold=False,
+ test_now = (2010, 5, 14, 10, 51, 22),
+ )
+ self._callFUT(options)
+ target = os.path.join(self._repository_directory,
+ gen_filename(options))
+ original = open(db._file_name, 'rb').read()
+ self.assertEqual(open(target, 'rb').read(), original)
+ datfile = os.path.join(self._repository_directory,
+ gen_filename(options, '.dat'))
+ self.assertEqual(open(datfile).read(),
+ '%s 0 %d %s\n' %
+ (target, len(original), md5(original).hexdigest()))
+
+
+class Test_do_incremental_backup(OptionsTestBase, unittest.TestCase):
+
+ def _callFUT(self, options, reposz, repofiles):
+ from ZODB.scripts.repozo import do_incremental_backup
+ return do_incremental_backup(options, reposz, repofiles)
+
+ def _makeDB(self):
+ import tempfile
+ datadir = self._data_directory = tempfile.mkdtemp()
+ return OurDB(self._data_directory)
+
+ def test_dont_overwrite_existing_file(self):
+ from ZODB.scripts.repozo import WouldOverwriteFiles
+ from ZODB.scripts.repozo import gen_filename
+ from ZODB.scripts.repozo import find_files
+ db = self._makeDB()
+ options = self._makeOptions(full=False,
+ file=db._file_name,
+ gzip=False,
+ test_now = (2010, 5, 14, 10, 51, 22),
+ date = None,
+ )
+ f = open(os.path.join(self._repository_directory,
+ gen_filename(options)), 'w')
+ f.write('TESTING')
+ f.flush()
+ f.close()
+ repofiles = find_files(options)
+ self.assertRaises(WouldOverwriteFiles,
+ self._callFUT, options, 0, repofiles)
+
+ def test_no_changes(self):
+ from ZODB.scripts.repozo import gen_filename
+ db = self._makeDB()
+ oldpos = db.pos
+ options = self._makeOptions(file=db._file_name,
+ gzip=False,
+ killold=False,
+ test_now = (2010, 5, 14, 10, 51, 22),
+ date = None,
+ )
+ fullfile = os.path.join(self._repository_directory,
+ '2010-05-14-00-00-00.fs')
+ original = open(db._file_name, 'rb').read()
+ last = len(original)
+ f = open(fullfile, 'wb')
+ f.write(original)
+ f.flush()
+ f.close()
+ datfile = os.path.join(self._repository_directory,
+ '2010-05-14-00-00-00.dat')
+ repofiles = [fullfile, datfile]
+ self._callFUT(options, oldpos, repofiles)
+ target = os.path.join(self._repository_directory,
+ gen_filename(options))
+ self.assertEqual(open(target, 'rb').read(), '')
+ self.assertEqual(open(datfile).read(),
+ '%s %d %d %s\n' %
+ (target, oldpos, oldpos, md5('').hexdigest()))
+
+ def test_w_changes(self):
+ from ZODB.scripts.repozo import gen_filename
+ db = self._makeDB()
+ oldpos = db.pos
+ options = self._makeOptions(file=db._file_name,
+ gzip=False,
+ killold=False,
+ test_now = (2010, 5, 14, 10, 51, 22),
+ date = None,
+ )
+ fullfile = os.path.join(self._repository_directory,
+ '2010-05-14-00-00-00.fs')
+ original = open(db._file_name, 'rb').read()
+ f = open(fullfile, 'wb')
+ f.write(original)
+ f.flush()
+ f.close()
+ datfile = os.path.join(self._repository_directory,
+ '2010-05-14-00-00-00.dat')
+ repofiles = [fullfile, datfile]
+ db.mutate()
+ newpos = db.pos
+ self._callFUT(options, oldpos, repofiles)
+ target = os.path.join(self._repository_directory,
+ gen_filename(options))
+ f = open(db._file_name, 'rb')
+ f.seek(oldpos)
+ increment = f.read()
+ self.assertEqual(open(target, 'rb').read(), increment)
+ self.assertEqual(open(datfile).read(),
+ '%s %d %d %s\n' %
+ (target, oldpos, newpos,
+ md5(increment).hexdigest()))
+
+
+class MonteCarloTests(unittest.TestCase):
+
layer = ZODB.tests.util.MininalTestLayer('repozo')
def setUp(self):
@@ -96,7 +669,7 @@
from ZODB.scripts.repozo import main
main(argv)
- def testRepozo(self):
+ def test_via_monte_carlo(self):
self.saved_snapshots = [] # list of (name, time) pairs for copies.
for i in range(100):
@@ -168,94 +741,18 @@
(correctpath, when, ' '.join(argv)))
self.assertEquals(fguts, gguts, msg)
-class Test_delete_old_backups(unittest.TestCase):
- _repository_directory = None
-
- def tearDown(self):
- if self._repository_directory is not None:
- from shutil import rmtree
- rmtree(self._repository_directory)
-
- def _callFUT(self, options=None, filenames=()):
- from ZODB.scripts.repozo import delete_old_backups
- if options is None:
- options = self._makeOptions(filenames)
- delete_old_backups(options)
-
- def _makeOptions(self, filenames=()):
- import tempfile
- dir = self._repository_directory = tempfile.mkdtemp()
- for filename in filenames:
- fqn = os.path.join(dir, filename)
- f = open(fqn, 'wb')
- f.write('testing delete_old_backups')
- f.close()
- class Options(object):
- repository = dir
- return Options()
-
- def test_empty_dir_doesnt_raise(self):
- self._callFUT()
- self.assertEqual(len(os.listdir(self._repository_directory)), 0)
-
- def test_no_repozo_files_doesnt_raise(self):
- FILENAMES = ['bogus.txt', 'not_a_repozo_file']
- self._callFUT(filenames=FILENAMES)
- remaining = os.listdir(self._repository_directory)
- self.assertEqual(len(remaining), len(FILENAMES))
- for name in FILENAMES:
- fqn = os.path.join(self._repository_directory, name)
- self.failUnless(os.path.isfile(fqn))
-
- def test_doesnt_remove_current_repozo_files(self):
- FILENAMES = ['2009-12-20-10-08-03.fs', '2009-12-20-10-08-03.dat']
- self._callFUT(filenames=FILENAMES)
- remaining = os.listdir(self._repository_directory)
- self.assertEqual(len(remaining), len(FILENAMES))
- for name in FILENAMES:
- fqn = os.path.join(self._repository_directory, name)
- self.failUnless(os.path.isfile(fqn))
-
- def test_removes_older_repozo_files(self):
- OLDER_FULL = ['2009-12-20-00-01-03.fs', '2009-12-20-00-01-03.dat']
- DELTAS = ['2009-12-21-00-00-01.deltafs', '2009-12-22-00-00-01.deltafs']
- CURRENT_FULL = ['2009-12-23-00-00-01.fs', '2009-12-23-00-00-01.dat']
- FILENAMES = OLDER_FULL + DELTAS + CURRENT_FULL
- self._callFUT(filenames=FILENAMES)
- remaining = os.listdir(self._repository_directory)
- self.assertEqual(len(remaining), len(CURRENT_FULL))
- for name in OLDER_FULL:
- fqn = os.path.join(self._repository_directory, name)
- self.failIf(os.path.isfile(fqn))
- for name in DELTAS:
- fqn = os.path.join(self._repository_directory, name)
- self.failIf(os.path.isfile(fqn))
- for name in CURRENT_FULL:
- fqn = os.path.join(self._repository_directory, name)
- self.failUnless(os.path.isfile(fqn))
-
- def test_removes_older_repozo_files_zipped(self):
- OLDER_FULL = ['2009-12-20-00-01-03.fsz', '2009-12-20-00-01-03.dat']
- DELTAS = ['2009-12-21-00-00-01.deltafsz',
- '2009-12-22-00-00-01.deltafsz']
- CURRENT_FULL = ['2009-12-23-00-00-01.fsz', '2009-12-23-00-00-01.dat']
- FILENAMES = OLDER_FULL + DELTAS + CURRENT_FULL
- self._callFUT(filenames=FILENAMES)
- remaining = os.listdir(self._repository_directory)
- self.assertEqual(len(remaining), len(CURRENT_FULL))
- for name in OLDER_FULL:
- fqn = os.path.join(self._repository_directory, name)
- self.failIf(os.path.isfile(fqn))
- for name in DELTAS:
- fqn = os.path.join(self._repository_directory, name)
- self.failIf(os.path.isfile(fqn))
- for name in CURRENT_FULL:
- fqn = os.path.join(self._repository_directory, name)
- self.failUnless(os.path.isfile(fqn))
-
def test_suite():
return unittest.TestSuite([
- unittest.makeSuite(RepozoTests),
+ unittest.makeSuite(Test_dofile),
+ unittest.makeSuite(Test_checksum),
+ unittest.makeSuite(Test_copyfile),
+ unittest.makeSuite(Test_concat),
+ unittest.makeSuite(Test_gen_filename),
+ unittest.makeSuite(Test_find_files),
+ unittest.makeSuite(Test_scandat),
unittest.makeSuite(Test_delete_old_backups),
+ unittest.makeSuite(Test_do_full_backup),
+ unittest.makeSuite(Test_do_incremental_backup),
+ unittest.makeSuite(MonteCarloTests),
])
More information about the Zodb-checkins
mailing list