[Zodb-checkins] CVS: StandaloneZODB/ZEO - ClientStub.py:1.2 Exceptions.py:1.2 ServerStub.py:1.2 TransactionBuffer.py:1.2 zrpc2.py:1.2 ClientCache.py:1.19 ClientStorage.py:1.36 StorageServer.py:1.33 smac.py:1.12 start.py:1.27
Jeremy Hylton
jeremy@zope.com
Fri, 11 Jan 2002 14:33:20 -0500
Update of /cvs-repository/StandaloneZODB/ZEO
In directory cvs.zope.org:/tmp/cvs-serv30669
Modified Files:
ClientCache.py ClientStorage.py StorageServer.py smac.py
start.py
Added Files:
ClientStub.py Exceptions.py ServerStub.py TransactionBuffer.py
zrpc2.py
Log Message:
Changes from the ZEO-ZRPC-Dev branch merge.
=== StandaloneZODB/ZEO/ClientStub.py 1.1 => 1.2 ===
+
+class ClientStorage:
+ def __init__(self, rpc):
+ self.rpc = rpc
+
+ def beginVerify(self):
+ self.rpc.callAsync('begin')
+
+ # XXX what's the difference between these two?
+
+ def invalidate(self, args):
+ self.rpc.callAsync('invalidate', args)
+
+ def Invalidate(self, args):
+ self.rpc.callAsync('Invalidate', args)
+
+ def endVerify(self):
+ self.rpc.callAsync('end')
+
+ def serialno(self, arg):
+ self.rpc.callAsync('serialno', arg)
+
+ def info(self, arg):
+ self.rpc.callAsync('info', arg)
=== StandaloneZODB/ZEO/Exceptions.py 1.1 => 1.2 ===
+
+class Disconnected(Exception):
+ """Exception raised when a ZEO client is disconnected from the
+ ZEO server."""
=== StandaloneZODB/ZEO/ServerStub.py 1.1 => 1.2 ===
+
+class StorageServer:
+
+ def __init__(self, rpc):
+ self.rpc = rpc
+
+ def register(self, storage_name, read_only):
+ self.rpc.call('register', storage_name, read_only)
+
+ def get_info(self):
+ return self.rpc.call('get_info')
+
+ def get_size_info(self):
+ return self.rpc.call('get_size_info')
+
+ def beginZeoVerify(self):
+ self.rpc.callAsync('beginZeoVerify')
+
+ def zeoVerify(self, oid, s, sv):
+ self.rpc.callAsync('zeoVerify', oid, s, sv)
+
+ def endZeoVerify(self):
+ self.rpc.callAsync('endZeoVerify')
+
+ def new_oids(self, n=None):
+ if n is None:
+ return self.rpc.call('new_oids')
+ else:
+ return self.rpc.call('new_oids', n)
+
+ def pack(self, t, wait=None):
+ if wait is None:
+ self.rpc.call('pack', t)
+ else:
+ self.rpc.call('pack', t, wait)
+
+ def zeoLoad(self, oid):
+ return self.rpc.call('zeoLoad', oid)
+
+ def storea(self, oid, serial, data, version, id):
+ self.rpc.callAsync('storea', oid, serial, data, version, id)
+
+ def tpc_begin(self, id, user, descr, ext):
+ return self.rpc.call('tpc_begin', id, user, descr, ext)
+
+ def vote(self, trans_id):
+ return self.rpc.call('vote', trans_id)
+
+ def tpc_finish(self, id):
+ return self.rpc.call('tpc_finish', id)
+
+ def tpc_abort(self, id):
+ self.rpc.callAsync('tpc_abort', id)
+
+ def abortVersion(self, src, id):
+ return self.rpc.call('abortVersion', src, id)
+
+ def commitVersion(self, src, dest, id):
+ return self.rpc.call('commitVersion', src, dest, id)
+
+ def history(self, oid, version, length=None):
+ if length is not None:
+ return self.rpc.call('history', oid, version)
+ else:
+ return self.rpc.call('history', oid, version, length)
+
+ def load(self, oid, version):
+ return self.rpc.call('load', oid, version)
+
+ def loadSerial(self, oid, serial):
+ return self.rpc.call('loadSerial', oid, serial)
+
+ def modifiedInVersion(self, oid):
+ return self.rpc.call('modifiedInVersion', oid)
+
+ def new_oid(self, last=None):
+ if last is None:
+ return self.rpc.call('new_oid')
+ else:
+ return self.rpc.call('new_oid', last)
+
+ def store(self, oid, serial, data, version, trans):
+ return self.rpc.call('store', oid, serial, data, version, trans)
+
+ def transactionalUndo(self, trans_id, trans):
+ return self.rpc.call('transactionalUndo', trans_id, trans)
+
+ def undo(self, trans_id):
+ return self.rpc.call('undo', trans_id)
+
+ def undoLog(self, first, last):
+ # XXX filter not allowed across RPC
+ return self.rpc.call('undoLog', first, last)
+
+ def undoInfo(self, first, last, spec):
+ return self.rpc.call('undoInfo', first, last, spec)
+
+ def versionEmpty(self, vers):
+ return self.rpc.call('versionEmpty', vers)
+
+ def versions(self, max=None):
+ if max is None:
+ return self.rpc.call('versions')
+ else:
+ return self.rpc.call('versions', max)
+
+
=== StandaloneZODB/ZEO/TransactionBuffer.py 1.1 => 1.2 ===
+
+A transaction may generate enough data that it is not practical to
+always hold pending updates in memory. Instead, a TransactionBuffer
+is used to store the data until a commit or abort.
+"""
+
+# XXX Figure out what a sensible storage format is
+
+# XXX A faster implementation might store trans data in memory until
+# it reaches a certain size.
+
+import tempfile
+import cPickle
+
+class TransactionBuffer:
+
+ def __init__(self):
+ self.file = tempfile.TemporaryFile()
+ self.count = 0
+ self.size = 0
+ # It's safe to use a fast pickler because the only objects
+ # stored are builtin types -- strings or None.
+ self.pickler = cPickle.Pickler(self.file, 1)
+ self.pickler.fast = 1
+
+ def store(self, oid, version, data):
+ """Store oid, version, data for later retrieval"""
+ self.pickler.dump((oid, version, data))
+ self.count += 1
+ # Estimate per-record cache size
+ self.size = self.size + len(data) + (27 + 12)
+ if version:
+ self.size = self.size + len(version) + 4
+
+ def invalidate(self, oid, version):
+ self.pickler.dump((oid, version, None))
+ self.count += 1
+
+ def clear(self):
+ """Mark the buffer as empty"""
+ self.file.seek(0)
+ self.count = 0
+ self.size = 0
+
+ # XXX unchecked constraints:
+ # 1. can't call store() after begin_iterate()
+ # 2. must call clear() after iteration finishes
+
+ def begin_iterate(self):
+ """Move the file pointer in advance of iteration"""
+ self.file.flush()
+ self.file.seek(0)
+ self.unpickler = cPickle.Unpickler(self.file)
+
+ def next(self):
+ """Return next tuple of data or None if EOF"""
+ if self.count == 0:
+ del self.unpickler
+ return None
+ oid_ver_data = self.unpickler.load()
+ self.count -= 1
+ return oid_ver_data
+
+ def get_size(self):
+ """Return size of data stored in buffer (just a hint)."""
+
+ return self.size
=== StandaloneZODB/ZEO/zrpc2.py 1.1 => 1.2 === (615/715 lines abridged)
+
+The basic protocol is as:
+a pickled tuple containing: msgid, flags, method, args
+
+msgid is an integer.
+flags is an integer.
+ The only currently defined flag is ASYNC (0x1), which means
+ the client does not expect a reply.
+method is a string specifying the method to invoke.
+ For a reply, the method is ".reply".
+args is a tuple of the argument to pass to method.
+
+XXX need to specify a version number that describes the protocol.
+allow for future revision.
+
+XXX support multiple outstanding calls
+
+XXX factor out common pattern of deciding what protocol to use based
+on whether address is tuple or string
+"""
+
+import asyncore
+import errno
+import cPickle
+import os
+import select
+import socket
+import sys
+import threading
+import thread
+import time
+import traceback
+import types
+
+from cStringIO import StringIO
+
+from ZODB import POSException
+from ZEO import smac, trigger
+from Exceptions import Disconnected
+import zLOG
+import ThreadedAsync
+from Exceptions import Disconnected
+
+REPLY = ".reply" # message name used for replies
+ASYNC = 1
+
+_label = "zrpc:%s" % os.getpid()
+
+def new_label():
+ global _label
[-=- -=- -=- 615 lines omitted -=- -=- -=-]
+
+ def readable(self):
+ return 1
+
+ def handle_accept(self):
+ try:
+ sock, addr = self.accept()
+ except socket.error, msg:
+ log("accepted failed: %s" % msg)
+ return
+ c = self.factory(sock, addr, self.obj)
+ log("connect from %s: %s" % (repr(addr), c))
+ self.clients.append(c)
+
+class Handler:
+ """Base class used to handle RPC caller discovery"""
+
+ def set_caller(self, addr):
+ self.__caller = addr
+
+ def get_caller(self):
+ return self.__caller
+
+ def clear_caller(self):
+ self.__caller = None
+
+_globals = globals()
+_silly = ('__doc__',)
+
+def find_global(module, name):
+ """Helper for message unpickler"""
+ try:
+ m = __import__(module, _globals, _globals, _silly)
+ except ImportError, msg:
+ raise ZRPCError("import error %s: %s" % (module, msg))
+
+ try:
+ r = getattr(m, name)
+ except AttributeError:
+ raise ZRPCError("module %s has no global %s" % (module, name))
+
+ safe = getattr(r, '__no_side_effects__', 0)
+ if safe:
+ return r
+
+ if type(r) == types.ClassType and issubclass(r, Exception):
+ return r
+
+ raise ZRPCError("Unsafe global: %s.%s" % (module, name))
+
=== StandaloneZODB/ZEO/ClientCache.py 1.18 => 1.19 ===
from struct import pack, unpack
from thread import allocate_lock
-import zLOG
-magic='ZEC0'
+import sys
+import zLOG
-def LOG(msg, level=zLOG.BLATHER):
+def log(msg, level=zLOG.INFO):
zLOG.LOG("ZEC", level, msg)
+magic='ZEC0'
+
class ClientCache:
def __init__(self, storage='', size=20000000, client=None, var=None):
@@ -211,16 +213,14 @@
f[0].write(magic)
current=0
+ log("cache opened. current = %s" % current)
+
self._limit=size/2
self._current=current
- def close(self):
- try:
- self._f[self._current].close()
- except (os.error, ValueError):
- pass
-
def open(self):
+ # XXX open is overloaded to perform two tasks for
+ # optimization reasons
self._acquire()
try:
self._index=index={}
@@ -235,6 +235,19 @@
return serial.items()
finally: self._release()
+ def close(self):
+ for f in self._f:
+ if f is not None:
+ f.close()
+
+ def verify(self, verifyFunc):
+ """Call the verifyFunc on every object in the cache.
+
+ verifyFunc(oid, serialno, version)
+ """
+ for oid, (s, vs) in self.open():
+ verifyFunc(oid, s, vs)
+
def invalidate(self, oid, version):
self._acquire()
try:
@@ -373,8 +386,6 @@
self._f[current]=open(self._p[current],'w+b')
else:
# Temporary cache file:
- if self._f[current] is not None:
- self._f[current].close()
self._f[current] = tempfile.TemporaryFile(suffix='.zec')
self._f[current].write(magic)
self._pos=pos=4
@@ -383,55 +394,57 @@
def store(self, oid, p, s, version, pv, sv):
self._acquire()
- try: self._store(oid, p, s, version, pv, sv)
- finally: self._release()
+ try:
+ self._store(oid, p, s, version, pv, sv)
+ finally:
+ self._release()
def _store(self, oid, p, s, version, pv, sv):
if not s:
- p=''
- s='\0\0\0\0\0\0\0\0'
- tlen=31+len(p)
+ p = ''
+ s = '\0\0\0\0\0\0\0\0'
+ tlen = 31 + len(p)
if version:
- tlen=tlen+len(version)+12+len(pv)
- vlen=len(version)
+ tlen = tlen + len(version) + 12 + len(pv)
+ vlen = len(version)
else:
- vlen=0
+ vlen = 0
- pos=self._pos
- current=self._current
- f=self._f[current]
- f.seek(pos)
- stlen=pack(">I",tlen)
- write=f.write
- write(oid+'v'+stlen+pack(">HI", vlen, len(p))+s)
- if p: write(p)
+ stlen = pack(">I", tlen)
+ # accumulate various data to write into a list
+ l = [oid, 'v', stlen, pack(">HI", vlen, len(p)), s]
+ if p:
+ l.append(p)
if version:
- write(version)
- write(pack(">I", len(pv)))
- write(pv)
- write(sv)
-
- write(stlen)
+ l.extend([version,
+ pack(">I", len(pv)),
+ pv, sv])
+ l.append(stlen)
+ f = self._f[self._current]
+ f.seek(self._pos)
+ f.write("".join(l))
- if current: self._index[oid]=-pos
- else: self._index[oid]=pos
+ if self._current:
+ self._index[oid] = - self._pos
+ else:
+ self._index[oid] = self._pos
- self._pos=pos+tlen
+ self._pos += tlen
def read_index(index, serial, f, current):
- LOG("read_index(%s)" % f.name)
seek=f.seek
read=f.read
pos=4
+ seek(0,2)
+ size=f.tell()
while 1:
- seek(pos)
+ f.seek(pos)
h=read(27)
-
+
if len(h)==27 and h[8] in 'vni':
tlen, vlen, dlen = unpack(">iHi", h[9:19])
- else:
- break
+ else: tlen=-1
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
break
@@ -466,15 +479,3 @@
except: pass
return pos
-
-def main(files):
- for file in files:
- print file
- index = {}
- serial = {}
- read_index(index, serial, open(file), 0)
- print index.keys()
-
-if __name__ == "__main__":
- import sys
- main(sys.argv[1:])
=== StandaloneZODB/ZEO/ClientStorage.py 1.35 => 1.36 === (868/968 lines abridged)
##############################################################################
"""Network ZODB storage client
-"""
+XXX support multiple outstanding requests up until the vote?
+XXX is_connected() vis ClientDisconnected error
+"""
__version__='$Revision$'[11:-2]
-import struct, time, os, socket, string, Sync, zrpc, ClientCache
-import tempfile, Invalidator, ExtensionClass, thread
-import ThreadedAsync
-
-now=time.time
+import cPickle
+import os
+import socket
+import string
+import struct
+import sys
+import tempfile
+import thread
+import threading
+import time
+from types import TupleType, StringType
from struct import pack, unpack
-from ZODB import POSException, BaseStorage
+
+import ExtensionClass, Sync, ThreadLock
+import ClientCache
+import zrpc2
+import ServerStub
+from TransactionBuffer import TransactionBuffer
+
+from ZODB import POSException
from ZODB.TimeStamp import TimeStamp
-from zLOG import LOG, PROBLEM, INFO
+from zLOG import LOG, PROBLEM, INFO, BLATHER
+from Exceptions import Disconnected
-try: from ZODB.ConflictResolution import ResolvedSerial
-except: ResolvedSerial='rs'
+def log2(type, msg, subsys="ClientStorage %d" % os.getpid()):
+ LOG(subsys, type, msg)
-TupleType=type(())
+try:
+ from ZODB.ConflictResolution import ResolvedSerial
+except ImportError:
+ ResolvedSerial = 'rs'
[-=- -=- -=- 868 lines omitted -=- -=- -=-]
- _w.append(t)
- return t
+ return self._server.versions(max)
+
+ # below are methods invoked by the StorageServer
+
+ def serialno(self, arg):
+ self._serials.append(arg)
+
+ def info(self, dict):
+ self._info.update(dict)
+
+ def begin(self):
+ self._tfile = tempfile.TemporaryFile()
+ self._pickler = cPickle.Pickler(self._tfile, 1)
+ self._pickler.fast = 1 # Don't use the memo
+
+ def invalidate(self, args):
+ if self._pickler is None:
+ return
+ self._pickler.dump(args)
+
+ def end(self):
+ if self._pickler is None:
+ return
+ self._pickler.dump((0,0))
+## self._pickler.dump = None
+ self._tfile.seek(0)
+ unpick = cPickle.Unpickler(self._tfile)
+ self._tfile = None
+
+ while 1:
+ oid, version = unpick.load()
+ if not oid:
+ break
+ self._cache.invalidate(oid, version=version)
+ self._db.invalidate(oid, version=version)
+
+ def Invalidate(self, args):
+ # XXX _db could be None
+ for oid, version in args:
+ self._cache.invalidate(oid, version=version)
+ try:
+ self._db.invalidate(oid, version=version)
+ except AttributeError, msg:
+ log2(PROBLEM,
+ "Invalidate(%s, %s) failed for _db: %s" % (repr(oid),
+ repr(version),
+ msg))
+
=== StandaloneZODB/ZEO/StorageServer.py 1.32 => 1.33 === (750/850 lines abridged)
+##############################################################################
#
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
@@ -59,7 +59,7 @@
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
-#
+#
#
# Disclaimer
#
@@ -82,527 +82,394 @@
# attributions are listed in the accompanying credits file.
#
##############################################################################
+"""Network ZODB storage server
-__version__ = "$Revision$"[11:-2]
+This server acts as a front-end for one or more real storages, like
+file storage or Berkeley storage.
-import asyncore, socket, string, sys, os
-from smac import SizedMessageAsyncConnection
-from ZODB import POSException
+XXX Need some basic access control-- a declaration of the methods
+exported for invocation by the server.
+"""
+
+import asyncore
import cPickle
-from cPickle import Unpickler
-from ZODB.POSException import TransactionError, UndoError, VersionCommitError
-from ZODB.Transaction import Transaction
-import traceback
-from zLOG import LOG, INFO, ERROR, TRACE, BLATHER
+import os
+import sys
+import threading
+import types
+
+import ClientStub
+import zrpc2
+import zLOG
+
+from zrpc2 import Dispatcher, Handler, ManagedServerConnection, Delay
+from ZODB.POSException import StorageError, StorageTransactionError, \
+ TransactionError, ReadOnlyError
from ZODB.referencesf import referencesf
[-=- -=- -=- 750 lines omitted -=- -=- -=-]
- try:
- port='', int(port)
- except:
- pass
-
- d = {'1': ZODB.FileStorage.FileStorage(name)}
- StorageServer(port, d)
- asyncwrap.loop()
+ self.server.invalidate(self, self.__storage_id,
+ self.__invalidated,
+ self.get_size_info())
+
+ if not self._handle_waiting():
+ self._transaction = None
+ self.__invalidated = []
+
+ def tpc_abort(self, id):
+ if not self._check_tid(id):
+ return
+ r = self.__storage.tpc_abort(self._transaction)
+ assert self.__storage._transaction is None
+
+ if not self._handle_waiting():
+ self._transaction = None
+ self.__invalidated = []
+
+ def _restart_delayed_transaction(self, delay, trans):
+ self._transaction = trans
+ self.__storage.tpc_begin(trans)
+ self.__invalidated = []
+ assert self._transaction.id == self.__storage._transaction.id
+ delay.reply(None)
+
+ def _handle_waiting(self):
+ if self.__storage.__waiting:
+ delay, proxy, trans = self.__storage.__waiting.pop(0)
+ proxy._restart_delayed_transaction(delay, trans)
+ if self is proxy:
+ return 1
+
+ def new_oids(self, n=100):
+ """Return a sequence of n new oids, where n defaults to 100"""
+ if n < 0:
+ n = 1
+ return [self.__storage.new_oid() for i in range(n)]
+
+def fixup_storage(storage):
+ # backwards compatibility hack
+ if not hasattr(storage,'tpc_vote'):
+ storage.tpc_vote = lambda *args: None
=== StandaloneZODB/ZEO/smac.py 1.11 => 1.12 ===
__version__ = "$Revision$"[11:-2]
-import asyncore, string, struct, zLOG, sys, Acquisition
+import asyncore, struct
+from Exceptions import Disconnected
+from zLOG import LOG, TRACE, ERROR, INFO, BLATHER
+from types import StringType
+
import socket, errno
-from zLOG import LOG, TRACE, ERROR, INFO
# Use the dictionary to make sure we get the minimum number of errno
# entries. We expect that EWOULDBLOCK == EAGAIN on most systems --
@@ -109,81 +112,101 @@
expected_socket_write_errors = tuple(tmp_dict.keys())
del tmp_dict
-class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
+class SizedMessageAsyncConnection(asyncore.dispatcher):
+ __super_init = asyncore.dispatcher.__init__
+ __super_close = asyncore.dispatcher.close
+
+ __closed = 1 # Marker indicating that we're closed
- __append=None # Marker indicating that we're closed
+ socket = None # to outwit Sam's getattr
- socket=None # to outwit Sam's getattr
+ READ_SIZE = 8096
def __init__(self, sock, addr, map=None, debug=None):
- SizedMessageAsyncConnection.inheritedAttribute(
- '__init__')(self, sock, map)
- self.addr=addr
+ self.__super_init(sock, map)
+ self.addr = addr
if debug is not None:
- self._debug=debug
+ self._debug = debug
elif not hasattr(self, '_debug'):
- self._debug=__debug__ and 'smac'
- self.__state=None
- self.__inp=None
- self.__inpl=0
- self.__l=4
- self.__output=output=[]
- self.__append=output.append
- self.__pop=output.pop
-
- def handle_read(self,
- join=string.join, StringType=type(''), _type=type,
- _None=None):
-
+ self._debug = __debug__ and 'smac'
+ self.__state = None
+ self.__inp = None # None, a single String, or a list
+ self.__input_len = 0
+ self.__msg_size = 4
+ self.__output = []
+ self.__closed = None
+
+ # XXX avoid expensive getattr calls?
+ def __nonzero__(self):
+ return 1
+
+ def handle_read(self):
+ # Use a single __inp buffer and integer indexes to make this
+ # fast.
try:
d=self.recv(8096)
except socket.error, err:
if err[0] in expected_socket_read_errors:
return
raise
- if not d: return
+ if not d:
+ return
- inp=self.__inp
- if inp is _None:
- inp=d
- elif _type(inp) is StringType:
- inp=[inp,d]
+ input_len = self.__input_len + len(d)
+ msg_size = self.__msg_size
+ state = self.__state
+
+ inp = self.__inp
+ if msg_size > input_len:
+ if inp is None:
+ self.__inp = d
+ elif type(self.__inp) is StringType:
+ self.__inp = [self.__inp, d]
+ else:
+ self.__inp.append(d)
+ self.__input_len = input_len
+ return # keep waiting for more input
+
+ # load all previous input and d into single string inp
+ if isinstance(inp, StringType):
+ inp = inp + d
+ elif inp is None:
+ inp = d
else:
inp.append(d)
+ inp = "".join(inp)
- inpl=self.__inpl+len(d)
- l=self.__l
-
- while 1:
-
- if l <= inpl:
- # Woo hoo, we have enough data
- if _type(inp) is not StringType: inp=join(inp,'')
- d=inp[:l]
- inp=inp[l:]
- inpl=inpl-l
- if self.__state is _None:
- # waiting for message
- l=struct.unpack(">i",d)[0]
- self.__state=1
- else:
- l=4
- self.__state=_None
- self.message_input(d)
+ offset = 0
+ while (offset + msg_size) <= input_len:
+ msg = inp[offset:offset + msg_size]
+ offset = offset + msg_size
+ if state is None:
+ # waiting for message
+ msg_size = struct.unpack(">i", msg)[0]
+ state = 1
else:
- break # not enough data
-
- self.__l=l
- self.__inp=inp
- self.__inpl=inpl
+ msg_size = 4
+ state = None
+ self.message_input(msg)
+
+ self.__state = state
+ self.__msg_size = msg_size
+ self.__inp = inp[offset:]
+ self.__input_len = input_len - offset
- def readable(self): return 1
- def writable(self): return not not self.__output
+ def readable(self):
+ return 1
+
+ def writable(self):
+ if len(self.__output) == 0:
+ return 0
+ else:
+ return 1
def handle_write(self):
- output=self.__output
+ output = self.__output
while output:
- v=output[0]
+ v = output[0]
try:
n=self.send(v)
except socket.error, err:
@@ -191,42 +214,33 @@
break # we couldn't write anything
raise
if n < len(v):
- output[0]=v[n:]
+ output[0] = v[n:]
break # we can't write any more
else:
del output[0]
- #break # waaa
-
def handle_close(self):
self.close()
- def message_output(self, message,
- pack=struct.pack, len=len):
- if self._debug:
- if len(message) > 40: m=message[:40]+' ...'
- else: m=message
- LOG(self._debug, TRACE, 'message_output %s' % `m`)
-
- append=self.__append
- if append is None:
- raise Disconnected("This action is temporarily unavailable.<p>")
-
- append(pack(">i",len(message))+message)
-
- def log_info(self, message, type='info'):
- if type=='error': type=ERROR
- else: type=INFO
- LOG('ZEO', type, message)
+ def message_output(self, message):
+ if __debug__:
+ if self._debug:
+ if len(message) > 40:
+ m = message[:40]+' ...'
+ else:
+ m = message
+ LOG(self._debug, TRACE, 'message_output %s' % `m`)
- log=log_info
+ if self.__closed is not None:
+ raise Disconnected, (
+ "This action is temporarily unavailable."
+ "<p>"
+ )
+ # do two separate appends to avoid copying the message string
+ self.__output.append(struct.pack(">i", len(message)))
+ self.__output.append(message)
def close(self):
- if self.__append is not None:
- self.__append=None
- SizedMessageAsyncConnection.inheritedAttribute('close')(self)
-
-class Disconnected(Exception):
- """The client has become disconnected from the server
- """
-
+ if self.__closed is None:
+ self.__closed = 1
+ self.__super_close()
=== StandaloneZODB/ZEO/start.py 1.26 => 1.27 ===
import sys, os, getopt, string
+import StorageServer
+import asyncore
+
def directory(p, n=1):
d=p
while n:
@@ -115,9 +118,11 @@
def main(argv):
me=argv[0]
- sys.path[:]==filter(None, sys.path)
sys.path.insert(0, directory(me, 2))
+ # XXX hack for profiling support
+ global unix, storages, zeo_pid, asyncore
+
args=[]
last=''
for a in argv[1:]:
@@ -130,25 +135,13 @@
args.append(a)
last=a
- if os.environ.has_key('INSTANCE_HOME'):
- INSTANCE_HOME=os.environ['INSTANCE_HOME']
- elif os.path.isdir(os.path.join(directory(me, 4),'var')):
- INSTANCE_HOME=directory(me, 4)
- else:
- INSTANCE_HOME=os.getcwd()
-
- if os.path.isdir(os.path.join(INSTANCE_HOME, 'var')):
- var=os.path.join(INSTANCE_HOME, 'var')
- else:
- var=INSTANCE_HOME
+ INSTANCE_HOME=os.environ.get('INSTANCE_HOME', directory(me, 4))
zeo_pid=os.environ.get('ZEO_SERVER_PID',
- os.path.join(var, 'ZEO_SERVER.pid')
+ os.path.join(INSTANCE_HOME, 'var', 'ZEO_SERVER.pid')
)
- opts, args = getopt.getopt(args, 'p:Ddh:U:sS:u:')
-
- fs=os.path.join(var, 'Data.fs')
+ fs=os.path.join(INSTANCE_HOME, 'var', 'Data.fs')
usage="""%s [options] [filename]
@@ -156,17 +149,14 @@
-D -- Run in debug mode
- -d -- Generate detailed debug logging without running
- in the foreground.
-
-U -- Unix-domain socket file to listen on
-u username or uid number
The username to run the ZEO server as. You may want to run
the ZEO server as 'nobody' or some other user with limited
- resouces. The only works under Unix, and if the storage
- server is started by root.
+ resouces. The only works under Unix, and if ZServer is
+ started by root.
-p port -- port to listen on
@@ -189,23 +179,42 @@
attr_name -- This is the name to which the storage object
is assigned in the module.
+ -P file -- Run under profile and dump output to file. Implies the
+ -s flag.
+
if no file name is specified, then %s is used.
""" % (me, fs)
+ try:
+ opts, args = getopt.getopt(args, 'p:Dh:U:sS:u:P:')
+ except getopt.error, msg:
+ print usage
+ print msg
+ sys.exit(1)
+
port=None
- debug=detailed=0
+ debug=0
host=''
unix=None
Z=1
UID='nobody'
+ prof = None
for o, v in opts:
if o=='-p': port=string.atoi(v)
elif o=='-h': host=v
elif o=='-U': unix=v
elif o=='-u': UID=v
elif o=='-D': debug=1
- elif o=='-d': detailed=1
elif o=='-s': Z=0
+ elif o=='-P': prof = v
+
+ if prof:
+ Z = 0
+
+ try:
+ from ZServer.medusa import asyncore
+ sys.modules['asyncore']=asyncore
+ except: pass
if port is None and unix is None:
print usage
@@ -219,10 +228,9 @@
sys.exit(1)
fs=args[0]
+ __builtins__.__debug__=debug
if debug: os.environ['Z_DEBUG_MODE']='1'
- if detailed: os.environ['STUPID_LOG_SEVERITY']='-99999'
-
from zLOG import LOG, INFO, ERROR
# Try to set uid to "-u" -provided uid.
@@ -263,71 +271,54 @@
import zdaemon
zdaemon.run(sys.argv, '')
- try:
-
- import ZEO.StorageServer, asyncore
-
- storages={}
- for o, v in opts:
- if o=='-S':
- n, m = string.split(v,'=')
- if string.find(m,':'):
- # we got an attribute name
- m, a = string.split(m,':')
- else:
- # attribute name must be same as storage name
- a=n
- storages[n]=get_storage(m,a)
-
- if not storages:
- import ZODB.FileStorage
- storages['1']=ZODB.FileStorage.FileStorage(fs)
-
- # Try to set up a signal handler
- try:
- import signal
-
- signal.signal(signal.SIGTERM,
- lambda sig, frame, s=storages: shutdown(s)
- )
- signal.signal(signal.SIGINT,
- lambda sig, frame, s=storages: shutdown(s, 0)
- )
- try: signal.signal(signal.SIGHUP, rotate_logs_handler)
- except: pass
-
- except: pass
-
- items=storages.items()
- items.sort()
- for kv in items:
- LOG('ZEO Server', INFO, 'Serving %s:\t%s' % kv)
-
- if not unix: unix=host, port
-
- ZEO.StorageServer.StorageServer(unix, storages)
-
- try: ppid, pid = os.getppid(), os.getpid()
- except: pass # getpid not supported
- else: open(zeo_pid,'w').write("%s %s" % (ppid, pid))
-
- except:
- # Log startup exception and tell zdaemon not to restart us.
- info=sys.exc_info()
- try:
- import zLOG
- zLOG.LOG("z2", zLOG.PANIC, "Startup exception",
- error=info)
- except:
- pass
-
- import traceback
- apply(traceback.print_exception, info)
-
- sys.exit(0)
+ storages={}
+ for o, v in opts:
+ if o=='-S':
+ n, m = string.split(v,'=')
+ if string.find(m,':'):
+ # we got an attribute name
+ m, a = string.split(m,':')
+ else:
+ # attribute name must be same as storage name
+ a=n
+ storages[n]=get_storage(m,a)
+
+ if not storages:
+ import ZODB.FileStorage
+ storages['1']=ZODB.FileStorage.FileStorage(fs)
- asyncore.loop()
+ # Try to set up a signal handler
+ try:
+ import signal
+ signal.signal(signal.SIGTERM,
+ lambda sig, frame, s=storages: shutdown(s)
+ )
+ signal.signal(signal.SIGINT,
+ lambda sig, frame, s=storages: shutdown(s, 0)
+ )
+ signal.signal(signal.SIGHUP, rotate_logs_handler)
+
+ finally: pass
+
+ items=storages.items()
+ items.sort()
+ for kv in items:
+ LOG('ZEO Server', INFO, 'Serving %s:\t%s' % kv)
+
+ if not unix: unix=host, port
+
+ if prof:
+ cmds = \
+ "StorageServer.StorageServer(unix, storages);" \
+ 'open(zeo_pid,"w").write("%s %s" % (os.getppid(), os.getpid()));' \
+ "asyncore.loop()"
+ import profile
+ profile.run(cmds, prof)
+ else:
+ StorageServer.StorageServer(unix, storages)
+ open(zeo_pid,'w').write("%s %s" % (os.getppid(), os.getpid()))
+ asyncore.loop()
def rotate_logs():
import zLOG
@@ -335,10 +326,7 @@
zLOG.log_write.reinitialize()
else:
# Hm, lets at least try to take care of the stupid logger:
- if hasattr(zLOG, '_set_stupid_dest'):
- zLOG._set_stupid_dest(None)
- else:
- zLOG._stupid_dest = None
+ zLOG._stupid_dest=None
def rotate_logs_handler(signum, frame):
rotate_logs()
@@ -359,7 +347,7 @@
for storage in storages.values():
try: storage.close()
- except: pass
+ finally: pass
try:
from zLOG import LOG, INFO