[Zope-CVS] CVS: Products/Ape/lib/apelib/zodb3 - zodbtables.py:1.2
connection.py:1.7 consts.py:1.2 db.py:1.6 interfaces.py:1.2
scanner.py:1.3 serializers.py:1.4 storage.py:1.9 utils.py:1.3
gateways.py:NONE oidencoder.py:NONE
Shane Hathaway
shane at zope.com
Mon Feb 2 10:07:53 EST 2004
Update of /cvs-repository/Products/Ape/lib/apelib/zodb3
In directory cvs.zope.org:/tmp/cvs-serv26672/lib/apelib/zodb3
Modified Files:
connection.py consts.py db.py interfaces.py scanner.py
serializers.py storage.py utils.py
Added Files:
zodbtables.py
Removed Files:
gateways.py oidencoder.py
Log Message:
Moved ape-0_8-branch to the HEAD.
>From CHANGES.txt:
- Major restructuring to reduce the number of concepts in
Ape. Keychains and keys have been replaced with simple string OIDs.
There is now a flat namespace of mappers instead of a tree. Only
one classifier and one OID generator are used in any object
database.
- The ZODB root object is now stored on the filesystem.
=== Products/Ape/lib/apelib/zodb3/zodbtables.py 1.1 => 1.2 ===
--- /dev/null Mon Feb 2 10:07:53 2004
+++ Products/Ape/lib/apelib/zodb3/zodbtables.py Mon Feb 2 10:07:22 2004
@@ -0,0 +1,418 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""ZODB tables with support for basic relational operations.
+
+$Id$
+"""
+
+from ZODB import Persistent
+from BTrees.IIBTree import IITreeSet, intersection
+from BTrees.IOBTree import IOBTree
+from BTrees.OIBTree import OIBTree
+from BTrees.OOBTree import OOBTree
+from Record import Record
+
+
+class DuplicateError(Exception):
+ """Duplicated data record"""
+
+
+class Column:
+
+ def __init__(self, name, primary, indexed):
+ self.name = name # string
+ self.primary = primary # boolean
+ self.indexed = indexed # boolean
+
+ def __repr__(self):
+ return "<%s(%s)>" % (self.__class__.__name__, self.name)
+
+
+class TableSchema:
+
+ reserved_names = ('rid',)
+
+ def __init__(self):
+ self.columns = []
+ self.column_names = {}
+
+ def addColumn(self, name, primary=0, indexed=0):
+ if name in self.reserved_names:
+ raise ValueError, "Column name %s is reserved" % repr(name)
+ if self.column_names.has_key(name):
+ raise ValueError, "Column %s already exists" % repr(name)
+ self.column_names[name] = 1
+ self.columns.append(Column(name, primary, indexed))
+
+ def getColumns(self):
+ return tuple(self.columns)
+
+ def __repr__(self):
+ names = []
+ for c in self.columns:
+ names.append(c.name)
+ return "<%s(%s)>" % (self.__class__.__name__, ', '.join(names))
+
+
+class TableRecordMixin:
+
+ def __repr__(self):
+ items = []
+ for name, position in self.__record_schema__.items():
+ value = repr(getattr(self, name))
+ items.append((position, "%s=%s" % (name, value)))
+ items.sort()
+ params = []
+ for position, p in items:
+ params.append(p)
+ return "<%s(%s)>" % (self.__class__.__name__, ', '.join(params))
+
+
+class Table(Persistent):
+ """Simple, generic relational table.
+ """
+ schema = None
+ _v_record_class = None
+
+ def __init__(self, schema=None):
+ if schema is not None:
+ self.schema = schema
+ columns = schema.getColumns()
+ self.col_info = [] # [(tuple position, column),]
+ self.positions = {}
+ for i in range(len(columns)):
+ # Leave space for the record ID at position 0.
+ position = i + 1
+ self.col_info.append((position, columns[i]))
+ self.positions[columns[i].name] = position
+ self.proto_record = [None] * (len(columns) + 1)
+ self.next_rid = 1
+ self.clear()
+
+
+ def clear(self):
+ self.data = IOBTree() # {rid -> record as tuple}
+ self.indexes = {} # {index_name -> OOBTree({value -> IITreeSet})}
+ self.primary_index = OIBTree() # {primary key -> rid}
+ for position, column in self.col_info:
+ if column.indexed:
+ self.indexes[column.name] = OOBTree()
+
+
+ def tuplify(self, params):
+ """Accepts a mapping-like object and returns a tuple.
+ """
+ record = self.proto_record[:]
+ positions = self.positions
+ if hasattr(params, '__record_schema__'):
+ for name in params.__record_schema__.keys():
+ position = positions[name]
+ record[position] = params[name]
+ else:
+ for name, value in params.items():
+ position = positions[name]
+ record[position] = value
+ return tuple(record)
+
+
+ def insert(self, params):
+ record = self.tuplify(params)
+
+ # Determine the primary key.
+ primary_key = []
+ for position, column in self.col_info:
+ if column.primary:
+ if record[position] is None:
+ raise ValueError, (
+ "No value provided for primary key column %s"
+ % repr(column.name))
+ primary_key.append(record[position])
+ if primary_key:
+ primary_key = tuple(primary_key)
+ if self.primary_index.has_key(primary_key):
+ raise DuplicateError(
+ "Primary key %s in use" % repr(primary_key))
+
+ # Add a record.
+ rid = self.next_rid
+ self.next_rid += 1 # XXX Hotspot!
+ record = (rid,) + record[1:]
+ self.data[rid] = record
+ if primary_key:
+ self.primary_index[primary_key] = rid
+
+ # Add to indexes.
+ for position, column in self.col_info:
+ name = column.name
+ value = record[position]
+ if value is not None:
+ if self.indexes.has_key(name):
+ set = self.indexes[name].get(value)
+ if set is None:
+ set = IITreeSet()
+ self.indexes[name][value] = set
+ set.insert(rid)
+
+ # Return the number of rows inserted.
+ return 1
+
+
+ def delete(self, filter):
+ rids = self._select_rids(self.tuplify(filter))
+ if rids is None:
+ # Zap everything
+ count = len(self.data)
+ self.clear()
+ return count
+ elif not rids:
+ # No rows selected
+ return 0
+
+ rids = tuple(rids) # Make sure rids is a static sequence
+ for rid in rids:
+ old_r = self.data[rid]
+ assert old_r[0] == rid
+ primary_key = []
+ for position, column in self.col_info:
+ old_value = old_r[position]
+ if old_value is not None:
+ if column.primary:
+ primary_key.append(old_value)
+ # Remove from indexes.
+ index = self.indexes.get(column.name)
+ if index is not None:
+ if index.has_key(old_value):
+ # Remove an index entry.
+ set = index[old_value]
+ set.remove(rid)
+ if not set:
+ del index[old_value]
+
+ if primary_key:
+ # Remove a primary key.
+ primary_key = tuple(primary_key)
+ assert self.primary_index[primary_key] == rid
+ del self.primary_index[primary_key]
+
+ # Remove the data.
+ del self.data[rid]
+
+ return len(rids)
+
+
+ def update(self, filter, changes):
+ rids = self._select_rids(self.tuplify(filter))
+ if rids is None:
+ rids = self.data.keys()
+ elif not rids:
+ # Nothing needs to be updated.
+ return 0
+ count = len(rids)
+
+ # Identify changes.
+ old_data = {} # rid -> old tuple
+ new_data = {} # rid -> new tuple
+ old_to_new = {} # old primary key -> new primary key
+ new_to_rid = {} # new primary key -> rid
+
+ record = self.tuplify(changes)
+ for rid in rids:
+ old_r = self.data[rid]
+ old_data[rid] = old_r
+ new_r = list(old_r)
+ # new_r and old_r contain record tuples.
+ for position, column in self.col_info:
+ if record[position] is not None:
+ new_r[position] = record[position]
+ new_data[rid] = tuple(new_r)
+ # Hmm. The code below allows an update to change the primary
+ # key. It might be better to prevent primary key columns from
+ # being changed by an update() call.
+ opk = []
+ npk = []
+ for position, column in self.col_info:
+ if column.primary:
+ opk.append(old_r[position])
+ npk.append(new_r[position])
+ if opk != npk:
+ opk = tuple(opk)
+ npk = tuple(npk)
+ old_to_new[opk] = npk
+ new_to_rid[npk] = rid
+
+ # Look for primary key conflicts. A primary key conflict can
+ # occur when changing a record to a different primary key and
+ # the new primary key is already in use.
+ for pk in old_to_new.values():
+ if (self.primary_index.has_key(pk)
+ and not old_to_new.has_key(pk)):
+ raise DuplicateError("Primary key %s in use" % repr(pk))
+
+ # Update the data.
+ self.data.update(new_data)
+
+ # Remove old primary key indexes and insert new primary key indexes.
+ for pk in old_to_new.keys():
+ del self.primary_index[pk]
+ self.primary_index.update(new_to_rid)
+
+ # Update indexes.
+ for rid, old_r in old_data.items():
+ for position, column in self.col_info:
+ index = self.indexes.get(column.name)
+ if index is not None:
+ new_value = record[position]
+ old_value = old_r[position]
+ if new_value != old_value:
+ if old_value is not None and index.has_key(old_value):
+ # Remove an index entry.
+ set = index[old_value]
+ set.remove(rid)
+ if not set:
+ del index[old_value]
+ if new_value is not None:
+ # Add an index entry.
+ set = index.get(new_value)
+ if set is None:
+ set = IITreeSet()
+ index[new_value] = set
+ set.insert(rid)
+
+ # Return the number of rows affected.
+ return count
+
+
+ def getRecordClass(self):
+ klass = self._v_record_class
+ if klass is None:
+ schema = {'rid': 0}
+ for position, column in self.col_info:
+ schema[column.name] = position
+ class TableRecord(TableRecordMixin, Record):
+ __record_schema__ = schema
+ self._v_record_class = klass = TableRecord
+ return klass
+
+
+ def select(self, filter):
+ rids = self._select_rids(self.tuplify(filter))
+ if rids is None:
+ # All
+ klass = self.getRecordClass()
+ return [klass(rec) for rec in self.data.values()]
+ elif rids:
+ # Some
+ klass = self.getRecordClass()
+ data = self.data
+ return [klass(data[rid]) for rid in rids]
+ else:
+ # None
+ return []
+
+
+ def _select_rids(self, query):
+ """Searches the table for matches, returning record ids.
+
+ Returns a sequence of record ids, or None for all records.
+ """
+ primary_key = []
+ params = 0 # The number of parameters specified
+ primary_params = 0 # The number of primary params specified
+ for position, column in self.col_info:
+ value = query[position]
+ if value is not None:
+ params += 1
+ if column.primary:
+ primary_params += 1
+ if primary_key is not None:
+ primary_key.append(value)
+ elif column.primary:
+ # Didn't fully specify the primary key.
+ # Can't search by primary key.
+ primary_key = None
+
+ if not params:
+ # No query. Select all.
+ return None
+
+ # First strategy: try to satisfy the request by consulting
+ # the primary key index.
+ if primary_key:
+ # The primary key is complete. The result set will have
+ # either zero rows or one row.
+ primary_key = tuple(primary_key)
+ rid = self.primary_index.get(primary_key)
+ if rid is None:
+ return ()
+ # Possibly filter out the single item.
+ if params > primary_params:
+ cand = self.data[rid]
+ for position, column in self.col_info:
+ if query[position] is not None:
+ if cand[position] != query[position]:
+ # Not a match.
+ return ()
+ return (rid,)
+
+ # Second strategy: try to satisfy the request by intersecting
+ # indexes.
+ rids = None
+ iteration_filters = []
+ for position, column in self.col_info:
+ value = query[position]
+ if value is not None:
+ index = self.indexes.get(column.name)
+ if index is None:
+ iteration_filters.append((position, value))
+ else:
+ set = index.get(value)
+ if set is None:
+ # No rows satisfy this criterion.
+ return ()
+ if rids is None:
+ rids = set
+ else:
+ rids = intersection(rids, set)
+ if not rids:
+ # No rows satisfy all criteria.
+ return ()
+ if rids is not None:
+ rids = rids.keys()
+
+ if not iteration_filters:
+ # Indexes did all the work. No need to search each record.
+ return rids
+
+ # Fallback strategy: Eliminate items one by one.
+ if rids is None:
+ # Use the whole data set.
+ candidates = self.data.values()
+ else:
+ # Use the specified records.
+ candidates = [self.data[rid] for rid in rids]
+
+ rids = []
+ append = rids.append
+ for cand in candidates:
+ for position, value in iteration_filters:
+ if cand[position] != value:
+ # Not a match.
+ break
+ else:
+ # A match.
+ append(cand[0])
+ return rids
+
+ def __repr__(self):
+ return "<%s(schema=%s)>" % (self.__class__.__name__, repr(self.schema))
=== Products/Ape/lib/apelib/zodb3/connection.py 1.6 => 1.7 ===
--- Products/Ape/lib/apelib/zodb3/connection.py:1.6 Wed Jul 30 17:33:12 2003
+++ Products/Ape/lib/apelib/zodb3/connection.py Mon Feb 2 10:07:22 2004
@@ -23,7 +23,8 @@
from cPickle import Unpickler, Pickler
from Acquisition import aq_base
-from ZODB import Persistent
+from Persistence import Persistent
+from ZODB.Transaction import Transaction
from ZODB.POSException \
import ConflictError, ReadConflictError, InvalidObjectReference, \
StorageError
@@ -33,7 +34,7 @@
from consts import HASH0, DEBUG
from apelib.core.io import ObjectSystemIO, ClassifiedState
-from apelib.core.interfaces import IKeyedObjectSystem
+from apelib.core.interfaces import IObjectDatabase, LoadError
class ApeConnection (Connection):
@@ -49,28 +50,57 @@
_osio = None
_scan_ctl = None
- __implements__ = (IKeyedObjectSystem,
+ __implements__ = (IObjectDatabase,
getattr(Connection, '__implements__', ()))
def _setDB(self, odb):
Connection._setDB(self, odb)
- if odb._scan_ctl is not None:
+ pool_ctl = odb.pool_scan_ctl
+ if pool_ctl is not None:
ctl = self._scan_ctl
if ctl is None:
- self._scan_ctl = ctl = odb._scan_ctl.newConnection()
- if ctl.ready():
+ self._scan_ctl = ctl = pool_ctl.newConnection()
+ if ctl.elapsed():
+ # Let the scanner know which OIDs matter.
ctl.setOIDs(self._cache.cache_data.keys())
+ # If it's time, scan on behalf of the whole pool.
+ if pool_ctl.elapsed():
+ pool_ctl.scan()
# If there were any invalidations, process them now.
if self._invalidated:
self._flush_invalidations()
+ def _prepareRoot(self):
+ osio = self.getObjectSystemIO()
+ oid = osio.conf.oid_gen.root_oid
+ try:
+ self[oid]
+ except (KeyError, LoadError):
+ # Create the root object.
+ from Persistence import PersistentMapping
+ root = PersistentMapping()
+ root._p_jar = self
+ root._p_changed = 1
+ root._p_oid = oid
+ t = Transaction()
+ t.note('Initial database creation')
+ self.tpc_begin(t)
+ self.commit(root, t)
+ self.tpc_vote(t)
+ self.tpc_finish(t)
+
+ def root(self):
+ osio = self.getObjectSystemIO()
+ oid = osio.conf.oid_gen.root_oid
+ return self[oid]
+
def getObjectSystemIO(self):
osio = self._osio
if osio is None:
- root_mapper = self._db._mapper_resource.access(self)
- osio = ObjectSystemIO(root_mapper, self)
+ conf = self._db._conf_resource.access(self)
+ osio = ObjectSystemIO(conf, self)
self._osio = osio
return osio
@@ -82,7 +112,7 @@
finally:
if db is not None and self._osio is not None:
self._osio = None
- db._mapper_resource.release(self)
+ db._conf_resource.release(self)
def __getitem__(self, oid, tt=type(())):
@@ -103,17 +133,18 @@
raise "Could not load oid %s, pickled data in traceback info may\
contain clues" % (oid)
osio = self.getObjectSystemIO()
- object = osio.newObject(classified_state)
- assert object is not None
+ obj = osio.newObject(classified_state)
+ assert obj is not None
- object._p_oid=oid
- object._p_jar=self
- object._p_changed=None
-
- self._cache[oid] = object
- if oid=='\0\0\0\0\0\0\0\0':
- self._root_=object # keep a ref
- return object
+ obj._p_oid=oid
+ obj._p_jar=self
+ obj._p_changed=None
+
+ self._cache[oid] = obj
+
+ if oid == osio.conf.oid_gen.root_oid:
+ self._root_=obj # keep a ref
+ return obj
def _persistent_load(self, oid, hints=None):
@@ -125,17 +156,17 @@
return obj
if hints:
- mapper_names = hints.get('mapper_names')
- if mapper_names is not None:
- classified_state = ClassifiedState(None, None, mapper_names)
+ mapper_name = hints.get('mapper_name')
+ if mapper_name is not None:
+ classified_state = ClassifiedState(None, None, mapper_name)
osio = self.getObjectSystemIO()
- object = osio.newObject(classified_state)
- if object is not None:
- object._p_oid=oid
- object._p_jar=self
- object._p_changed=None
- self._cache[oid] = object
- return object
+ obj = osio.newObject(classified_state)
+ if obj is not None:
+ obj._p_oid=oid
+ obj._p_jar=self
+ obj._p_changed=None
+ self._cache[oid] = obj
+ return obj
# We don't have enough info for fast loading. Load the whole object.
return self[oid]
@@ -147,8 +178,8 @@
self._begun = 1
- def commit(self, object, transaction):
- if object is self:
+ def commit(self, obj, transaction):
+ if obj is self:
self.mayBegin(transaction)
# We registered ourself. Execute a commit action, if any.
if self._Connection__onCommitActions is not None:
@@ -156,8 +187,8 @@
self._Connection__onCommitActions.pop(0)
apply(getattr(self, method_name), (transaction,) + args, kw)
return
- oid=object._p_oid
- assert oid != 'unmanaged', repr(object)
+ oid=obj._p_oid
+ assert oid != 'unmanaged', repr(obj)
#invalid=self._invalidated.get
invalid = self._invalid
@@ -165,20 +196,20 @@
if modified is None:
modified = self._invalidating
- if oid is None or object._p_jar is not self:
+ if oid is None or obj._p_jar is not self:
# new object
oid = self.new_oid()
- object._p_jar=self
- object._p_oid=oid
+ obj._p_jar=self
+ obj._p_oid=oid
self._creating.append(oid)
- elif object._p_changed:
+ elif obj._p_changed:
if (
- (invalid(oid) and not hasattr(object, '_p_resolveConflict'))
+ (invalid(oid) and not hasattr(obj, '_p_resolveConflict'))
or
invalid(None)
):
- raise ConflictError(object=object)
+ raise ConflictError(object=obj)
modified.append(oid)
else:
@@ -187,7 +218,7 @@
self.mayBegin(transaction)
- stack=[object]
+ stack=[obj]
file=StringIO()
seek=file.seek
@@ -205,11 +236,11 @@
version=self._version
while stack:
- object=stack[-1]
+ obj=stack[-1]
del stack[-1]
- oid=object._p_oid
- assert oid != 'unmanaged', repr(object)
- serial = self.getSerial(object)
+ oid=obj._p_oid
+ assert oid != 'unmanaged', repr(obj)
+ serial = self.getSerial(obj)
if serial == HASH0:
# new object
self._creating.append(oid)
@@ -219,29 +250,27 @@
# for the first object on the stack.
if (
(invalid(oid) and
- not hasattr(object, '_p_resolveConflict'))
+ not hasattr(obj, '_p_resolveConflict'))
or
invalid(None)
):
- raise ConflictError(object=object)
+ raise ConflictError(object=obj)
modified.append(oid)
# SDH: hook in the serializer.
- # state=object.__getstate__()
- oid_encoder = self._db._oid_encoder
- keychain = oid_encoder.decode(oid)
+ # state=obj.__getstate__()
osio = self.getObjectSystemIO()
- event, classified_state = osio.serialize(keychain, object)
- ext_refs = event.getExternalRefs()
+ event, classified_state = osio.serialize(oid, obj)
+ ext_refs = event.external
if ext_refs:
- for (ext_keychain, ext_ref) in ext_refs:
+ for (ext_oid, ext_ref) in ext_refs:
if self.getSerial(ext_ref) == HASH0:
- ext_oid = oid_encoder.encode(ext_keychain)
+ # New object
if ext_ref._p_jar is not None:
if ext_ref._p_jar is not self:
raise InvalidObjectReference, (
"Can't refer from %s in %s to %s in %s"
- % (repr(object), repr(self), repr(ext_ref),
+ % (repr(obj), repr(self), repr(ext_ref),
repr(ext_ref._p_jar)))
else:
ext_ref._p_jar = self
@@ -252,9 +281,8 @@
ext_ref._p_oid = ext_oid
stack.append(ext_ref)
- unmanaged = event.getUnmanagedPersistentObjects()
- if unmanaged:
- self.handleUnmanaged(object, unmanaged)
+ if event.upos:
+ self.handleUnmanaged(obj, event.upos)
seek(0)
clear_memo()
@@ -266,7 +294,7 @@
# Put the object in the cache before handling the
# response, just in case the response contains the
# serial number for a newly created object
- try: cache[oid] = object
+ try: cache[oid] = obj
except ValueError:
# "Cannot re-register an object under a different
# oid". This can happen when the user is working on
@@ -274,20 +302,20 @@
# was used recently. Try to fix it by minimizing
# the cache and trying again.
cache.minimize()
- cache[oid] = object
+ cache[oid] = obj
except:
- if aq_base(object) is not object:
+ if aq_base(obj) is not obj:
# Yuck, someone tried to store a wrapper. Try to
# cache it unwrapped.
- cache[oid] = aq_base(object)
+ cache[oid] = aq_base(obj)
else:
raise
self._handle_serial(s, oid)
- def setstate(self, object):
- oid=object._p_oid
+ def setstate(self, obj):
+ oid=obj._p_oid
if self._storage is None:
msg = ("Shouldn't load state for %s "
@@ -312,9 +340,9 @@
#invalid = self._invalidated.get
invalid = self._invalid
if invalid(oid) or invalid(None):
- if not hasattr(object.__class__, '_p_independent'):
+ if not hasattr(obj.__class__, '_p_independent'):
get_transaction().register(self)
- raise ReadConflictError(object=object)
+ raise ReadConflictError(object=obj)
invalid=1
else:
invalid=0
@@ -331,23 +359,21 @@
# else:
# d=object.__dict__
# for k,v in state.items(): d[k]=v
- keychain = self._db._oid_encoder.decode(oid)
osio = self.getObjectSystemIO()
- event = osio.deserialize(keychain, object, classified_state)
+ event = osio.deserialize(oid, obj, classified_state)
- unmanaged = event.getUnmanagedPersistentObjects()
- if unmanaged:
- self.handleUnmanaged(object, unmanaged)
+ if event.upos:
+ self.handleUnmanaged(obj, event.upos)
- self.setSerial(object, serial)
+ self.setSerial(obj, serial)
if invalid:
- if object._p_independent():
+ if obj._p_independent():
try: del self._invalidated[oid]
except KeyError: pass
else:
get_transaction().register(self)
- raise ConflictError(object=object)
+ raise ConflictError(object=obj)
except ConflictError:
raise
@@ -357,12 +383,12 @@
raise
- def register(self, object):
+ def register(self, obj):
"""Register an object with the appropriate transaction manager.
"""
- assert object._p_jar is self
- if object._p_oid is not None:
- get_transaction().register(object)
+ assert obj._p_jar is self
+ if obj._p_oid is not None:
+ get_transaction().register(obj)
# else someone is trying to trick ZODB into registering an
# object with no OID. OFS.Image.File._read_data() does this.
# Since ApeConnection really needs meaningful OIDs, just ignore
@@ -377,39 +403,37 @@
return '<%s at %08x%s>' % (self.__class__.__name__, id(self), ver)
- def handleUnmanaged(self, object, unmanaged):
+ def handleUnmanaged(self, obj, unmanaged):
+ # Add an event handler to unmanaged subobjects.
+ # The event handler calls self.register() when it changes.
for o in unmanaged:
if isinstance(o, Persistent):
if o._p_jar is None:
o._p_oid = 'unmanaged'
- o._p_jar = UnmanagedJar(self, object._p_oid)
+ o._p_jar = UnmanagedJar(self, obj._p_oid)
else:
- # Turn off the "changed" flag
o._p_changed = 0
- # IKeyedObjectSystem implementation
+ # IObjectDatabase implementation
- def getObject(self, keychain, hints=None):
- oid = self._db._oid_encoder.encode(keychain)
- return self._persistent_load(oid, hints)
+ getObject = _persistent_load
- def identifyObject(self, obj):
- oid = obj._p_oid
+ def identify(self, obj):
+ try:
+ oid = obj._p_oid
+ except AttributeError:
+ raise TypeError("%s does not subclass Persistent" % repr(obj))
if oid is None:
return None
if obj._p_jar is not self:
raise InvalidObjectReference, (
"Can't refer to %s, located in %s, from %s"
% (repr(obj), repr(obj._p_jar), repr(self)))
- return self._db._oid_encoder.decode(oid)
+ return oid
- def newKey(self):
- oid = self.new_oid()
- keychain = self._db._oid_encoder.decode(oid)
- return keychain[-1]
-
- loadStub = getObject # Deprecated
+ def new_oid(self):
+ return self._storage.new_oid()
def getClass(self, module, name):
@@ -426,11 +450,9 @@
if ob._p_changed is not None:
p, serial = self._storage.load(oid, self._version)
if serial != self.getSerial(ob):
- keychain = self._db._oid_encoder.decode(oid)
raise StorageError(
- "Inconsistent serial for keychain %s" % repr(keychain))
+ "Inconsistent serial for oid %s" % repr(oid))
-
def exportFile(self, oid, file=None):
raise NotImplementedError, 'ZEXP Export not implemented'
@@ -446,7 +468,7 @@
# a _serials dictionary.
_serials = None
- SERIAL_CLEANUP_THRESHOLD = 1000
+ serial_cleanup_threshold = 1000
def getSerial(self, ob):
oid = ob._p_oid
@@ -468,10 +490,10 @@
self._serials = serials
if not serials.has_key(oid):
# When the number of recorded serials exceeds the number of
- # cache entries by SERIAL_CLEANUP_THRESHOLD, prune the serials
+ # cache entries by serial_cleanup_threshold, prune the serials
# dictionary.
if (len(serials) >= len(self._cache) +
- self.SERIAL_CLEANUP_THRESHOLD):
+ self.serial_cleanup_threshold):
# clean up
cache_get = self._cache.get
for oid in serials.keys():
=== Products/Ape/lib/apelib/zodb3/consts.py 1.1 => 1.2 ===
--- Products/Ape/lib/apelib/zodb3/consts.py:1.1 Wed Apr 9 23:09:58 2003
+++ Products/Ape/lib/apelib/zodb3/consts.py Mon Feb 2 10:07:22 2004
@@ -23,6 +23,5 @@
else:
DEBUG = 0
-ROOT_OID = '\0' * 8
HASH0 = '\0' * 8
HASH1 = '\0' * 7 + '\001'
=== Products/Ape/lib/apelib/zodb3/db.py 1.5 => 1.6 ===
--- Products/Ape/lib/apelib/zodb3/db.py:1.5 Thu Aug 14 16:22:36 2003
+++ Products/Ape/lib/apelib/zodb3/db.py Mon Feb 2 10:07:22 2004
@@ -16,20 +16,20 @@
$Id$
"""
-from ZODB.DB import DB, Transaction, cPickle, cStringIO, allocate_lock
+import cPickle
+import cStringIO
-from apelib.core.interfaces import IMapper
-from apelib.core.exceptions import ConfigurationError
+from ZODB.DB import DB, Transaction, allocate_lock
+from apelib.core.interfaces import ConfigurationError
from connection import ApeConnection
from storage import ApeStorage
-from oidencoder import OIDEncoder
from resource import StaticResource
-from interfaces import IResourceAccess, IOIDEncoder
+from interfaces import IResourceAccess
-def callMapperFactory(factory, kw):
- """Returns (mapper, tpc_conns) given the name of a factory and arguments.
+def callConfFactory(factory, kw):
+ """Returns (conf, conns) given the name of a factory and arguments.
"""
pos = factory.rfind('.')
if pos < 0:
@@ -50,9 +50,8 @@
# SDH: some extra args.
def __init__(self, storage,
- mapper_resource=None,
+ conf_resource=None,
factory=None,
- oid_encoder=None,
scan_interval=10,
pool_size=7,
cache_size=400,
@@ -64,25 +63,25 @@
):
"""Create an object database.
"""
- if mapper_resource is None:
+ if conf_resource is None:
if factory is not None:
- # Use a mapper factory
- mapper, connections = callMapperFactory(factory, kw)
- assert IMapper.isImplementedBy(mapper)
- mapper_resource = StaticResource(mapper)
+ # Use a configuration factory
+ conf, connections = callConfFactory(factory, kw)
+ conf_resource = StaticResource(conf)
else:
if kw:
raise ConfigurationError('Extra keyword args: %s' % kw)
if isinstance(storage, ApeStorage):
- # Use the mapper from the storage
- mapper_resource = storage.getMapperResource()
+ # Use the configuration from the storage
+ conf_resource = storage.getConfResource()
else:
- raise ConfigurationError('No mapper or factory specified')
+ raise ConfigurationError(
+ 'No configuration or factory specified')
else:
- # mapper_resource was specified
+ # conf_resource was specified
if kw:
raise ConfigurationError('Extra keyword args: %s' % kw)
- assert IResourceAccess.isImplementedBy(mapper_resource)
+ assert IResourceAccess.isImplementedBy(conf_resource)
assert factory is None
# Allocate locks:
@@ -107,19 +106,15 @@
storage.registerDB(self, None)
if not hasattr(storage,'tpc_vote'): storage.tpc_vote=lambda *args: None
- if oid_encoder is None:
- oid_encoder = OIDEncoder()
- else:
- assert IOIDEncoder.isImplementedBy(oid_encoder)
- self._oid_encoder = oid_encoder
- self._mapper_resource = mapper_resource
+ self._conf_resource = conf_resource
scan_interval = int(scan_interval)
if scan_interval > 0:
- from scanner import ScanControl
- ctl = ScanControl(db=self, scan_interval=scan_interval)
- self._scan_ctl = ctl
- ctl.scanner.setStorage(storage)
- storage.setScanner(ctl.scanner)
+ from scanner import PoolScanControl, Scanner
+ pool_ctl = PoolScanControl(storage, db=self, scan_interval=scan_interval)
+ self.pool_scan_ctl = pool_ctl
+ scanner = Scanner()
+ storage.scanner = scanner
+ scanner.storage = storage
else:
self._scan_ctl = None
@@ -131,4 +126,11 @@
if hasattr(storage, 'undoInfo'):
self.undoInfo=storage.undoInfo
+
+ # Create the root object if it doesn't exist
+ c = self.open()
+ try:
+ c._prepareRoot()
+ finally:
+ c.close()
=== Products/Ape/lib/apelib/zodb3/interfaces.py 1.1 => 1.2 ===
--- Products/Ape/lib/apelib/zodb3/interfaces.py:1.1 Wed Apr 9 23:09:58 2003
+++ Products/Ape/lib/apelib/zodb3/interfaces.py Mon Feb 2 10:07:22 2004
@@ -19,15 +19,6 @@
from Interface import Interface
-class IOIDEncoder (Interface):
-
- def decode(oid):
- "Returns a keychain (a tuple) given an OID"
-
- def encode(keychain):
- "Returns an OID (a string) given a keychain"
-
-
class IResourceAccess (Interface):
"""Provides access to a resource that may need periodic updates.
"""
=== Products/Ape/lib/apelib/zodb3/scanner.py 1.2 => 1.3 ===
--- Products/Ape/lib/apelib/zodb3/scanner.py:1.2 Wed Jul 30 17:33:12 2003
+++ Products/Ape/lib/apelib/zodb3/scanner.py Mon Feb 2 10:07:22 2004
@@ -27,23 +27,36 @@
# FUTURE_TIMEOUT defines how long to keep source information regarding
# OIDs that might be used soon.
-FUTURE_TIMEOUT = 10 * 60
+future_timeout = 10 * 60
-class ScanControl:
+class PoolScanControl:
+ """Scanning for a pool of connections.
- def __init__(self, db=None, scan_interval=10):
+ A ScanControl instance is an attribute of an ApeDB instance. The
+ actual scanning is delegated to a Scanner instance attached to an
+ ApeStorage. The delegation theoretically permits scanning to
+ occur on a ZEO server while the ScanControl instances run on
+ separate ZEO clients.
+
+ Assigns scanner-specific identities to database connections for
+ the purpose of tracking which OIDs are still in use.
+ """
+
+ def __init__(self, storage, db=None, scan_interval=10):
+ self.storage = storage
self.db = db
self.next_conn_id = 1
self.conn_oids = IOBTree() # IOBTree({ conn_id -> OOSet([oid]) } })
self.oids = OOSet() # OOSet([oid])
- self.scanner = Scanner()
self.lock = allocate_lock()
self.scan_interval = scan_interval
self.next_scan = time() + scan_interval
def newConnection(self):
+ """Returns a ConnectionScanControl to attach to a new connection.
+ """
self.lock.acquire()
try:
conn_id = self.next_conn_id
@@ -54,6 +67,8 @@
def setConnectionOIDs(self, conn_id, oids):
+ """Records the OIDs a connection is using and periodically scans.
+ """
changed = 0
new_oids = OOSet()
self.lock.acquire()
@@ -71,48 +86,73 @@
finally:
self.lock.release()
if changed:
- self.scanner.setOIDs(new_oids)
- self.mayScan()
+ self.storage.scanner.setOIDs(new_oids)
- def mayScan(self):
+ def elapsed(self):
+ """Returns true if the scan interval has elapsed.
+ """
now = time()
if now >= self.next_scan:
self.next_scan = now + self.scan_interval
- LOG('Ape', DEBUG, 'Scanning %d objects.' % len(self.oids))
- inv = self.scanner.scan()
- self.scanner.pruneFuture()
- LOG('Ape', DEBUG,
- 'Finished scanning. %d objects changed.' % len(inv))
- if inv:
- d = {}
- for oid in inv:
- d[oid] = 1
- if self.db is not None:
- self.db.invalidate(d)
- else:
- LOG('Ape', DEBUG, "No database set, so can't invalidate!")
+ return 1
+ return 0
+
+
+ def scan(self):
+ """Runs a scan and sends invalidation messages to the database.
+ """
+ LOG('Ape', DEBUG, 'Scanning %d objects.' % len(self.oids))
+ scanner = self.storage.scanner
+ inv = scanner.scan(prune)
+ scanner.pruneFuture()
+ LOG('Ape', DEBUG,
+ 'Finished scanning. %d objects changed.' % len(inv))
+ if inv:
+ # Some objects changed and the caches need to be invalidated.
+ d = {}
+ for oid in inv:
+ d[oid] = 1
+ if self.db is not None:
+ self.db.invalidate(d)
+ else:
+ LOG('Ape', DEBUG, "No database set, so can't invalidate!")
class ConnectionScanControl:
+ """Scanning for a database connection (an ApeConnection.)
- def __init__(self, ctl, conn_id):
- self.ctl = ctl
+ Delegates to a ScanControl, which in turn delegates to a Scanner.
+ """
+
+ def __init__(self, pool_ctl, conn_id):
+ self.pool_ctl = pool_ctl
self.conn_id = conn_id
self.next_update = 0
- def ready(self):
+ def elapsed(self):
+ """Returns true if the connection-specific scan interval has elapsed.
+
+ The interval prevents connections from calling setOIDs() with
+ excessive frequency.
+ """
now = time()
if now >= self.next_update:
- self.next_update = now + self.ctl.scan_interval
+ self.next_update = now + self.pool_ctl.scan_interval
return 1
return 0
def setOIDs(self, oids):
- self.ctl.setConnectionOIDs(self.conn_id, oids)
+ """Records the OIDs this connection is using.
+ """
+ self.pool_ctl.setConnectionOIDs(self.conn_id, oids)
class Scanner:
+ """Scanning for an ApeStorage.
+
+ Uses gateways to scan for changes.
+ """
def __init__(self):
self.current = OOBTree() # OOBTree({ oid -> {source->state} })
@@ -121,11 +161,12 @@
self.lock = allocate_lock()
self.storage = None
- def setStorage(self, s):
- # This is needed for calling storage.getSources().
- self.storage = s
-
def setOIDs(self, oids):
+ """Sets the list of OIDs to scan.
+
+ Gathers source information about new OIDs and discards
+ source information for OIDs no longer in use.
+ """
new_sources = {} # { oid -> sourcedict }
self.lock.acquire()
try:
@@ -149,7 +190,7 @@
LOG('Ape', DEBUG, 'Getting sources for %d oids.'
% len(new_sources))
for oid in new_sources.keys():
- new_sources[oid] = self.storage.getSources(oid)
+ new_sources[oid] = self.storage.getPollSources(oid)
else:
LOG('Ape', DEBUG, "Can't get sources for %d oids. "
"Assuming no sources!" % len(new_sources))
@@ -169,22 +210,29 @@
self.lock.release()
- def setSources(self, oid, sources):
+ def afterLoad(self, oid, sources):
+ """Called by the storage after an object is loaded.
+ """
if sources is None:
sources = {}
self.lock.acquire()
try:
- if self.current.has_key(oid):
- # This OID is known to be in use.
- self.current[oid] = sources
- else:
- # This OID might be useful soon.
+ if not self.current.has_key(oid):
+ # This object is being loaded for the first time.
+ # Make a record of its current state immediately
+ # so that the next scan can pick up changes.
self.future[oid] = (sources, time())
+ # else we already have info about this object, and now
+ # isn't a good time to update self.current since that
+ # would prevent changes from being detected at a time when
+ # it's possible to send invalidation messages.
finally:
self.lock.release()
- def setUncommittedSources(self, tid, oid, sources):
+ def afterStore(self, oid, tid, sources):
+ """Called by the storage after an object is stored (but not committed.)
+ """
self.lock.acquire()
try:
t = self.uncommitted.setdefault(tid, {})
@@ -194,6 +242,8 @@
def scan(self):
+ """Scan sources, returning the OIDs of changed objects.
+ """
to_scan = {} # { repo -> { source -> state } }
to_invalidate = {} # { oid -> 1 }
self.lock.acquire() # lock because oid_states might be self.current.
@@ -206,7 +256,7 @@
self.lock.release()
changes = {}
for repo, d in to_scan.items():
- c = repo.freshen(d)
+ c = repo.poll(d)
if c:
changes.update(c)
if changes:
@@ -225,11 +275,13 @@
def pruneFuture(self):
+ """Prunes the cache of future source information.
+ """
if self.future:
self.lock.acquire()
try:
# OIDs older than some timeout will probably never be loaded.
- cutoff = time() - FUTURE_TIMEOUT
+ cutoff = time() - future_timeout
for oid, (sources, atime) in self.future.items():
if atime < cutoff:
del self.future[oid]
@@ -240,6 +292,8 @@
def afterCommit(self, tid):
+ """Commits information recorded by setUncommittedSources().
+ """
self.lock.acquire()
try:
if not self.uncommitted.has_key(tid):
@@ -251,23 +305,35 @@
# Update the sources with new states for the committed OIDs.
to_scan = {} # { repo -> { source -> state } }
for oid, sources in t.items():
- for source, state in sources.items():
- repo, location = source
- to_scan.setdefault(repo, {})[source] = state
+ if sources:
+ for source, state in sources.items():
+ repo, location = source
+ to_scan.setdefault(repo, {})[source] = state
changes = {}
for repo, d in to_scan.items():
- c = repo.freshen(d)
+ c = repo.poll(d)
if c:
changes.update(c)
- for oid, sources in t.items():
- new_sources = {}
- for source, state in sources.items():
- state = changes.get(source, state)
- new_sources[source] = state
- self.setSources(oid, new_sources)
+ self.lock.acquire()
+ try:
+ now = time()
+ for oid, sources in t.items():
+ new_sources = {}
+ if sources:
+ for source, state in sources.items():
+ state = changes.get(source, state)
+ new_sources[source] = state
+ if self.current.has_key(oid):
+ self.current[oid] = new_sources
+ else:
+ self.future[oid] = (new_sources, now)
+ finally:
+ self.lock.release()
def afterAbort(self, tid):
+ """Aborts information recorded by setUncommittedSources().
+ """
self.lock.acquire()
try:
if self.uncommitted.has_key(tid):
=== Products/Ape/lib/apelib/zodb3/serializers.py 1.3 => 1.4 ===
--- Products/Ape/lib/apelib/zodb3/serializers.py:1.3 Tue Sep 16 17:00:07 2003
+++ Products/Ape/lib/apelib/zodb3/serializers.py Mon Feb 2 10:07:22 2004
@@ -18,7 +18,7 @@
import os
from cStringIO import StringIO
-from cPickle import Pickler, Unpickler, UnpickleableError
+from cPickle import Pickler, Unpickler, UnpickleableError, loads, dumps
import time
from types import DictType
@@ -28,99 +28,78 @@
from apelib.core.interfaces \
import ISerializer, IFullSerializationEvent, IFullDeserializationEvent
from apelib.core.events import SerializationEvent, DeserializationEvent
-from apelib.core.exceptions import SerializationError
+from apelib.core.interfaces import SerializationError
from apelib.core.schemas import RowSequenceSchema, FieldSchema
-class BasicPersistentMapping:
- """Basic PersistentMapping (de)serializer
+class StringToPersistentPM:
+ """String-to-Persistent PersistentMapping (de)serializer
- This version assumes the PM maps string keys to object references.
+ Requires that the PM maps string keys to first-class persistent
+ objects.
"""
__implements__ = ISerializer
schema = RowSequenceSchema()
schema.addField('key', 'string', 1)
- schema.addField('keychain', 'keychain')
+ schema.addField('oid', 'string')
- def getSchema(self):
- return self.schema
-
- def canSerialize(self, object):
- return isinstance(object, PersistentMapping)
+ def canSerialize(self, obj):
+ return isinstance(obj, PersistentMapping)
- def serialize(self, obj, event):
- assert self.canSerialize(obj)
+ def serialize(self, event):
+ assert self.canSerialize(event.obj)
res = []
- for key, value in obj.items():
- keychain = event.identifyObject(value)
- if keychain is None:
- keychain = event.makeKeychain(key, 1)
- event.notifySerializedRef(key, value, 0, keychain)
- res.append((key, keychain))
- event.ignoreAttribute('data')
- event.ignoreAttribute('_container')
+ for key, value in event.obj.items():
+ oid = event.obj_db.identify(value)
+ if oid is None:
+ oid = event.conf.oid_gen.new_oid(event, key, True)
+ event.referenced(key, value, False, oid)
+ res.append((key, oid))
+ event.ignore(('data', '_container'))
return res
- def deserialize(self, obj, event, state):
- assert self.canSerialize(obj)
+ def deserialize(self, event, state):
+ assert self.canSerialize(event.obj)
data = {}
- for (key, keychain) in state:
- value = event.dereference(key, keychain)
+ for (key, oid) in state:
+ value = event.resolve(key, oid)
data[key] = value
- obj.__init__(data)
+ event.obj.__init__(data)
-class FixedPersistentMapping:
- """Unchanging persistent mapping.
+class StringToPicklePM:
+ """String-to-Pickle PersistentMapping (de)serializer
- Generally used for a ZODB root object.
+ Requires that the PM maps string keys to second-class persistent
+ objects.
"""
-
__implements__ = ISerializer
- def __init__(self):
- # map: { name -> (keychain, mapper) }
- self.map = {}
-
- def add(self, name, keychain, mapper_names=None):
- self.map[name] = (keychain, mapper_names)
-
- def getSchema(self):
- return None # No storage
-
- def canSerialize(self, object):
- return isinstance(object, PersistentMapping)
-
- def serialize(self, object, event):
- names = object.keys()
- names.sort()
- expected = self.map.keys()
- expected.sort()
- assert names == expected, '%s != %s' % (names, expected)
-
- for name in names:
- keychain, mapper_names = self.map[name]
- subob = object[name]
- event.notifySerializedRef(name, subob, 0, keychain)
+ schema = RowSequenceSchema()
+ schema.addField('key', 'string', 1)
+ schema.addField('value', 'string')
- # One of the two will work. ;-)
- event.ignoreAttribute('data')
- event.ignoreAttribute('_container')
+ def canSerialize(self, obj):
+ return isinstance(obj, PersistentMapping)
+ def serialize(self, event):
+ assert self.canSerialize(event.obj)
+ res = []
+ for key, value in event.obj.items():
+ res.append((key, dumps(value)))
+ event.serialized(key, value, False)
+ event.ignore(('data', '_container'))
+ return res
- def deserialize(self, object, event, state):
- assert state is None
+ def deserialize(self, event, state):
+ assert self.canSerialize(event.obj)
data = {}
- for name, (keychain, mapper_names) in self.map.items():
- subob = event.dereference(name, keychain,
- {'mapper_names': mapper_names})
- data[name] = subob
- # The PersistentMapping doesn't have its data or _container
- # attribute yet, and we don't know what its name should be
- # since PersistentMapping's internal structure is not fixed.
- # So call the PersistentMapping's constructor.
- object.__init__(data)
+ for (key, p) in state:
+ value = loads(p)
+ data[key] = value
+ event.deserialized(key, value)
+ event.obj.__init__(data)
class RollCall:
@@ -129,30 +108,28 @@
Designed for debugging purposes.
"""
__implements__ = ISerializer
+ schema = None # No storage
- def getSchema(self):
- return None # No storage
-
- def canSerialize(self, object):
+ def canSerialize(self, obj):
return 1
- def serialize(self, object, event):
+ def serialize(self, event):
assert IFullSerializationEvent.isImplementedBy(event)
attrs = event.getSerializedAttributeNames()
attrs_map = {}
for attr in attrs:
attrs_map[attr] = 1
missed = []
- for k in object.__dict__.keys():
+ for k in event.obj.__dict__.keys():
if not k.startswith('_v_') and not attrs_map.has_key(k):
missed.append(repr(k))
if missed:
raise SerializationError(
- 'Attribute(s) %s of object at %s not serialized' %
- (', '.join(missed), repr(event.getKeychain())))
+ 'Attribute(s) %s of object %s, oid=%s, not serialized' %
+ (', '.join(missed), repr(event.obj), repr(event.oid)))
return None
- def deserialize(self, object, event, state):
+ def deserialize(self, event, state):
assert state is None
@@ -163,26 +140,23 @@
schema = FieldSchema('data', 'string')
- def getSchema(self):
- return self.schema
-
- def canSerialize(self, object):
+ def canSerialize(self, obj):
try:
- return isinstance(object, Persistent)
+ return isinstance(obj, Persistent)
except TypeError:
# XXX Python 2.1 thinks Persistent is not a class
return 0
- def serialize(self, object, event):
+ def serialize(self, event):
assert IFullSerializationEvent.isImplementedBy(event)
- assert isinstance(object, Persistent)
+ assert isinstance(event.obj, Persistent)
# Allow pickling of cyclic references to the object.
- event.notifySerialized('self', object, 0)
+ event.serialized('self', event.obj, False)
# Ignore previously serialized attributes
- state = object.__dict__.copy()
+ state = event.obj.__dict__.copy()
for key in state.keys():
if key.startswith('_v_'):
del state[key]
@@ -197,9 +171,9 @@
p = Pickler(outfile)
unmanaged = []
- def persistent_id(ob, getInternalRef=event.getInternalRef,
+ def persistent_id(ob, identifyInternal=event.identifyInternal,
unmanaged=unmanaged):
- ref = getInternalRef(ob)
+ ref = identifyInternal(ob)
if ref is None:
if hasattr(ob, '_p_oid'):
# Persistent objects that end up in the remainder
@@ -237,8 +211,8 @@
raise RuntimeError(
'Unable to pickle the %s attribute, %s, '
'of %s at %s. %s.' % (
- repr(attrname), repr(attrvalue), repr(object),
- repr(event.getKeychain()), str(exc)))
+ repr(attrname), repr(attrvalue), repr(event.obj),
+ repr(event.oid), str(exc)))
else:
# Couldn't help.
raise
@@ -246,42 +220,43 @@
p.persistent_id = lambda ob: None # Stop recording references
p.dump(unmanaged)
s = outfile.getvalue()
- event.addUnmanagedPersistentObjects(unmanaged)
+ event.upos.extend(unmanaged)
return s
- def deserialize(self, object, event, state):
+ def deserialize(self, event, state):
assert IFullDeserializationEvent.isImplementedBy(event)
- assert isinstance(object, Persistent)
+ assert isinstance(event.obj, Persistent)
- # Set up to recover cyclic references to the object.
- event.notifyDeserialized('self', object)
+ # Set up to resolve cyclic references to the object.
+ event.deserialized('self', event.obj)
if state:
infile = StringIO(state)
u = Unpickler(infile)
- u.persistent_load = event.loadInternalRef
+ u.persistent_load = event.resolveInternal
s = u.load()
- object.__dict__.update(s)
+ event.obj.__dict__.update(s)
try:
unmanaged = u.load()
except EOFError:
# old pickle with no list of unmanaged objects
pass
else:
- event.addUnmanagedPersistentObjects(unmanaged)
+ event.upos.extend(unmanaged)
class ModTimeAttribute:
- """Sets the _p_mtime attribute."""
+ """Sets the _p_mtime attribute.
+
+ XXX Due to a ZODB limitation, this class has to set the _p_mtime
+ by setting _p_serial.
+ """
__implements__ = ISerializer
schema = FieldSchema('mtime', 'int')
- def getSchema(self):
- return self.schema
-
def canSerialize(self, obj):
try:
return isinstance(obj, Persistent)
@@ -295,14 +270,14 @@
args = time.gmtime(t)[:5] + (t%60,)
obj._p_serial = repr(TimeStamp(*args))
- def serialize(self, obj, event):
+ def serialize(self, event):
now = long(time.time())
- if obj._p_changed:
+ if event.obj._p_changed:
# Indicate that this object just changed. Note that the time
# is a guess.
- self.setTime(obj, now)
+ self.setTime(event.obj, now)
return now
- def deserialize(self, obj, event, state):
- self.setTime(obj, state)
+ def deserialize(self, event, state):
+ self.setTime(event.obj, state)
=== Products/Ape/lib/apelib/zodb3/storage.py 1.8 => 1.9 ===
--- Products/Ape/lib/apelib/zodb3/storage.py:1.8 Tue Sep 16 17:00:07 2003
+++ Products/Ape/lib/apelib/zodb3/storage.py Mon Feb 2 10:07:22 2004
@@ -11,7 +11,7 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-"""Storage implementation that loads/stores using a mapper.
+"""Storage implementation that loads/stores using Ape mappers.
$Id$
"""
@@ -23,30 +23,21 @@
from ZODB import POSException, BaseStorage
from apelib.core.io import GatewayIO
-from apelib.core.exceptions import NoStateFoundError, ConfigurationError
from consts import HASH0, HASH1, DEBUG
-from oidencoder import OIDEncoder
-from interfaces import IResourceAccess, IOIDEncoder
+from interfaces import IResourceAccess
class ApeStorage(BaseStorage.BaseStorage):
- def __init__(self, mapper_resource, connections,
- oid_encoder=None, name='', clear_all=0):
+ def __init__(self, conf_resource, connections, name='', clear_all=0):
"""Initializes an ApeStorage.
- mapper_resource is a resource for loading the mapper.
+ conf_resource is a resource for loading the IMapperConfiguration.
connections is a mapping that maps names to ITPCConnections.
- oid_encoder is an IOIDEncoder.
"""
- assert IResourceAccess.isImplementedBy(mapper_resource)
- self._mapper_resource = mapper_resource
- if oid_encoder is None:
- oid_encoder = OIDEncoder()
- else:
- assert IOIDEncoder.isImplementedBy(oid_encoder)
- self._oid_encoder = oid_encoder
- gwio = GatewayIO(mapper_resource.access(self), connections)
+ assert IResourceAccess.isImplementedBy(conf_resource)
+ self._conf_resource = conf_resource
+ gwio = GatewayIO(conf_resource.access(self), connections)
self._gwio = gwio
self._conn_list = gwio.getConnectionList()
gwio.openConnections()
@@ -60,12 +51,9 @@
if not name:
name = 'ApeStorage: ' + ', '.join(names)
self._ltid = None
- self._scanner = None
+ self.scanner = None
BaseStorage.BaseStorage.__init__(self, name)
- def setScanner(self, s):
- self._scanner = s
-
def __len__(self):
return 1
@@ -76,9 +64,6 @@
def sortKey(self):
return self._sort_key
- def getMapperResource(self):
- return self._mapper_resource
-
def initDatabases(self, clear_all=0):
self._gwio.initDatabases(clear_all=clear_all)
@@ -102,9 +87,8 @@
raise POSException.Unsupported, "Versions aren't supported"
self._lock_acquire()
try:
- self._mapper_resource.access(self) # Update mapper
- keychain = self._oid_encoder.decode(oid)
- event, classified_state, hash_value = self._gwio.load(keychain)
+ self._conf_resource.access(self) # Update configuration
+ event, classified_state, hash_value = self._gwio.load(oid)
file = StringIO()
p = Pickler(file)
p.dump(classified_state)
@@ -112,10 +96,9 @@
h = self.hash64(hash_value)
if DEBUG:
print 'loaded', `oid`, `h`
- if self._scanner is not None:
- gw = event.getMapper().getGateway()
- sources = gw.getSources(event)
- self._scanner.setSources(oid, sources)
+ if self.scanner is not None:
+ sources = event.mapper.gateway.getPollSources(event)
+ self.scanner.afterLoad(oid, sources)
return data, h
finally:
self._lock_release()
@@ -129,47 +112,37 @@
self._lock_acquire()
try:
- self._mapper_resource.access(self) # Update mapper
- keychain = self._oid_encoder.decode(oid)
+ self._conf_resource.access(self) # Update configuration
# First detect conflicts.
# The "h64" argument, if its value is not 0,
# was previously generated by hash64().
if DEBUG:
print 'storing', `oid`, `h64`
- if h64 != HASH0:
+ if h64 == HASH0:
+ # Writing a new object.
+ is_new = True
+ else:
# Overwriting an old object. Use the hash to verify
# that the new data was derived from the old data.
- event, old_cs, old_hash = self._gwio.load(keychain)
+ is_new = False
+ event, old_cs, old_hash = self._gwio.load(oid)
old_h64 = self.hash64(old_hash)
if h64 != old_h64:
raise POSException.ConflictError(
"Storing %s based on old data. %s != %s" % (
- repr(keychain),
+ repr(oid),
repr(h64), repr(old_h64)))
- else:
- # A new object. Attempts to load should lead to
- # NoStateFoundError or a hash of None, otherwise
- # there's a conflict.
- try:
- event, cs, old_hash = self._gwio.load(keychain)
- except NoStateFoundError:
- pass
- else:
- if old_hash is not None:
- raise POSException.ConflictError(
- "%s already exists" % repr(keychain))
# Now unpickle and store the data.
file = StringIO(data)
u = Unpickler(file)
classified_state = u.load()
- event, new_hash = self._gwio.store(keychain, classified_state)
+ event, new_hash = self._gwio.store(oid, classified_state, is_new)
new_h64 = self.hash64(new_hash)
- if self._scanner is not None:
- gw = event.getMapper().getGateway()
- sources = gw.getSources(event)
- self._scanner.setUncommittedSources(self._serial, oid, sources)
+ if self.scanner is not None:
+ sources = event.mapper.gateway.getPollSources(event)
+ self.scanner.afterStore(oid, self._serial, sources)
finally:
self._lock_release()
@@ -177,17 +150,15 @@
print 'stored', `oid`, `h64`, `new_h64`
return new_h64
- def getSources(self, oid):
- keychain = self._oid_encoder.decode(oid)
+ def getPollSources(self, oid):
self._lock_acquire()
try:
- return self._gwio.getSources(keychain)
+ return self._gwio.getPollSources(oid)
finally:
self._lock_release()
def new_oid(self):
- keychain = self._gwio.newKeychain()
- return self._oid_encoder.encode(keychain)
+ return self._gwio.new_oid()
def lastTransaction(self):
return self._ltid
@@ -198,8 +169,8 @@
def _abort(self):
for c in self._conn_list:
c.abort()
- if self._scanner is not None:
- self._scanner.afterAbort(self._serial)
+ if self.scanner is not None:
+ self.scanner.afterAbort(self._serial)
def _begin(self, tid, u, d, e):
for c in self._conn_list:
@@ -209,8 +180,8 @@
for c in self._conn_list:
c.finish()
self._ltid = self._serial
- if self._scanner is not None:
- self._scanner.afterCommit(self._serial)
+ if self.scanner is not None:
+ self.scanner.afterCommit(self._serial)
def _vote(self):
for c in self._conn_list:
@@ -227,5 +198,5 @@
def close(self):
for c in self._conn_list:
c.close()
- self._mapper_resource.release(self)
+ self._conf_resource.release(self)
=== Products/Ape/lib/apelib/zodb3/utils.py 1.2 => 1.3 ===
--- Products/Ape/lib/apelib/zodb3/utils.py:1.2 Tue Sep 16 16:56:42 2003
+++ Products/Ape/lib/apelib/zodb3/utils.py Mon Feb 2 10:07:22 2004
@@ -21,7 +21,7 @@
from types import StringType
-def copyOf(object):
+def copyOf(source):
"""Copies a ZODB object, loading subobjects as needed.
Re-ghostifies objects along the way to save memory.
@@ -57,7 +57,7 @@
stream = StringIO()
p = Pickler(stream, 1)
p.persistent_id = persistent_id
- p.dump(object)
+ p.dump(source)
if former_ghosts:
for g in former_ghosts:
del g._p_changed
=== Removed File Products/Ape/lib/apelib/zodb3/gateways.py ===
=== Removed File Products/Ape/lib/apelib/zodb3/oidencoder.py ===
More information about the Zope-CVS
mailing list