[Zodb-checkins] SVN: ZODB/branches/test_repozo/src/ZODB/scripts/tests/test_repozo.py put files in separate directories
Godefroid Chapelle
gotcha at bubblenet.be
Wed Dec 2 12:17:27 EST 2009
Log message for revision 106185:
put files in separate directories
use TestCase API
Changed:
U ZODB/branches/test_repozo/src/ZODB/scripts/tests/test_repozo.py
-=-
Modified: ZODB/branches/test_repozo/src/ZODB/scripts/tests/test_repozo.py
===================================================================
--- ZODB/branches/test_repozo/src/ZODB/scripts/tests/test_repozo.py 2009-12-02 16:45:39 UTC (rev 106184)
+++ ZODB/branches/test_repozo/src/ZODB/scripts/tests/test_repozo.py 2009-12-02 17:17:27 UTC (rev 106185)
@@ -18,36 +18,22 @@
import sys
import random
import time
-import glob
import shutil
import ZODB
from ZODB import FileStorage
import transaction
+from BTrees.OOBTree import OOBTree
from ZODB.scripts import tests as tests_module
REPOZO = os.path.join(os.path.dirname(sys.argv[0]), 'repozo')
-def cleanup(basedir):
- globData = os.path.join(basedir, 'Data.*')
- globCopy = os.path.join(basedir, 'Copy.*')
- backupDir = os.path.join(basedir, 'backup')
- for fname in glob.glob(globData) + glob.glob(globCopy):
- os.remove(fname)
-
- if os.path.isdir(backupDir):
- for fname in os.listdir(backupDir):
- os.remove(os.path.join(backupDir, fname))
- os.rmdir(backupDir)
-
-
class OurDB:
- def __init__(self, basedir):
- from BTrees.OOBTree import OOBTree
- self.basedir = basedir
+ def __init__(self, dir):
+ self.dir = dir
self.getdb()
conn = self.db.open()
conn.root()['tree'] = OOBTree()
@@ -55,7 +41,7 @@
self.close()
def getdb(self):
- storage_filename = os.path.join(self.basedir, 'Data.fs')
+ storage_filename = os.path.join(self.dir, 'Data.fs')
storage = FileStorage.FileStorage(storage_filename)
self.db = ZODB.DB(storage)
@@ -88,84 +74,101 @@
# Do recovery to time 'when', and check that it's identical to correctpath.
-def check(correctpath='Data.fs', when=None):
- if when is None:
- extra = ''
- else:
- extra = ' -D ' + when
- cmd = REPOZO + ' -vRr backup -o Copy.fs' + extra
- os.system(cmd)
- f = file(correctpath, 'rb')
- g = file('Copy.fs', 'rb')
- fguts = f.read()
- gguts = g.read()
- f.close()
- g.close()
- if fguts != gguts:
- raise ValueError("guts don't match\n"
- " correctpath=%r when=%r\n"
- " cmd=%r" % (correctpath, when, cmd))
+class RepozoTest(unittest.TestCase):
+ def setUp(self):
+ # compute directory names
+ self.basedir = os.path.dirname(tests_module.__file__)
+ self.backupdir = os.path.join(self.basedir, 'backup')
+ self.datadir = os.path.join(self.basedir, 'data')
+ self.restoredir = os.path.join(self.basedir, 'restore')
+ self.copydir = os.path.join(self.basedir, 'copy')
+ self.currdir = os.getcwd()
+ # ensure they have all been deleted
+ self.cleanup()
+ # create empty directories
+ os.mkdir(self.backupdir)
+ os.mkdir(self.datadir)
+ os.mkdir(self.restoredir)
+ os.mkdir(self.copydir)
+ os.chdir(self.datadir)
+ self.db = OurDB(self.datadir)
-def main(basedir, d):
- # Every 9th time thru the loop, we save a full copy of Data.fs,
- # and at the end we ensure we can reproduce those too.
- saved_snapshots = [] # list of (name, time) pairs for copies.
+ def tearDown(self):
+ self.cleanup()
+ os.chdir(self.currdir)
- for i in range(100):
- print i
- # Make some mutations.
- d.mutate()
+ def testRepozo(self):
+ self.saved_snapshots = [] # list of (name, time) pairs for copies.
+ for i in range(100):
+ self.mutate_pack_backup(i)
+
+ # Verify snapshots can be reproduced exactly.
+ for copyname, copytime in self.saved_snapshots:
+ print "Checking that", copyname, "at", copytime, "is reproducible."
+ self.assertRestored(copyname, copytime)
+
+ def mutate_pack_backup(self, i):
+ self.db.mutate()
+
# Pack about each tenth time.
if random.random() < 0.1:
print "packing"
- d.pack()
- d.close()
+ self.db.pack()
+ self.db.close()
# Make an incremental backup, half the time with gzip (-z).
if random.random() < 0.5:
- os.system(REPOZO + ' -vBQr backup -f Data.fs')
+ cmd = REPOZO + ' -vBQr %s -f Data.fs'
else:
- os.system(REPOZO + ' -zvBQr backup -f Data.fs')
+ cmd = REPOZO + ' -zvBQr %s -f Data.fs'
+ os.system(cmd % self.backupdir)
+ # Save snapshots to assert that dated restores are possible
if i % 9 == 0:
+ srcname = os.path.join(self.datadir, 'Data.fs')
copytime = '%04d-%02d-%02d-%02d-%02d-%02d' % (time.gmtime()[:6])
- copyname = os.path.join(basedir, 'backup', "Data%d" % i) + '.fs'
- srcname = os.path.join(basedir, 'Data.fs')
+ copyname = os.path.join(self.copydir, "Data%d.fs" % i)
shutil.copyfile(srcname, copyname)
- saved_snapshots.append((copyname, copytime))
+ self.saved_snapshots.append((copyname, copytime))
# Make sure the clock moves at least a second.
time.sleep(1.01)
# Verify current Data.fs can be reproduced exactly.
- check()
+ self.assertRestored()
- # Verify snapshots can be reproduced exactly.
- for copyname, copytime in saved_snapshots:
- print "Checking that", copyname, "at", copytime, "is reproducible."
- check(copyname, copytime)
+ def assertRestored(self, correctpath='Data.fs', when=None):
+ if when is None:
+ extra = ''
+ else:
+ extra = ' -D ' + when
+ # restore to Restored.fs
+ restoredfile = os.path.join(self.restoredir, 'Restored.fs')
+ cmd = REPOZO + ' -vRr %s -o ' + restoredfile + extra
+ os.system(cmd % self.backupdir)
+ # check restored file content is equal to file that was backed up
+ f = file(correctpath, 'rb')
+ g = file(restoredfile, 'rb')
+ fguts = f.read()
+ gguts = g.read()
+ f.close()
+ g.close()
+ msg = ("guts don't match\ncorrectpath=%r when=%r\n cmd=%r" %
+ (correctpath, when, cmd))
+ self.assertEquals(fguts, gguts, msg)
-class RepozoTest(unittest.TestCase):
+ def cleanup(self):
+ for dir in [self.datadir, self.backupdir, self.restoredir,
+ self.copydir]:
+ if os.path.isdir(dir):
+ for fname in os.listdir(dir):
+ os.remove(os.path.join(dir, fname))
+ os.rmdir(dir)
- def setUp(self):
- self.basedir = os.path.dirname(tests_module.__file__)
- self.currdir = os.getcwd()
- os.chdir(self.basedir)
- cleanup(self.basedir)
- os.mkdir(os.path.join(self.basedir, 'backup'))
- self.d = OurDB(self.basedir)
- def tearDown(self):
- cleanup(self.basedir)
- os.chdir(self.currdir)
-
- def testDummy(self):
- main(self.basedir, self.d)
-
-
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(RepozoTest))
More information about the Zodb-checkins
mailing list