[Zodb-checkins] CVS: ZEO/ZEO - ClientCache.py:1.18 ClientStorage.py:1.35 Invalidator.py:1.6 StorageServer.py:1.30 start.py:1.26 trigger.py:1.3 zrpc.py:1.20
Jeremy Hylton
jeremy@zope.com
Fri, 2 Nov 2001 15:54:18 -0500
Update of /cvs-repository/ZEO/ZEO
In directory cvs.zope.org:/tmp/cvs-serv14443
Modified Files:
ClientCache.py ClientStorage.py Invalidator.py
StorageServer.py start.py trigger.py zrpc.py
Log Message:
Merge changes from zeo-1_0-branch to the trunk.
The trunk now has the same code ZEO 1.0b5 plus a few minor changes.
=== ZEO/ZEO/ClientCache.py 1.17 => 1.18 ===
try:
self._f[self._current].close()
- except OSError:
+ except (os.error, ValueError):
pass
def open(self):
@@ -373,6 +373,8 @@
self._f[current]=open(self._p[current],'w+b')
else:
# Temporary cache file:
+ if self._f[current] is not None:
+ self._f[current].close()
self._f[current] = tempfile.TemporaryFile(suffix='.zec')
self._f[current].write(magic)
self._pos=pos=4
=== ZEO/ZEO/ClientStorage.py 1.34 => 1.35 ===
"""Network ZODB storage client
"""
+
__version__='$Revision$'[11:-2]
import struct, time, os, socket, string, Sync, zrpc, ClientCache
@@ -168,17 +169,16 @@
# Among other things, we know that our data methods won't get
# called until after this call.
- invalidator=Invalidator.Invalidator(
- db.invalidate,
- self._cache.invalidate)
+ self.invalidator = Invalidator.Invalidator(db.invalidate,
+ self._cache.invalidate)
def out_of_band_hook(
code, args,
get_hook={
- 'b': (invalidator.begin, 0),
- 'i': (invalidator.invalidate, 1),
- 'e': (invalidator.end, 0),
- 'I': (invalidator.Invalidate, 1),
+ 'b': (self.invalidator.begin, 0),
+ 'i': (self.invalidator.invalidate, 1),
+ 'e': (self.invalidator.end, 0),
+ 'I': (self.invalidator.Invalidate, 1),
'U': (self._commit_lock_release, 0),
's': (self._serials.append, 1),
'S': (self._info.update, 1),
@@ -307,8 +307,18 @@
try:
LOG("ClientStorage", INFO, "close")
self._call.closeIntensionally()
+ try:
+ self._tfile.close()
+ except os.error:
+ # On Windows, this can fail if it is called more than
+ # once, because it tries to delete the file each
+ # time.
+ pass
self._cache.close()
- self.closed = 1
+ if self.invalidator is not None:
+ self.invalidator.close()
+ self.invalidator = None
+ self.closed = 1
finally: self._lock_release()
def commitVersion(self, src, dest, transaction):
@@ -317,7 +327,6 @@
self._lock_acquire()
try:
oids=self._call('commitVersion', src, dest, self._serial)
- invalidate=self._cache.invalidate
if dest:
vlen = pack(">H", len(src))
# just invalidate our version data
@@ -436,12 +445,17 @@
finally: self._lock_release()
-
-
- def supportsUndo(self): return self._info['supportsUndo']
- def supportsVersions(self): return self._info['supportsVersions']
+ def supportsUndo(self):
+ return self._info['supportsUndo']
+
+ def supportsVersions(self):
+ return self._info['supportsVersions']
+
def supportsTransactionalUndo(self):
- return self._info['supportsTransactionalUndo']
+ try:
+ return self._info['supportsTransactionalUndo']
+ except KeyError:
+ return 0
def tpc_abort(self, transaction):
self._lock_acquire()
@@ -522,7 +536,6 @@
seek=tfile.seek
read=tfile.read
cache=self._cache
- update=cache.update
size=tfile.tell()
cache.checkSize(size)
seek(0)
@@ -543,9 +556,9 @@
"temporary file."
)
if s==ResolvedSerial:
- cache.invalidate(oid, v)
+ self._cache.invalidate(oid, v)
else:
- update(oid, s, v, p)
+ self._cache.update(oid, s, v, p)
i=i+15+vlen+dlen
elif opcode == "i":
oid=read(8)
@@ -578,7 +591,8 @@
try:
oids=self._call('undo', transaction_id)
cinvalidate=self._cache.invalidate
- for oid in oids: cinvalidate(oid,'')
+ for oid in oids:
+ cinvalidate(oid,'')
return oids
finally: self._lock_release()
=== ZEO/ZEO/Invalidator.py 1.5 => 1.6 ===
self.cinvalidate=cinvalidate
+ def close(self):
+ self.dinvalidate = None
+ self.cinvalidate = None
+
def begin(self):
self._tfile=tempfile.TemporaryFile()
pickler=cPickle.Pickler(self._tfile, 1)
=== ZEO/ZEO/StorageServer.py 1.29 => 1.30 ===
max_blather=120
def blather(*args):
- m=string.join(map(str,args))
- if len(m) > max_blather: m=m[:max_blather]+' ...'
+ accum = []
+ total_len = 0
+ for arg in args:
+ if not isinstance(arg, StringType):
+ arg = str(arg)
+ accum.append(arg)
+ total_len = total_len + len(arg)
+ if total_len >= max_blather:
+ break
+ m = string.join(accum)
+ if len(m) > max_blather: m = m[:max_blather] + ' ...'
LOG('ZEO Server', TRACE, m)
@@ -121,11 +130,13 @@
def __init__(self, connection, storages):
self.__storages=storages
- for n, s in storages.items(): init_storage(s)
+ for n, s in storages.items():
+ init_storage(s)
self.__connections={}
self.__get_connections=self.__connections.get
+ self._pack_trigger = trigger.trigger()
asyncore.dispatcher.__init__(self)
if type(connection) is type(''):
@@ -258,7 +269,12 @@
def message_input(self, message,
dump=dump, Unpickler=Unpickler, StringIO=StringIO,
None=None):
- if __debug__: blather('message_input', id(self), `message`)
+ if __debug__:
+ if len(message) > max_blather:
+ tmp = `message[:max_blather]`
+ else:
+ tmp = `message`
+ blather('message_input', id(self), tmp)
if self.__storage is None:
# This is the first communication from the client
@@ -276,7 +292,9 @@
args=unpickler.load()
name, args = args[0], args[1:]
- if __debug__: blather('call %s: %s%s' % (id(self), name, `args`))
+ if __debug__:
+ apply(blather,
+ ("call", id(self), ":", name,) + args)
if not storage_method(name):
raise 'Invalid Method Name', name
@@ -294,7 +312,8 @@
self.return_error(sys.exc_info()[0], sys.exc_info()[1])
return
- if __debug__: blather("%s R: %s" % (id(self), `r`))
+ if __debug__:
+ blather("%s R: %s" % (id(self), `r`))
r=dump(r,1)
self.message_output('R'+r)
@@ -303,7 +322,8 @@
if type(err_value) is not type(self):
err_value = err_type, err_value
- if __debug__: blather("%s E: %s" % (id(self), `err_value`))
+ if __debug__:
+ blather("%s E: %s" % (id(self), `err_value`))
try: r=dump(err_value, 1)
except:
@@ -396,11 +416,12 @@
error=sys.exc_info())
if wait:
self.return_error(sys.exc_info()[0], sys.exc_info()[1])
- self._pack_trigger.pull_trigger()
+ self.__server._pack_trigger.pull_trigger()
else:
if wait:
self.message_output('RN.')
- self._pack_trigger.pull_trigger()
+ self.__server._pack_trigger.pull_trigger()
+
else:
# Broadcast new size statistics
self.__server.invalidate(0, self.__storage_id, (),
@@ -582,6 +603,8 @@
port='', int(port)
except:
pass
-
- StorageServer(port, ZODB.FileStorage.FileStorage(name))
+
+ d = {'1': ZODB.FileStorage.FileStorage(name)}
+ StorageServer(port, d)
asyncwrap.loop()
+
=== ZEO/ZEO/start.py 1.25 => 1.26 ===
for storage in storages.values():
try: storage.close()
- finally: pass
+ except: pass
try:
from zLOG import LOG, INFO
=== ZEO/ZEO/trigger.py 1.2 => 1.3 ===
def __init__ (self):
- r, w = os.pipe()
+ r, w = self._fds = os.pipe()
self.trigger = w
asyncore.file_dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock()
self.thunks = []
+
+ def __del__(self):
+ os.close(self._fds[0])
+ os.close(self._fds[1])
def __repr__ (self):
return '<select-trigger (pipe) at %x>' % id(self)
=== ZEO/ZEO/zrpc.py 1.19 => 1.20 ===
def finishConnect(self, s):
- if self.__haveMainLoop: map=None # use the main loop map
- else: map = {} # provide a dummy map
+ if self.__haveMainLoop:
+ map = None # use the main loop map
+ else:
+ map = {} # provide a dummy map
SizedMessageAsyncConnection.__init__(self, s, '', map)
# we are our own socket map!
@@ -221,12 +223,21 @@
if c=='R':
if r=='RN.': return None # Common case!
return loads(r[1:])
+
+ # If c == 'E', an error occured on the server. In
+ # this case, the return value is a pickled exception.
+ # Unpickle it and raise it on the client side. The
+ # traceback for this exception ends at this method,
+ # but the real error occurred somewhere in the server
+ # code. To diagnose the error, look for the real
+ # traceback in the server's zLOG output.
if c=='E':
try: r=loads(r[1:])
except:
raise UnUnPickleableError(r[1:])
- if type(r) is TupleType: raise r[0], r[1]
- raise r
+ if type(r) is TupleType:
+ raise r[0], r[1] # see server log for real traceback
+ raise r
oob=self._outOfBand
if oob is not None:
r=r[1:]
@@ -260,8 +271,10 @@
def message_input(self, m):
if self._debug:
- md=`m`
- if len(m) > 60: md=md[:60]+' ...'
+ if len(m) > 60:
+ md = repr(m[:60]) + ' ...'
+ else:
+ md = repr(m)
LOG(self._debug, TRACE, 'message_input %s' % md)
c=m[:1]
@@ -292,6 +305,7 @@
self.__Wakeup(lambda self=self: self.close())
else:
self.close()
+ self._outOfBand = None
self.__closed = 1
def close(self):
@@ -299,6 +313,8 @@
self.aq_parent.notifyDisconnected(self)
# causes read call to raise last exception, which should be
# the socket error that caused asyncore to close the socket.
- self.__r='E'+dump(sys.exc_info()[:2], 1)
- try: self.__lr()
- except: pass
+ self.__r = 'E' + dump(sys.exc_info()[:2], 1)
+ try:
+ self.__lr()
+ except:
+ pass