[Zodb-checkins] SVN: ZODB/trunk/src/ The FileStorage iterator now handles large files better. Whenm
Jim Fulton
jim at zope.com
Mon Dec 22 17:48:31 EST 2008
Log message for revision 94254:
The FileStorage iterator now handles large files better. Whenm
iteratng from a starting transaction near the end of the file, the
iterator will scan backward from the end of the file to find the
starting point.
Changed:
U ZODB/trunk/src/CHANGES.txt
U ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
A ZODB/trunk/src/ZODB/FileStorage/iterator.test
U ZODB/trunk/src/ZODB/FileStorage/tests.py
-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt 2008-12-22 21:51:07 UTC (rev 94253)
+++ ZODB/trunk/src/CHANGES.txt 2008-12-22 22:48:30 UTC (rev 94254)
@@ -42,6 +42,11 @@
- As a small convenience (mainly for tests), you can now specify
initial data as a string argument to the Blob constructor.
+- The FileStorage iterator now handles large files better. Whenm
+ iteratng from a starting transaction near the end of the file, the
+ iterator will scan backward from the end of the file to find the
+ starting point.
+
3.9.0a8 (2008-12-15)
====================
@@ -54,7 +59,7 @@
blob-cache-size * (100 - blob-cache-size-check) / 100
The makes it far more likely (but doesn't guarantee) that the blob
- cache size will remain under the maximum.
+ cache size will remain under the maximum.
The blob-cache-size check was reduced to 10%.
@@ -79,7 +84,7 @@
cache will periodically be reduced to the target size.
The client blob directory layout has changed. If you have existing
- non-shared blob directories, you will have to remove them.
+ non-shared blob directories, you will have to remove them.
Bugs Fixed
----------
@@ -119,7 +124,7 @@
you would otherwise pass to ZEO.ClientStorage.ClientStorage::
import ZEO
- db = ZEO.DB(('some_host', 8200))
+ db = ZEO.DB(('some_host', 8200))
- Object saves are a little faster
@@ -297,7 +302,7 @@
- Fixed bug 153316: persistent and BTrees were using `int`
for memory sizes which caused errors on x86_64 Intel Xeon machines
(using 64-bit Linux).
-
+
- Fixed small bug that the Connection.isReadOnly method didn't
work after a savepoint.
@@ -390,7 +395,7 @@
Bugs Fixed:
- Fixed several bugs that caused ZEO cache corruption when connecting
- to servers. These bugs affected both persistent and non-persistent caches.
+ to servers. These bugs affected both persistent and non-persistent caches.
- Improved the the ZEO client shutdown support to try to
avoid spurious errors on exit, especially for scripts, such as zeopack.
@@ -416,7 +421,7 @@
Bugs Fixed:
- The cache used an excessive amount of memory, causing applications
- with large caches to exhaust available memory.
+ with large caches to exhaust available memory.
3.8.1b1 (2008-05-08)
====================
Modified: ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/FileStorage.py 2008-12-22 21:51:07 UTC (rev 94253)
+++ ZODB/trunk/src/ZODB/FileStorage/FileStorage.py 2008-12-22 22:48:30 UTC (rev 94254)
@@ -21,7 +21,7 @@
from struct import pack, unpack
from types import StringType
from zc.lockfile import LockFile
-from ZODB.FileStorage.format import CorruptedDataError
+from ZODB.FileStorage.format import CorruptedError, CorruptedDataError
from ZODB.FileStorage.format import FileStorageFormatter, DataHeader
from ZODB.FileStorage.format import TRANS_HDR, TRANS_HDR_LEN
from ZODB.FileStorage.format import TxnHeader, DATA_HDR, DATA_HDR_LEN
@@ -50,7 +50,6 @@
logger = logging.getLogger('ZODB.FileStorage')
-
def panic(message, *data):
logger.critical(message, *data)
raise CorruptedTransactionError(message)
@@ -210,7 +209,7 @@
self.blob_dir = os.path.abspath(blob_dir)
if create and os.path.exists(self.blob_dir):
ZODB.blob.remove_committed_dir(self.blob_dir)
-
+
self._blob_init(blob_dir)
zope.interface.alsoProvides(self,
ZODB.interfaces.IBlobStorageRestoreable)
@@ -484,7 +483,7 @@
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
assert not version
-
+
self._lock_acquire()
try:
if oid > self._oid:
@@ -532,7 +531,7 @@
raise POSException.ReadOnlyError()
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
-
+
self._lock_acquire()
try:
old = self._index_get(oid, 0)
@@ -544,7 +543,7 @@
if oldserial != committed_tid:
raise POSException.ConflictError(
oid=oid, serials=(committed_tid, oldserial))
-
+
pos = self._pos
here = pos + self._tfile.tell() + self._thl
self._tindex[oid] = here
@@ -748,7 +747,7 @@
def _finish_finish(self, tid):
# This is a separate method to allow tests to replace it with
# something broken. :)
-
+
self._file.flush()
if fsync is not None:
fsync(self._file.fileno())
@@ -825,7 +824,7 @@
# Eek, a later transaction modified the data, but,
# maybe it is pointing at the same data we are.
ctid, cdataptr, cdata = self._undoDataInfo(oid, ipos, tpos)
-
+
if cdataptr != pos:
# We aren't sure if we are talking about the same data
try:
@@ -994,7 +993,7 @@
self.openCommittedBlobFile(h.oid, userial),
open(tmp, 'wb'))
self._blob_storeblob(h.oid, self._tid, tmp)
-
+
new = DataHeader(h.oid, self._tid, ipos, otloc, 0, len(p))
# TODO: This seek shouldn't be necessary, but some other
@@ -1177,7 +1176,7 @@
# Helpers that remove an oid dir or revision file.
handle_file = ZODB.blob.remove_committed
handle_dir = ZODB.blob.remove_committed_dir
-
+
# Fist step: move or remove oids or revisions
for line in open(os.path.join(self.blob_dir, '.removed')):
line = line.strip().decode('hex')
@@ -1191,10 +1190,10 @@
handle_dir(path)
maybe_remove_empty_dir_containing(path)
continue
-
+
if len(line) != 16:
raise ValueError("Bad record in ", self.blob_dir, '.removed')
-
+
oid, tid = line[:8], line[8:]
path = fshelper.getBlobFilename(oid, tid)
if not os.path.exists(path):
@@ -1208,7 +1207,7 @@
if not self.pack_keep_old:
return
-
+
# Second step, copy remaining files.
for path, dir_names, file_names in os.walk(self.blob_dir):
for file_name in file_names:
@@ -1219,7 +1218,7 @@
if not os.path.exists(dest):
os.makedirs(dest, 0700)
link_or_copy(file_path, old+file_path[lblob_dir:])
-
+
def iterator(self, start=None, stop=None):
return FileIterator(self._file_name, start, stop)
@@ -1244,8 +1243,8 @@
for trans in FileIterator(self._file_name, pos=pos)]
finally:
self._lock_release()
-
+
def lastTid(self, oid):
"""Return last serialno committed for object oid.
@@ -1641,16 +1640,23 @@
assert isinstance(filename, str)
file = open(filename, 'rb')
self._file = file
+ self._file_name = filename
if file.read(4) != packed_version:
raise FileStorageFormatError(file.name)
file.seek(0,2)
self._file_size = file.tell()
+ if (pos < 4) or pos > self._file_size:
+ raise ValueError("Given position is greater than the file size",
+ pos, self._file_size)
self._pos = pos
assert start is None or isinstance(start, str)
assert stop is None or isinstance(stop, str)
+ self._start = start
+ self._stop = stop
if start:
+ if self._file_size <= 4:
+ return
self._skip_to_start(start)
- self._stop = stop
def __len__(self):
# Define a bogus __len__() to make the iterator work
@@ -1674,32 +1680,87 @@
file.close()
def _skip_to_start(self, start):
- # Scan through the transaction records doing almost no sanity
- # checks.
file = self._file
+ pos1 = self._pos
+ file.seek(pos1)
+ tid1 = file.read(8)
+ if len(tid1) < 8:
+ raise CorruptedError("Couldn't read tid.")
+ if start < tid1:
+ pos2 = pos1
+ tid2 = tid1
+ file.seek(4)
+ tid1 = file.read(8)
+ if start <= tid1:
+ self._pos = 4
+ return
+ pos1 = 4
+ else:
+ if start == tid1:
+ return
+
+ # Try to read the last transaction. We could be unlucky and
+ # opened the file while committing a transaction. In that
+ # case, we'll just scan from the beginning if the file is
+ # small enough, otherwise we'll fail.
+ file.seek(self._file_size-8)
+ l = u64(file.read(8))
+ if not (l + 12 <= self._file_size and
+ self._read_num(self._file_size-l) == l):
+ if self._file_size < (1<<20):
+ return self._scan_foreward(start)
+ raise ValueError("Can't find last transaction in large file")
+ pos2 = self._file_size-l-8
+ file.seek(pos2)
+ tid2 = file.read(8)
+ if tid2 < tid1:
+ raise CorruptedError("Tids out of order.")
+ if tid2 <= start:
+ if tid2 == start:
+ self._pos = pos2
+ else:
+ self._pos = self._file_size
+ return
+
+ t1 = ZODB.TimeStamp.TimeStamp(tid1).timeTime()
+ t2 = ZODB.TimeStamp.TimeStamp(tid2).timeTime()
+ ts = ZODB.TimeStamp.TimeStamp(start).timeTime()
+ if (ts - t1) < (t2 - ts):
+ return self._scan_forward(pos1, start)
+ else:
+ return self._scan_backward(pos2, start)
+
+ def _scan_forward(self, pos, start):
+ logger.debug("Scan forward %s:%s looking for %r",
+ self._file_name, pos, start)
+ file = self._file
+ while 1:
+ # Read the transaction record
+ h = self._read_txn_header(pos)
+ if h.tid >= start:
+ self._pos = pos
+ return
+
+ pos += h.tlen + 8
+
+ def _scan_backward(self, pos, start):
+ logger.debug("Scan backward %s:%s looking for %r",
+ self._file_name, pos, start)
+ file = self._file
+ seek = file.seek
read = file.read
- seek = file.seek
while 1:
- seek(self._pos)
- h = read(16)
- if len(h) < 16:
+ pos -= 8
+ seek(pos)
+ tlen = ZODB.utils.u64(read(8))
+ pos -= tlen
+ h = self._read_txn_header(pos)
+ if h.tid <= start:
+ if h.tid == start:
+ self._pos = pos
+ else:
+ self._pos = pos + tlen + 8
return
- tid, stl = unpack(">8s8s", h)
- if tid >= start:
- return
- tl = u64(stl)
- try:
- self._pos += tl + 8
- except OverflowError:
- self._pos = long(self._pos) + tl + 8
- if __debug__:
- # Sanity check
- seek(self._pos - 8, 0)
- rtl = read(8)
- if rtl != stl:
- pos = file.tell() - 8
- panic("%s has inconsistent transaction length at %s "
- "(%s != %s)", file.name, pos, u64(rtl), u64(stl))
# Iterator protocol
def __iter__(self):
Added: ZODB/trunk/src/ZODB/FileStorage/iterator.test
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/iterator.test (rev 0)
+++ ZODB/trunk/src/ZODB/FileStorage/iterator.test 2008-12-22 22:48:30 UTC (rev 94254)
@@ -0,0 +1,142 @@
+FileStorage-specific iterator tests
+===================================
+
+The FileStorage iterator has some special features that deserve some
+special tests.
+
+We'll make some assertions about time, so we'll take it over:
+
+ >>> now = 1229959248
+ >>> def faux_time():
+ ... global now
+ ... now += 0.1
+ ... return now
+ >>> import time
+ >>> time_time = time.time
+ >>> time.time = faux_time
+
+Commit a bunch of transactions:
+
+ >>> import ZODB.FileStorage, transaction
+ >>> db = ZODB.DB('data.fs')
+ >>> tids = [db.storage.lastTransaction()]
+ >>> poss = [db.storage._pos]
+ >>> conn = db.open()
+ >>> for i in range(100):
+ ... conn.root()[i] = conn.root().__class__()
+ ... transaction.commit()
+ ... tids.append(db.storage.lastTransaction())
+ ... poss.append(db.storage._pos)
+
+Deciding where to start
+-----------------------
+
+By default, we start at the beginning:
+
+ >>> it = ZODB.FileStorage.FileIterator('data.fs')
+ >>> it.next().tid == tids[0]
+ True
+
+The file iterator has an optimization to deal with large files. It
+can serarch from either the front or the back of the file, depending
+on the starting transaction given. To see this, we'll turn on debug
+logging:
+
+ >>> import logging, sys
+ >>> old_log_level = logging.getLogger().getEffectiveLevel()
+ >>> logging.getLogger().setLevel(logging.DEBUG)
+ >>> handler = logging.StreamHandler(sys.stdout)
+ >>> logging.getLogger().addHandler(handler)
+
+If we specify a start transaction, we'll scan forward or backward, as
+seems best and set the next record to that:
+
+ >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[0])
+ >>> it.next().tid == tids[0]
+ True
+
+ >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[1])
+ Scan forward data.fs:4 looking for '\x03z\xbd\xd8\xd06\x9c\xcc'
+ >>> it.next().tid == tids[1]
+ True
+
+ >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[30])
+ Scan forward data.fs:4 looking for '\x03z\xbd\xd8\xdc\x96.\xcc'
+ >>> it.next().tid == tids[30]
+ True
+
+ >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[70])
+ Scan backward data.fs:118274 looking for '\x03z\xbd\xd8\xed\xa7>\xcc'
+ >>> it.next().tid == tids[70]
+ True
+
+ >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[-2])
+ Scan backward data.fs:118274 looking for '\x03z\xbd\xd8\xfa\x06\xd0\xcc'
+ >>> it.next().tid == tids[-2]
+ True
+
+ >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[-1])
+ >>> it.next().tid == tids[-1]
+ True
+
+We can also supply a file position. This can speed up finding the
+starting point, or just pick up where another iterator left off:
+
+ >>> it = ZODB.FileStorage.FileIterator('data.fs', pos=poss[50])
+ >>> it.next().tid == tids[51]
+ True
+
+ >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[0], pos=4)
+ >>> it.next().tid == tids[0]
+ True
+
+ >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[-1], pos=poss[-2])
+ >>> it.next().tid == tids[-1]
+ True
+
+ >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[50], pos=poss[50])
+ Scan backward data.fs:36542 looking for '\x03z\xbd\xd8\xe5\x1e\xb6\xcc'
+ >>> it.next().tid == tids[50]
+ True
+
+ >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[49], pos=poss[50])
+ Scan backward data.fs:36542 looking for '\x03z\xbd\xd8\xe4\xb1|\xcc'
+ >>> it.next().tid == tids[49]
+ True
+
+ >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[51], pos=poss[50])
+ >>> it.next().tid == tids[51]
+ True
+
+ >>> logging.getLogger().setLevel(old_log_level)
+ >>> logging.getLogger().removeHandler(handler)
+
+
+If a starting transaction is before the first transaction in the file,
+then the first transaction is returned.
+
+ >>> from ZODB.utils import p64, u64
+ >>> it = ZODB.FileStorage.FileIterator('data.fs', p64(u64(tids[0])-1))
+ >>> it.next().tid == tids[0]
+ True
+
+If it is after the last transaction, then iteration be empty:
+
+ >>> it = ZODB.FileStorage.FileIterator('data.fs', p64(u64(tids[-1])+1))
+ >>> list(it)
+ []
+
+Even if we write more transactions:
+
+ >>> it = ZODB.FileStorage.FileIterator('data.fs', p64(u64(tids[-1])+1))
+ >>> for i in range(10):
+ ... conn.root()[i] = conn.root().__class__()
+ ... transaction.commit()
+ >>> list(it)
+ []
+
+.. Cleanup
+
+ >>> time.time = time_time
+ >>> it.close()
+ >>> db.close()
Property changes on: ZODB/trunk/src/ZODB/FileStorage/iterator.test
___________________________________________________________________
Added: svn:eol-style
+ native
Modified: ZODB/trunk/src/ZODB/FileStorage/tests.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/tests.py 2008-12-22 21:51:07 UTC (rev 94253)
+++ ZODB/trunk/src/ZODB/FileStorage/tests.py 2008-12-22 22:48:30 UTC (rev 94254)
@@ -93,7 +93,7 @@
def test_suite():
return unittest.TestSuite((
doctest.DocFileSuite(
- 'zconfig.txt',
+ 'zconfig.txt', 'iterator.test',
setUp=ZODB.tests.util.setUp, tearDown=ZODB.tests.util.tearDown,
),
doctest.DocTestSuite(
More information about the Zodb-checkins
mailing list