[Zodb-checkins] SVN: ZODB/branches/ctheune-blobsupport/s -
different cleanups
Christian Theune
ct at gocept.com
Mon Mar 21 23:16:04 EST 2005
Log message for revision 29637:
- different cleanups
- merged from head
- added configuration methods to configure a blobfilestorage
- made stuff work ;)
Changed:
U ZODB/branches/ctheune-blobsupport/setup.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/BaseStorage.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/Blob.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/BlobStorage.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/TODO.txt
U ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/interfaces.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/tests/connection.txt
U ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/tests/test_doctests.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/Connection.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/DB.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/FileStorage/FileStorage.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/FileStorage/format.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/TmpStore.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/component.xml
U ZODB/branches/ctheune-blobsupport/src/ZODB/config.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/interfaces.py
A ZODB/branches/ctheune-blobsupport/src/ZODB/tests/loggingsupport.py
A ZODB/branches/ctheune-blobsupport/src/ZODB/tests/multidb.txt
U ZODB/branches/ctheune-blobsupport/src/ZODB/tests/testConnection.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/tests/test_doctest_files.py
U ZODB/branches/ctheune-blobsupport/src/ZODB/utils.py
U ZODB/branches/ctheune-blobsupport/src/persistent/interfaces.py
U ZODB/branches/ctheune-blobsupport/src/transaction/interfaces.py
A ZODB/branches/ctheune-blobsupport/src/zope/proxy/
_U ZODB/branches/ctheune-blobsupport/src/zope/proxy/SETUP.cfg
_U ZODB/branches/ctheune-blobsupport/src/zope/proxy/__init__.py
_U ZODB/branches/ctheune-blobsupport/src/zope/proxy/_zope_proxy_proxy.c
_U ZODB/branches/ctheune-blobsupport/src/zope/proxy/interfaces.py
_U ZODB/branches/ctheune-blobsupport/src/zope/proxy/proxy.h
_U ZODB/branches/ctheune-blobsupport/src/zope/proxy/tests/__init__.py
_U ZODB/branches/ctheune-blobsupport/src/zope/proxy/tests/test_proxy.py
-=-
Modified: ZODB/branches/ctheune-blobsupport/setup.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/setup.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/setup.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -128,21 +128,37 @@
sources= ['src/zope/interface/_zope_interface_coptimizations.c']
)
-exts += [cPersistence, cPickleCache, TimeStamp, winlock, cZopeInterface]
+cZopeProxy = Extension(
+ name = 'zope.proxy._zope_proxy_proxy',
+ sources= ['src/zope/proxy/_zope_proxy_proxy.c']
+ )
+exts += [cPersistence,
+ cPickleCache,
+ TimeStamp,
+ winlock,
+ cZopeInterface,
+ cZopeProxy,
+ ]
+
# The ZODB.zodb4 code is not being packaged, because it is only
# need to convert early versions of Zope3 databases to ZODB3.
packages = ["BTrees", "BTrees.tests",
"ZEO", "ZEO.auth", "ZEO.zrpc", "ZEO.tests",
- "ZODB", "ZODB.FileStorage", "ZODB.Blobs",
+ "ZODB", "ZODB.FileStorage", "ZODB.Blobs", "ZODB.Blobs.tests",
"ZODB.tests",
"Persistence", "Persistence.tests",
"persistent", "persistent.tests",
"transaction", "transaction.tests",
"ThreadedAsync",
"zdaemon", "zdaemon.tests",
- "zope", "zope.interface", "zope.testing",
+
+ "zope",
+ "zope.interface", "zope.interface.tests",
+ "zope.proxy", "zope.proxy.tests",
+ "zope.testing",
+
"ZopeUndo", "ZopeUndo.tests",
"ZConfig", "ZConfig.tests",
"ZConfig.components",
@@ -187,6 +203,7 @@
"ZODB/tests",
"zdaemon",
"zdaemon/tests",
+ "zope/interface", "zope/interface/tests",
]:
dir = convert_path(dir)
inputdir = os.path.join("src", dir)
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/BaseStorage.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/BaseStorage.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/BaseStorage.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -252,6 +252,12 @@
pass
def tpc_finish(self, transaction, f=None):
+ # It's important that the storage calls the function we pass
+ # while it still has its lock. We don't want another thread
+ # to be able to read any updated data until we've had a chance
+ # to send an invalidation message to all of the other
+ # connections!
+
self._lock_acquire()
try:
if transaction is not self._transaction:
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/Blob.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/Blob.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/Blob.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -1,5 +1,6 @@
import os
+import tempfile
from zope.interface import implements
@@ -8,33 +9,22 @@
from ZODB import utils
from persistent import Persistent
-class TempFileHandler(object):
- """Handles holding a tempfile around.
+try:
+ from ZPublisher.Iterators import IStreamIterator
+except ImportError:
+ IStreamIterator = None
- The tempfile is unlinked when the tempfilehandler is GCed.
- """
-
- def __init__(self, directory, mode)
- self.handle, self.filename = tempfile.mkstemp(dir=directory,
- text=mode)
-
- def __del__(self):
- self.handle
- os.unlink(self.filename)
-
class Blob(Persistent):
implements(IBlob)
- def __init__(self):
- self._p_blob_readers = 0
- self._p_blob_writers = 0
- self._p_blob_uncommitted = None
- self._p_blob_data = None
+ _p_blob_readers = 0
+ _p_blob_writers = 0
+ _p_blob_uncommitted = None
+ _p_blob_data = None
def open(self, mode):
"""Returns a file(-like) object for handling the blob data."""
-
if mode == "r":
if self._current_filename() is None:
raise BlobError, "Blob does not exist."
@@ -43,17 +33,17 @@
raise BlobError, "Already opened for writing."
self._p_blob_readers += 1
- return BlobTempFile(self._current_filename(), "rb", self)
+ return BlobFile(self._current_filename(), "rb", self)
if mode == "w":
if self._p_blob_readers != 0:
raise BlobError, "Already opened for reading."
if self._p_blob_uncommitted is None:
- self._p_blob_uncommitted = self._get_uncommitted_filename()
+ self._p_blob_uncommitted = utils.mktemp()
self._p_blob_writers += 1
- return BlobTempFile(self._p_blob_uncommitted, "wb", self)
+ return BlobFile(self._p_blob_uncommitted, "wb", self)
if mode =="a":
if self._current_filename() is None:
@@ -62,15 +52,15 @@
if self._p_blob_readers != 0:
raise BlobError, "Already opened for reading."
- if not self._p_blob_uncommitted:
+ if self._p_blob_uncommitted is None:
# Create a new working copy
- self._p_blob_uncommitted = self._get_uncommitted_filename()
- uncommitted = BlobTempFile(self._p_blob_uncommitted, "wb", self)
+ self._p_blob_uncommitted = utils.mktmp()
+ uncommitted = BlobFile(self._p_blob_uncommitted, "wb", self)
utils.cp(file(self._p_blob_data), uncommitted)
uncommitted.seek(0)
else:
# Re-use existing working copy
- uncommitted = BlobTempFile(self._p_blob_uncommitted, "ab", self)
+ uncommitted = BlobFile(self._p_blob_uncommitted, "ab", self)
self._p_blob_writers +=1
return uncommitted
@@ -80,28 +70,29 @@
def _current_filename(self):
return self._p_blob_uncommitted or self._p_blob_data
- def _get_uncommitted_filename(self):
- return os.tempnam()
+class BlobFile(file):
-class BlobFileBase:
-
# XXX those files should be created in the same partition as
# the storage later puts them to avoid copying them ...
+ if IStreamIterator is not None:
+ __implements__ = (IStreamIterator,)
+
def __init__(self, name, mode, blob):
- file.__init__(self, name, mode)
+ super(BlobFile, self).__init__(name, mode)
self.blob = blob
+ self.streamsize = 1<<16
def write(self, data):
- file.write(self, data)
+ super(BlobFile, self).write(data)
self.blob._p_changed = 1
def writelines(self, lines):
- file.writelines(self, lines)
+ super(BlobFile, self).writelines(lines)
self.blob._p_changed = 1
def truncate(self, size):
- file.truncate(self, size)
+ super(BlobFile, self).truncate(size)
self.blob._p_changed = 1
def close(self):
@@ -110,15 +101,20 @@
self.blob._p_blob_writers -= 1
else:
self.blob._p_blob_readers -= 1
- file.close(self)
+ super(BlobFile, self).close()
-class BlobFile(BlobFileBase, file):
- pass
+ def next(self):
+ data = self.read(self.streamsize)
+ if not data:
+ self.blob._p_blob_readers -= 1
+ raise StopIteration
+ return data
-class BlobTempFile(BlobFileBase, NamedTempFile)
- pass
+ def __len__(self):
+ cur_pos = self.tell()
+ self.seek(0, 2)
+ size = self.tell()
+ self.seek(cur_pos, 0)
+ return size
-def copy_file(old, new):
- for chunk in old.read(4096):
- new.write(chunk)
- new.seek(0)
+
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/BlobStorage.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/BlobStorage.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/BlobStorage.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -12,41 +12,85 @@
#
##############################################################################
+import os
+
from zope.interface import implements
-from zope.proxy import ProxyBase
+from zope.proxy import ProxyBase, getProxiedObject
-from ZODB.interfaces import \
- IStorageAdapter, IUndoableStorage, IVersioningStorage, IBlobStorage
+from ZODB import utils
+from ZODB.Blobs.interfaces import IBlobStorage, IBlob
class BlobStorage(ProxyBase):
"""A storage to support blobs."""
implements(IBlobStorage)
- __slots__ = ('base_directory',)
+ __slots__ = ('base_directory', 'dirty_oids')
- def __init__(self, base_directory, storage):
+ def __new__(self, base_directory, storage):
+ return ProxyBase.__new__(self, storage)
+
+ def __init__(self, base_directory, storage):
+ # TODO Complain if storage is ClientStorage
ProxyBase.__init__(self, storage)
self.base_directory = base_directory
+ self.dirty_oids = []
- def storeBlob(oid, serial, data, blob, version, transaction):
+ def storeBlob(self, oid, oldserial, data, blobfilename, version, transaction):
"""Stores data that has a BLOB attached."""
- if transaction is not self._transaction:
- raise POSException.StorageTransactionError(self, transaction)
+ serial = self.store(oid, oldserial, data, version, transaction)
+ assert isinstance(serial, str) # XXX in theory serials could be
+ # something else
self._lock_acquire()
try:
- #
+ targetname = self._getCleanFilename(oid, serial)
+ try:
+ os.rename(blobfilename, targetname)
+ except OSError:
+ target = file(targetname, "wb")
+ source = file(blobfilename, "rb")
+ utils.cp(blobfile, target)
+ target.close()
+ source.close()
+ os.unlink(blobfilename)
-
+ # XXX if oid already in there, something is really hosed.
+ # The underlying storage should have complained anyway
+ self.dirty_oids.append((oid, serial))
finally:
self._lock_release()
return self._tid
+ def _getDirtyFilename(self, oid):
+ """Generate an intermediate filename for two-phase commit.
+ XXX Not used right now due to conceptual flux. Please keep it around
+ anyway.
+ """
+ return self._getCleanFilename(oid, "store")
+ def _getCleanFilename(self, oid, tid):
+ return "%s/%s-%s.blob" % \
+ (self.base_directory,
+ utils.oid_repr(oid),
+ utils.tid_repr(tid),
+ )
+ def _finish(self, tid, u, d, e):
+ ProxyBase._finish(self, tid, u, d, e)
+ self.dirty_blobs = []
- def loadBlob(oid, serial, version, blob):
- """Loads the BLOB data for 'oid' into the given blob object.
+ def _abort(self):
+ ProxyBase._abort(self)
+
+ # Throw away the stuff we'd had committed
+ while self.dirty_blobs:
+ oid, serial = self.dirty_blobs.pop()
+ os.unlink(self._getCleanFilename(oid))
+
+ def loadBlob(self, oid, serial, version):
+ """Return the filename where the blob file can be found.
"""
+ return self._getCleanFilename(oid, serial)
+
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/TODO.txt
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/TODO.txt 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/TODO.txt 2005-03-22 04:16:03 UTC (rev 29637)
@@ -1,2 +1,4 @@
- Blob instances should clean up temporary files after committing
+
+- Support database import/export
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/interfaces.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/interfaces.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/interfaces.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -13,3 +13,20 @@
# XXX need a method to initialize the blob from the storage
# this means a) setting the _p_blob_data filename and b) putting
# the current data in that file
+
+class IBlobStorage(Interface):
+ """A storage supporting BLOBs."""
+
+ def storeBlob(oid, oldserial, data, blob, version, transaction):
+ """Stores data that has a BLOB attached."""
+
+ def loadBlob(oid, serial, version):
+ """Return the filename of the Blob data responding to this OID and
+ serial.
+
+ Returns a filename or None if no Blob data is connected with this OID.
+ """
+
+ def getBlobDirectory():
+ """
+ """
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/tests/connection.txt
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/tests/connection.txt 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/tests/connection.txt 2005-03-22 04:16:03 UTC (rev 29637)
@@ -23,15 +23,18 @@
>>> blob = Blob()
>>> data = blob.open("w")
>>> data.write("I'm a happy Blob.")
+ >>> data.close()
We also need a database with a blob supporting storage:
>>> from ZODB.MappingStorage import MappingStorage
+ >>> from ZODB.Blobs.BlobStorage import BlobStorage
+ >>> from ZODB.DB import DB
>>> from tempfile import mkdtemp
>>> base_storage = MappingStorage("test")
>>> blob_dir = mkdtemp()
>>> blob_storage = BlobStorage(blob_dir, base_storage)
- >>> database = DB(storage)
+ >>> database = DB(blob_storage)
Putting a Blob into a Connection works like every other object:
@@ -40,12 +43,11 @@
>>> root['myblob'] = blob
>>> import transaction
>>> transaction.commit()
- >>> connection.close()
Getting stuff out of there works similar:
- >>> connection = database.open()
- >>> root = connection.root()
+ >>> connection2 = database.open()
+ >>> root = connection2.root()
>>> blob2 = root['myblob']
>>> IBlob.isImplementedBy(blob2)
True
@@ -56,17 +58,18 @@
>>> no_blob_storage = MappingStorage()
>>> database2 = DB(no_blob_storage)
- >>> connection = database.open()
- >>> root = connection.root()
- >>> root['myblob'] = blob
- >>> transaction.commit()
+ >>> connection3 = database2.open()
+ >>> root = connection3.root()
+ >>> root['myblob'] = Blob()
+ >>> transaction.commit() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
- POSException.Unsupported: Storing Blobs is not supported.
+ Unsupported: Storing Blobs in <ZODB.MappingStorage.MappingStorage instance at ...> is not supported.
While we are testing this, we don't need the storage directory and databases anymore:
- >>> import os
- >>> os.unlink(blob_dir)
+ >>> import shutil
+ >>> shutil.rmtree(blob_dir)
+ >>> transaction.abort()
>>> database.close()
>>> database2.close()
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/tests/test_doctests.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/tests/test_doctests.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/tests/test_doctests.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -15,4 +15,4 @@
from zope.testing.doctestunit import DocFileSuite
def test_suite():
- return DocFileSuite("../README.txt")
+ return DocFileSuite("../Blob.txt", "connection.txt")
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Connection.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Connection.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Connection.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -23,21 +23,25 @@
from persistent import PickleCache
+# interfaces
+from persistent.interfaces import IPersistentDataManager
+from ZODB.interfaces import IConnection
+from ZODB.Blobs.interfaces import IBlob, IBlobStorage
+from transaction.interfaces import IDataManager
+from zope.interface import implements
+
import transaction
from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ExportImport import ExportImport
from ZODB.POSException \
import ConflictError, ReadConflictError, InvalidObjectReference, \
- ConnectionStateError
+ ConnectionStateError, Unsupported
from ZODB.TmpStore import TmpStore
-from ZODB.utils import u64, oid_repr, z64, positive_id
from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr
-from ZODB.interfaces import IConnection
-from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
+from ZODB.utils import u64, oid_repr, z64, positive_id, \
+ DEPRECATED_ARGUMENT, deprecated36
-from zope.interface import implements
-
global_reset_counter = 0
def resetCaches():
@@ -54,127 +58,19 @@
global_reset_counter += 1
class Connection(ExportImport, object):
- """Connection to ZODB for loading and storing objects.
+ """Connection to ZODB for loading and storing objects."""
- The Connection object serves as a data manager. The root() method
- on a Connection returns the root object for the database. This
- object and all objects reachable from it are associated with the
- Connection that loaded them. When a transaction commits, it uses
- the Connection to store modified objects.
+ implements(IConnection, IDataManager, IPersistentDataManager)
- Typical use of ZODB is for each thread to have its own
- Connection and that no thread should have more than one Connection
- to the same database. A thread is associated with a Connection by
- loading objects from that Connection. Objects loaded by one
- thread should not be used by another thread.
-
- A Connection can be associated with a single version when it is
- created. By default, a Connection is not associated with a
- version; it uses non-version data.
-
- Each Connection provides an isolated, consistent view of the
- database, by managing independent copies of objects in the
- database. At transaction boundaries, these copies are updated to
- reflect the current state of the database.
-
- You should not instantiate this class directly; instead call the
- open() method of a DB instance.
-
- In many applications, root() is the only method of the Connection
- that you will need to use.
-
- Synchronization
- ---------------
-
- A Connection instance is not thread-safe. It is designed to
- support a thread model where each thread has its own transaction.
- If an application has more than one thread that uses the
- connection or the transaction the connection is registered with,
- the application should provide locking.
-
- The Connection manages movement of objects in and out of object
- storage.
-
- TODO: We should document an intended API for using a Connection via
- multiple threads.
-
- TODO: We should explain that the Connection has a cache and that
- multiple calls to get() will return a reference to the same
- object, provided that one of the earlier objects is still
- referenced. Object identity is preserved within a connection, but
- not across connections.
-
- TODO: Mention the database pool.
-
- A database connection always presents a consistent view of the
- objects in the database, although it may not always present the
- most current revision of any particular object. Modifications
- made by concurrent transactions are not visible until the next
- transaction boundary (abort or commit).
-
- Two options affect consistency. By default, the mvcc and synch
- options are enabled by default.
-
- If you pass mvcc=True to db.open(), the Connection will never read
- non-current revisions of an object. Instead it will raise a
- ReadConflictError to indicate that the current revision is
- unavailable because it was written after the current transaction
- began.
-
- The logic for handling modifications assumes that the thread that
- opened a Connection (called db.open()) is the thread that will use
- the Connection. If this is not true, you should pass synch=False
- to db.open(). When the synch option is disabled, some transaction
- boundaries will be missed by the Connection; in particular, if a
- transaction does not involve any modifications to objects loaded
- from the Connection and synch is disabled, the Connection will
- miss the transaction boundary. Two examples of this behavior are
- db.undo() and read-only transactions.
-
-
- :Groups:
-
- - `User Methods`: root, get, add, close, db, sync, isReadOnly,
- cacheGC, cacheFullSweep, cacheMinimize, getVersion,
- modifiedInVersion
- - `Experimental Methods`: setLocalTransaction, getTransaction,
- onCloseCallbacks
- - `Transaction Data Manager Methods`: tpc_begin, tpc_vote,
- tpc_finish, tpc_abort, sortKey, abort, commit, commit_sub,
- abort_sub
- - `Database Invalidation Methods`: invalidate, _setDB
- - `IPersistentDataManager Methods`: setstate, register,
- setklassstate
- - `Other Methods`: oldstate, exchange, getDebugInfo, setDebugInfo,
- getTransferCounts
-
- """
- implements(IConnection)
-
_tmp = None
_code_timestamp = 0
+ # ZODB.IConnection
+
def __init__(self, version='', cache_size=400,
cache_deactivate_after=None, mvcc=True, txn_mgr=None,
synch=True):
- """Create a new Connection.
-
- A Connection instance should by instantiated by the DB
- instance that it is connected to.
-
- :Parameters:
- - `version`: the "version" that all changes will be made
- in, defaults to no version.
- - `cache_size`: the target size of the in-memory object
- cache, measured in objects.
- - `cache_deactivate_after`: deprecated, ignored
- - `mvcc`: boolean indicating whether MVCC is enabled
- - `txn_mgr`: transaction manager to use. None means
- used the default transaction manager.
- - `synch`: boolean indicating whether Connection should
- register for afterCompletion() calls.
- """
-
+ """Create a new Connection."""
self._log = logging.getLogger("ZODB.Connection")
self._storage = None
self._debug_info = ()
@@ -220,7 +116,7 @@
# from a single transaction should be applied atomically, so
# the lock must be held when reading _invalidated.
- # It sucks that we have to hold the lock to read _invalidated.
+ # It sucks that we have to hold the lock to read _invalidated.
# Normally, _invalidated is written by calling dict.update, which
# will execute atomically by virtue of the GIL. But some storage
# might generate oids where hash or compare invokes Python code. In
@@ -253,79 +149,20 @@
# to pass to _importDuringCommit().
self._import = None
- def getTransaction(self):
- """Get the current transaction for this connection.
+ self.connections = None
- :deprecated:
+ def get_connection(self, database_name):
+ """Return a Connection for the named database."""
+ connection = self.connections.get(database_name)
+ if connection is None:
+ new_con = self._db.databases[database_name].open()
+ self.connections.update(new_con.connections)
+ new_con.connections = self.connections
+ connection = new_con
+ return connection
- The transaction manager's get method works the same as this
- method. You can pass a transaction manager (TM) to DB.open()
- to control which TM the Connection uses.
- """
- deprecated36("getTransaction() is deprecated. "
- "Use the txn_mgr argument to DB.open() instead.")
- return self._txn_mgr.get()
-
- def setLocalTransaction(self):
- """Use a transaction bound to the connection rather than the thread.
-
- :deprecated:
-
- Returns the transaction manager used by the connection. You
- can pass a transaction manager (TM) to DB.open() to control
- which TM the Connection uses.
- """
- deprecated36("setLocalTransaction() is deprecated. "
- "Use the txn_mgr argument to DB.open() instead.")
- if self._txn_mgr is transaction.manager:
- if self._synch:
- self._txn_mgr.unregisterSynch(self)
- self._txn_mgr = transaction.TransactionManager()
- if self._synch:
- self._txn_mgr.registerSynch(self)
- return self._txn_mgr
-
- def _cache_items(self):
- # find all items on the lru list
- items = self._cache.lru_items()
- # fine everything. some on the lru list, some not
- everything = self._cache.cache_data
- # remove those items that are on the lru list
- for k,v in items:
- del everything[k]
- # return a list of [ghosts....not recently used.....recently used]
- return everything.items() + items
-
- def __repr__(self):
- if self._version:
- ver = ' (in version %s)' % `self._version`
- else:
- ver = ''
- return '<Connection at %08x%s>' % (positive_id(self), ver)
-
def get(self, oid):
- """Return the persistent object with oid 'oid'.
-
- If the object was not in the cache and the object's class is
- ghostable, then a ghost will be returned. If the object is
- already in the cache, a reference to the cached object will be
- returned.
-
- Applications seldom need to call this method, because objects
- are loaded transparently during attribute lookup.
-
- :return: persistent object corresponding to `oid`
-
- :Parameters:
- - `oid`: an object id
-
- :Exceptions:
- - `KeyError`: if oid does not exist. It is possible that an
- object does not exist as of the current transaction, but
- existed in the past. It may even exist again in the
- future, if the transaction that removed it is undone.
- - `ConnectionStateError`: if the connection is closed.
- """
+ """Return the persistent object with oid 'oid'."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
@@ -347,33 +184,8 @@
self._cache[oid] = obj
return obj
- # deprecate this method?
- __getitem__ = get
-
def add(self, obj):
- """Add a new object 'obj' to the database and assign it an oid.
-
- A persistent object is normally added to the database and
- assigned an oid when it becomes reachable to an object already in
- the database. In some cases, it is useful to create a new
- object and use its oid (_p_oid) in a single transaction.
-
- This method assigns a new oid regardless of whether the object
- is reachable.
-
- The object is added when the transaction commits. The object
- must implement the IPersistent interface and must not
- already be associated with a Connection.
-
- :Parameters:
- - `obj`: a Persistent object
-
- :Exceptions:
- - `TypeError`: if obj is not a persistent object.
- - `InvalidObjectReference`: if obj is already associated
- with another connection.
- - `ConnectionStateError`: if the connection is closed.
- """
+ """Add a new object 'obj' to the database and assign it an oid."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
@@ -397,72 +209,11 @@
raise InvalidObjectReference(obj, obj._p_jar)
def sortKey(self):
- # If two connections use the same storage, give them a
- # consistent order using id(). This is unique for the
- # lifetime of a connection, which is good enough.
- return "%s:%s" % (self._sortKey(), id(self))
+ """Return a consistent sort key for this connection."""
+ return "%s:%s" % (self._storage.sortKey(), id(self))
- def _setDB(self, odb, mvcc=None, txn_mgr=None, synch=None):
- """Register odb, the DB that this Connection uses.
-
- This method is called by the DB every time a Connection
- is opened. Any invalidations received while the Connection
- was closed will be processed.
-
- If the global module function resetCaches() was called, the
- cache will be cleared.
-
- :Parameters:
- - `odb`: database that owns the Connection
- - `mvcc`: boolean indicating whether MVCC is enabled
- - `txn_mgr`: transaction manager to use. None means
- used the default transaction manager.
- - `synch`: boolean indicating whether Connection should
- register for afterCompletion() calls.
- """
-
- # TODO: Why do we go to all the trouble of setting _db and
- # other attributes on open and clearing them on close?
- # A Connection is only ever associated with a single DB
- # and Storage.
-
- self._db = odb
- self._storage = odb._storage
- self._sortKey = odb._storage.sortKey
- self.new_oid = odb._storage.new_oid
- self._opened = time()
- if synch is not None:
- self._synch = synch
- if mvcc is not None:
- self._mvcc = mvcc
- self._txn_mgr = txn_mgr or transaction.manager
- if self._reset_counter != global_reset_counter:
- # New code is in place. Start a new cache.
- self._resetCache()
- else:
- self._flush_invalidations()
- if self._synch:
- self._txn_mgr.registerSynch(self)
- self._reader = ConnectionObjectReader(self, self._cache,
- self._db.classFactory)
-
- def _resetCache(self):
- """Creates a new cache, discarding the old one.
-
- See the docstring for the resetCaches() function.
- """
- self._reset_counter = global_reset_counter
- self._invalidated.clear()
- cache_size = self._cache.cache_size
- self._cache = cache = PickleCache(self, cache_size)
-
def abort(self, transaction):
- """Abort modifications to registered objects.
-
- This tells the cache to invalidate changed objects. _p_jar
- and _p_oid are deleted from new objects.
- """
-
+ """Abort a transaction and forget all changes."""
for obj in self._registered_objects:
oid = obj._p_oid
assert oid is not None
@@ -475,70 +226,22 @@
self._tpc_cleanup()
- # Should there be a way to call incrgc directly?
- # Perhaps "full sweep" should do that?
+ # TODO: we should test what happens when cacheGC is called mid-transaction.
- # TODO: we should test what happens when these methods are called
- # mid-transaction.
-
- def cacheFullSweep(self, dt=None):
- deprecated36("cacheFullSweep is deprecated. "
- "Use cacheMinimize instead.")
- if dt is None:
- self._cache.full_sweep()
- else:
- self._cache.full_sweep(dt)
-
- def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
- """Deactivate all unmodified objects in the cache.
-
- Call _p_deactivate() on each cached object, attempting to turn
- it into a ghost. It is possible for individual objects to
- remain active.
-
- :Parameters:
- - `dt`: ignored. It is provided only for backwards compatibility.
- """
- if dt is not DEPRECATED_ARGUMENT:
- deprecated36("cacheMinimize() dt= is ignored.")
- self._cache.minimize()
-
def cacheGC(self):
- """Reduce cache size to target size.
-
- Call _p_deactivate() on cached objects until the cache size
- falls under the target size.
- """
+ """Reduce cache size to target size."""
self._cache.incrgc()
__onCloseCallbacks = None
def onCloseCallback(self, f):
- """Register a callable, f, to be called by close().
-
- The callable, f, will be called at most once, the next time
- the Connection is closed.
-
- :Parameters:
- - `f`: object that will be called on `close`
- """
+ """Register a callable, f, to be called by close()."""
if self.__onCloseCallbacks is None:
self.__onCloseCallbacks = []
self.__onCloseCallbacks.append(f)
def close(self):
- """Close the Connection.
-
- A closed Connection should not be used by client code. It
- can't load or store objects. Objects in the cache are not
- freed, because Connections are re-used and the cache are
- expected to be useful to the next client.
-
- When the Connection is closed, all callbacks registered by
- onCloseCallback() are invoked and the cache is scanned for
- old objects.
- """
-
+ """Close the Connection."""
if not self._needs_to_join:
# We're currently joined to a transaction.
raise ConnectionStateError("Cannot close a connection joined to "
@@ -575,7 +278,10 @@
# assert that here, because self may have been reused (by
# another thread) by the time we get back here.
+ # transaction.interfaces.IDataManager
+
def commit(self, transaction):
+ """Commit changes to an object"""
if self._import:
# TODO: This code seems important for Zope, but needs docs
# to explain why.
@@ -636,8 +342,20 @@
self._modified.append(oid)
p = writer.serialize(obj) # This calls __getstate__ of obj
- s = self._storage.store(oid, serial, p, self._version, transaction)
+ if IBlob.providedBy(obj):
+ if not IBlobStorage.providedBy(self._storage):
+ raise Unsupported(
+ "Storing Blobs in %s is not supported." %
+ repr(self._storage))
+ s = self._storage.storeBlob(oid, serial, p,
+ obj._p_blob_uncommitted,
+ self._version, transaction)
+ else:
+ s = self._storage.store(oid, serial, p, self._version,
+ transaction)
self._store_count += 1
+
+
# Put the object in the cache before handling the
# response, just in case the response contains the
# serial number for a newly created object
@@ -652,9 +370,17 @@
raise
self._handle_serial(s, oid)
+
+ if IBlob.providedBy(obj):
+ # We need to update internals of the blobs here
+ obj._p_blob_uncommitted = None
+ obj._p_blob_data = \
+ self._storage.loadBlob(oid, obj._p_serial,
+ self._version )
def commit_sub(self, t):
- """Commit all work done in all subtransactions for this transaction."""
+ """Commit all changes made in subtransactions and begin 2-phase commit
+ """
if self._tmp is None:
return
src = self._storage
@@ -671,11 +397,16 @@
for oid in oids:
data, serial = src.load(oid, src)
- s = self._storage.store(oid, serial, data, self._version, t)
+ blobfile = src.loadBlob(oid, serial, self._version)
+ if blobfile is not None:
+ s = self._storage.storeBlob(oid, serial, data, blobfile,
+ self._version, t)
+ else:
+ s = self._storage.store(oid, serial, data, self._version, t)
self._handle_serial(s, oid, change=False)
def abort_sub(self, t):
- """Abort work done in all subtransactions for this transaction."""
+ """Discard all subtransaction data."""
if self._tmp is None:
return
src = self._storage
@@ -686,7 +417,7 @@
self._invalidate_creating(src._creating)
def _invalidate_creating(self, creating=None):
- """Dissown any objects newly saved in an uncommitted transaction."""
+ """Disown any objects newly saved in an uncommitted transaction."""
if creating is None:
creating = self._creating
self._creating = []
@@ -698,34 +429,42 @@
del o._p_jar
del o._p_oid
+ # The next two methods are callbacks for transaction synchronization.
+
+ def beforeCompletion(self, txn):
+ # We don't do anything before a commit starts.
+ pass
+
+ def afterCompletion(self, txn):
+ self._flush_invalidations()
+
+ def _flush_invalidations(self):
+ self._inv_lock.acquire()
+ try:
+ self._cache.invalidate(self._invalidated)
+ self._invalidated.clear()
+ self._txn_time = None
+ finally:
+ self._inv_lock.release()
+ # Now is a good time to collect some garbage
+ self._cache.incrgc()
+
+ def root(self):
+ """Return the database root object."""
+ return self.get(z64)
+
def db(self):
+ """Returns a handle to the database this connection belongs to."""
return self._db
- def getVersion(self):
- if self._storage is None:
- raise ConnectionStateError("The database connection is closed")
- return self._version
-
def isReadOnly(self):
+ """Returns True if the storage for this connection is read only."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._storage.isReadOnly()
def invalidate(self, tid, oids):
- """Notify the Connection that transaction 'tid' invalidated oids.
-
- When the next transaction boundary is reached, objects will be
- invalidated. If any of the invalidated objects is accessed by
- the current transaction, the revision written before C{tid}
- will be used.
-
- The DB calls this method, even when the Connection is closed.
-
- :Parameters:
- - `tid`: the storage-level id of the transaction that committed
- - `oids`: oids is a set of oids, represented as a dict with oids
- as keys.
- """
+ """Notify the Connection that transaction 'tid' invalidated oids."""
self._inv_lock.acquire()
try:
if self._txn_time is None:
@@ -734,72 +473,149 @@
finally:
self._inv_lock.release()
- # The next two methods are callbacks for transaction synchronization.
+ # IDataManager
- def beforeCompletion(self, txn):
- # We don't do anything before a commit starts.
- pass
+ def tpc_begin(self, transaction, sub=False):
+ """Begin commit of a transaction, starting the two-phase commit."""
+ self._modified = []
- def afterCompletion(self, txn):
- self._flush_invalidations()
+ # _creating is a list of oids of new objects, which is used to
+ # remove them from the cache if a transaction aborts.
+ self._creating = []
+ if sub and self._tmp is None:
+ # Sub-transaction!
+ self._tmp = self._storage
+ self._storage = TmpStore(self._version, self._storage)
- def _flush_invalidations(self):
- self._inv_lock.acquire()
- try:
- self._cache.invalidate(self._invalidated)
- self._invalidated.clear()
- self._txn_time = None
- finally:
- self._inv_lock.release()
- # Now is a good time to collect some garbage
- self._cache.incrgc()
+ self._storage.tpc_begin(transaction)
- def modifiedInVersion(self, oid):
+ def tpc_vote(self, transaction):
+ """Verify that a data manager can commit the transaction."""
try:
- return self._db.modifiedInVersion(oid)
- except KeyError:
- return self._version
+ vote = self._storage.tpc_vote
+ except AttributeError:
+ return
+ s = vote(transaction)
+ self._handle_serial(s)
- def register(self, obj):
- """Register obj with the current transaction manager.
+ def _handle_serial(self, store_return, oid=None, change=1):
+ """Handle the returns from store() and tpc_vote() calls."""
- A subclass could override this method to customize the default
- policy of one transaction manager for each thread.
+ # These calls can return different types depending on whether
+ # ZEO is used. ZEO uses asynchronous returns that may be
+ # returned in batches by the ClientStorage. ZEO1 can also
+ # return an exception object and expect that the Connection
+ # will raise the exception.
- obj must be an object loaded from this Connection.
- """
- assert obj._p_jar is self
- if obj._p_oid is None:
- # There is some old Zope code that assigns _p_jar
- # directly. That is no longer allowed, but we need to
- # provide support for old code that still does it.
+ # When commit_sub() exceutes a store, there is no need to
+ # update the _p_changed flag, because the subtransaction
+ # tpc_vote() calls already did this. The change=1 argument
+ # exists to allow commit_sub() to avoid setting the flag
+ # again.
- # The actual complaint here is that an object without
- # an oid is being registered. I can't think of any way to
- # achieve that without assignment to _p_jar. If there is
- # a way, this will be a very confusing warning.
- deprecated36("Assigning to _p_jar is deprecated, and will be "
- "changed to raise an exception.")
- elif obj._p_oid in self._added:
- # It was registered before it was added to _added.
+ # When conflict resolution occurs, the object state held by
+ # the connection does not match what is written to the
+ # database. Invalidate the object here to guarantee that
+ # the new state is read the next time the object is used.
+
+ if not store_return:
return
- self._register(obj)
+ if isinstance(store_return, str):
+ assert oid is not None
+ self._handle_one_serial(oid, store_return, change)
+ else:
+ for oid, serial in store_return:
+ self._handle_one_serial(oid, serial, change)
- def _register(self, obj=None):
- if obj is not None:
- self._registered_objects.append(obj)
- if self._needs_to_join:
- self._txn_mgr.get().join(self)
- self._needs_to_join = False
+ def _handle_one_serial(self, oid, serial, change):
+ if not isinstance(serial, str):
+ raise serial
+ obj = self._cache.get(oid, None)
+ if obj is None:
+ return
+ if serial == ResolvedSerial:
+ del obj._p_changed # transition from changed to ghost
+ else:
+ if change:
+ obj._p_changed = 0 # transition from changed to up-to-date
+ obj._p_serial = serial
- def root(self):
- """Return the database root object.
+ def tpc_finish(self, transaction):
+ """Indicate confirmation that the transaction is done."""
+ if self._tmp is not None:
+ # Commiting a subtransaction!
+ # There is no need to invalidate anything.
+ self._storage.tpc_finish(transaction)
+ self._storage._creating[:0]=self._creating
+ del self._creating[:]
+ else:
+ def callback(tid):
+ d = {}
+ for oid in self._modified:
+ d[oid] = 1
+ self._db.invalidate(tid, d, self)
+ self._storage.tpc_finish(transaction, callback)
+ self._tpc_cleanup()
- The root is a persistent.mapping.PersistentMapping.
- """
- return self.get(z64)
+ def tpc_abort(self, transaction):
+ """Abort a transaction."""
+ if self._import:
+ self._import = None
+ self._storage.tpc_abort(transaction)
+ self._cache.invalidate(self._modified)
+ self._invalidate_creating()
+ while self._added:
+ oid, obj = self._added.popitem()
+ del obj._p_oid
+ del obj._p_jar
+ self._tpc_cleanup()
+ def _tpc_cleanup(self):
+ """Performs cleanup operations to support tpc_finish and tpc_abort."""
+ self._conflicts.clear()
+ if not self._synch:
+ self._flush_invalidations()
+ self._needs_to_join = True
+ self._registered_objects = []
+
+ def sync(self):
+ """Manually update the view on the database."""
+ self._txn_mgr.get().abort()
+ sync = getattr(self._storage, 'sync', 0)
+ if sync:
+ sync()
+ self._flush_invalidations()
+
+ def getDebugInfo(self):
+ """Returns a tuple with different items for debugging the
+ connection.
+ """
+ return self._debug_info
+
+ def setDebugInfo(self, *args):
+ """Add the given items to the debug information of this connection."""
+ self._debug_info = self._debug_info + args
+
+ def getTransferCounts(self, clear=False):
+ """Returns the number of objects loaded and stored."""
+ res = self._load_count, self._store_count
+ if clear:
+ self._load_count = 0
+ self._store_count = 0
+ return res
+
+ ##############################################
+ # persistent.interfaces.IPersistentDatamanager
+
+ def oldstate(self, obj, tid):
+ """Return copy of 'obj' that was written by transaction 'tid'."""
+ assert obj._p_jar is self
+ p = self._storage.loadSerial(obj._p_oid, tid)
+ return self._reader.getState(p)
+
def setstate(self, obj):
+ """Turns the ghost 'obj' into a real object by loading it's from the
+ database."""
oid = obj._p_oid
if self._storage is None:
@@ -845,12 +661,15 @@
self._load_before_or_conflict(obj)
return
- p, serial = self._storage.load(obj._p_oid, self._version)
+ oid = obj._p_oid
+
+ p, serial = self._storage.load(oid, self._version)
self._load_count += 1
+
self._inv_lock.acquire()
try:
- invalid = obj._p_oid in self._invalidated
+ invalid = oid in self._invalidated
finally:
self._inv_lock.release()
@@ -866,9 +685,13 @@
self._reader.setGhostState(obj, p)
obj._p_serial = serial
+ # Blob support
+ if IBlob.providedBy(obj):
+ obj._p_blob_data = \
+ self._storage.loadBlob(oid, serial, self._version)
+
def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
-
if not (self._mvcc and self._setstate_noncurrent(obj)):
self._register(obj)
self._conflicts[obj._p_oid] = True
@@ -917,27 +740,137 @@
self._register(obj)
raise ReadConflictError(object=obj)
- def oldstate(self, obj, tid):
- """Return copy of obj that was written by tid.
+ def register(self, obj):
+ """Register obj with the current transaction manager.
- The returned object does not have the typical metadata
- (_p_jar, _p_oid, _p_serial) set. I'm not sure how references
- to other peristent objects are handled.
+ A subclass could override this method to customize the default
+ policy of one transaction manager for each thread.
- :return: a persistent object
+ obj must be an object loaded from this Connection.
+ """
+ assert obj._p_jar is self
+ if obj._p_oid is None:
+ # There is some old Zope code that assigns _p_jar
+ # directly. That is no longer allowed, but we need to
+ # provide support for old code that still does it.
- :Parameters:
- - `obj`: a persistent object from this Connection.
- - `tid`: id of a transaction that wrote an earlier revision.
+ # The actual complaint here is that an object without
+ # an oid is being registered. I can't think of any way to
+ # achieve that without assignment to _p_jar. If there is
+ # a way, this will be a very confusing warning.
+ deprecated36("Assigning to _p_jar is deprecated, and will be "
+ "changed to raise an exception.")
+ elif obj._p_oid in self._added:
+ # It was registered before it was added to _added.
+ return
+ self._register(obj)
- :Exceptions:
- - `KeyError`: if tid does not exist or if tid deleted a revision
- of obj.
+ def _register(self, obj=None):
+ if obj is not None:
+ self._registered_objects.append(obj)
+ if self._needs_to_join:
+ self._txn_mgr.get().join(self)
+ self._needs_to_join = False
+
+ # PROTECTED stuff (used by e.g. ZODB.DB.DB)
+
+ def _cache_items(self):
+ # find all items on the lru list
+ items = self._cache.lru_items()
+ # fine everything. some on the lru list, some not
+ everything = self._cache.cache_data
+ # remove those items that are on the lru list
+ for k,v in items:
+ del everything[k]
+ # return a list of [ghosts....not recently used.....recently used]
+ return everything.items() + items
+
+ def _setDB(self, odb, mvcc=None, txn_mgr=None, synch=None):
+ """Register odb, the DB that this Connection uses.
+
+ This method is called by the DB every time a Connection
+ is opened. Any invalidations received while the Connection
+ was closed will be processed.
+
+ If the global module function resetCaches() was called, the
+ cache will be cleared.
+
+ Parameters:
+ odb: database that owns the Connection
+ mvcc: boolean indicating whether MVCC is enabled
+ txn_mgr: transaction manager to use. None means
+ used the default transaction manager.
+ synch: boolean indicating whether Connection should
+ register for afterCompletion() calls.
"""
- assert obj._p_jar is self
- p = self._storage.loadSerial(obj._p_oid, tid)
- return self._reader.getState(p)
+ # TODO: Why do we go to all the trouble of setting _db and
+ # other attributes on open and clearing them on close?
+ # A Connection is only ever associated with a single DB
+ # and Storage.
+
+ self._db = odb
+ self._storage = odb._storage
+ self.new_oid = odb._storage.new_oid
+ self._opened = time()
+ if synch is not None:
+ self._synch = synch
+ if mvcc is not None:
+ self._mvcc = mvcc
+ self._txn_mgr = txn_mgr or transaction.manager
+ if self._reset_counter != global_reset_counter:
+ # New code is in place. Start a new cache.
+ self._resetCache()
+ else:
+ self._flush_invalidations()
+ if self._synch:
+ self._txn_mgr.registerSynch(self)
+ self._reader = ConnectionObjectReader(self, self._cache,
+ self._db.classFactory)
+
+ # Multi-database support
+ self.connections = {self._db.database_name: self}
+
+ def _resetCache(self):
+ """Creates a new cache, discarding the old one.
+
+ See the docstring for the resetCaches() function.
+ """
+ self._reset_counter = global_reset_counter
+ self._invalidated.clear()
+ cache_size = self._cache.cache_size
+ self._cache = cache = PickleCache(self, cache_size)
+
+ # Python protocol
+
+ def __repr__(self):
+ if self._version:
+ ver = ' (in version %s)' % `self._version`
+ else:
+ ver = ''
+ return '<Connection at %08x%s>' % (positive_id(self), ver)
+
+ # DEPRECATION candidates
+
+ __getitem__ = get
+
+ def modifiedInVersion(self, oid):
+ """Returns the version the object with the given oid was modified in.
+
+ If it wasn't modified in a version, the current version of this
+ connection is returned.
+ """
+ try:
+ return self._db.modifiedInVersion(oid)
+ except KeyError:
+ return self.getVersion()
+
+ def getVersion(self):
+ """Returns the version this connection is attached to."""
+ if self._storage is None:
+ raise ConnectionStateError("The database connection is closed")
+ return self._version
+
def setklassstate(self, obj):
# Special case code to handle ZClasses, I think.
# Called the cache when an object of type type is invalidated.
@@ -959,141 +892,60 @@
self._log.error("setklassstate failed", exc_info=sys.exc_info())
raise
- def tpc_begin(self, transaction, sub=False):
- self._modified = []
+ def exchange(self, old, new):
+ # called by a ZClasses method that isn't executed by the test suite
+ oid = old._p_oid
+ new._p_oid = oid
+ new._p_jar = self
+ new._p_changed = 1
+ self._register(new)
+ self._cache[oid] = new
- # _creating is a list of oids of new objects, which is used to
- # remove them from the cache if a transaction aborts.
- self._creating = []
- if sub and self._tmp is None:
- # Sub-transaction!
- self._tmp = self._storage
- self._storage = TmpStore(self._version, self._storage)
+ # DEPRECATED methods
- self._storage.tpc_begin(transaction)
+ def getTransaction(self):
+ """Get the current transaction for this connection.
- def tpc_vote(self, transaction):
- try:
- vote = self._storage.tpc_vote
- except AttributeError:
- return
- s = vote(transaction)
- self._handle_serial(s)
+ :deprecated:
- def _handle_serial(self, store_return, oid=None, change=1):
- """Handle the returns from store() and tpc_vote() calls."""
+ The transaction manager's get method works the same as this
+ method. You can pass a transaction manager (TM) to DB.open()
+ to control which TM the Connection uses.
+ """
+ deprecated36("getTransaction() is deprecated. "
+ "Use the txn_mgr argument to DB.open() instead.")
+ return self._txn_mgr.get()
- # These calls can return different types depending on whether
- # ZEO is used. ZEO uses asynchronous returns that may be
- # returned in batches by the ClientStorage. ZEO1 can also
- # return an exception object and expect that the Connection
- # will raise the exception.
+ def setLocalTransaction(self):
+ """Use a transaction bound to the connection rather than the thread.
- # When commit_sub() exceutes a store, there is no need to
- # update the _p_changed flag, because the subtransaction
- # tpc_vote() calls already did this. The change=1 argument
- # exists to allow commit_sub() to avoid setting the flag
- # again.
+ :deprecated:
- # When conflict resolution occurs, the object state held by
- # the connection does not match what is written to the
- # database. Invalidate the object here to guarantee that
- # the new state is read the next time the object is used.
+ Returns the transaction manager used by the connection. You
+ can pass a transaction manager (TM) to DB.open() to control
+ which TM the Connection uses.
+ """
+ deprecated36("setLocalTransaction() is deprecated. "
+ "Use the txn_mgr argument to DB.open() instead.")
+ if self._txn_mgr is transaction.manager:
+ if self._synch:
+ self._txn_mgr.unregisterSynch(self)
+ self._txn_mgr = transaction.TransactionManager()
+ if self._synch:
+ self._txn_mgr.registerSynch(self)
+ return self._txn_mgr
- if not store_return:
- return
- if isinstance(store_return, str):
- assert oid is not None
- self._handle_one_serial(oid, store_return, change)
+ def cacheFullSweep(self, dt=None):
+ deprecated36("cacheFullSweep is deprecated. "
+ "Use cacheMinimize instead.")
+ if dt is None:
+ self._cache.full_sweep()
else:
- for oid, serial in store_return:
- self._handle_one_serial(oid, serial, change)
+ self._cache.full_sweep(dt)
- def _handle_one_serial(self, oid, serial, change):
- if not isinstance(serial, str):
- raise serial
- obj = self._cache.get(oid, None)
- if obj is None:
- return
- if serial == ResolvedSerial:
- del obj._p_changed # transition from changed to ghost
- else:
- if change:
- obj._p_changed = 0 # transition from changed to up-to-date
- obj._p_serial = serial
+ def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
+ """Deactivate all unmodified objects in the cache."""
+ if dt is not DEPRECATED_ARGUMENT:
+ deprecated36("cacheMinimize() dt= is ignored.")
+ self._cache.minimize()
- def tpc_finish(self, transaction):
- # It's important that the storage calls the function we pass
- # while it still has its lock. We don't want another thread
- # to be able to read any updated data until we've had a chance
- # to send an invalidation message to all of the other
- # connections!
-
- if self._tmp is not None:
- # Commiting a subtransaction!
- # There is no need to invalidate anything.
- self._storage.tpc_finish(transaction)
- self._storage._creating[:0]=self._creating
- del self._creating[:]
- else:
- def callback(tid):
- d = {}
- for oid in self._modified:
- d[oid] = 1
- self._db.invalidate(tid, d, self)
- self._storage.tpc_finish(transaction, callback)
- self._tpc_cleanup()
-
- def tpc_abort(self, transaction):
- if self._import:
- self._import = None
- self._storage.tpc_abort(transaction)
- self._cache.invalidate(self._modified)
- self._invalidate_creating()
- while self._added:
- oid, obj = self._added.popitem()
- del obj._p_oid
- del obj._p_jar
- self._tpc_cleanup()
-
- # Common cleanup actions after tpc_finish/tpc_abort.
- def _tpc_cleanup(self):
- self._conflicts.clear()
- if not self._synch:
- self._flush_invalidations()
- self._needs_to_join = True
- self._registered_objects = []
-
-
- def sync(self):
- self._txn_mgr.get().abort()
- sync = getattr(self._storage, 'sync', 0)
- if sync:
- sync()
- self._flush_invalidations()
-
- def getDebugInfo(self):
- return self._debug_info
-
- def setDebugInfo(self, *args):
- self._debug_info = self._debug_info + args
-
- def getTransferCounts(self, clear=False):
- """Returns the number of objects loaded and stored.
-
- If clear is True, reset the counters.
- """
- res = self._load_count, self._store_count
- if clear:
- self._load_count = 0
- self._store_count = 0
- return res
-
- def exchange(self, old, new):
- # called by a ZClasses method that isn't executed by the test suite
- oid = old._p_oid
- new._p_oid = oid
- new._p_jar = self
- new._p_changed = 1
- self._register(new)
- self._cache[oid] = new
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/DB.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/DB.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/DB.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -27,6 +27,9 @@
from ZODB.utils import WeakSet
from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
+from zope.interface import implements
+from ZODB.interfaces import IDatabase
+
import transaction
logger = logging.getLogger('ZODB.DB')
@@ -178,6 +181,7 @@
setCacheDeactivateAfter,
getVersionCacheDeactivateAfter, setVersionCacheDeactivateAfter
"""
+ implements(IDatabase)
klass = Connection # Class to use for connections
_activity_monitor = None
@@ -188,6 +192,8 @@
cache_deactivate_after=DEPRECATED_ARGUMENT,
version_pool_size=3,
version_cache_size=100,
+ database_name='unnamed',
+ databases=None,
version_cache_deactivate_after=DEPRECATED_ARGUMENT,
):
"""Create an object database.
@@ -248,6 +254,16 @@
storage.tpc_vote(t)
storage.tpc_finish(t)
+ # Multi-database setup.
+ if databases is None:
+ databases = {}
+ self.databases = databases
+ self.database_name = database_name
+ if database_name in databases:
+ raise ValueError("database_name %r already in databases" %
+ database_name)
+ databases[database_name] = self
+
# Pass through methods:
for m in ['history', 'supportsUndo', 'supportsVersions', 'undoLog',
'versionEmpty', 'versions']:
@@ -565,7 +581,7 @@
def get_info(c):
# `result`, `time` and `version` are lexically inherited.
o = c._opened
- d = c._debug_info
+ d = c.getDebugInfo()
if d:
if len(d) == 1:
d = d[0]
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/FileStorage/FileStorage.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/FileStorage/FileStorage.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -547,6 +547,7 @@
self._lock_release()
def load(self, oid, version):
+ """Return pickle data and serial number."""
self._lock_acquire()
try:
pos = self._lookup_pos(oid)
@@ -629,7 +630,7 @@
finally:
self._lock_release()
- def store(self, oid, serial, data, version, transaction):
+ def store(self, oid, oldserial, data, version, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
if transaction is not self._transaction:
@@ -652,12 +653,12 @@
pnv = h.pnv
cached_tid = h.tid
- if serial != cached_tid:
+ if oldserial != cached_tid:
rdata = self.tryToResolveConflict(oid, cached_tid,
- serial, data)
+ oldserial, data)
if rdata is None:
raise POSException.ConflictError(
- oid=oid, serials=(cached_tid, serial), data=data)
+ oid=oid, serials=(cached_tid, oldserial), data=data)
else:
data = rdata
@@ -687,7 +688,7 @@
raise FileStorageQuotaError(
"The storage quota has been exceeded.")
- if old and serial != cached_tid:
+ if old and oldserial != cached_tid:
return ConflictResolution.ResolvedSerial
else:
return self._tid
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/FileStorage/format.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/FileStorage/format.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/FileStorage/format.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -68,16 +68,16 @@
#
# - 8-byte data length
#
-# ? 8-byte position of non-version data
+# ? 8-byte position of non-version data record
# (if version length > 0)
#
# ? 8-byte position of previous record in this version
# (if version length > 0)
#
-# ? version string
+# ? version string
# (if version length > 0)
#
-# ? data
+# ? data
# (data length > 0)
#
# ? 8-byte position of data record containing data
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/TmpStore.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/TmpStore.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/TmpStore.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -12,8 +12,11 @@
#
##############################################################################
+from zope.interface import implements
+
+from ZODB.Blobs.interfaces import IBlobStorage
from ZODB import POSException
-from ZODB.utils import p64, u64, z64
+from ZODB.utils import p64, u64, z64, cp
import tempfile
@@ -22,6 +25,8 @@
_bver = ''
+ implements(IBlobStorage)
+
def __init__(self, base_version, storage):
self._transaction = None
self._storage = storage
@@ -37,6 +42,8 @@
self._tindex = {}
self._creating = []
+ self.blob_files = {}
+
def close(self):
self._file.close()
@@ -61,6 +68,9 @@
serial = h[:8]
return self._file.read(size), serial
+ def sortKey(self):
+ return self._storage.sortKey()
+
# TODO: clarify difference between self._storage & self._db._storage
def modifiedInVersion(self, oid):
@@ -119,5 +129,27 @@
def versionEmpty(self, version):
# TODO: what is this supposed to do?
+ # NOTE: This appears to implement the opposite of what it should.
if version == self._bver:
return len(self._index)
+
+ # Blob support
+
+ def loadBlob(self, oid, serial, version):
+ return self.blob_files.get(oid)
+
+ def storeBlob(self, oid, oldserial, data, blobfile, version, transaction):
+ result = self.store(oid, oldserial, data, version, transaction)
+
+ target = file(self.generateBlobFile(oid), "w")
+ src = file(blobfile, "r")
+ cp(src, target)
+
+ return result
+
+ def generateBlobFile(self, oid):
+ if not self.blob_files.has_key(oid):
+ handle, name = tempfile.mkstemp()
+ handle.close()
+ self.blob_files[oid] = name
+ return self.blob_files[oid]
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/component.xml
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/component.xml 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/component.xml 2005-03-22 04:16:03 UTC (rev 29637)
@@ -158,4 +158,15 @@
<key name="version-cache-size" datatype="integer" default="100"/>
</sectiontype>
+ <sectiontype name="blobfilestorage" datatype=".BlobFileStorage"
+ implements="ZODB.storage" extends="filestorage">
+ <key name="blob-dir" required="yes">
+ <description>
+ Path name to the blob storage directory.
+ </description>
+ </key>
+ </sectiontype>
+
+
+
</component>
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/config.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/config.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/config.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -132,6 +132,15 @@
read_only=self.config.read_only,
quota=self.config.quota)
+class BlobFileStorage(FileStorage):
+
+ def open(self):
+ from ZODB.Blobs.BlobStorage import BlobStorage
+ base_storage = FileStorage.open(self)
+ return BlobStorage(self.config.blob_dir, base_storage)
+
+
+
class ZEOClient(BaseConfig):
def open(self):
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/interfaces.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/interfaces.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/interfaces.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -16,14 +16,122 @@
$Id$
"""
-import zope.interface
+from zope.interface import Interface, Attribute
-class IConnection(zope.interface.Interface):
- """ZODB connection.
+class IConnection(Interface):
+ """Connection to ZODB for loading and storing objects.
- TODO: This interface is incomplete.
+ The Connection object serves as a data manager. The root() method
+ on a Connection returns the root object for the database. This
+ object and all objects reachable from it are associated with the
+ Connection that loaded them. When a transaction commits, it uses
+ the Connection to store modified objects.
+
+ Typical use of ZODB is for each thread to have its own
+ Connection and that no thread should have more than one Connection
+ to the same database. A thread is associated with a Connection by
+ loading objects from that Connection. Objects loaded by one
+ thread should not be used by another thread.
+
+ A Connection can be associated with a single version when it is
+ created. By default, a Connection is not associated with a
+ version; it uses non-version data.
+
+ Each Connection provides an isolated, consistent view of the
+ database, by managing independent copies of objects in the
+ database. At transaction boundaries, these copies are updated to
+ reflect the current state of the database.
+
+ You should not instantiate this class directly; instead call the
+ open() method of a DB instance.
+
+ In many applications, root() is the only method of the Connection
+ that you will need to use.
+
+ Synchronization
+ ---------------
+
+ A Connection instance is not thread-safe. It is designed to
+ support a thread model where each thread has its own transaction.
+ If an application has more than one thread that uses the
+ connection or the transaction the connection is registered with,
+ the application should provide locking.
+
+ The Connection manages movement of objects in and out of object
+ storage.
+
+ TODO: We should document an intended API for using a Connection via
+ multiple threads.
+
+ TODO: We should explain that the Connection has a cache and that
+ multiple calls to get() will return a reference to the same
+ object, provided that one of the earlier objects is still
+ referenced. Object identity is preserved within a connection, but
+ not across connections.
+
+ TODO: Mention the database pool.
+
+ A database connection always presents a consistent view of the
+ objects in the database, although it may not always present the
+ most current revision of any particular object. Modifications
+ made by concurrent transactions are not visible until the next
+ transaction boundary (abort or commit).
+
+ Two options affect consistency. By default, the mvcc and synch
+ options are enabled by default.
+
+ If you pass mvcc=True to db.open(), the Connection will never read
+ non-current revisions of an object. Instead it will raise a
+ ReadConflictError to indicate that the current revision is
+ unavailable because it was written after the current transaction
+ began.
+
+ The logic for handling modifications assumes that the thread that
+ opened a Connection (called db.open()) is the thread that will use
+ the Connection. If this is not true, you should pass synch=False
+ to db.open(). When the synch option is disabled, some transaction
+ boundaries will be missed by the Connection; in particular, if a
+ transaction does not involve any modifications to objects loaded
+ from the Connection and synch is disabled, the Connection will
+ miss the transaction boundary. Two examples of this behavior are
+ db.undo() and read-only transactions.
+
+ Groups of methods:
+
+ User Methods:
+ root, get, add, close, db, sync, isReadOnly, cacheGC, cacheFullSweep,
+ cacheMinimize, getVersion, modifiedInVersion
+
+ Experimental Methods:
+ onCloseCallbacks
+
+ Database Invalidation Methods:
+ invalidate
+
+ Other Methods: exchange, getDebugInfo, setDebugInfo,
+ getTransferCounts
"""
+ def __init__(version='', cache_size=400,
+ cache_deactivate_after=None, mvcc=True, txn_mgr=None,
+ synch=True):
+ """Create a new Connection.
+
+ A Connection instance should by instantiated by the DB
+ instance that it is connected to.
+
+ Parameters:
+ version: the "version" that all changes will be made in, defaults
+ to no version.
+ cache_size: the target size of the in-memory object cache, measured
+ in objects.
+ mvcc: boolean indicating whether MVCC is enabled
+ txn_mgr: transaction manager to use. None means used the default
+ transaction manager.
+ synch: boolean indicating whether Connection should register for
+ afterCompletion() calls.
+ """
+
def add(ob):
"""Add a new object 'obj' to the database and assign it an oid.
@@ -38,15 +146,330 @@
The object is added when the transaction commits. The object
must implement the IPersistent interface and must not
already be associated with a Connection.
+
+ Parameters:
+ obj: a Persistent object
+
+ Raises TypeError if obj is not a persistent object.
+
+ Raises InvalidObjectReference if obj is already associated with another
+ connection.
+
+ Raises ConnectionStateError if the connection is closed.
"""
-class IBlobStorage(zope.interface.Interface):
- """A storage supporting BLOBs."""
+ def get(oid):
+ """Return the persistent object with oid 'oid'.
- def storeBlob(oid, serial, data, blob, version, transaction):
- """Stores data that has a BLOB attached."""
+ If the object was not in the cache and the object's class is
+ ghostable, then a ghost will be returned. If the object is
+ already in the cache, a reference to the cached object will be
+ returned.
- def loadBlob(oid, serial, version, blob):
- """Loads the BLOB data for 'oid' into the given blob object.
+ Applications seldom need to call this method, because objects
+ are loaded transparently during attribute lookup.
+
+ Parameters:
+ oid: an object id
+
+ Raises KeyError if oid does not exist.
+
+ It is possible that an object does not exist as of the current
+ transaction, but existed in the past. It may even exist again in
+ the future, if the transaction that removed it is undone.
+
+ Raises ConnectionStateError if the connection is closed.
"""
+ def cacheMinimize():
+ """Deactivate all unmodified objects in the cache.
+
+ Call _p_deactivate() on each cached object, attempting to turn
+ it into a ghost. It is possible for individual objects to
+ remain active.
+ """
+
+ def cacheGC():
+ """Reduce cache size to target size.
+
+ Call _p_deactivate() on cached objects until the cache size
+ falls under the target size.
+ """
+
+ def onCloseCallback(f):
+ """Register a callable, f, to be called by close().
+
+ f will be called with no arguments before the Connection is closed.
+
+ Parameters:
+ f: method that will be called on `close`
+ """
+
+ def close():
+ """Close the Connection.
+
+ When the Connection is closed, all callbacks registered by
+ onCloseCallback() are invoked and the cache is garbage collected.
+
+ A closed Connection should not be used by client code. It can't load
+ or store objects. Objects in the cache are not freed, because
+ Connections are re-used and the cache is expected to be useful to the
+ next client.
+ """
+
+ def db():
+ """Returns a handle to the database this connection belongs to."""
+
+ def isReadOnly():
+ """Returns True if the storage for this connection is read only."""
+
+ def invalidate(tid, oids):
+ """Notify the Connection that transaction 'tid' invalidated oids.
+
+ When the next transaction boundary is reached, objects will be
+ invalidated. If any of the invalidated objects are accessed by the
+ current transaction, the revision written before Connection.tid will be
+ used.
+
+ The DB calls this method, even when the Connection is closed.
+
+ Parameters:
+ tid: the storage-level id of the transaction that committed
+ oids: oids is a set of oids, represented as a dict with oids as keys.
+ """
+
+ def root():
+ """Return the database root object.
+
+ The root is a persistent.mapping.PersistentMapping.
+ """
+
+ def getVersion():
+ """Returns the version this connection is attached to."""
+
+ # Multi-database support.
+
+ connections = Attribute("""\
+ A mapping from database name to a Connection to that database.
+
+ In multi-database use, the Connections of all members of a database
+ collection share the same .connections object.
+
+ In single-database use, of course this mapping contains a single
+ entry.
+ """)
+
+ # TODO: should this accept all the arguments one may pass to DB.open()?
+ def get_connection(database_name):
+ """Return a Connection for the named database.
+
+ This is intended to be called from an open Connection associated with
+ a multi-database. In that case, database_name must be the name of a
+ database within the database collection (probably the name of a
+ different database than is associated with the calling Connection
+ instance, but it's fine to use the name of the calling Connection
+ object's database). A Connection for the named database is
+ returned. If no connection to that database is already open, a new
+ Connection is opened. So long as the multi-database remains open,
+ passing the same name to get_connection() multiple times returns the
+ same Connection object each time.
+ """
+
+ def sync():
+ """Manually update the view on the database.
+
+ This includes aborting the current transaction, getting a fresh and
+ consistent view of the data (synchronizing with the storage if possible)
+ and call cacheGC() for this connection.
+
+ This method was especially useful in ZODB 3.2 to better support
+ read-only connections that were affected by a couple of problems.
+ """
+
+ # Debug information
+
+ def getDebugInfo():
+ """Returns a tuple with different items for debugging the connection.
+
+ Debug information can be added to a connection by using setDebugInfo.
+ """
+
+ def setDebugInfo(*items):
+ """Add the given items to the debug information of this connection."""
+
+ def getTransferCounts(clear=False):
+ """Returns the number of objects loaded and stored.
+
+ If clear is True, reset the counters.
+ """
+
+class IDatabase(Interface):
+ """ZODB DB.
+
+ TODO: This interface is incomplete.
+ """
+
+ def __init__(storage,
+ pool_size=7,
+ cache_size=400,
+ version_pool_size=3,
+ version_cache_size=100,
+ database_name='unnamed',
+ databases=None,
+ ):
+ """Create an object database.
+
+ storage: the storage used by the database, e.g. FileStorage
+ pool_size: expected maximum number of open connections
+ cache_size: target size of Connection object cache, in number of
+ objects
+ version_pool_size: expected maximum number of connections (per
+ version)
+ version_cache_size: target size of Connection object cache for
+ version connections, in number of objects
+ database_name: when using a multi-database, the name of this DB
+ within the database group. It's a (detected) error if databases
+ is specified too and database_name is already a key in it.
+ This becomes the value of the DB's database_name attribute.
+ databases: when using a multi-database, a mapping to use as the
+ binding of this DB's .databases attribute. It's intended
+ that the second and following DB's added to a multi-database
+ pass the .databases attribute set on the first DB added to the
+ collection.
+ """
+
+ databases = Attribute("""\
+ A mapping from database name to DB (database) object.
+
+ In multi-database use, all DB members of a database collection share
+ the same .databases object.
+
+ In single-database use, of course this mapping contains a single
+ entry.
+ """)
+
+class IStorage(Interface):
+ """A storage is responsible for storing and retrieving data of objects.
+ """
+
+ def load(oid, version):
+ """XXX"""
+
+ def close():
+ """XXX"""
+
+ def cleanup():
+ """XXX"""
+
+ def lastSerial():
+ """XXX"""
+
+ def lastTransaction():
+ """XXX"""
+
+ def lastTid(oid):
+ """Return last serialno committed for object oid."""
+
+ def loadSerial(oid, serial):
+ """XXX"""
+
+ def loadBefore(oid, tid):
+ """XXX"""
+
+ def iterator(start=None, stop=None):
+ """XXX"""
+
+ def sortKey():
+ """XXX"""
+
+ def getName():
+ """XXX"""
+
+ def getSize():
+ """XXX"""
+
+ def history(oid, version, length=1, filter=None):
+ """XXX"""
+
+ def new_oid(last=None):
+ """XXX"""
+
+ def set_max_oid(possible_new_max_oid):
+ """XXX"""
+
+ def registerDB(db, limit):
+ """XXX"""
+
+ def isReadOnly():
+ """XXX"""
+
+ def supportsUndo():
+ """XXX"""
+
+ def supportsVersions():
+ """XXX"""
+
+ def tpc_abort(transaction):
+ """XXX"""
+
+ def tpc_begin(transaction):
+ """XXX"""
+
+ def tpc_vote(transaction):
+ """XXX"""
+
+ def tpc_finish(transaction, f=None):
+ """XXX"""
+
+ def getSerial(oid):
+ """XXX"""
+
+ def loadSerial(oid, serial):
+ """XXX"""
+
+ def loadBefore(oid, tid):
+ """XXX"""
+
+ def getExtensionMethods():
+ """XXX"""
+
+ def copyTransactionsFrom():
+ """XXX"""
+
+ def store(oid, oldserial, data, version, transaction):
+ """
+
+ may return the new serial or not
+ """
+
+class IUndoableStorage(IStorage):
+
+ def undo(transaction_id, txn):
+ """XXX"""
+
+ def undoInfo():
+ """XXX"""
+
+ def undoLog(first, last, filter=None):
+ """XXX"""
+
+ def pack(t, referencesf):
+ """XXX"""
+
+class IVersioningStorage(IStorage):
+
+ def abortVersion(src, transaction):
+ """XXX"""
+
+ def commitVersion(src, dest, transaction):
+ """XXX"""
+
+ def modifiedInVersion(oid):
+ """XXX"""
+
+ def versionEmpty(version):
+ """XXX"""
+
+ def versions(max=None):
+ """XXX"""
+
Added: ZODB/branches/ctheune-blobsupport/src/ZODB/tests/loggingsupport.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/tests/loggingsupport.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/tests/loggingsupport.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -0,0 +1,121 @@
+##############################################################################
+#
+# Copyright (c) 2004 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Support for testing logging code
+
+If you want to test that your code generates proper log output, you
+can create and install a handler that collects output:
+
+ >>> handler = InstalledHandler('foo.bar')
+
+The handler is installed into loggers for all of the names passed. In
+addition, the logger level is set to 1, which means, log
+everything. If you want to log less than everything, you can provide a
+level keyword argument. The level setting effects only the named
+loggers.
+
+Then, any log output is collected in the handler:
+
+ >>> logging.getLogger('foo.bar').exception('eek')
+ >>> logging.getLogger('foo.bar').info('blah blah')
+
+ >>> for record in handler.records:
+ ... print record.name, record.levelname
+ ... print ' ', record.getMessage()
+ foo.bar ERROR
+ eek
+ foo.bar INFO
+ blah blah
+
+A similar effect can be gotten by just printing the handler:
+
+ >>> print handler
+ foo.bar ERROR
+ eek
+ foo.bar INFO
+ blah blah
+
+After checking the log output, you need to uninstall the handler:
+
+ >>> handler.uninstall()
+
+At which point, the handler won't get any more log output.
+Let's clear the handler:
+
+ >>> handler.clear()
+ >>> handler.records
+ []
+
+And then log something:
+
+ >>> logging.getLogger('foo.bar').info('blah')
+
+and, sure enough, we still have no output:
+
+ >>> handler.records
+ []
+
+$Id: loggingsupport.py 28349 2004-11-06 00:10:32Z tim_one $
+"""
+
+import logging
+
+class Handler(logging.Handler):
+
+ def __init__(self, *names, **kw):
+ logging.Handler.__init__(self)
+ self.names = names
+ self.records = []
+ self.setLoggerLevel(**kw)
+
+ def setLoggerLevel(self, level=1):
+ self.level = level
+ self.oldlevels = {}
+
+ def emit(self, record):
+ self.records.append(record)
+
+ def clear(self):
+ del self.records[:]
+
+ def install(self):
+ for name in self.names:
+ logger = logging.getLogger(name)
+ self.oldlevels[name] = logger.level
+ logger.setLevel(self.level)
+ logger.addHandler(self)
+
+ def uninstall(self):
+ for name in self.names:
+ logger = logging.getLogger(name)
+ logger.setLevel(self.oldlevels[name])
+ logger.removeHandler(self)
+
+ def __str__(self):
+ return '\n'.join(
+ [("%s %s\n %s" %
+ (record.name, record.levelname,
+ '\n'.join([line
+ for line in record.getMessage().split('\n')
+ if line.strip()])
+ )
+ )
+ for record in self.records]
+ )
+
+
+class InstalledHandler(Handler):
+
+ def __init__(self, *names):
+ Handler.__init__(self, *names)
+ self.install()
Copied: ZODB/branches/ctheune-blobsupport/src/ZODB/tests/multidb.txt (from rev 29627, ZODB/trunk/src/ZODB/tests/multidb.txt)
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/tests/testConnection.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/tests/testConnection.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/tests/testConnection.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -647,6 +647,8 @@
self._storage = StubStorage()
classFactory = None
+ database_name = 'stubdatabase'
+ databases = {'stubdatabase': database_name}
def invalidate(self, transaction, dict_with_oid_keys, connection):
pass
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/tests/test_doctest_files.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/tests/test_doctest_files.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/tests/test_doctest_files.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -15,4 +15,6 @@
from zope.testing.doctestunit import DocFileSuite
def test_suite():
- return DocFileSuite("dbopen.txt")
+ return DocFileSuite("dbopen.txt",
+ "multidb.txt",
+ )
Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/utils.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/utils.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/utils.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -21,6 +21,8 @@
from cStringIO import StringIO
import weakref
import warnings
+from tempfile import mkstemp
+import os
from persistent.TimeStamp import TimeStamp
@@ -305,3 +307,10 @@
# We're cheating by breaking into the internals of Python's
# WeakValueDictionary here (accessing its .data attribute).
return self.data.data.values()
+
+
+def mktemp():
+ """Create a temp file, known by name, in a semi-secure manner."""
+ handle, filename = mkstemp()
+ os.close(handle)
+ return filename
Modified: ZODB/branches/ctheune-blobsupport/src/persistent/interfaces.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/persistent/interfaces.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/persistent/interfaces.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -257,18 +257,35 @@
def setstate(object):
"""Load the state for the given object.
- The object should be in the ghost state.
- The object's state will be set and the object will end up
- in the saved state.
+ The object should be in the ghost state. The object's state will be
+ set and the object will end up in the saved state.
The object must provide the IPersistent interface.
"""
+ def oldstate(obj, tid):
+ """Return copy of 'obj' that was written by transaction 'tid'.
+
+ The returned object does not have the typical metadata (_p_jar, _p_oid,
+ _p_serial) set. I'm not sure how references to other peristent objects
+ are handled.
+
+ Parameters
+ obj: a persistent object from this Connection.
+ tid: id of a transaction that wrote an earlier revision.
+
+ Raises KeyError if tid does not exist or if tid deleted a revision of
+ obj.
+ """
+
def register(object):
"""Register an IPersistent with the current transaction.
This method must be called when the object transitions to
the changed state.
+
+ A subclass could override this method to customize the default
+ policy of one transaction manager for each thread.
"""
def mtime(object):
Modified: ZODB/branches/ctheune-blobsupport/src/transaction/interfaces.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/transaction/interfaces.py 2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/transaction/interfaces.py 2005-03-22 04:16:03 UTC (rev 29637)
@@ -18,104 +18,7 @@
import zope.interface
-class IResourceManager(zope.interface.Interface):
- """Objects that manage resources transactionally.
-
- These objects may manage data for other objects, or they may manage
- non-object storages, such as relational databases.
-
- IDataManagerOriginal is the interface currently provided by ZODB
- database connections, but the intent is to move to the newer
- IDataManager.
- """
-
- # Two-phase commit protocol. These methods are called by the
- # ITransaction object associated with the transaction being
- # committed.
-
- def tpc_begin(transaction):
- """Begin two-phase commit, to save data changes.
-
- An implementation should do as much work as possible without
- making changes permanent. Changes should be made permanent
- when tpc_finish is called (or aborted if tpc_abort is called).
- The work can be divided between tpc_begin() and tpc_vote(), and
- the intent is that tpc_vote() be as fast as possible (to minimize
- the period of uncertainty).
-
- transaction is the ITransaction instance associated with the
- transaction being committed.
- """
-
- def tpc_vote(transaction):
- """Verify that a resource manager can commit the transaction.
-
- This is the last chance for a resource manager to vote 'no'. A
- resource manager votes 'no' by raising an exception.
-
- transaction is the ITransaction instance associated with the
- transaction being committed.
- """
-
- def tpc_finish(transaction):
- """Indicate confirmation that the transaction is done.
-
- transaction is the ITransaction instance associated with the
- transaction being committed.
-
- This should never fail. If this raises an exception, the
- database is not expected to maintain consistency; it's a
- serious error.
- """
-
- def tpc_abort(transaction):
- """Abort a transaction.
-
- transaction is the ITransaction instance associated with the
- transaction being committed.
-
- All changes made by the current transaction are aborted. Note
- that this includes all changes stored in any savepoints that may
- be associated with the current transaction.
-
- tpc_abort() can be called at any time, either in or out of the
- two-phase commit.
-
- This should never fail.
- """
-
- # The savepoint/rollback API.
-
- def savepoint(transaction):
- """Save partial transaction changes.
-
- There are two purposes:
-
- 1) To allow discarding partial changes without discarding all
- dhanges.
-
- 2) To checkpoint changes to disk that would otherwise live in
- memory for the duration of the transaction.
-
- Returns an object implementing ISavePoint2 that can be used
- to discard changes made since the savepoint was captured.
-
- An implementation that doesn't support savepoints should implement
- this method by returning a savepoint object that raises an
- exception when its rollback method is called. The savepoint method
- shouldn't raise an error. This way, transactions that create
- savepoints can proceed as long as an attempt is never made to roll
- back a savepoint.
- """
-
- def discard(transaction):
- """Discard changes within the transaction since the last savepoint.
-
- That means changes made since the last savepoint if one exists, or
- since the start of the transaction.
- """
-
-class IDataManagerOriginal(zope.interface.Interface):
+class IDataManager(zope.interface.Interface):
"""Objects that manage transactional storage.
These objects may manage data for other objects, or they may manage
@@ -155,7 +58,7 @@
has been called; this is only used when the transaction is
being committed.
- This call also implied the beginning of 2-phase commit.
+ This call also implies the beginning of 2-phase commit.
"""
# Two-phase commit protocol. These methods are called by the
@@ -180,10 +83,12 @@
"""
-
def tpc_abort(transaction):
"""Abort a transaction.
+ This is called by a transaction manager to end a two-phase commit on
+ the data manager.
+
This is always called after a tpc_begin call.
transaction is the ITransaction instance associated with the
@@ -202,6 +107,11 @@
database is not expected to maintain consistency; it's a
serious error.
+ It's important that the storage calls the passed function
+ while it still has its lock. We don't want another thread
+ to be able to read any updated data until we've had a chance
+ to send an invalidation message to all of the other
+ connections!
"""
def tpc_vote(transaction):
@@ -214,126 +124,47 @@
transaction being committed.
"""
- def commit(object, transaction):
- """CCCommit changes to an object
+ def commit(transaction):
+ """Commit modifications to registered objects.
Save the object as part of the data to be made persistent if
the transaction commits.
- """
- def abort(object, transaction):
- """Abort changes to an object
-
- Only changes made since the last transaction or
- sub-transaction boundary are discarded.
-
- This method may be called either:
-
- o Outside of two-phase commit, or
-
- o In the first phase of two-phase commit
-
+ This includes conflict detection and handling. If no conflicts or
+ errors occur it saves the objects in the storage.
"""
- def sortKey():
- """
- Return a key to use for ordering registered DataManagers
-
- ZODB uses a global sort order to prevent deadlock when it commits
- transactions involving multiple resource managers. The resource
- manager must define a sortKey() method that provides a global ordering
- for resource managers.
- """
-
-class IDataManager(zope.interface.Interface):
- """Data management interface for storing objects transactionally.
-
- ZODB database connections currently provides the older
- IDataManagerOriginal interface, but the intent is to move to this newer
- IDataManager interface.
-
- Our hope is that this interface will evolve and become the standard
- interface. There are some issues to be resolved first, like:
-
- - Probably want separate abort methods for use in and out of
- two-phase commit.
-
- - The savepoint api may need some more thought.
-
- """
-
- def prepare(transaction):
- """Perform the first phase of a 2-phase commit
-
- The data manager prepares for commit any changes to be made
- persistent. A normal return from this method indicated that
- the data manager is ready to commit the changes.
-
- The data manager must raise an exception if it is not prepared
- to commit the transaction after executing prepare().
-
- The transaction must match that used for preceeding
- savepoints, if any.
- """
-
- # This is equivalent to zodb3's tpc_begin, commit, and
- # tpc_vote combined.
-
def abort(transaction):
- """Abort changes made by transaction
+ """Abort a transaction and forget all changes.
- This may be called before two-phase commit or in the second
- phase of two-phase commit.
+ Abort must be called outside of a two-phase commit.
- The transaction must match that used for preceeding
- savepoints, if any.
-
+ Abort is called by the transaction manager to abort transactions
+ that are not yet in a two-phase commit.
"""
- # This is equivalent to *both* zodb3's abort and tpc_abort
- # calls. This should probably be split into 2 methods.
-
- def commit(transaction):
- """Finish two-phase commit
-
- The prepare method must be called, with the same transaction,
- before calling commit.
-
- """
-
- # This is equivalent to zodb3's tpc_finish
-
- def savepoint(transaction):
- """Do tentative commit of changes to this point.
-
- Should return an object implementing IRollback that can be used
- to rollback to the savepoint.
-
- Note that (unlike zodb3) this doesn't use a 2-phase commit
- protocol. If this call fails, or if a rollback call on the
- result fails, the (containing) transaction should be
- aborted. Aborting the containing transaction is *not* the
- responsibility of the data manager, however.
-
- An implementation that doesn't support savepoints should
- implement this method by returning a rollback implementation
- that always raises an error when it's rollback method is
- called. The savepoing method shouldn't raise an error. This
- way, transactions that create savepoints can proceed as long
- as an attempt is never made to roll back a savepoint.
-
- """
-
def sortKey():
- """
- Return a key to use for ordering registered DataManagers
+ """Return a key to use for ordering registered DataManagers
ZODB uses a global sort order to prevent deadlock when it commits
transactions involving multiple resource managers. The resource
manager must define a sortKey() method that provides a global ordering
for resource managers.
"""
+ # XXX: Alternate version:
+ #"""Return a consistent sort key for this connection.
+ #
+ #This allows ordering multiple connections that use the same storage in
+ #a consistent manner. This is unique for the lifetime of a connection,
+ #which is good enough to avoid ZEO deadlocks.
+ #"""
+ def beforeCompletion(transaction):
+ """Hook that is called by the transaction before completing a commit"""
+
+ def afterCompletion(transaction):
+ """Hook that is called by the transaction after completing a commit"""
+
class ITransaction(zope.interface.Interface):
"""Object representing a running transaction.
@@ -414,35 +245,7 @@
# Unsure: is this allowed to cause an exception here, during
# the two-phase commit, or can it toss data silently?
-class ISavePoint(zope.interface.Interface):
- """ISavePoint objects represent partial transaction changes.
- Sequences of savepoint objects are associated with transactions,
- and with IResourceManagers.
- """
-
- def rollback():
- """Discard changes made after this savepoint.
-
- This includes discarding (call the discard method on) all
- subsequent savepoints.
- """
-
- def discard():
- """Discard changes saved by this savepoint.
-
- That means changes made since the immediately preceding
- savepoint if one exists, or since the start of the transaction,
- until this savepoint.
-
- Once a savepoint has been discarded, it's an error to attempt
- to rollback or discard it again.
- """
-
- next_savepoint = zope.interface.Attribute(
- """The next savepoint (later in time), or None if self is the
- most recent savepoint.""")
-
class IRollback(zope.interface.Interface):
def rollback():
@@ -457,3 +260,4 @@
- The transaction has ended.
"""
+
Copied: ZODB/branches/ctheune-blobsupport/src/zope/proxy (from rev 29627, ZODB/trunk/src/zope/proxy)
Property changes on: ZODB/branches/ctheune-blobsupport/src/zope/proxy
___________________________________________________________________
Name: svn:ignore
+ *so
More information about the Zodb-checkins
mailing list