[Zope3-checkins] CVS: Zope3/src/zodb - connection.py:1.40
db.py:1.20 interfaces.py:1.20 _timestamp.c:NONE
component.xml:NONE config.py:NONE conflict.py:NONE
dbdump.py:NONE export.py:NONE lockfile.py:NONE
timestamp.py:NONE transact.py:NONE winlock.c:NONE
ztransaction.py:NONE
Fred L. Drake, Jr.
fred at zope.com
Fri Feb 20 11:58:01 EST 2004
Update of /cvs-repository/Zope3/src/zodb
In directory cvs.zope.org:/tmp/cvs-serv22507/src/zodb
Modified Files:
connection.py db.py interfaces.py
Removed Files:
_timestamp.c component.xml config.py conflict.py dbdump.py
export.py lockfile.py timestamp.py transact.py winlock.c
ztransaction.py
Log Message:
update to replace ZODB 4 with ZODB 3
=== Zope3/src/zodb/connection.py 1.39 => 1.40 ===
--- Zope3/src/zodb/connection.py:1.39 Wed Oct 15 08:00:19 2003
+++ Zope3/src/zodb/connection.py Fri Feb 20 11:56:58 2004
@@ -11,736 +11,5 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-"""Database connection support
-The Connection object serves as a data manager. ZODB is organized so
-that each thread should have its own Connection. A thread acquires a
-Connection by calling the open() method on a database object.
-
-A Connection can be associated with a single version when it is
-created. By default, a Connection is not associated with a version.
-It uses non-version data.
-
-The root() method on a Connection returns the root object for the
-database. This object and all objects reachable from it are
-associated with the Connection that loaded them. When the objects are
-modified, the Connection is registered with the current transaction.
-
-Synchronization
-
-A Connection instance is not thread-safe. It is designed to support a
-thread model where each thread has its own transaction. If an
-application has more than one thread that uses the connection or the
-transaction the connection is registered with, the application should
-provide locking.
-
-$Id$
-"""
-
-import logging
-import struct
-import tempfile
-import threading
-from types import StringType
-
-from zope.interface import implements
-
-from zodb import interfaces
-from zodb.conflict import ResolvedSerial
-from zodb.export import ExportImport
-from zodb.interfaces import *
-from zodb.interfaces import _fmt_oid
-from zodb.serialize import ConnectionObjectReader, ObjectWriter
-from zodb.storage.base import splitrefs
-from zodb.utils import u64, Set
-
-from transaction import get_transaction
-from transaction.interfaces import IDataManager, IRollback, TransactionError
-from persistence.cache import Cache
-from persistence.interfaces import IPersistentDataManager
-
-
-class RegisteredMapping(dict):
- """Mapping used for Connection._registered.
-
- This mapping must support additions and clears during iteration over
- values.
- """
-
- def __init__(self, *args, **kw):
- dict.__init__(self, *args, **kw)
- self._added_keys = []
-
- def __setitem__(self, key, value):
- if key not in self:
- self._added_keys.append(key)
- dict.__setitem__(self, key, value)
-
- def setdefault(self, key, value=None):
- if key not in self:
- self._added_keys.append(key)
- dict.setdefault(self, key, value)
-
- def update(self, other):
- self._added_keys.extend([key for key in other if key not in self])
- dict.update(self, other)
-
- def iterAddedKeys(self):
- return iter(self._added_keys)
-
- def clearAddedKeys(self):
- del self._added_keys[:]
-
-
-class Connection(ExportImport, object):
- """Object managers for individual object space.
-
- An object space is a version of collection of objects. In a
- multi-threaded application, each thread gets its own object
- space.
-
- The Connection manages movement of objects in and out of object
- storage.
- """
-
- implements(IAppConnection, IConnection, IPersistentDataManager,
- IDataManager)
-
- def __init__(self, db, storage, version='', cache_size=400):
- self._db = db
- self._storage = storage
- self._version = version
- self._cache = Cache(cache_size)
- self._reader = ConnectionObjectReader(self, self._cache)
- self._log = logging.getLogger("zodb")
- # a TmpStore object used by sub-transactions
- self._tmp = None
- # whether the connection is open or closed
- self._open = True
- # the connection's current txn
- self._txn = None
-
- # _invalidated queues invalidate messages delivered from the DB
- # _inv_lock prevents one thread from modifying the set while
- # another is processing invalidations. All the invalidations
- # from a single transaction should be applied atomically, so
- # the lock must be held when reading _invalidated.
-
- # XXX It sucks that we have to hold the lock to read
- # _invalidated. Normally, _invalidated is written by call
- # dict.update, which will execute atomically by virtue of the
- # GIL. But some storage might generate oids where hash or
- # compare invokes Python code. In that case, the GIL can't
- # save us.
- self._inv_lock = threading.Lock()
- self._invalidated = Set()
- self._committed = []
-
- # Bookkeeping for objects affected by the current transaction.
- # These sets are clear()ed at transaction boundaries.
-
- # XXX Is a Set safe? What if the objects are not hashable?
- self._registered = RegisteredMapping()
- self._modified = Set() # XXX is this the same as registered?
- self._created = Set()
- # _conflicts: set of objects that failed to load because
- # of read conflicts. We must track these explicitly
- # because they occur outside the two-phase commit and
- # we must not allow the transaction they occur in to commit.
- self._conflicts = Set()
-
- # new_oid is used by serialize
- self.newObjectId = self._storage.newObjectId
-
- def _get_transaction(self):
- # Return the transaction currently active.
- # If no transaction is active, call get_transaction().
- if self._txn is None:
- self._txn = get_transaction()
- return self._txn
-
- ######################################################################
- # IAppConnection defines the next three methods
- # root(), sync(), get()
-
- def root(self):
- return self.get(ZERO)
-
- def sync(self):
- if self._txn:
- self._txn.abort()
- sync = getattr(self._storage, 'sync', None)
- if sync is not None:
- sync()
- self._flush_invalidations()
-
- def get(self, oid):
- # assume that a cache cannot store None as a valid object
- object = self._cache.get(oid)
- if object is not None:
- return object
-
- p, serial = self._storage.load(oid, self._version)
- obj = self._reader.getGhost(p)
-
- obj._p_oid = oid
- obj._p_jar = self
- # When an object is created, it is put in the UPTODATE state.
- # We must explicitly deactivate it to turn it into a ghost.
- obj._p_deactivate()
- obj._p_serial = serial
-
- self._cache.set(oid, obj)
- if oid == ZERO:
- # Keep a reference to the root so that the pickle cache
- # won't evict it. XXX Not sure if this is necessary. If
- # the cache is LRU, it should know best if the root is needed.
- self._root = obj
- return obj
-
- ######################################################################
- # IPersistentDataManager requires the next three methods:
- # setstate(), register(), mtime()
-
- def setstate(self, obj):
- oid = obj._p_oid
-
- if not self._open:
- msg = "Attempt to load object on closed connection: %r" % oid
- self._log.warn(msg)
- raise POSError(msg)
-
- try:
- # Avoid reading data from a transaction that committed
- # after the current transaction started, as that might
- # lead to mixing of cached data from earlier transactions
- # and new inconsistent data.
- #
- # Wait for check until after data is loaded from storage
- # to avoid time-of-check to time-of-use race.
- p, serial = self._storage.load(oid, self._version)
- invalid = self._is_invalidated(obj)
- self._reader.setGhostState(obj, p)
- obj._p_serial = serial
- if invalid:
- self._handle_independent(obj)
- except ConflictError:
- raise
- except:
- self._log.exception("Couldn't load state for %r", oid)
- raise
- else:
- # Add the object to the cache active list
- self._cache.activate(oid)
-
- def _is_invalidated(self, obj):
- # Helper method for setstate() covers three cases:
- # returns false if obj is valid
- # returns true if obj was invalidation, but is independent
- # otherwise, raises ConflictError for invalidated objects
- self._inv_lock.acquire()
- try:
- if obj._p_oid in self._invalidated:
- ind = getattr(obj, "_p_independent", None)
- if ind is not None:
- # Defer _p_independent() call until state is loaded.
- return True
- else:
- self._get_transaction().join(self)
- self._conflicts.add(obj._p_oid)
- raise ReadConflictError(object=obj)
- else:
- return False
- finally:
- self._inv_lock.release()
-
- def _handle_independent(self, obj):
- # Helper method for setstate() handles possibly independent objects
- # Call _p_independent(), if it returns True, setstate() wins.
- # Otherwise, raise a ConflictError.
-
- if obj._p_independent():
- self._inv_lock.acquire()
- try:
- try:
- self._invalidated.remove(obj._p_oid)
- except KeyError:
- pass
- finally:
- self._inv_lock.release()
- else:
- self._get_transaction().join(self)
- raise ReadConflictError(object=obj)
-
- def register(self, obj):
- assert obj._p_jar is self and obj._p_oid is not None
- self._log.debug("register oid=%s" % _fmt_oid(obj._p_oid))
- if not self._registered:
- self._get_transaction().join(self)
- self._registered[obj._p_oid] = obj
-
- def mtime(self, obj):
- # required by the IPersistentDataManager interface, but unimplemented
- return None
-
- ######################################################################
- # IConnection requires the next six methods:
- # getVersion(), reset(), cacheGC(), invalidate(), close(), add()
-
- def getVersion(self):
- return self._version
-
- def reset(self, version=""):
- self._log.debug("connection reset")
- if version != self._version:
- # XXX I think it's necessary to clear the cache here, because
- # the objects in the cache don't know that they were in a
- # version.
- self._cache.clear()
- self._version = version
- self._inv_lock.acquire()
- try:
- self._cache.invalidate(self._invalidated)
- self._invalidated.clear()
- finally:
- self._inv_lock.release()
- self._open = True
-
- def cacheGC(self):
- self._cache.shrink()
-
- def invalidate(self, oids):
- self._inv_lock.acquire()
- try:
- self._invalidated.update(oids)
- finally:
- self._inv_lock.release()
-
- def close(self):
- if self._txn is not None:
- msg = "connection closed while transaction active"
- self._log.warn(msg)
- raise TransactionError(msg)
- self._log.debug("connection closed")
- self._open = False
- self._cache.shrink()
- # Return the connection to the pool.
- self._db._closeConnection(self)
-
- def add(self, obj):
- marker = object()
- oid = getattr(obj, "_p_oid", marker)
- if oid is marker:
- raise TypeError("cannot add a non-persistent object %r "
- "to a connection" % (obj, ))
- if obj._p_jar is not None and obj._p_jar is not self:
- raise InvalidObjectReference(obj, obj._p_jar)
- if obj._p_jar is None:
- # Setting _p_changed has a side-effect of adding obj to
- # _p_jar._registered, so it must be set after _p_jar.
- obj._p_oid = self.newObjectId()
- obj._p_jar = self
- obj._p_changed = True
- self._created.add(obj._p_oid)
-
- # There is an 'invariant' that objects in the cache can be
- # made into ghosts because they have _p_jar and _p_oid.
- # We are breaking this invariant, but that's OK because
- # it's not used anywhere. The right solution is to have a
- # separate cache of objects outside that invariant.
- self._cache.set(obj._p_oid, obj)
-
-
- ######################################################################
- # transaction.interfaces.IDataManager requires the next four methods
- # prepare(), abort(), commit(), savepoint()
-
- def prepare(self, txn):
- if self._conflicts:
- # XXX should raise all of the conflicting oids, but
- # haven't gotten around to changing the exception
- # to store them.
- oid = list(self._conflicts)[0]
- raise ReadConflictError(oid)
- self._modified.clear()
- self._created.clear()
- if self._tmp is not None:
- # commit_sub() will call tpc_begin() on the real storage
- self._commit_sub(txn)
- else:
- self._storage.tpcBegin(txn)
-
- self._registered.clearAddedKeys()
- for obj in self._registered.values():
- self._objcommit(obj, txn)
- for oid in self._registered.iterAddedKeys():
- # _registered can have new items added to it during _objcommit,
- # but it cannot have any existing ones removed
- obj = self._registered[oid]
- self._objcommit(obj, txn)
-
- s = self._storage.tpcVote(txn)
- self._handle_serial(s)
- return True
-
- def abort(self, txn):
- if self._tmp is not None:
- self._abort_sub()
- self._storage.tpcAbort(txn)
-
- if self._registered:
- self._cache.invalidate(list(self._registered))
- self._registered.clear()
- self._invalidate_created(self._created)
- self._cache.invalidate(self._modified)
- self._txn = None
- self._flush_invalidations()
- self._created.clear()
- self._modified.clear()
- self._conflicts.clear()
-
- def commit(self, txn):
- # It's important that the storage call the function we pass
- # (self._invalidate_modified) while it still has its lock.
- # We don't want another thread to be able to read any
- # updated data until we've had a chance to send an
- # invalidation message to all of the other connections!
-
- # If another thread could read the newly committed data
- # before the invalidation is delivered, the connection would
- # not be able to detect a read conflict.
- self._storage.tpcFinish(txn, self._invalidate_modified)
- self._txn = None
- self._conflicts.clear()
- self._flush_invalidations()
- self._registered.clear()
- self._created.clear()
- self._modified.clear()
-
- def savepoint(self, txn):
- if self._tmp is None:
- tmp = TmpStore(self._db, self._storage, self._version)
- self._tmp = self._storage
- self._storage = tmp
- self._modified = Set()
- self._created = Set()
- self._storage.tpcBegin(txn)
-
- self._registered.clearAddedKeys()
- for obj in self._registered.values():
- self._objcommit(obj, txn)
- for oid in self._registered.iterAddedKeys():
- # _registered can have new items added to it during _objcommit,
- # but it cannot have any existing ones removed
- obj = self._registered[oid]
- self._objcommit(obj, txn)
- self.importHook(txn) # hook for ExportImport
-
- # The tpcFinish() of TmpStore returns an UndoInfo object.
- undo = self._storage.tpcFinish(txn)
- self._cache.shrink()
- self._storage._created = self._created
- self._created = Set()
- return Rollback(self, undo)
-
- def _invalidate_created(self, created):
- # Dis-own new objects from uncommitted transaction.
- for oid in created:
- o = self._cache.get(oid)
- if o is not None:
- del o._p_jar
- del o._p_oid
- self._cache.remove(oid)
-
- def _invalidate_modified(self):
- self._db.invalidate(self._modified, self, self._version)
-
- def _flush_invalidations(self):
- self._inv_lock.acquire()
- try:
- self._cache.invalidate(self._invalidated)
- self._invalidated.clear()
- finally:
- self._inv_lock.release()
- # Now is a good time to collect some garbage
- self._cache.shrink()
-
- def _handle_serial(self, store_return, oid=None, change=1):
- """Handle the returns from store() and tpc_vote() calls."""
-
- # These calls can return different types depending on whether
- # ZEO is used. ZEO uses asynchronous returns that may be
- # returned in batches by the ClientStorage. ZEO1 can also
- # return an exception object and expect that the Connection
- # will raise the exception.
-
- # When commit_sub() exceutes a store, there is no need to
- # update the _p_changed flag, because the subtransaction
- # tpc_vote() calls already did this. The change=1 argument
- # exists to allow commit_sub() to avoid setting the flag
- # again.
-
- # When conflict resolution occurs, the object state held by
- # the connection does not match what is written to the
- # database. Invalidate the object here to guarantee that
- # the new state is read the next time the object is used.
-
- if not store_return:
- return
- if isinstance(store_return, StringType):
- assert oid is not None
- self._handle_one_serial(oid, store_return, change)
- else:
- for oid, serial in store_return:
- self._handle_one_serial(oid, serial, change)
-
- def _handle_one_serial(self, oid, serial, change=1):
- if not isinstance(serial, StringType):
- raise serial
- obj = self._cache.get(oid, None)
- if obj is None:
- return
- if serial == ResolvedSerial:
- obj._p_deactivate(force=True)
- else:
- if change:
- obj._p_changed = False
- obj._p_serial = serial
-
- def _objcommit(self, obj, transaction):
- oid = obj._p_oid
- self._log.debug("commit object %s", _fmt_oid(oid))
-
- if obj._p_changed:
- self._modified.add(oid)
- else:
- # The object reverted to the up-to-date state after
- # registering.
- self._log.info("object not modified %s", _fmt_oid(oid))
- return
-
- writer = ObjectWriter(self)
- try:
- for o in writer.newObjects(obj):
- self._commit_store(writer, o, transaction)
- finally:
- writer.close()
-
- def _commit_store(self, writer, obj, transaction):
- oid = obj._p_oid
- serial = getattr(obj, '_p_serial', None)
- if serial is None:
- self._created.add(oid)
- else:
- # Make a quick check against the invalidated set, because
- # pickling is expensive. Catching a conflict here will
- # be much faster than catching it in the store call.
- self._inv_lock.acquire()
- try:
- if (oid in self._invalidated and
- getattr(obj, '_p_resolveConflict', None) is None):
- raise ConflictError(oid=oid)
- finally:
- self._inv_lock.release()
- # XXX persistent classes don't register themselves
- # when they are modified, so we call add again here
- # to be sure they are invalidated.
- self._modified.add(oid)
-
- data, refs = writer.getState(obj)
- s = self._storage.store(oid, serial, data, refs, self._version,
- transaction)
- # Put the object in the cache before handling the
- # response, just in case the response contains the
- # serial number for a newly created object
- self._cache.set(oid, obj)
- self._handle_serial(s, oid)
-
- def _commit_sub(self, txn):
- # Commit all work done in subtransactions.
- assert self._tmp is not None
-
- tmp = self._storage
- self._storage = self._tmp
- self._tmp = None
-
- self._storage.tpcBegin(txn)
-
- # Copy invalidating and creating info from temporary storage:
- self._modified |= Set(tmp._index)
- self._created |= tmp._created
-
- for oid in tmp._index:
- data, refs, serial = tmp.loadrefs(oid, tmp._bver)
- s = self._storage.store(oid, serial, data, refs,
- self._version, txn)
- self._handle_serial(s, oid, change=False)
- tmp.close()
-
- def _abort_sub(self):
- # Abort work done in subtransactions.
- assert self._tmp is not None
-
- tmp = self._storage
- self._storage = self._tmp
- self._tmp = None
-
- self._cache.invalidate(tmp._index)
- self._invalidate_created(tmp._created)
- tmp.close()
-
-class Rollback:
- """Rollback changes associated with savepoint"""
-
- # In order to rollback changes for a savepoint(), we must remove
- # the logged changes from the TmpStore and invalidate any object
- # that has been changed since the rolledback transaction started.
-
- # XXX Should it be possible to rollback() to the same savepoint
- # more than once? (Yes.)
-
- implements(IRollback)
-
- def __init__(self, conn, tmp_undo):
- self._conn = conn
- self._tmp_undo = tmp_undo # undo info from the storage
-
- def rollback(self):
- if not self._tmp_undo.current(self._conn._storage):
- msg = "savepoint has already been committed"
- raise interfaces.RollbackError(msg)
- self._tmp_undo.rollback()
- self._conn._cache.invalidate(self._conn._modified)
-
-class UndoInfo:
- """A helper class for rollback.
-
- The class stores the state necessary for rolling back to a
- particular time.
- """
-
- def __init__(self, store, pos, index):
- self._store = store
- self._pos = pos
- self._index = index
-
- def current(self, cur_store):
- """Return true if the UndoInfo is for cur_store."""
- return self._store is cur_store
-
- def rollback(self):
- self._store.rollback(self._pos, self._index)
-
-
-class TmpStore:
- """A storage to support savepoints."""
-
- _bver = ''
-
- # The header format is oid, serial, nrefs, len(data). Following
- # the header are the refs and the data, where the size of refs is
- # nrefs * 8.
-
- _fmt = ">8s8sQI"
- _header_size = 28
-
- def __init__(self, db, storage, base_version):
- self._db = db
- self._storage = storage
- self._transaction = None
- if base_version:
- self._bver = base_version
- self._file = tempfile.TemporaryFile()
- # _pos: current file position
- # _tpos: file position at last commit point
- self._pos = self._tpos = 0
- # _index: map oid to pos of last committed version
- self._index = {}
- # _tindex: map oid to pos for new updates
- self._tindex = {}
- self._created = Set()
- self._db = None
-
- def close(self):
- self._file.close()
-
- def load(self, oid, version):
- # XXX I don't think the version handling is correct here.
- pos = self._index.get(oid, None)
- if pos is None:
- return self._storage.load(oid, self._bver)
- data, refs, serial = self.loadrefs(oid, version)
- return data, serial
-
- def loadrefs(self, oid, version):
- # A version of load the returns data, refs, and serial.
- pos = self._index.get(oid)
- # We only call loadrefs() for objects in the TmpStore.
- assert pos is not None
- self._file.seek(pos)
- buf = self._file.read(self._header_size)
- oid, serial, nrefs, size = struct.unpack(self._fmt, buf)
- refs = self._file.read(nrefs * 8)
- data = self._file.read(size)
- return data, splitrefs(refs), serial
-
- def newObjectId(self):
- return self._storage.newObjectId()
-
- def store(self, oid, serial, data, refs, version, transaction):
- if transaction is not self._transaction:
- raise interfaces.StorageTransactionError(self, transaction)
- self._file.seek(self._pos)
- if serial is None:
- serial = ZERO
- buf = struct.pack(self._fmt, oid, serial, len(refs), len(data))
- self._file.write(buf)
- self._file.write("".join(refs))
- self._file.write(data)
- self._tindex[oid] = self._pos
- self._pos += len(refs) * 8 + len(data) + self._header_size
- return serial
-
- def tpcAbort(self, transaction):
- if transaction is not self._transaction:
- return
- self._tindex.clear()
- self._transaction = None
- self._pos = self._tpos
-
- def tpcBegin(self, transaction):
- if self._transaction is transaction:
- return
- self._transaction = transaction
- self._tindex.clear() # Just to be sure!
- self._pos = self._tpos
-
- def tpcVote(self, transaction):
- pass
-
- def tpcFinish(self, transaction, f=None):
- if transaction is not self._transaction:
- return
- if f is not None:
- f()
- undo = UndoInfo(self, self._tpos, self._index.copy())
- self._index.update(self._tindex)
- self._tindex.clear()
- self._tpos = self._pos
- return undo
-
- def undoLog(self, first, last, filter=None):
- return ()
-
- def versionEmpty(self, version):
- # XXX what is this supposed to do?
- if version == self._bver:
- return len(self._index)
-
- def rollback(self, pos, index):
- if not (pos <= self._tpos <= self._pos):
- msg = "transaction rolled back to early point"
- raise interfaces.RollbackError(msg)
- self._tpos = self._pos = pos
- self._index = index
- self._tindex.clear()
+from ZODB.Connection import Connection
=== Zope3/src/zodb/db.py 1.19 => 1.20 ===
--- Zope3/src/zodb/db.py:1.19 Fri Jun 20 16:46:09 2003
+++ Zope3/src/zodb/db.py Fri Feb 20 11:56:58 2004
@@ -16,400 +16,4 @@
$Id$
"""
-__metaclass__ = type
-
-import sys
-from threading import Lock
-from time import time
-import logging
-
-from zope.interface import implements
-
-from zodb.storage.interfaces import *
-from zodb.connection import Connection
-from zodb.serialize import getDBRoot
-from zodb.ztransaction import Transaction
-from zodb.interfaces import ZERO
-from zodb.utils import Set
-
-from transaction import get_transaction
-from transaction.interfaces import IDataManager
-
-class DB:
- """The Object Database
-
- The Object database coordinates access to and interaction of one
- or more connections, which manage object spaces. Most of the actual work
- of managing objects is done by the connections.
- """
-
- # the database version number, a 4-byte string
- version = "DB01"
-
- def __init__(self, storage, pool_size=7, cache_size=400):
- """Create an object database.
-
- The storage for the object database must be passed in.
- Optional arguments are:
-
- pool_size -- The size of the pool of object spaces.
- """
-
- self.log = logging.getLogger("zodb")
-
- # The lock protects access to the pool data structures.
- # Store the lock acquire and release methods as methods
- # of the instance.
- l = Lock()
- self._a = l.acquire
- self._r = l.release
-
- # Setup connection pools and cache info
- # _pool is currently available (closed) connections
- # _allocated is all connections, open and closed
- # _temps is temporary connections
- self._pool = []
- self._allocated = []
- self._temps = []
- self._pool_lock = Lock()
- self._pool_lock.acquire()
- self._pool_size = pool_size
-
- self._cache_size = cache_size
-
- # Setup storage
- self._storage = storage
- self._checkVersion()
- storage.registerDB(self)
- try:
- storage.load(ZERO, "")
- except KeyError:
- # Create the database's root in the storage if it doesn't exist
- t = Transaction(description="initial database creation")
- storage.tpcBegin(t)
- # Because this is the initial root object, we know it can't have
- # any references, so include a longer comment then it would take
- # to unpack getDBRoot()'s return value.
- storage.store(ZERO, None, getDBRoot()[0], [], '', t)
- storage.tpcVote(t)
- storage.tpcFinish(t)
-
- # Pass through methods:
- if IUndoStorage.isImplementedBy(storage):
- self.undoInfo = storage.undoInfo
- if IVersionStorage.isImplementedBy(storage):
- for m in ['versionEmpty', 'versions', 'modifiedInVersion',
- 'versionEmpty']:
- setattr(self, m, getattr(storage, m))
-
- def _checkVersion(self):
- # Make sure the database version that created the storage is
- # compatible with this version of the database. If the storage
- # doesn't have a database version, it's brand-new so set it.
- ver = self._storage.getVersion()
- if ver is None:
- self._storage.setVersion(self.version)
- elif ver != self.version:
- raise StorageVersionError(self.version, ver)
-
- def _closeConnection(self, connection):
- """Return a connection to the pool"""
- self._a()
- try:
- self._pool.append(connection)
- if len(self._pool) == 1:
- # Pool now usable again, unlock it.
- self._pool_lock.release()
- finally:
- self._r()
-
- def _connectionMap(self, f):
- self._a()
- try:
- map(f, self._allocated)
-
- # XXX I don't understand what this code is trying to do
- if self._temps:
- for cc in self._temps:
- if sys.getrefcount(cc) > 3:
- f(cc)
- self._temps = []
- finally:
- self._r()
-
- def abortVersion(self, version):
- AbortVersion(self, version)
-
- def close(self):
- # XXX Jim observes that database close typically occurs when
- # the app server is shutting down. If an errant thread is
- # still running, it may not be possible to stop it. Thus,
- # the error on connection.close() may be counter-productive.
- for c in self._allocated:
- c.close()
- del self._allocated[:]
- del self._pool[:]
- self._storage.close()
-
- def commitVersion(self, source, destination=''):
- CommitVersion(self, source, destination)
-
- def getCacheSize(self):
- return self._cache_size
-
- def getName(self):
- return self._storage.getName()
-
- def getPoolSize(self):
- return self._pool_size
-
- def invalidate(self, oids, connection=None, version=''):
- """Invalidate references to a given oid.
-
- This is used to indicate that one of the connections has committed a
- change to the object. The connection commiting the change should be
- passed in to prevent useless (but harmless) messages to the
- connection.
- """
- if connection is not None:
- assert version == connection._version
- version = connection._version
-
- self.log.debug("invalidate %s" % oids)
-
- # Notify connections
- for cc in self._allocated:
- if cc is not connection:
- self.invalidateConnection(cc, oids, version)
-
- if self._temps:
- # t accumulates all the connections that aren't closed.
- t = []
- for cc in self._temps:
- if cc is not connection:
- self.invalidateConnection(cc, oids, version,
- t.append)
- self._temps = t
-
- def invalidateConnection(self, conn, oids, version, alive=None):
- """Send invalidation message to conn for oid on version.
-
- If the modification occurred on a version, an invalidation is
- sent only if the version of the mod matches the version of the
- connection.
-
- This function also handles garbage collection of connection's
- that aren't used anymore. If the optional argument alive is
- defined, it is a function that is called for all connections
- that aren't garbage collected.
- """
-
- # XXX use weakrefs instead of refcounts?
- if sys.getrefcount(conn) <= 3:
- conn.close()
- else:
- if alive is not None:
- alive(conn)
- if not version or conn.getVersion() == version:
- conn.invalidate(oids)
-
- def open(self, version='', transaction=None, temporary=0, force=None,
- waitflag=1):
- """Return a object space (AKA connection) to work in
-
- The optional version argument can be used to specify that a
- version connection is desired.
-
- The optional transaction argument can be provided to cause the
- connection to be automatically closed when a transaction is
- terminated. In addition, connections per transaction are
- reused, if possible.
-
- Note that the connection pool is managed as a stack, to increate the
- likelihood that the connection's stack will include useful objects.
- """
- self._a()
- try:
-
- if transaction is not None:
- connections=transaction._connections
- if connections:
- v = connections.get(version)
- if not (v is None or temporary):
- return v
- else:
- transaction._connections = connections = {}
- transaction = transaction._connections
-
- if temporary:
- # This is a temporary connection.
- # We won't bother with the pools. This will be
- # a one-use connection.
- c = Connection(self, self._storage, version,
- cache_size=self._cache_size)
- self._temps.append(c)
- if transaction is not None:
- transaction[id(c)] = c
- return c
-
- # Pool locks are tricky. Basically, the lock needs to be
- # set whenever the pool becomes empty so that threads are
- # forced to wait until the pool gets a connection in it.
- # The lock is acquired when the (empty) pool is
- # created. The The lock is acquired just prior to removing
- # the last connection from the pool and just after adding
- # a connection to an empty pool.
-
- if not self._pool:
- if self._pool_size > len(self._allocated) or force:
- # If the number allocated is less than the pool
- # size, then we've never reached the limit.
- # Allocate a connection and return without
- # touching the lock.
- c = Connection(self, self._storage, version,
- cache_size=self._cache_size)
- self._allocated.append(c)
- return c
- else:
- # If the number allocated is larger than the pool
- # size, then we have to wait for another thread to
- # close its connection.
- if waitflag:
- self.log.debug("waiting for pool lock")
- self._r()
- self._pool_lock.acquire()
- self._a()
- self.log.debug("acquired pool lock")
- if len(self._pool) > 1:
- # Note that the pool size will normally be 1 here,
- # but it could be higher due to a race condition.
- self._pool_lock.release()
- else:
- self.log.debug("open failed because pool is empty")
- return
- elif len(self._pool) == 1:
- # Taking last one, lock the pool
-
- # Note that another thread might grab the lock before
- # us, so we might actually block, however, when we get
- # the lock back, there *will* be a connection in the
- # pool.
-
- self._r()
- self._pool_lock.acquire()
- self._a()
- if len(self._pool) > 1:
- # Note that the pool size will normally be 1 here,
- # but it could be higher due to a race condition.
- self._pool_lock.release()
-
- # XXX Could look for a connection with the right version
- c = self._pool.pop()
- c.reset(version)
- for other_conn in self._pool:
- other_conn.cacheGC()
-
- if transaction is not None:
- transaction[version] = c
- return c
-
- finally: self._r()
-
- def pack(self, t=None, days=0):
- if t is None:
- t = time()
- t -= days * 86400
- try:
- self._storage.pack(t)
- except:
- self.log.exception("packing")
- raise
-
- def setCacheSize(self, v):
- self._cache_size = v
- for c in self._pool:
- c._cache.cache_size = v
-
- def setPoolSize(self, v):
- self._pool_size = v
-
- def undo(self, id):
- TransactionalUndo(self, id)
-
-class SimpleDataManager:
-
- implements(IDataManager)
-
- def __init__(self, db):
- self._db = db
- self._storage = db._storage
- get_transaction().join(self)
-
- def prepare(self, txn):
- self._storage.tpcBegin(txn)
- try:
- self._prepare(txn)
- self._storage.tpcVote(txn)
- except StorageError, err:
- logging.getLogger("zodb").info("Error during prepare: %s", err)
- return False
- else:
- return True
-
- def abort(self, txn):
- self._storage.tpcAbort(txn)
-
- def commit(self, txn):
- self._storage.tpcFinish(txn)
-
- def _prepare(self, txn):
- # Hook for clients to perform action during 2PC
- pass
-
-class CommitVersion(SimpleDataManager):
- """An object that will see to version commit."""
-
- def __init__(self, db, version, dest=''):
- super(CommitVersion, self).__init__(db)
- self._version = version
- self._dest = dest
-
- def _prepare(self, txn):
- self._oids = Set(self._storage.commitVersion(self._version, self._dest,
- txn))
-
- def commit(self, txn):
- super(CommitVersion, self).commit(txn)
- self._db.invalidate(self._oids, version=self._dest)
- if self._dest:
- # the code above just invalidated the dest version.
- # now we need to invalidate the source!
- self._db.invalidate(self._oids, version=self._version)
-
-class AbortVersion(SimpleDataManager):
- """An object that will see to version abortion."""
-
- def __init__(self, db, version):
- super(AbortVersion, self).__init__(db)
- self._version = version
-
- def _prepare(self, txn):
- self._oids = Set(self._storage.abortVersion(self._version, txn))
-
- def commit(self, txn):
- super(AbortVersion, self).commit(txn)
- self._db.invalidate(self._oids, version=self._version)
-
-class TransactionalUndo(SimpleDataManager):
- """An object that will see to transactional undo."""
-
- def __init__(self, db, tid):
- super(TransactionalUndo, self).__init__(db)
- self._tid = tid
-
- def _prepare(self, txn):
- self._oids = Set(self._storage.undo(self._tid, txn))
-
- def commit(self, txn):
- super(TransactionalUndo, self).commit(txn)
- self._db.invalidate(self._oids)
+from ZODB.DB import DB
=== Zope3/src/zodb/interfaces.py 1.19 => 1.20 ===
--- Zope3/src/zodb/interfaces.py:1.19 Wed Oct 8 04:20:28 2003
+++ Zope3/src/zodb/interfaces.py Fri Feb 20 11:56:58 2004
@@ -26,362 +26,4 @@
$Id$
"""
-import zodb.utils
-from zope.interface import Interface, Attribute
-
-from transaction.interfaces import ITransaction as _ITransaction
-from transaction.interfaces \
- import TransactionError, RollbackError, ConflictError as _ConflictError
-
-__all__ = [
- # Constants
- 'ZERO', 'MAXTID',
- # Exceptions
- 'POSError',
- 'POSKeyError',
- 'ConflictError',
- 'ReadConflictError',
- 'DanglingReferenceError',
- 'VersionError',
- 'VersionCommitError',
- 'VersionLockError',
- 'UndoError',
- 'MultipleUndoErrors',
- 'ExportError',
- 'Unsupported',
- 'InvalidObjectReference',
- # Interfaces
- 'IAppConnection',
- 'IConnection',
- 'ITransaction',
- 'ITransactionAttrs',
- ]
-
-ZERO = '\0'*8
-MAXTID = '\377'*8
-
-def _fmt_oid(oid):
- return "%016x" % zodb.utils.u64(oid)
-
-def _fmt_undo(oid, reason):
- s = reason and (": %s" % reason) or ""
- return "Undo error %s%s" % (_fmt_oid(oid), s)
-
-class POSError(StandardError):
- """Persistent object system error."""
-
-class POSKeyError(KeyError, POSError):
- """Key not found in database."""
-
- def __str__(self):
- return _fmt_oid(self.args[0])
-
-class ConflictError(_ConflictError):
- """Two transactions tried to modify the same object at once.
-
- This transaction should be resubmitted.
-
- Instance attributes:
- oid : string
- the OID (8-byte packed string) of the object in conflict
- class_name : string
- the fully-qualified name of that object's class
- message : string
- a human-readable explanation of the error
- serials : (string, string)
- a pair of 8-byte packed strings; these are the serial numbers
- related to conflict. The first is the revision of object that
- is in conflict, the second is the revision of that the current
- transaction read when it started.
-
- The caller should pass either object or oid as a keyword argument,
- but not both of them. If object is passed, it should be a
- persistent object with an _p_oid attribute.
- """
-
- def __init__(self, message=None, object=None, oid=None, serials=None):
- if message is None:
- self.message = "database conflict error"
- else:
- self.message = message
-
- if object is None:
- self.oid = None
- self.class_name = None
- else:
- self.oid = object._p_oid
- klass = object.__class__
- self.class_name = klass.__module__ + "." + klass.__name__
-
- if oid is not None:
- assert self.oid is None
- self.oid = oid
-
- self.serials = serials
-
- def __str__(self):
- extras = []
- if self.oid:
- extras.append("oid %s" % _fmt_oid(self.oid))
- if self.class_name:
- extras.append("class %s" % self.class_name)
- if self.serials:
- extras.append("serial was %s, now %s" %
- tuple(map(_fmt_oid, self.serials)))
- if extras:
- return "%s (%s)" % (self.message, ", ".join(extras))
- else:
- return self.message
-
- def get_oid(self):
- return self.oid
-
- def get_class_name(self):
- return self.class_name
-
- def get_old_serial(self):
- return self.serials[0]
-
- def get_new_serial(self):
- return self.serials[1]
-
- def get_serials(self):
- return self.serials
-
-class ReadConflictError(ConflictError):
- """Conflict detected when object was loaded.
-
- An attempt was made to read an object that has changed in another
- transaction (eg. another thread or process).
- """
- def __init__(self, message=None, object=None, serials=None):
- if message is None:
- message = "database read conflict error"
- ConflictError.__init__(self, message=message, object=object,
- serials=serials)
-
-class DanglingReferenceError(TransactionError):
- """An object has a persistent reference to a missing object.
-
- If an object is stored and it has a reference to another object
- that does not exist (for example, it was deleted by pack), this
- exception may be raised. Whether a storage supports this feature,
- it a quality of implementation issue.
-
- Instance attributes:
- referer: oid of the object being written
- missing: referenced oid that does not have a corresponding object
- """
-
- def __init__(self, Aoid, Boid):
- self.referer = Aoid
- self.missing = Boid
-
- def __str__(self):
- return "from %s to %s" % (_fmt_oid(self.referer),
- _fmt_oid(self.missing))
-
-class VersionError(POSError):
- """An error in handling versions occurred."""
-
-class VersionCommitError(VersionError):
- """An invalid combination of versions was used in a version commit."""
-
-class VersionLockError(VersionError, TransactionError):
- """Can't modify an object that is modified in unsaved version."""
-
- def __init__(self, oid, version):
- self.oid = oid
- self.version = version
-
- def __str__(self):
- return "%s locked in version %r" % (_fmt_oid(self.oid),
- self.version)
-
-class UndoError(POSError):
- """An attempt was made to undo a non-undoable transaction."""
-
- def __init__(self, oid, reason=None):
- self._oid = oid
- self._reason = reason
-
- def __str__(self):
- return _fmt_undo(self._oid, self._reason)
-
-class MultipleUndoErrors(UndoError):
- """Several undo errors occured during a single transaction."""
-
- def __init__(self, errs):
- # provide an oid and reason for clients that only look at that
- UndoError.__init__(self, *errs[0])
- self._errs = errs
-
- def __str__(self):
- return "\n".join([_fmt_undo(*pair) for pair in self._errs])
-
-class ExportError(POSError):
- """An export file doesn't have the right format."""
-
-class Unsupported(POSError):
- """An feature that is unsupported bt the storage was used."""
-
-class InvalidObjectReference(POSError):
- """An object contains an invalid reference to another object.
-
- A reference is invalid if it refers to an object managed
- by a different database connection.
-
- Attributes:
- obj is the invalid object
- jar is the manager that attempted to store it.
-
- obj._p_jar != jar
- """
-
- def __init__(self, obj, jar):
- self.obj = obj
- self.jar = jar
-
- def __str__(self):
- return "Invalid reference to object %s." % _fmt_oid(self.obj._p_jar)
-
-class IAppDatabase(Interface):
- """Interface exported by database to applications.
-
- The database contains a graph of objects reachable from the
- distinguished root object. The root object is a mapping object
- that can contain arbitrary application data.
-
- There is only rudimentary support for using more than one database
- in a single application. The persistent state of an object in one
- database can not contain a direct reference to an object in
- another database.
- """
-
- def open(version="", transaction=None, temporary=False, force=False,
- waitflag=True):
- # XXX Most of these arguments should eventually go away
- """Open a new database connection."""
-
- def abortVersion(version):
- """Abort the locked database version named version."""
-
- def commitVersion(source, dest=""):
- """Commit changes from locked database version source to dest.
-
- The default value of dest means commit the changes to the
- default version.
- """
-
- def pack(time):
- """Pack database to time."""
-
- def undo(txnid):
- """Undo changes caused by transaction txnid."""
-
-class IAppConnection(Interface):
- """Interface exported by database connection to applications.
-
- Each database connection provides an independent copy of the
- persistent object space. ZODB supports multiple threads by
- providing each thread with a separate connection.
-
- Connections are synchronized through database commits and explicit
- sync() calls. Changes to the object space are only made visible
- when a transaction commits. When a connection commits its
- changes, they become visible to other connections. Changes made
- by other connections are also become visible at this time.
- """
-
- def root():
- """Return the root of the database."""
-
- def sync():
- """Process pending invalidations.
-
- If there is a current transaction, it will be aborted.
- """
-
- def get(oid):
- """Return object for `oid`.
-
- The object may be a ghost.
- """
-
-class IDatabase(Interface):
- """Interface between the database and its connections."""
-
- def invalidate(oids, conn=None, version=""):
- pass
-
- def _closeConnection(conn):
- pass
-
-
-class IConnection(Interface):
- """Interface required of Connection by ZODB DB.
-
- The Connection also implements IPersistentDataManager.
- """
-
- def invalidate(oids):
- """Invalidate a set of oids modified by a single transaction.
-
- This marks the oids as invalid, but doesn't actually
- invalidate them. The object data will be actually invalidated
- at certain transaction boundaries.
- """
-
- def reset(version=""):
- """Reset connection to use specified version."""
-
- def getVersion():
- """Return the version that connection is using."""
-
- def close():
- pass
-
- def cacheGC():
- pass
-
- def add(obj):
- """Add a persistent object to this connection.
-
- Essentially, set _p_jar and assign _p_oid on the object.
-
- Raises a TypeError if obj is not persistent. Does nothing if
- obj is already added to this connection.
- """
-
-class ITransaction(_ITransaction):
- """Extends base ITransaction with with metadata.
-
- Client code should use this interface to set attributes.
- """
-
- def note(text):
- """Add the text to the transaction description
-
- If there previous description isn't empty, a blank line is
- added before the new text.
- """
-
- def setUser(user_name):
- """Set the transaction user name."""
-
- def setExtendedInfo(name, value):
- """Set extended information."""
-
-class ITransactionAttrs(_ITransaction):
- # XXX The following attributes used by storages, so they are part
- # of the interface. But I'd rather not have user code explicitly
- # use the attributes.
-
- user = Attribute("The user as set by setUser()")
-
- description = Attribute("A description as set by note()")
-
- _extension = Attribute(
- """Extended info as set by setExtendedInfo()
-
- Should be None or a dictionary.""")
+from ZODB.POSException import *
=== Removed File Zope3/src/zodb/_timestamp.c ===
=== Removed File Zope3/src/zodb/component.xml ===
=== Removed File Zope3/src/zodb/config.py ===
=== Removed File Zope3/src/zodb/conflict.py ===
=== Removed File Zope3/src/zodb/dbdump.py ===
=== Removed File Zope3/src/zodb/export.py ===
=== Removed File Zope3/src/zodb/lockfile.py ===
=== Removed File Zope3/src/zodb/timestamp.py ===
=== Removed File Zope3/src/zodb/transact.py ===
=== Removed File Zope3/src/zodb/winlock.c ===
=== Removed File Zope3/src/zodb/ztransaction.py ===
More information about the Zope3-Checkins
mailing list