[Zodb-checkins] CVS: ZODB3/ZEO - runzeo.py:1.15.6.2
StorageServer.py:1.98.4.1 CommitLog.py:1.4.72.1
ClientStorage.py:1.106.4.2 ClientCache.py:1.47.4.1
Jeremy Hylton
jeremy at zope.com
Mon Sep 15 14:03:41 EDT 2003
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv13599/ZEO
Modified Files:
Tag: Zope-2_7-branch
runzeo.py StorageServer.py CommitLog.py ClientStorage.py
ClientCache.py
Log Message:
Take two: Merge changes from ZODB3-3_2-branch to Zope-2_7-branch.
Please make all future changes on the Zope-2_7-branch instead.
The previous attempt used "cvs up -j ZODB3-3_2-branch", but appeared
to get only a small fraction of the changes. This attempt is based on
copying a checkout of ZODB3-3_2-branch over top of a checkout of
Zope-2_7-branch.
=== ZODB3/ZEO/runzeo.py 1.15.6.1 => 1.15.6.2 ===
--- ZODB3/ZEO/runzeo.py:1.15.6.1 Mon Jul 21 12:37:12 2003
+++ ZODB3/ZEO/runzeo.py Mon Sep 15 14:02:59 2003
@@ -196,7 +196,7 @@
transaction_timeout=self.options.transaction_timeout,
monitor_address=self.options.monitor_address,
auth_protocol=self.options.auth_protocol,
- auth_filename=self.options.auth_database, # XXX option spelling
+ auth_database=self.options.auth_database,
auth_realm=self.options.auth_realm)
def loop_forever(self):
=== ZODB3/ZEO/StorageServer.py 1.98 => 1.98.4.1 ===
--- ZODB3/ZEO/StorageServer.py:1.98 Fri Jun 13 15:50:05 2003
+++ ZODB3/ZEO/StorageServer.py Mon Sep 15 14:02:59 2003
@@ -82,6 +82,7 @@
self.read_only = read_only
self.locked = 0
self.verifying = 0
+ self.store_failed = 0
self.log_label = _label
self.authenticated = 0
self.auth_realm = auth_realm
@@ -367,6 +368,7 @@
self.txnlog = CommitLog()
self.tid = tid
self.status = status
+ self.store_failed = 0
self.stats.active_txns += 1
def tpc_finish(self, id):
@@ -401,9 +403,9 @@
self.timeout.end(self)
self.stats.lock_time = None
self.log("Transaction released storage lock")
- # _handle_waiting() can start another transaction (by
- # restarting a waiting one) so must be done last
- self._handle_waiting()
+ # _handle_waiting() can start another transaction (by
+ # restarting a waiting one) so must be done last
+ self._handle_waiting()
def _abort(self):
# called when a connection is closed unexpectedly
@@ -471,12 +473,14 @@
self.storage.tpc_begin(txn, tid, status)
def _store(self, oid, serial, data, version):
+ err = None
try:
newserial = self.storage.store(oid, serial, data, version,
self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception, err:
+ self.store_failed = 1
if isinstance(err, ConflictError):
self.stats.conflicts += 1
if not isinstance(err, TransactionError):
@@ -503,9 +507,15 @@
if newserial == ResolvedSerial:
self.stats.conflicts_resolved += 1
self.serials.append((oid, newserial))
+ return err is None
def _vote(self):
self.client.serialnos(self.serials)
+ # If a store call failed, then return to the client immediately.
+ # The serialnos() call will deliver an exception that will be
+ # handled by the client in its tpc_vote() method.
+ if self.store_failed:
+ return
return self.storage.tpc_vote(self.transaction)
def _abortVersion(self, src):
@@ -554,11 +564,18 @@
def _restart(self, delay=None):
# Restart when the storage lock is available.
+ if self.txnlog.stores == 1:
+ template = "Preparing to commit transaction: %d object, %d bytes"
+ else:
+ template = "Preparing to commit transaction: %d objects, %d bytes"
+ self.log(template % (self.txnlog.stores, self.txnlog.size()),
+ level=zLOG.BLATHER)
self._tpc_begin(self.transaction, self.tid, self.status)
loads, loader = self.txnlog.get_loader()
for i in range(loads):
# load oid, serial, data, version
- self._store(*loader.load())
+ if not self._store(*loader.load()):
+ break
resp = self._thunk()
if delay is not None:
delay.reply(resp)
@@ -612,7 +629,7 @@
transaction_timeout=None,
monitor_address=None,
auth_protocol=None,
- auth_filename=None,
+ auth_database=None,
auth_realm=None):
"""StorageServer constructor.
@@ -659,7 +676,7 @@
auth_protocol -- The name of the authentication protocol to use.
Examples are "digest" and "srp".
- auth_filename -- The name of the password database filename.
+ auth_database -- The name of the password database filename.
It should be in a format compatible with the authentication
protocol used; for instance, "sha" and "srp" require different
formats.
@@ -685,7 +702,7 @@
s._waiting = []
self.read_only = read_only
self.auth_protocol = auth_protocol
- self.auth_filename = auth_filename
+ self.auth_database = auth_database
self.auth_realm = auth_realm
self.database = None
if auth_protocol:
@@ -739,7 +756,7 @@
# storages, avoiding the need to bloat each with a new authenticator
# Database that would contain the same info, and also avoiding any
# possibly synchronization issues between them.
- self.database = db_class(self.auth_filename)
+ self.database = db_class(self.auth_database)
if self.database.realm != self.auth_realm:
raise ValueError("password database realm %r "
"does not match storage realm %r"
=== ZODB3/ZEO/CommitLog.py 1.4 => 1.4.72.1 ===
--- ZODB3/ZEO/CommitLog.py:1.4 Thu Aug 29 15:00:21 2002
+++ ZODB3/ZEO/CommitLog.py Mon Sep 15 14:02:59 2003
@@ -31,6 +31,9 @@
self.stores = 0
self.read = 0
+ def size(self):
+ return self.file.tell()
+
def store(self, oid, serial, data, version):
self.pickler.dump((oid, serial, data, version))
self.stores += 1
=== ZODB3/ZEO/ClientStorage.py 1.106.4.1 => 1.106.4.2 ===
--- ZODB3/ZEO/ClientStorage.py:1.106.4.1 Mon Jul 21 12:37:12 2003
+++ ZODB3/ZEO/ClientStorage.py Mon Sep 15 14:02:59 2003
@@ -56,7 +56,7 @@
the argument.
"""
t = time.time()
- t = TimeStamp(*time.gmtime(t)[:5] + (t % 60,))
+ t = apply(TimeStamp, (time.gmtime(t)[:5] + (t % 60,)))
if prev_ts is not None:
t = t.laterThan(prev_ts)
return t
@@ -78,7 +78,7 @@
MB = 1024**2
-class ClientStorage:
+class ClientStorage(object):
"""A Storage class that is a network client to a remote storage.
@@ -129,6 +129,7 @@
client -- A name used to construct persistent cache filenames.
Defaults to None, in which case the cache is not persistent.
+ See ClientCache for more info.
debug -- Ignored. This is present only for backwards
compatibility with ZEO 1.
@@ -232,6 +233,11 @@
self._username = username
self._password = password
self._realm = realm
+
+ # Flag tracking disconnections in the middle of a transaction. This
+ # is reset in tpc_begin() and set in notifyDisconnected().
+ self._midtxn_disconnect = 0
+
# _server_addr is used by sortKey()
self._server_addr = None
self._tfile = None
@@ -514,7 +520,7 @@
if self._server_addr is None:
raise ClientDisconnected
else:
- return self._server_addr
+ return '%s:%s' % (self._storage, self._server_addr)
def verify_cache(self, server):
"""Internal routine called to verify the cache.
@@ -583,6 +589,7 @@
self._connection = None
self._ready.clear()
self._server = disconnected_stub
+ self._midtxn_disconnect = 1
def __len__(self):
"""Return the size of the storage."""
@@ -821,6 +828,7 @@
if self._is_read_only:
raise POSException.ReadOnlyError()
self._tpc_cond.acquire()
+ self._midtxn_disconnect = 0
while self._transaction is not None:
# It is allowable for a client to call two tpc_begins in a
# row with the same transaction, and the second of these
@@ -891,6 +899,12 @@
return
self._load_lock.acquire()
try:
+ if self._midtxn_disconnect:
+ raise ClientDisconnected(
+ 'Calling tpc_finish() on a disconnected transaction')
+
+ tid = self._server.tpc_finish(self._serial)
+
self._lock.acquire() # for atomic processing of invalidations
try:
self._update_cache()
@@ -898,8 +912,6 @@
f()
finally:
self._lock.release()
-
- tid = self._server.tpc_finish(self._serial)
self._cache.setLastTid(tid)
r = self._check_serials()
=== ZODB3/ZEO/ClientCache.py 1.47 => 1.47.4.1 ===
--- ZODB3/ZEO/ClientCache.py:1.47 Mon Jun 16 14:27:51 2003
+++ ZODB3/ZEO/ClientCache.py Mon Sep 15 14:02:59 2003
@@ -367,8 +367,14 @@
data = read(dlen)
self._trace(0x2A, oid, version, h[19:], dlen)
if (p < 0) != self._current:
+ # If the cache read we are copying has version info,
+ # we need to pass the header to copytocurrent().
+ if vlen:
+ vheader = read(vlen + 4)
+ else:
+ vheader = None
self._copytocurrent(ap, oidlen, tlen, dlen, vlen, h,
- oid, data)
+ oid, data, vheader)
return data, h[19:]
else:
self._trace(0x26, oid, version)
@@ -412,12 +418,13 @@
"""
if self._pos + tlen > self._limit:
return # Don't let this cause a cache flip
- assert len(header) == 27
+ assert len(header) == 27, len(header)
if header[8] == 'n':
# Rewrite the header to drop the version data.
# This shortens the record.
tlen = 31 + oidlen + dlen
vlen = 0
+ vheader = None
# (oidlen:2, reserved:6, status:1, tlen:4,
# vlen:2, dlen:4, serial:8)
header = header[:9] + pack(">IHI", tlen, vlen, dlen) + header[-8:]
@@ -446,7 +453,8 @@
l.append(vdata)
l.append(vserial)
else:
- assert None is vheader is vdata is vserial
+ assert None is vheader is vdata is vserial, (
+ vlen, vheader, vdata, vserial)
l.append(header[9:13]) # copy of tlen
g = self._f[self._current]
g.seek(self._pos)
More information about the Zodb-checkins
mailing list