[ZODB-Dev] BerkeleyStorage Pack Error
Barry A. Warsaw
barry@zope.com
Thu, 19 Sep 2002 11:03:43 -0400
--9ozcRBB/Df
Content-Type: text/plain; charset=iso-8859-1
Content-Description: message body text
Content-Transfer-Encoding: quoted-printable
>>>>> "JDS" =3D=3D J=FAlio Dinis Silva <juliodinis@hotmail.com> writes:=
JDS> I have a zodb in berkeley storage using:
| -python2.1.3
| -zope-2.5.1-src
| -bsddb3Storage beta 5
| -berkeley 3.3.11
| -pybsddb 3.3.0
Later on you mention that you're using Packless.py. I don't plan on
spending much effort supporting Packless now that I believe we have
better implementation strategies for BerkeleyDB based storages.
E.g. Minimal.py in cvs on the bdb-nolocks branch is, IMO, a better
no-undo, no-version, no-pack storage. It eliminates the need for a
temporary commit log file, can't run out of Berkeley locks, and should
perform quite well -- if your system is configured properly.
Minimal.py doesn't handle cyclic data, so we'll do one of two things.
Either we'll implement a mark-and-sweep pack() method to collect
those, or more likely we'll integrate Tim's cyclic gc detection code.
To fix your specific problem, I've wrapped the failing line in a
try/except. This is a bandaid and may not be sufficient (make a
backup of your data first, please!). Maybe we need to implement a way
to migrate data from Packless to Minimal, once the branch is merged
back into the trunk. Attached is the new (untested) Packless.py file.
-Barry
--9ozcRBB/Df
Content-Type: text/plain
Content-Disposition: inline;
filename="Packless.py"
Content-Transfer-Encoding: 7bit
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""
An implementation of a BerkeleyDB-backed storage that uses a reference-
counting garbage-collection strategy which necessitates packing only when
the stored data has cyclically-referenced garbage.
"""
__version__ ='$Revision: 1.7 $'[11:-2]
from base import Base, DBError
from base import BerkeleyDBError
from bsddb3 import db
from struct import pack, unpack
from ZODB.referencesf import referencesf
from ZODB import POSException
MAXTEMPFSIZE = 999999
class ReferenceCountError(POSException.POSError):
""" An error occured while decrementing a reference to an object in
the commit phase. The object's reference count was below zero."""
class TemporaryLogCorruptedError(POSException.POSError):
""" An error occurred due to temporary log file corruption. """
class OutOfTempSpaceError(POSException.POSError):
""" An out-of-disk-space error occured when writing a temporary log
file. """
class Packless(Base):
def _setupDbs(self):
# Supports Base framework
self._index=self._setupDB('current')
self._setupDB('referenceCount')
self._setupDB('oreferences', flags=db.DB_DUP)
self._setupDB('opickle')
def _dbnames(self):
"""
current -- mapping of oid to current serial
referenceCount -- mapping of oid to count
oreferences -- mapping of oid to a sequence of its referenced oids
opickle -- mapping of oid to pickle
"""
return 'current', 'referenceCount', 'oreferences', 'opickle'
def _abort(self):
pass
def load(self, oid, version):
self._lock_acquire()
try:
try:
s=self._index[oid]
p=self._opickle[oid]
return p, s # pickle, serial
except DBError, msg:
raise BerkeleyDBError, (
"%s (%s)" % (BerkeleyDBError.__doc__, msg)
)
finally: self._lock_release()
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
if version:
raise POSException.Unsupported, "Versions aren't supported"
self._lock_acquire()
try:
if self._index.has_key(oid):
oserial=self._index[oid]
if serial != oserial:
raise POSException.ConflictError(serials=(oserial, serial))
serial=self._serial
try:
# write the metadata to the temp log
self._tmp.write(oid+pack(">i", len(data)))
# write the pickle to the temp log
self._tmp.write(data)
except IOError:
raise OutOfTempSpaceError, (
"%s (%s)" % (OutOfTempSpaceError.__doc__, self._tempdir)
)
finally: self._lock_release()
return serial
def _finish(self, tid, u, d, e):
txn = self._env.txn_begin()
try:
zeros={}
referenceCount=self._referenceCount
referenceCount_get=referenceCount.get
referenceCount_put=referenceCount.put
oreferences=self._oreferences
oreferences_put=oreferences.put
serial_put=self._index.put
opickle_put=self._opickle.put
serial=self._serial
tmp=self._tmp
oidlen=8 # length in bytes of oid string rep
intlen=4 # length in bytes of struct.packed integer string rep
fsize=tmp.tell()
tmp.seek(0)
read=tmp.read
l=0
while l < fsize:
sdata = read(oidlen+intlen)
oid, ldata = unpack(">%ssi" % oidlen, sdata)
data=read(ldata)
# get references
referencesl=[]
referencesf(data, referencesl)
references={}
for roid in referencesl: references[roid]=1
referenced=references.has_key
# Create refcnt
if not referenceCount_get(oid, txn=txn):
referenceCount_put(oid, '\0'*intlen, txn=txn)
# zeros[oid]=1
# ^^^^^^^^^^^^
# this should happen when ZODB import is fixed
# to commit an import in a subtransaction. we rely
# on pack to get rid of unreferenced objects added
# via an aborted import now. this is only slightly
# lame.
# update stored references
c=oreferences.cursor(txn)
try:
try: roid = c.set(oid)
except:
pass
else:
while roid:
roid=roid[1]
if referenced(roid):
# still referenced, so no need to update
del references[roid]
else:
# Delete the stored ref, since we no longer
# have it
c.delete()
# decrement refcnt:
rc=unpack(">i",
referenceCount_get(roid,txn=txn))[0]
rc=rc-1
if rc < 0:
# This should never happen
rce = ReferenceCountError
raise rce, (
"%s (Oid %s had refcount %s)" %
(rce.__doc__,`roid`,rc)
)
referenceCount_put(roid, pack(">i", rc), txn)
if rc==0: zeros[roid]=1
roid=c.get(db.DB_NEXT_DUP)
finally: c.close()
# Now add any references that weren't already stored:
for roid in references.keys():
oreferences_put(oid, roid, txn)
# Create/update refcnt
rcs=referenceCount_get(roid, txn=txn)
if rcs:
rc=unpack(">i", rcs)[0]
if rc==0:
try: del zeros[roid]
except: pass
referenceCount_put(roid, pack(">i", rc+1), txn)
else:
referenceCount_put(roid, pack(">i", 1), txn)
l=l+ldata+oidlen+intlen
if ldata > fsize:
# this should never happen.
raise TemporaryLogCorruptedError, (
TemporaryLogCorruptedError.__doc__
)
serial_put(oid, serial, txn)
opickle_put(oid, data, txn)
if zeros:
for oid in zeros.keys():
if oid == '\0\0\0\0\0\0\0\0': continue
self._takeOutGarbage(oid, txn)
tmp.seek(0)
if fsize > MAXTEMPFSIZE: tmp.truncate()
except DBError, msg:
try:
txn.abort()
except db.error, msg:
raise BerkeleyDBError, "%s (%s)" % (BerkeleyDBError.__doc__,
msg)
raise BerkeleyDBError, "%s (%s)" % (BerkeleyDBError.__doc__,
msg)
except:
txn.abort()
raise
else:
txn.commit()
def _takeOutGarbage(self, oid, txn):
# take out the garbage.
referenceCount=self._referenceCount
try:
referenceCount.delete(oid, txn)
except db.DBNotFoundError:
pass
self._opickle.delete(oid, txn)
self._current.delete(oid, txn)
# Remove/decref references
referenceCount_get=referenceCount.get
referenceCount_put=referenceCount.put
c=self._oreferences.cursor(txn)
try:
try: roid = c.set(oid)
except:
pass
else:
while roid:
c.delete()
roid=roid[1]
# decrement refcnt:
rc=referenceCount_get(roid, txn=txn)
if rc:
rc=unpack(">i", rc)[0]-1
if rc < 0:
rce = ReferenceCountError
raise rce, (
"%s (Oid %s had refcount %s)" %
(rce.__doc__,`roid`,rc)
)
if rc==0: self._takeOutGarbage(roid, txn)
else: referenceCount_put(roid, pack(">i", rc), txn)
roid=c.get(db.DB_NEXT_DUP)
finally: c.close()
if self._len > 0: self._len=self._len-1
def pack(self, t, referencesf):
self._lock_acquire()
try:
try:
txn = self._env.txn_begin()
rindex={}
referenced=rindex.has_key
rootl=['\0\0\0\0\0\0\0\0']
# mark referenced objects
while rootl:
oid=rootl.pop()
if referenced(oid): continue
p = self._opickle[oid]
referencesf(p, rootl)
rindex[oid] = None
# sweep unreferenced objects
for oid in self._index.keys():
if not referenced(oid):
self._takeOutGarbage(oid, txn)
except:
txn.abort()
raise
else:
txn.commit()
finally:
self._lock_release()
--9ozcRBB/Df--