[Zodb-checkins] CVS: ZEO/ZEO - ClientCache.py:1.18 ClientStorage.py:1.35 Invalidator.py:1.6 StorageServer.py:1.30 start.py:1.26 trigger.py:1.3 zrpc.py:1.20

Jeremy Hylton jeremy@zope.com
Fri, 2 Nov 2001 15:54:18 -0500


Update of /cvs-repository/ZEO/ZEO
In directory cvs.zope.org:/tmp/cvs-serv14443

Modified Files:
	ClientCache.py ClientStorage.py Invalidator.py 
	StorageServer.py start.py trigger.py zrpc.py 
Log Message:
Merge changes from zeo-1_0-branch to the trunk.

The trunk now has the same code ZEO 1.0b5 plus a few minor changes.


=== ZEO/ZEO/ClientCache.py 1.17 => 1.18 ===
         try:
             self._f[self._current].close()
-        except OSError:
+        except (os.error, ValueError):
             pass
 
     def open(self):
@@ -373,6 +373,8 @@
                     self._f[current]=open(self._p[current],'w+b')
                 else:
                     # Temporary cache file:
+                    if self._f[current] is not None:
+                        self._f[current].close()
                     self._f[current] = tempfile.TemporaryFile(suffix='.zec')
                 self._f[current].write(magic)
                 self._pos=pos=4


=== ZEO/ZEO/ClientStorage.py 1.34 => 1.35 ===
 """Network ZODB storage client
 """
+
 __version__='$Revision$'[11:-2]
 
 import struct, time, os, socket, string, Sync, zrpc, ClientCache
@@ -168,17 +169,16 @@
         # Among other things, we know that our data methods won't get
         # called until after this call.
 
-        invalidator=Invalidator.Invalidator(
-            db.invalidate,
-            self._cache.invalidate)
+        self.invalidator = Invalidator.Invalidator(db.invalidate,
+                                                   self._cache.invalidate)
 
         def out_of_band_hook(
             code, args,
             get_hook={
-                'b': (invalidator.begin, 0),
-                'i': (invalidator.invalidate, 1),
-                'e': (invalidator.end, 0),
-                'I': (invalidator.Invalidate, 1),
+                'b': (self.invalidator.begin, 0),
+                'i': (self.invalidator.invalidate, 1),
+                'e': (self.invalidator.end, 0),
+                'I': (self.invalidator.Invalidate, 1),
                 'U': (self._commit_lock_release, 0),
                 's': (self._serials.append, 1),
                 'S': (self._info.update, 1),
@@ -307,8 +307,18 @@
         try:
             LOG("ClientStorage", INFO, "close")
             self._call.closeIntensionally()
+            try:
+                self._tfile.close()
+            except os.error:
+                # On Windows, this can fail if it is called more than
+                # once, because it tries to delete the file each
+                # time.
+                pass
             self._cache.close()
-            self.closed = 1 
+            if self.invalidator is not None:
+                self.invalidator.close()
+                self.invalidator = None
+            self.closed = 1
         finally: self._lock_release()
         
     def commitVersion(self, src, dest, transaction):
@@ -317,7 +327,6 @@
         self._lock_acquire()
         try:
             oids=self._call('commitVersion', src, dest, self._serial)
-            invalidate=self._cache.invalidate
             if dest:
                 vlen = pack(">H", len(src))
                 # just invalidate our version data
@@ -436,12 +445,17 @@
         
         finally: self._lock_release()
             
-        
-
-    def supportsUndo(self): return self._info['supportsUndo']
-    def supportsVersions(self): return self._info['supportsVersions']
+    def supportsUndo(self):
+        return self._info['supportsUndo']
+    
+    def supportsVersions(self):
+        return self._info['supportsVersions']
+    
     def supportsTransactionalUndo(self):
-        return self._info['supportsTransactionalUndo']
+        try:
+            return self._info['supportsTransactionalUndo']
+        except KeyError:
+            return 0
         
     def tpc_abort(self, transaction):
         self._lock_acquire()
@@ -522,7 +536,6 @@
             seek=tfile.seek
             read=tfile.read
             cache=self._cache
-            update=cache.update
             size=tfile.tell()
             cache.checkSize(size)
             seek(0)
@@ -543,9 +556,9 @@
                             "temporary file."
                             )
                     if s==ResolvedSerial:
-                        cache.invalidate(oid, v)
+                        self._cache.invalidate(oid, v)
                     else:
-                        update(oid, s, v, p)
+                        self._cache.update(oid, s, v, p)
                     i=i+15+vlen+dlen
                 elif opcode == "i":
                     oid=read(8)
@@ -578,7 +591,8 @@
         try:
             oids=self._call('undo', transaction_id)
             cinvalidate=self._cache.invalidate
-            for oid in oids: cinvalidate(oid,'')                
+            for oid in oids:
+                cinvalidate(oid,'')                
             return oids
         finally: self._lock_release()
 


=== ZEO/ZEO/Invalidator.py 1.5 => 1.6 ===
         self.cinvalidate=cinvalidate
 
+    def close(self):
+        self.dinvalidate = None
+        self.cinvalidate = None
+
     def begin(self):
         self._tfile=tempfile.TemporaryFile()
         pickler=cPickle.Pickler(self._tfile, 1)


=== ZEO/ZEO/StorageServer.py 1.29 => 1.30 ===
 max_blather=120
 def blather(*args):
-    m=string.join(map(str,args))
-    if len(m) > max_blather: m=m[:max_blather]+' ...'
+    accum = []
+    total_len = 0
+    for arg in args:
+        if not isinstance(arg, StringType):
+            arg = str(arg)
+        accum.append(arg)
+        total_len = total_len + len(arg)
+        if total_len >= max_blather:
+            break
+    m = string.join(accum)
+    if len(m) > max_blather: m = m[:max_blather] + ' ...'
     LOG('ZEO Server', TRACE, m)
 
 
@@ -121,11 +130,13 @@
     def __init__(self, connection, storages):
         
         self.__storages=storages
-        for n, s in storages.items(): init_storage(s)
+        for n, s in storages.items():
+            init_storage(s)
 
         self.__connections={}
         self.__get_connections=self.__connections.get
 
+        self._pack_trigger = trigger.trigger()
         asyncore.dispatcher.__init__(self)
 
         if type(connection) is type(''):
@@ -258,7 +269,12 @@
     def message_input(self, message,
                       dump=dump, Unpickler=Unpickler, StringIO=StringIO,
                       None=None):
-        if __debug__: blather('message_input', id(self), `message`)
+        if __debug__:
+            if len(message) > max_blather:
+                tmp = `message[:max_blather]`
+            else:
+                tmp = `message`
+            blather('message_input', id(self), tmp)
 
         if self.__storage is None:
             # This is the first communication from the client
@@ -276,7 +292,9 @@
             args=unpickler.load()
             
             name, args = args[0], args[1:]
-            if __debug__: blather('call %s: %s%s' % (id(self), name, `args`))
+            if __debug__:
+                apply(blather,
+                      ("call", id(self), ":", name,) + args)
                 
             if not storage_method(name):
                 raise 'Invalid Method Name', name
@@ -294,7 +312,8 @@
             self.return_error(sys.exc_info()[0], sys.exc_info()[1])
             return
 
-        if __debug__: blather("%s R: %s" % (id(self), `r`))
+        if __debug__:
+            blather("%s R: %s" % (id(self), `r`))
             
         r=dump(r,1)            
         self.message_output('R'+r)
@@ -303,7 +322,8 @@
         if type(err_value) is not type(self):
             err_value = err_type, err_value
 
-        if __debug__: blather("%s E: %s" % (id(self), `err_value`))
+        if __debug__:
+            blather("%s E: %s" % (id(self), `err_value`))
                     
         try: r=dump(err_value, 1)
         except:
@@ -396,11 +416,12 @@
                 error=sys.exc_info())
             if wait:
                 self.return_error(sys.exc_info()[0], sys.exc_info()[1])
-                self._pack_trigger.pull_trigger()
+                self.__server._pack_trigger.pull_trigger()
         else:
             if wait:
                 self.message_output('RN.')
-                self._pack_trigger.pull_trigger()
+                self.__server._pack_trigger.pull_trigger()
+
             else:
                 # Broadcast new size statistics
                 self.__server.invalidate(0, self.__storage_id, (),
@@ -582,6 +603,8 @@
         port='', int(port)
     except:
         pass
-    
-    StorageServer(port, ZODB.FileStorage.FileStorage(name))
+
+    d = {'1': ZODB.FileStorage.FileStorage(name)}
+    StorageServer(port, d)
     asyncwrap.loop()
+


=== ZEO/ZEO/start.py 1.25 => 1.26 ===
     for storage in storages.values():
         try: storage.close()
-        finally: pass
+        except: pass
 
     try:
         from zLOG import LOG, INFO


=== ZEO/ZEO/trigger.py 1.2 => 1.3 ===
 
         def __init__ (self):
-            r, w = os.pipe()
+            r, w = self._fds = os.pipe()
             self.trigger = w
             asyncore.file_dispatcher.__init__ (self, r)
             self.lock = thread.allocate_lock()
             self.thunks = []
+
+        def __del__(self):
+            os.close(self._fds[0])
+            os.close(self._fds[1])
 
         def __repr__ (self):
             return '<select-trigger (pipe) at %x>' % id(self)


=== ZEO/ZEO/zrpc.py 1.19 => 1.20 ===
 
     def finishConnect(self, s):
-        if self.__haveMainLoop: map=None # use the main loop map
-        else: map = {} # provide a dummy map
+        if self.__haveMainLoop:
+            map = None # use the main loop map
+        else:
+            map = {} # provide a dummy map
         SizedMessageAsyncConnection.__init__(self, s, '', map)
 
     # we are our own socket map!
@@ -221,12 +223,21 @@
                 if c=='R':
                     if r=='RN.': return None # Common case!
                     return loads(r[1:])
+                
+                # If c == 'E', an error occured on the server.  In
+                # this case, the return value is a pickled exception.
+                # Unpickle it and raise it on the client side.  The
+                # traceback for this exception ends at this method,
+                # but the real error occurred somewhere in the server
+                # code.  To diagnose the error, look for the real
+                # traceback in the server's zLOG output.
                 if c=='E':
                     try: r=loads(r[1:])
                     except:
                         raise UnUnPickleableError(r[1:])
-                    if type(r) is TupleType: raise r[0], r[1]
-                    raise r
+                    if type(r) is TupleType:
+                        raise r[0], r[1] # see server log for real traceback
+                    raise r 
                 oob=self._outOfBand
                 if oob is not None:
                     r=r[1:]
@@ -260,8 +271,10 @@
 
     def message_input(self, m):
         if self._debug:
-            md=`m`
-            if len(m) > 60: md=md[:60]+' ...'
+            if len(m) > 60:
+                md = repr(m[:60]) + ' ...'
+            else:
+                md = repr(m)
             LOG(self._debug, TRACE, 'message_input %s' % md)
 
         c=m[:1]
@@ -292,6 +305,7 @@
             self.__Wakeup(lambda self=self: self.close()) 
         else:
             self.close()
+            self._outOfBand = None
             self.__closed = 1
         
     def close(self):
@@ -299,6 +313,8 @@
         self.aq_parent.notifyDisconnected(self)
         # causes read call to raise last exception, which should be
         # the socket error that caused asyncore to close the socket.
-        self.__r='E'+dump(sys.exc_info()[:2], 1)
-        try: self.__lr()
-        except: pass
+        self.__r = 'E' + dump(sys.exc_info()[:2], 1)
+        try:
+            self.__lr()
+        except:
+            pass