[Zope-CVS] CVS: Products/Ape/lib/apelib/zodb3 - connection.py:1.9
scanner.py:1.5 storage.py:1.11
Shane Hathaway
shane at zope.com
Sat Feb 28 15:06:59 EST 2004
Update of /cvs-repository/Products/Ape/lib/apelib/zodb3
In directory cvs.zope.org:/tmp/cvs-serv30293/lib/apelib/zodb3
Modified Files:
connection.py scanner.py storage.py
Log Message:
Merged ape-fs-oid-branch.
Ape now uses arbitrary OIDs on the filesystem, rather than using paths
as OIDs. This solves problems with moving and replacing objects and
further unifies SQL and filesystem databases.
=== Products/Ape/lib/apelib/zodb3/connection.py 1.8 => 1.9 ===
--- Products/Ape/lib/apelib/zodb3/connection.py:1.8 Tue Feb 17 00:25:13 2004
+++ Products/Ape/lib/apelib/zodb3/connection.py Sat Feb 28 15:06:28 2004
@@ -139,6 +139,7 @@
obj._p_oid=oid
obj._p_jar=self
obj._p_changed=None
+ self.setSerial(obj, serial)
self._cache[oid] = obj
@@ -261,8 +262,10 @@
ext_refs = event.external
if ext_refs:
for (ext_oid, ext_ref) in ext_refs:
- if self.getSerial(ext_ref) == HASH0:
- # New object
+ assert ext_oid
+ assert ext_ref is not None
+ if self._cache.get(ext_oid, None) is not ext_ref:
+ # New object or a bad reference
if ext_ref._p_jar is not None:
if ext_ref._p_jar is not self:
raise InvalidObjectReference, (
@@ -293,14 +296,6 @@
# response, just in case the response contains the
# serial number for a newly created object
try: cache[oid] = obj
- except ValueError:
- # "Cannot re-register an object under a different
- # oid". This can happen when the user is working on
- # the filesystem and creates an object with an ID that
- # was used recently. Try to fix it by minimizing
- # the cache and trying again.
- cache.minimize()
- cache[oid] = obj
except:
if aq_base(obj) is not obj:
# Yuck, someone tried to store a wrapper. Try to
=== Products/Ape/lib/apelib/zodb3/scanner.py 1.4 => 1.5 ===
--- Products/Ape/lib/apelib/zodb3/scanner.py:1.4 Tue Feb 17 00:25:13 2004
+++ Products/Ape/lib/apelib/zodb3/scanner.py Sat Feb 28 15:06:28 2004
@@ -157,7 +157,6 @@
def __init__(self):
self.current = OOBTree() # OOBTree({ oid -> {source->state} })
self.future = {} # { oid -> ({source->state}, atime) }
- self.uncommitted = {} # { tid -> {oid->{source->state}} }
self.lock = allocate_lock()
self.storage = None
@@ -230,17 +229,6 @@
self.lock.release()
- def afterStore(self, oid, tid, sources):
- """Called by the storage after an object is stored (but not committed.)
- """
- self.lock.acquire()
- try:
- t = self.uncommitted.setdefault(tid, {})
- t[oid] = sources
- finally:
- self.lock.release()
-
-
def scan(self):
"""Scan sources, returning the OIDs of changed objects.
"""
@@ -291,53 +279,9 @@
'Future sources cache size: %d objects.' % len(self.future))
- def afterCommit(self, tid):
- """Commits information recorded by setUncommittedSources().
- """
- self.lock.acquire()
- try:
- if not self.uncommitted.has_key(tid):
- return
- t = self.uncommitted[tid]
- del self.uncommitted[tid]
- finally:
- self.lock.release()
- # Update the sources with new states for the committed OIDs.
- to_scan = {} # { repo -> { source -> state } }
- for oid, sources in t.items():
- if sources:
- for source, state in sources.items():
- repo, location = source
- to_scan.setdefault(repo, {})[source] = state
- changes = {}
- for repo, d in to_scan.items():
- c = repo.poll(d)
- if c:
- changes.update(c)
- self.lock.acquire()
- try:
- now = time()
- for oid, sources in t.items():
- new_sources = {}
- if sources:
- for source, state in sources.items():
- state = changes.get(source, state)
- new_sources[source] = state
- if self.current.has_key(oid):
- self.current[oid] = new_sources
- else:
- self.future[oid] = (new_sources, now)
- finally:
- self.lock.release()
-
-
- def afterAbort(self, tid):
- """Aborts information recorded by setUncommittedSources().
+ def afterCommit(self, oid, sources):
+ """Records changes to sources after commit..
"""
- self.lock.acquire()
- try:
- if self.uncommitted.has_key(tid):
- del self.uncommitted[tid]
- finally:
- self.lock.release()
-
+ self.current[oid] = sources
+ if self.future.has_key(oid):
+ del self.future[oid]
=== Products/Ape/lib/apelib/zodb3/storage.py 1.10 => 1.11 ===
--- Products/Ape/lib/apelib/zodb3/storage.py:1.10 Tue Feb 17 00:25:13 2004
+++ Products/Ape/lib/apelib/zodb3/storage.py Sat Feb 28 15:06:28 2004
@@ -52,6 +52,9 @@
name = 'ApeStorage: ' + ', '.join(names)
self._ltid = None
self.scanner = None
+ self.changed = {} # {tid: {oid: 1}}
+ if DEBUG:
+ self._loaded_hashes = {} # {oid: hash}
BaseStorage.BaseStorage.__init__(self, name)
def __len__(self):
@@ -78,8 +81,6 @@
if h == HASH0:
# Avoid the special zero hash.
h = HASH1
- if DEBUG:
- print '64-bit hash of %r is %r' % (value, h)
return h
def load(self, oid, version):
@@ -96,7 +97,7 @@
data = file.getvalue()
h = self.hash64(hash_value)
if DEBUG:
- print 'loaded', `oid`, `h`
+ self._loaded_hashes[oid] = hash_value
if self.scanner is not None:
sources = event.mapper.gateway.getPollSources(event)
self.scanner.afterLoad(oid, sources)
@@ -118,8 +119,6 @@
# First detect conflicts.
# The "h64" argument, if its value is not 0,
# was previously generated by hash64().
- if DEBUG:
- print 'storing', `oid`, `h64`
if h64 == HASH0:
# Writing a new object.
is_new = True
@@ -130,9 +129,15 @@
event, old_c, old_state, old_hash = self._gwio.load(oid)
old_h64 = self.hash64(old_hash)
if h64 != old_h64:
+ h = None
+ if DEBUG:
+ h = self._loaded_hashes.get(oid)
+ if h is None:
+ h = h64
+ old_hash = old_h64
raise POSException.ConflictError(
"Storing %s based on old data. %s != %s." % (
- repr(oid), repr(h64), repr(old_h64)))
+ repr(oid), repr(h), repr(old_hash)))
# Now unpickle and store the data.
file = StringIO(data)
@@ -142,14 +147,16 @@
event, new_hash = self._gwio.store(
oid, classification, state, is_new)
new_h64 = self.hash64(new_hash)
- if self.scanner is not None:
- sources = event.mapper.gateway.getPollSources(event)
- self.scanner.afterStore(oid, self._serial, sources)
+
+ # Remember that this OID changed (for scanning)
+ t = self.changed.get(self._serial)
+ if t is None:
+ t = {}
+ self.changed[self._serial] = t
+ t[oid] = 1
finally:
self._lock_release()
- if DEBUG:
- print 'stored', `oid`, `h64`, `new_h64`
return new_h64
def getPollSources(self, oid):
@@ -171,8 +178,8 @@
def _abort(self):
for c in self._conn_list:
c.abort()
- if self.scanner is not None:
- self.scanner.afterAbort(self._serial)
+ if self.changed.has_key(self._serial):
+ del self.changed[self._serial]
def _begin(self, tid, u, d, e):
for c in self._conn_list:
@@ -182,8 +189,13 @@
for c in self._conn_list:
c.finish()
self._ltid = self._serial
- if self.scanner is not None:
- self.scanner.afterCommit(self._serial)
+ if self.changed.has_key(self._serial):
+ oids = self.changed[self._serial]
+ del self.changed[self._serial]
+ if self.scanner:
+ for oid in oids:
+ sources = self._gwio.getPollSources(oid)
+ self.scanner.afterCommit(oid, sources)
def _vote(self):
for c in self._conn_list:
@@ -201,4 +213,3 @@
for c in self._conn_list:
c.close()
self.conf_resource.release(self)
-
More information about the Zope-CVS
mailing list