[Zodb-checkins] CVS: ZEO/ZEO - ClientStub.py:1.4 CommitLog.py:1.2 Exceptions.py:1.4 ICache.py:1.2 ServerStub.py:1.4 TransactionBuffer.py:1.4 ClientCache.py:1.23 ClientStorage.py:1.41 StorageServer.py:1.37 __init__.py:1.9 smac.py:1.17 start.py:1.33 trigger.py:1.6
Jeremy Hylton
jeremy@zope.com
Tue, 11 Jun 2002 09:43:37 -0400
Update of /cvs-repository/ZEO/ZEO
In directory cvs.zope.org:/tmp/cvs-serv5548/ZEO
Modified Files:
ClientCache.py ClientStorage.py StorageServer.py __init__.py
smac.py start.py trigger.py
Added Files:
ClientStub.py CommitLog.py Exceptions.py ICache.py
ServerStub.py TransactionBuffer.py
Log Message:
Merge ZEO2-branch to trunk.
=== ZEO/ZEO/ClientStub.py 1.3 => 1.4 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Stub for interface exported by ClientStorage"""
+
+class ClientStorage:
+ def __init__(self, rpc):
+ self.rpc = rpc
+
+ def beginVerify(self):
+ self.rpc.callAsync('begin')
+
+ # XXX must rename the two invalidate messages. I can never
+ # remember which is which
+
+ def invalidate(self, args):
+ self.rpc.callAsync('invalidate', args)
+
+ def Invalidate(self, args):
+ self.rpc.callAsync('Invalidate', args)
+
+ def endVerify(self):
+ self.rpc.callAsync('end')
+
+ def serialnos(self, arg):
+ self.rpc.callAsync('serialnos', arg)
+
+ def info(self, arg):
+ self.rpc.callAsync('info', arg)
=== ZEO/ZEO/CommitLog.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Log a transaction's commit info during two-phase commit.
+
+A storage server allows multiple clients to commit transactions, but
+must serialize them as the actually execute at the server. The
+concurrent commits are achieved by logging actions up until the
+tpc_vote(). At that point, the entire transaction is committed on the
+real storage.
+"""
+import cPickle
+import tempfile
+
+class CommitLog:
+
+ def __init__(self):
+ self.file = tempfile.TemporaryFile(suffix=".log")
+ self.pickler = cPickle.Pickler(self.file, 1)
+ self.pickler.fast = 1
+ self.stores = 0
+ self.read = 0
+
+ def tpc_begin(self, t, tid, status):
+ self.t = t
+ self.tid = tid
+ self.status = status
+
+ def store(self, oid, serial, data, version):
+ self.pickler.dump((oid, serial, data, version))
+ self.stores += 1
+
+ def get_loader(self):
+ self.read = 1
+ self.file.seek(0)
+ return self.stores, cPickle.Unpickler(self.file)
+
=== ZEO/ZEO/Exceptions.py 1.3 => 1.4 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Exceptions for ZEO."""
+
+class Disconnected(Exception):
+ """Exception raised when a ZEO client is disconnected from the
+ ZEO server."""
=== ZEO/ZEO/ICache.py 1.1 => 1.2 ===
+ from Interface import Base
+except ImportError:
+ class Base:
+ # a dummy interface for use when Zope's is unavailable
+ pass
+
+class ICache(Base):
+ """ZEO client cache.
+
+ __init__(storage, size, client, var)
+
+ All arguments optional.
+
+ storage -- name of storage
+ size -- max size of cache in bytes
+ client -- a string; if specified, cache is persistent.
+ var -- var directory to store cache files in
+ """
+
+ def open():
+ """Returns a sequence of object info tuples.
+
+ An object info tuple is a pair containing an object id and a
+ pair of serialnos, a non-version serialno and a version serialno:
+ oid, (serial, ver_serial)
+
+ This method builds an index of the cache and returns a
+ sequence used for cache validation.
+ """
+
+ def close():
+ """Closes the cache."""
+
+ def verify(func):
+ """Call func on every object in cache.
+
+ func is called with three arguments
+ func(oid, serial, ver_serial)
+ """
+
+ def invalidate(oid, version):
+ """Remove object from cache."""
+
+ def load(oid, version):
+ """Load object from cache.
+
+ Return None if object not in cache.
+ Return data, serialno if object is in cache.
+ """
+
+ def store(oid, p, s, version, pv, sv):
+ """Store a new object in the cache."""
+
+ def update(oid, serial, version, data):
+ """Update an object already in the cache.
+
+ XXX This method is called to update objects that were modified by
+ a transaction. It's likely that it is already in the cache,
+ and it may be possible for the implementation to operate more
+ efficiently.
+ """
+
+ def modifiedInVersion(oid):
+ """Return the version an object is modified in.
+
+ '' signifies the trunk.
+ Returns None if the object is not in the cache.
+ """
+
+ def checkSize(size):
+ """Check if adding size bytes would exceed cache limit.
+
+ This method is often called just before store or update. The
+ size is a hint about the amount of data that is about to be
+ stored. The cache may want to evict some data to make space.
+ """
+
+
+
+
+
+
=== ZEO/ZEO/ServerStub.py 1.3 => 1.4 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Stub for interface exposed by StorageServer"""
+
+class StorageServer:
+
+ def __init__(self, rpc):
+ self.rpc = rpc
+
+ def register(self, storage_name, read_only):
+ self.rpc.call('register', storage_name, read_only)
+
+ def get_info(self):
+ return self.rpc.call('get_info')
+
+ def get_size_info(self):
+ return self.rpc.call('get_size_info')
+
+ def beginZeoVerify(self):
+ self.rpc.callAsync('beginZeoVerify')
+
+ def zeoVerify(self, oid, s, sv):
+ self.rpc.callAsync('zeoVerify', oid, s, sv)
+
+ def endZeoVerify(self):
+ self.rpc.callAsync('endZeoVerify')
+
+ def new_oids(self, n=None):
+ if n is None:
+ return self.rpc.call('new_oids')
+ else:
+ return self.rpc.call('new_oids', n)
+
+ def pack(self, t, wait=None):
+ if wait is None:
+ self.rpc.call('pack', t)
+ else:
+ self.rpc.call('pack', t, wait)
+
+ def zeoLoad(self, oid):
+ return self.rpc.call('zeoLoad', oid)
+
+ def storea(self, oid, serial, data, version, id):
+ self.rpc.callAsync('storea', oid, serial, data, version, id)
+
+ def tpc_begin(self, id, user, descr, ext, tid, status):
+ return self.rpc.call('tpc_begin', id, user, descr, ext, tid, status)
+
+ def vote(self, trans_id):
+ return self.rpc.call('vote', trans_id)
+
+ def tpc_finish(self, id):
+ return self.rpc.call('tpc_finish', id)
+
+ def tpc_abort(self, id):
+ self.rpc.callAsync('tpc_abort', id)
+
+ def abortVersion(self, src, id):
+ return self.rpc.call('abortVersion', src, id)
+
+ def commitVersion(self, src, dest, id):
+ return self.rpc.call('commitVersion', src, dest, id)
+
+ def history(self, oid, version, length=None):
+ if length is not None:
+ return self.rpc.call('history', oid, version)
+ else:
+ return self.rpc.call('history', oid, version, length)
+
+ def load(self, oid, version):
+ return self.rpc.call('load', oid, version)
+
+ def loadSerial(self, oid, serial):
+ return self.rpc.call('loadSerial', oid, serial)
+
+ def modifiedInVersion(self, oid):
+ return self.rpc.call('modifiedInVersion', oid)
+
+ def new_oid(self, last=None):
+ if last is None:
+ return self.rpc.call('new_oid')
+ else:
+ return self.rpc.call('new_oid', last)
+
+ def store(self, oid, serial, data, version, trans):
+ return self.rpc.call('store', oid, serial, data, version, trans)
+
+ def transactionalUndo(self, trans_id, trans):
+ return self.rpc.call('transactionalUndo', trans_id, trans)
+
+ def undo(self, trans_id):
+ return self.rpc.call('undo', trans_id)
+
+ def undoLog(self, first, last):
+ # XXX filter not allowed across RPC
+ return self.rpc.call('undoLog', first, last)
+
+ def undoInfo(self, first, last, spec):
+ return self.rpc.call('undoInfo', first, last, spec)
+
+ def versionEmpty(self, vers):
+ return self.rpc.call('versionEmpty', vers)
+
+ def versions(self, max=None):
+ if max is None:
+ return self.rpc.call('versions')
+ else:
+ return self.rpc.call('versions', max)
=== ZEO/ZEO/TransactionBuffer.py 1.3 => 1.4 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""A TransactionBuffer store transaction updates until commit or abort.
+
+A transaction may generate enough data that it is not practical to
+always hold pending updates in memory. Instead, a TransactionBuffer
+is used to store the data until a commit or abort.
+"""
+
+# A faster implementation might store trans data in memory until it
+# reaches a certain size.
+
+import tempfile
+import cPickle
+
+class TransactionBuffer:
+
+ def __init__(self):
+ self.file = tempfile.TemporaryFile(suffix=".tbuf")
+ self.count = 0
+ self.size = 0
+ # It's safe to use a fast pickler because the only objects
+ # stored are builtin types -- strings or None.
+ self.pickler = cPickle.Pickler(self.file, 1)
+ self.pickler.fast = 1
+
+ def close(self):
+ try:
+ self.file.close()
+ except OSError:
+ pass
+
+
+ def store(self, oid, version, data):
+ """Store oid, version, data for later retrieval"""
+ self.pickler.dump((oid, version, data))
+ self.count += 1
+ # Estimate per-record cache size
+ self.size = self.size + len(data) + (27 + 12)
+ if version:
+ self.size = self.size + len(version) + 4
+
+ def invalidate(self, oid, version):
+ self.pickler.dump((oid, version, None))
+ self.count += 1
+
+ def clear(self):
+ """Mark the buffer as empty"""
+ self.file.seek(0)
+ self.count = 0
+ self.size = 0
+
+ # unchecked constraints:
+ # 1. can't call store() after begin_iterate()
+ # 2. must call clear() after iteration finishes
+
+ def begin_iterate(self):
+ """Move the file pointer in advance of iteration"""
+ self.file.flush()
+ self.file.seek(0)
+ self.unpickler = cPickle.Unpickler(self.file)
+
+ def next(self):
+ """Return next tuple of data or None if EOF"""
+ if self.count == 0:
+ del self.unpickler
+ return None
+ oid_ver_data = self.unpickler.load()
+ self.count -= 1
+ return oid_ver_data
+
+ def get_size(self):
+ """Return size of data stored in buffer (just a hint)."""
+
+ return self.size
=== ZEO/ZEO/ClientCache.py 1.22 => 1.23 === (447/547 lines abridged)
##############################################################################
"""Implement a client cache
-
+
The cache is managed as two files, var/c0.zec and var/c1.zec.
Each cache file is a sequence of records of the form:
@@ -75,143 +75,181 @@
__version__ = "$Revision$"[11:-2]
-import os, tempfile
+import os
+import sys
+import tempfile
from struct import pack, unpack
from thread import allocate_lock
-import zLOG
-magic='ZEC0'
+import zLOG
+from ZEO.ICache import ICache
-def LOG(msg, level=zLOG.BLATHER):
+def log(msg, level=zLOG.INFO):
zLOG.LOG("ZEC", level, msg)
+magic='ZEC0'
+
class ClientCache:
+ __implements__ = ICache
+
def __init__(self, storage='', size=20000000, client=None, var=None):
# Allocate locks:
- l=allocate_lock()
- self._acquire=l.acquire
- self._release=l.release
+ L = allocate_lock()
+ self._acquire = L.acquire
+ self._release = L.release
if client:
# Create a persistent cache
if var is None:
- try: var=CLIENT_HOME
+ try:
+ var = CLIENT_HOME
except:
[-=- -=- -=- 447 lines omitted -=- -=- -=-]
- else: vs=None
+ vs = read(8)
+ if read(4) != h[9:13]:
+ break
+ else:
+ vs = None
if h[8] in 'vn':
- if current: index[oid]=-pos
- else: index[oid]=pos
- serial[oid]=h[-8:], vs
+ if current:
+ index[oid] = -pos
+ else:
+ index[oid] = pos
+ serial[oid] = h[-8:], vs
else:
if serial.has_key(oid):
# We have a record for this oid, but it was invalidated!
del serial[oid]
del index[oid]
-
-
- pos=pos+tlen
+
+
+ pos = pos + tlen
f.seek(pos)
- try: f.truncate()
- except: pass
-
- return pos
+ try:
+ f.truncate()
+ except:
+ pass
-def main(files):
- for file in files:
- print file
- index = {}
- serial = {}
- read_index(index, serial, open(file), 0)
- print index.keys()
-
-if __name__ == "__main__":
- import sys
- main(sys.argv[1:])
+ return pos
=== ZEO/ZEO/ClientStorage.py 1.40 => 1.41 === (866/966 lines abridged)
__version__='$Revision$'[11:-2]
-import struct, time, os, socket, string
-import tempfile, thread
-from struct import pack, unpack
-from types import TupleType
+import cPickle
+import os
+import tempfile
+import threading
+import time
+
+from ZEO import ClientCache, ServerStub
+from ZEO.TransactionBuffer import TransactionBuffer
+from ZEO.Exceptions import Disconnected
+from ZEO.zrpc.client import ConnectionManager
-import Invalidator, ExtensionClass
-import ThreadedAsync, Sync, zrpc, ClientCache
-
-from ZODB import POSException, BaseStorage
+from ZODB import POSException
from ZODB.TimeStamp import TimeStamp
+from zLOG import LOG, PROBLEM, INFO, BLATHER
-from ZEO.logger import zLogger
-
-log = zLogger("ZEO Client")
+def log2(type, msg, subsys="ClientStorage %d" % os.getpid()):
+ LOG(subsys, type, msg)
try:
from ZODB.ConflictResolution import ResolvedSerial
-except:
- ResolvedSerial='rs'
+except ImportError:
+ ResolvedSerial = 'rs'
class ClientStorageError(POSException.StorageError):
"""An error occured in the ZEO Client Storage"""
class UnrecognizedResult(ClientStorageError):
- """A server call returned an unrecognized result
- """
+ """A server call returned an unrecognized result"""
-class ClientDisconnected(ClientStorageError):
- """The database storage is disconnected from the storage.
- """
+class ClientDisconnected(ClientStorageError, Disconnected):
[-=- -=- -=- 866 lines omitted -=- -=- -=-]
- _w.append(t)
- return t
+ return self._server.versions(max)
+
+ # below are methods invoked by the StorageServer
+
+ def serialnos(self, args):
+ self._serials.extend(args)
+
+ def info(self, dict):
+ self._info.update(dict)
+
+ def begin(self):
+ self._tfile = tempfile.TemporaryFile(suffix=".inv")
+ self._pickler = cPickle.Pickler(self._tfile, 1)
+ self._pickler.fast = 1 # Don't use the memo
+
+ def invalidate(self, args):
+ # Queue an invalidate for the end the transaction
+ if self._pickler is None:
+ return
+ self._pickler.dump(args)
+
+ def end(self):
+ if self._pickler is None:
+ return
+ self._pickler.dump((0,0))
+ self._tfile.seek(0)
+ unpick = cPickle.Unpickler(self._tfile)
+ f = self._tfile
+ self._tfile = None
+
+ while 1:
+ oid, version = unpick.load()
+ if not oid:
+ break
+ self._cache.invalidate(oid, version=version)
+ self._db.invalidate(oid, version=version)
+ f.close()
+
+ def Invalidate(self, args):
+ for oid, version in args:
+ self._cache.invalidate(oid, version=version)
+ try:
+ self._db.invalidate(oid, version=version)
+ except AttributeError, msg:
+ log2(PROBLEM,
+ "Invalidate(%s, %s) failed for _db: %s" % (repr(oid),
+ repr(version),
+ msg))
=== ZEO/ZEO/StorageServer.py 1.36 => 1.37 === (1035/1135 lines abridged)
#
##############################################################################
+"""Network ZODB storage server
-__version__ = "$Revision$"[11:-2]
+This server acts as a front-end for one or more real storages, like
+file storage or Berkeley storage.
-import asyncore, socket, string, sys, os
-import cPickle
-from cPickle import Unpickler
-from cStringIO import StringIO
-from thread import start_new_thread
-import time
-from types import StringType
+XXX Need some basic access control-- a declaration of the methods
+exported for invocation by the server.
+"""
-from ZODB import POSException
-from ZODB.POSException import TransactionError, UndoError, VersionCommitError
-from ZODB.Transaction import Transaction
+import asyncore
+import cPickle
+import os
+import sys
+import threading
+
+from ZEO import ClientStub
+from ZEO.CommitLog import CommitLog
+from ZEO.zrpc.server import Dispatcher
+from ZEO.zrpc.connection import ManagedServerConnection, Delay
+
+import zLOG
+from ZODB.POSException import StorageError, StorageTransactionError, \
+ TransactionError, ReadOnlyError
from ZODB.referencesf import referencesf
-from ZODB.utils import U64
-
-from ZEO import trigger
-from ZEO import asyncwrap
-from ZEO.smac import Disconnected, SizedMessageAsyncConnection
-from ZEO.logger import zLogger, format_msg
-
-class StorageServerError(POSException.StorageError):
- pass
+from ZODB.Transaction import Transaction
+from ZODB.TmpStore import TmpStore
# We create a special fast pickler! This allows us
[-=- -=- -=- 1035 lines omitted -=- -=- -=-]
+ self.log = CommitLog()
+ self.invalidated = []
+
+ # Store information about the call that blocks
+ self.name = None
+ self.args = None
+
+ def tpc_begin(self, txn, tid, status):
+ self.txn = txn
+ self.tid = tid
+ self.status = status
+
+ def store(self, oid, serial, data, version):
+ self.log.store(oid, serial, data, version)
+
+ def tpc_abort(self):
+ pass # just forget about this strategy
+
+ def tpc_finish(self):
+ raise RuntimeError, "Logic error. This method must not be called."
+
+ def tpc_vote(self):
+ self.name = "tpc_vote"
+ self.args = ()
+ return self.block()
+
+ def commitVersion(self, src, dest):
+ self.name = "commitVersion"
+ self.args = src, dest
+ return self.block()
+
+ def abortVersion(self, src):
+ self.name = "abortVersion"
+ self.args = src,
+ return self.block()
+
+ def transactionalUndo(self, trans_id):
+ self.name = "transactionalUndo"
+ self.args = trans_id,
+ return self.block()
+
+ def restart(self, new_strategy):
+ # called by the storage when the storage is available
+ new_strategy.tpc_begin(self.txn, self.tid, self.status)
+ loads, loader = self.log.get_loader()
+ for i in range(loads):
+ oid, serial, data, version = loader.load()
+ new_strategy.store(oid, serial, data, version)
+ meth = getattr(new_strategy, self.name)
+ return meth(*self.args)
=== ZEO/ZEO/__init__.py 1.8 => 1.9 ===
#
##############################################################################
-
-import fap
=== ZEO/ZEO/smac.py 1.16 => 1.17 ===
__version__ = "$Revision$"[11:-2]
-import asyncore, string, struct, zLOG, sys, Acquisition
+import asyncore, struct
+from Exceptions import Disconnected
+from zLOG import LOG, TRACE, ERROR, INFO, BLATHER
+from types import StringType
+
import socket, errno
-from logger import zLogger
# Use the dictionary to make sure we get the minimum number of errno
# entries. We expect that EWOULDBLOCK == EAGAIN on most systems --
@@ -38,81 +41,103 @@
expected_socket_write_errors = tuple(tmp_dict.keys())
del tmp_dict
-class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
+class SizedMessageAsyncConnection(asyncore.dispatcher):
+ __super_init = asyncore.dispatcher.__init__
+ __super_close = asyncore.dispatcher.close
+
+ __closed = 1 # Marker indicating that we're closed
- __append=None # Marker indicating that we're closed
+ socket = None # to outwit Sam's getattr
- socket=None # to outwit Sam's getattr
+ READ_SIZE = 8096
def __init__(self, sock, addr, map=None, debug=None):
- SizedMessageAsyncConnection.inheritedAttribute(
- '__init__')(self, sock, map)
- self.addr=addr
- if debug is None and __debug__:
- self._debug = zLogger("smac")
- else:
+ self.addr = addr
+ if debug is not None:
self._debug = debug
- self.__state=None
- self.__inp=None
- self.__inpl=0
- self.__l=4
- self.__output=output=[]
- self.__append=output.append
- self.__pop=output.pop
-
- def handle_read(self,
- join=string.join, StringType=type(''), _type=type,
- _None=None):
-
+ elif not hasattr(self, '_debug'):
+ self._debug = __debug__ and 'smac'
+ self.__state = None
+ self.__inp = None # None, a single String, or a list
+ self.__input_len = 0
+ self.__msg_size = 4
+ self.__output = []
+ self.__closed = None
+ self.__super_init(sock, map)
+
+ # XXX avoid expensive getattr calls? Can't remember exactly what
+ # this comment was supposed to mean, but it has something to do
+ # with the way asyncore uses getattr and uses if sock:
+ def __nonzero__(self):
+ return 1
+
+ def handle_read(self):
+ # Use a single __inp buffer and integer indexes to make this
+ # fast.
try:
d=self.recv(8096)
except socket.error, err:
if err[0] in expected_socket_read_errors:
return
raise
- if not d: return
+ if not d:
+ return
- inp=self.__inp
- if inp is _None:
- inp=d
- elif _type(inp) is StringType:
- inp=[inp,d]
+ input_len = self.__input_len + len(d)
+ msg_size = self.__msg_size
+ state = self.__state
+
+ inp = self.__inp
+ if msg_size > input_len:
+ if inp is None:
+ self.__inp = d
+ elif type(self.__inp) is StringType:
+ self.__inp = [self.__inp, d]
+ else:
+ self.__inp.append(d)
+ self.__input_len = input_len
+ return # keep waiting for more input
+
+ # load all previous input and d into single string inp
+ if isinstance(inp, StringType):
+ inp = inp + d
+ elif inp is None:
+ inp = d
else:
inp.append(d)
+ inp = "".join(inp)
- inpl=self.__inpl+len(d)
- l=self.__l
-
- while 1:
-
- if l <= inpl:
- # Woo hoo, we have enough data
- if _type(inp) is not StringType: inp=join(inp,'')
- d=inp[:l]
- inp=inp[l:]
- inpl=inpl-l
- if self.__state is _None:
- # waiting for message
- l=struct.unpack(">i",d)[0]
- self.__state=1
- else:
- l=4
- self.__state=_None
- self.message_input(d)
+ offset = 0
+ while (offset + msg_size) <= input_len:
+ msg = inp[offset:offset + msg_size]
+ offset = offset + msg_size
+ if state is None:
+ # waiting for message
+ msg_size = struct.unpack(">i", msg)[0]
+ state = 1
else:
- break # not enough data
-
- self.__l=l
- self.__inp=inp
- self.__inpl=inpl
-
- def readable(self): return 1
- def writable(self): return not not self.__output
+ msg_size = 4
+ state = None
+ self.message_input(msg)
+
+ self.__state = state
+ self.__msg_size = msg_size
+ self.__inp = inp[offset:]
+ self.__input_len = input_len - offset
+
+ def readable(self):
+ return 1
+
+ def writable(self):
+ if len(self.__output) == 0:
+ return 0
+ else:
+ return 1
def handle_write(self):
- output=self.__output
+ output = self.__output
while output:
- v=output[0]
+ v = output[0]
try:
n=self.send(v)
except socket.error, err:
@@ -120,37 +145,33 @@
break # we couldn't write anything
raise
if n < len(v):
- output[0]=v[n:]
+ output[0] = v[n:]
break # we can't write any more
else:
del output[0]
- #break # waaa
-
def handle_close(self):
self.close()
- def message_output(self, message,
- pack=struct.pack, len=len):
- if self._debug is not None:
- if len(message) > 40:
- m = message[:40]+' ...'
- else:
- m = message
- self._debug.trace('message_output %s' % `m`)
+ def message_output(self, message):
+ if __debug__:
+ if self._debug:
+ if len(message) > 40:
+ m = message[:40]+' ...'
+ else:
+ m = message
+ LOG(self._debug, TRACE, 'message_output %s' % `m`)
- append=self.__append
- if append is None:
- raise Disconnected("This action is temporarily unavailable.<p>")
-
- append(pack(">i",len(message))+message)
+ if self.__closed is not None:
+ raise Disconnected, (
+ "This action is temporarily unavailable."
+ "<p>"
+ )
+ # do two separate appends to avoid copying the message string
+ self.__output.append(struct.pack(">i", len(message)))
+ self.__output.append(message)
def close(self):
- if self.__append is not None:
- self.__append=None
- SizedMessageAsyncConnection.inheritedAttribute('close')(self)
-
-class Disconnected(Exception):
- """The client has become disconnected from the server
- """
-
+ if self.__closed is None:
+ self.__closed = 1
+ self.__super_close()
=== ZEO/ZEO/start.py 1.32 => 1.33 ===
#
##############################################################################
-
"""Start the server storage.
"""
@@ -19,13 +18,16 @@
import sys, os, getopt, string
+import StorageServer
+import asyncore
+
def directory(p, n=1):
d=p
while n:
d=os.path.split(d)[0]
if not d or d=='.': d=os.getcwd()
n=n-1
-
+
return d
def get_storage(m, n, cache={}):
@@ -44,9 +46,11 @@
def main(argv):
me=argv[0]
- sys.path[:]==filter(None, sys.path)
sys.path.insert(0, directory(me, 2))
+ # XXX hack for profiling support
+ global unix, storages, zeo_pid, asyncore
+
args=[]
last=''
for a in argv[1:]:
@@ -77,23 +81,22 @@
fs = os.path.join(var, 'Data.fs')
- usage = """%s [options] [filename]
+ usage="""%s [options] [filename]
where options are:
-D -- Run in debug mode
- -d -- Generate detailed debug logging without running
- in the foreground.
+ -d -- Set STUPD_LOG_SEVERITY to -300
-U -- Unix-domain socket file to listen on
-
+
-u username or uid number
The username to run the ZEO server as. You may want to run
the ZEO server as 'nobody' or some other user with limited
- resouces. The only works under Unix, and if the storage
- server is started by root.
+ resouces. The only works under Unix, and if ZServer is
+ started by root.
-p port -- port to listen on
@@ -116,30 +119,47 @@
attr_name -- This is the name to which the storage object
is assigned in the module.
+ -P file -- Run under profile and dump output to file. Implies the
+ -s flag.
+
if no file name is specified, then %s is used.
""" % (me, fs)
try:
- opts, args = getopt.getopt(args, 'p:Ddh:U:sS:u:')
- except getopt.error, err:
- print err
+ opts, args = getopt.getopt(args, 'p:Dh:U:sS:u:P:d')
+ except getopt.error, msg:
print usage
+ print msg
sys.exit(1)
- port=None
- debug=detailed=0
- host=''
- unix=None
- Z=1
- UID='nobody'
+ port = None
+ debug = 0
+ host = ''
+ unix =None
+ Z = 1
+ UID = 'nobody'
+ prof = None
+ detailed = 0
for o, v in opts:
- if o=='-p': port=string.atoi(v)
- elif o=='-h': host=v
- elif o=='-U': unix=v
- elif o=='-u': UID=v
- elif o=='-D': debug=1
- elif o=='-d': detailed=1
- elif o=='-s': Z=0
+ if o=='-p':
+ port = int(v)
+ elif o=='-h':
+ host = v
+ elif o=='-U':
+ unix = v
+ elif o=='-u':
+ UID = v
+ elif o=='-D':
+ debug = 1
+ elif o=='-d':
+ detailed = 1
+ elif o=='-s':
+ Z = 0
+ elif o=='-P':
+ prof = v
+
+ if prof:
+ Z = 0
if port is None and unix is None:
print usage
@@ -153,14 +173,16 @@
sys.exit(1)
fs=args[0]
- if debug: os.environ['Z_DEBUG_MODE']='1'
-
- if detailed: os.environ['STUPID_LOG_SEVERITY']='-99999'
+ __builtins__.__debug__=debug
+ if debug:
+ os.environ['Z_DEBUG_MODE'] = '1'
+ if detailed:
+ os.environ['STUPID_LOG_SEVERITY'] = '-300'
from zLOG import LOG, INFO, ERROR
# Try to set uid to "-u" -provided uid.
- # Try to set gid to "-u" user's primary group.
+ # Try to set gid to "-u" user's primary group.
# This will only work if this script is run by root.
try:
import pwd
@@ -175,7 +197,7 @@
uid = pwd.getpwuid(UID)[2]
gid = pwd.getpwuid(UID)[3]
else:
- raise KeyError
+ raise KeyError
try:
if gid is not None:
try:
@@ -200,7 +222,7 @@
try:
import ZEO.StorageServer, asyncore
-
+
storages={}
for o, v in opts:
if o=='-S':
@@ -243,15 +265,15 @@
if not unix: unix=host, port
- ZEO.StorageServer.StorageServer(unix, storages)
-
+ StorageServer.StorageServer(unix, storages)
+
try:
ppid, pid = os.getppid(), os.getpid()
except:
pass # getpid not supported
else:
open(zeo_pid,'w').write("%s %s" % (ppid, pid))
-
+
except:
# Log startup exception and tell zdaemon not to restart us.
info = sys.exc_info()
@@ -269,7 +291,6 @@
asyncore.loop()
-
def rotate_logs():
import zLOG
if hasattr(zLOG.log_write, 'reinitialize'):
@@ -292,29 +313,21 @@
# unnecessary, since we now use so_reuseaddr.
for ignored in 1,2:
for socket in asyncore.socket_map.values():
- try:
- socket.close()
- except:
- pass
+ try: socket.close()
+ except: pass
for storage in storages.values():
- try:
- storage.close()
- except:
- pass
+ try: storage.close()
+ finally: pass
try:
from zLOG import LOG, INFO
LOG('ZEO Server', INFO,
"Shutting down (%s)" % (die and "shutdown" or "restart")
)
- except:
- pass
-
- if die:
- sys.exit(0)
- else:
- sys.exit(1)
+ except: pass
+
+ if die: sys.exit(0)
+ else: sys.exit(1)
-if __name__ == '__main__':
- main(sys.argv)
+if __name__=='__main__': main(sys.argv)
=== ZEO/ZEO/trigger.py 1.5 => 1.6 ===
#
##############################################################################
-
-# This module is a simplified version of the select_trigger module
-# from Sam Rushing's Medusa server.
-
import asyncore
-import errno
+
import os
import socket
import string
import thread
-
+
if os.name == 'posix':
- class trigger(asyncore.file_dispatcher):
+ class trigger (asyncore.file_dispatcher):
"Wake up a call to select() running in the main thread"
@@ -56,46 +52,50 @@
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
- def __init__(self):
+ def __init__ (self):
r, w = self._fds = os.pipe()
self.trigger = w
- asyncore.file_dispatcher.__init__(self, r)
+ asyncore.file_dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock()
self.thunks = []
+ self._closed = None
- def __del__(self):
- os.close(self._fds[0])
- os.close(self._fds[1])
+ # Override the asyncore close() method, because it seems that
+ # it would only close the r file descriptor and not w. The
+ # constructor calls file_dispactcher.__init__ and passes r,
+ # which would get stored in a file_wrapper and get closed by
+ # the default close. But that would leave w open...
+
+ def close(self):
+ if self._closed is None:
+ self._closed = 1
+ self.del_channel()
+ for fd in self._fds:
+ os.close(fd)
- def __repr__(self):
- return '<select-trigger(pipe) at %x>' % id(self)
+ def __repr__ (self):
+ return '<select-trigger (pipe) at %x>' % id(self)
- def readable(self):
+ def readable (self):
return 1
- def writable(self):
+ def writable (self):
return 0
- def handle_connect(self):
+ def handle_connect (self):
pass
- def pull_trigger(self, thunk=None):
- # print 'PULL_TRIGGER: ', len(self.thunks)
+ def pull_trigger (self, thunk=None):
if thunk:
try:
self.lock.acquire()
- self.thunks.append(thunk)
+ self.thunks.append (thunk)
finally:
self.lock.release()
- os.write(self.trigger, 'x')
+ os.write (self.trigger, 'x')
- def handle_read(self):
- try:
- self.recv(8192)
- except os.error, err:
- if err[0] == errno.EAGAIN: # resource temporarily unavailable
- return
- raise
+ def handle_read (self):
+ self.recv (8192)
try:
self.lock.acquire()
for thunk in self.thunks:
@@ -104,7 +104,7 @@
except:
nil, t, v, tbinfo = asyncore.compact_traceback()
print ('exception in trigger thunk:'
- '(%s:%s %s)' % (t, v, tbinfo))
+ ' (%s:%s %s)' % (t, v, tbinfo))
self.thunks = []
finally:
self.lock.release()
@@ -116,13 +116,13 @@
# win32-safe version
- class trigger(asyncore.dispatcher):
+ class trigger (asyncore.dispatcher):
address = ('127.9.9.9', 19999)
- def __init__(self):
- a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ def __init__ (self):
+ a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+ w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
# set TCP_NODELAY to true to avoid buffering
w.setsockopt(socket.IPPROTO_TCP, 1, 1)
@@ -139,51 +139,46 @@
if port <= 19950:
raise 'Bind Error', 'Cannot bind trigger!'
port=port - 1
-
- a.listen(1)
- w.setblocking(0)
+
+ a.listen (1)
+ w.setblocking (0)
try:
- w.connect(self.address)
+ w.connect (self.address)
except:
pass
r, addr = a.accept()
a.close()
- w.setblocking(1)
+ w.setblocking (1)
self.trigger = w
- asyncore.dispatcher.__init__(self, r)
+ asyncore.dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock()
self.thunks = []
self._trigger_connected = 0
- def __repr__(self):
+ def __repr__ (self):
return '<select-trigger (loopback) at %x>' % id(self)
- def readable(self):
+ def readable (self):
return 1
- def writable(self):
+ def writable (self):
return 0
- def handle_connect(self):
+ def handle_connect (self):
pass
- def pull_trigger(self, thunk=None):
+ def pull_trigger (self, thunk=None):
if thunk:
try:
self.lock.acquire()
- self.thunks.append(thunk)
+ self.thunks.append (thunk)
finally:
self.lock.release()
- self.trigger.send('x')
+ self.trigger.send ('x')
- def handle_read(self):
- try:
- self.recv(8192)
- except os.error, err:
- if err[0] == errno.EAGAIN: # resource temporarily unavailable
- return
- raise
+ def handle_read (self):
+ self.recv (8192)
try:
self.lock.acquire()
for thunk in self.thunks: