[Zope3-checkins] CVS: Zope3/src/zodb/zeo - __init__.py:1.2 cache.py:1.2 client.py:1.2 commitlog.py:1.2 interfaces.py:1.2 runsvr.py:1.2 server.py:1.2 simul.py:1.2 stats.py:1.2 stubs.py:1.2 tbuf.py:1.2 threadedasync.py:1.2 utils.py:1.2
Jim Fulton
jim@zope.com
Wed, 25 Dec 2002 09:13:55 -0500
Update of /cvs-repository/Zope3/src/zodb/zeo
In directory cvs.zope.org:/tmp/cvs-serv15352/src/zodb/zeo
Added Files:
__init__.py cache.py client.py commitlog.py interfaces.py
runsvr.py server.py simul.py stats.py stubs.py tbuf.py
threadedasync.py utils.py
Log Message:
Grand renaming:
- Renamed most files (especially python modules) to lower case.
- Moved views and interfaces into separate hierarchies within each
project, where each top-level directory under the zope package
is a separate project.
- Moved everything to src from lib/python.
lib/python will eventually go away. I need access to the cvs
repository to make this happen, however.
There are probably some bits that are broken. All tests pass
and zope runs, but I haven't tried everything. There are a number
of cleanups I'll work on tomorrow.
=== Zope3/src/zodb/zeo/__init__.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:53 2002
+++ Zope3/src/zodb/zeo/__init__.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,24 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""ZEO -- Zope Enterprise Objects.
+
+See the file README.txt in this directory for an overview.
+
+ZEO's home on the web is
+
+ http://www.zope.org/Products/ZEO/
+
+"""
+
+version = "2.0+"
=== Zope3/src/zodb/zeo/cache.py 1.1 => 1.2 === (580/680 lines abridged)
--- /dev/null Wed Dec 25 09:13:53 2002
+++ Zope3/src/zodb/zeo/cache.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,677 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+
+# XXX TO DO
+# use two indices rather than the sign bit of the index??????
+# add a shared routine to read + verify a record???
+# redesign header to include vdlen???
+# rewrite the cache using a different algorithm???
+
+"""Implement a client cache
+
+The cache is managed as two files.
+
+The cache can be persistent (meaning it is survives a process restart)
+or temporary. It is persistent if the client argument is not None.
+
+Persistent cache files live in the var directory and are named
+'c<storage>-<client>-<digit>.zec' where <storage> is the storage
+argument (default '1'), <client> is the client argument, and <digit> is
+0 or 1. Temporary cache files are unnamed files in the standard
+temporary directory as determined by the tempfile module.
+
+The ClientStorage overrides the client name default to the value of
+the environment variable ZEO_CLIENT, if it exists.
+
+Each cache file has a 4-byte magic number followed by a sequence of
+records of the form:
+
+ offset in record: name -- description
+
+ 0: oid -- 8-byte object id
+
+ 8: status -- 1-byte status 'v': valid, 'n': non-version valid, 'i': invalid
+ ('n' means only the non-version data in the record is valid)
+
+ 9: tlen -- 4-byte (unsigned) record length
[-=- -=- -=- 580 lines omitted -=- -=- -=-]
+ if vlen+dlen+43+vdlen != tlen:
+ self.rilog("inconsistent lengths", pos, fileindex)
+ break
+ seek(vdlen, 1)
+ vs = read(8)
+ if read(4) != h[9:13]:
+ self.rilog("inconsistent tlen", pos, fileindex)
+ break
+ else:
+ if h[8] in 'vn' and vlen == 0:
+ if dlen+31 != tlen:
+ self.rilog("inconsistent nv lengths", pos, fileindex)
+ seek(dlen, 1)
+ if read(4) != h[9:13]:
+ self.rilog("inconsistent nv tlen", pos, fileindex)
+ break
+ vs = None
+
+ if h[8] in 'vn':
+ if fileindex:
+ index[oid] = -pos
+ else:
+ index[oid] = pos
+ serial[oid] = h[-8:], vs
+ else:
+ if serial.has_key(oid):
+ # We have a record for this oid, but it was invalidated!
+ del serial[oid]
+ del index[oid]
+
+
+ pos = pos + tlen
+ count += 1
+
+ f.seek(pos)
+ try:
+ f.truncate()
+ except:
+ pass
+
+ if count:
+ self.log("read_index: cache file %d has %d records and %d bytes",
+ fileindex, count, pos)
+
+ return pos
+
+ def rilog(self, msg, pos, fileindex):
+ # Helper to log certain messages from read_index
+ self.log("read_index: %s at position %d in cache file %d",
+ msg, pos, fileindex)
=== Zope3/src/zodb/zeo/client.py 1.1 => 1.2 === (739/839 lines abridged)
--- /dev/null Wed Dec 25 09:13:53 2002
+++ Zope3/src/zodb/zeo/client.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,836 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""The ClientStorage class and the exceptions that it may raise.
+
+Public contents of this module:
+
+Disconnected -- exception base class
+ClientStorage -- the main class, implementing the Storage API
+ClientStorageError -- exception raised by ClientStorage
+UnrecognizedResult -- exception raised by ClientStorage
+ClientDisconnected -- exception raised by ClientStorage
+"""
+
+# XXX TO DO
+# get rid of beginVerify, set up _tfile in verify_cache
+# set self._storage = stub later, in endVerify
+# if wait is given, wait until verify is complete
+
+import cPickle
+import os
+import socket
+import tempfile
+import threading
+import time
+import types
+import logging
+
+from zodb.zeo import cache, stubs
+from zodb.zeo.interfaces import Disconnected
+from zodb.zeo.tbuf import TransactionBuffer
+from zodb.zeo.zrpc.client import ConnectionManager
+
+from zodb import interfaces
+from zodb.timestamp import TimeStamp
+
+try:
[-=- -=- -=- 739 lines omitted -=- -=- -=-]
+ # Queue an invalidate for the end the verification procedure.
+ if self._pickler is None:
+ # XXX This should never happen
+ return
+ self._pickler.dump(args)
+
+ def endVerify(self):
+ """Server callback to signal end of cache validation."""
+ if self._pickler is None:
+ return
+ self._pickler.dump((0,0))
+ self._tfile.seek(0)
+ unpick = cPickle.Unpickler(self._tfile)
+ f = self._tfile
+ self._tfile = None
+
+ while 1:
+ oid, version = unpick.load()
+ if not oid:
+ break
+ self._cache.invalidate(oid, version=version)
+ self._db.invalidate(oid, version=version)
+ f.close()
+
+ def invalidateTrans(self, args):
+ """Server callback to invalidate a list of (oid, version) pairs.
+
+ This is called as the result of a transaction.
+ """
+ for oid, version in args:
+ self._cache.invalidate(oid, version=version)
+ try:
+ self._db.invalidate(oid, version=version)
+ except AttributeError, msg:
+ self.logger.error("Invalidate(%r, %r) failed for _db: %s",
+ oid, version, msg)
+
+ # XXX In Zope3, there's no reason to stick to the ZEO 2 protocol!
+
+ # Unfortunately, the ZEO 2 wire protocol uses different names for
+ # several of the callback methods invoked by the StorageServer.
+ # We can't change the wire protocol at this point because that
+ # would require synchronized updates of clients and servers and we
+ # don't want that. So here we alias the old names to their new
+ # implementations.
+
+ begin = beginVerify
+ invalidate = invalidateVerify
+ end = endVerify
+ Invalidate = invalidateTrans
=== Zope3/src/zodb/zeo/commitlog.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:53 2002
+++ Zope3/src/zodb/zeo/commitlog.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,41 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Log a transaction's commit info during two-phase commit.
+
+A storage server allows multiple clients to commit transactions, but
+must serialize them as the actually execute at the server. The
+concurrent commits are achieved by logging actions up until the
+tpc_vote(). At that point, the entire transaction is committed on the
+real storage.
+"""
+import cPickle
+import tempfile
+
+class CommitLog:
+
+ def __init__(self):
+ self.file = tempfile.TemporaryFile(suffix=".log")
+ self.pickler = cPickle.Pickler(self.file, 1)
+ self.pickler.fast = 1
+ self.stores = 0
+ self.read = 0
+
+ def store(self, oid, serial, data, version):
+ self.pickler.dump((oid, serial, data, version))
+ self.stores += 1
+
+ def get_loader(self):
+ self.read = 1
+ self.file.seek(0)
+ return self.stores, cPickle.Unpickler(self.file)
=== Zope3/src/zodb/zeo/interfaces.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:53 2002
+++ Zope3/src/zodb/zeo/interfaces.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,93 @@
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""ZEO interfaces and exceptions.
+
+$Id$
+"""
+
+from zope.interface import Interface
+
+class Disconnected(Exception):
+ """A client is disconnected from a server."""
+
+class ICache(Interface):
+ """ZEO client cache.
+
+ __init__(storage, size, client, var)
+
+ All arguments optional.
+
+ storage -- name of storage
+ size -- max size of cache in bytes
+ client -- a string; if specified, cache is persistent.
+ var -- var directory to store cache files in
+ """
+
+ def open():
+ """Returns a sequence of object info tuples.
+
+ An object info tuple is a pair containing an object id and a
+ pair of serialnos, a non-version serialno and a version serialno:
+ oid, (serial, ver_serial)
+
+ This method builds an index of the cache and returns a
+ sequence used for cache validation.
+ """
+
+ def close():
+ """Closes the cache."""
+
+ def verify(func):
+ """Call func on every object in cache.
+
+ func is called with three arguments
+ func(oid, serial, ver_serial)
+ """
+
+ def invalidate(oid, version):
+ """Remove object from cache."""
+
+ def load(oid, version):
+ """Load object from cache.
+
+ Return None if object not in cache.
+ Return data, serialno if object is in cache.
+ """
+
+ def store(oid, p, s, version, pv, sv):
+ """Store a new object in the cache."""
+
+ def update(oid, serial, version, data):
+ """Update an object already in the cache.
+
+ XXX This method is called to update objects that were modified by
+ a transaction. It's likely that it is already in the cache,
+ and it may be possible for the implementation to operate more
+ efficiently.
+ """
+
+ def modifiedInVersion(oid):
+ """Return the version an object is modified in.
+
+ '' signifies the trunk.
+ Returns None if the object is not in the cache.
+ """
+
+ def checkSize(size):
+ """Check if adding size bytes would exceed cache limit.
+
+ This method is often called just before store or update. The
+ size is a hint about the amount of data that is about to be
+ stored. The cache may want to evict some data to make space.
+ """
=== Zope3/src/zodb/zeo/runsvr.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:53 2002
+++ Zope3/src/zodb/zeo/runsvr.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,403 @@
+#! /usr/bin/env python
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Start the ZEO storage server.
+
+Usage: %s [-C URL] [-a ADDRESS] [-f FILENAME] [-h]
+
+Options:
+-C/--configuration URL -- configuration file or URL
+-a/--address ADDRESS -- server address of the form PORT, HOST:PORT, or PATH
+ (a PATH must contain at least one "/")
+-f/--filename FILENAME -- filename for FileStorage
+-h/--help -- print this usage message and exit
+
+Unless -C is specified, -a and -f are required.
+"""
+
+# The code here is designed to be reused by other, similar servers.
+# For the forseeable future, it must work under Python 2.1 as well as
+# 2.2 and above.
+
+# XXX The option parsing infrastructure could be shared with zdaemon.py
+
+import os
+import sys
+import getopt
+import signal
+import socket
+import logging
+
+import ZConfig
+
+
+class Options:
+
+ """A class to parse and hold the command line options.
+
+ Options are represented by various attributes (zeoport etc.).
+ Positional arguments are represented by the args attribute.
+
+ This also has a public usage() method that can be used to report
+ errors related to the command line.
+ """
+
+ configuration = None
+ rootconf = None
+
+ args = []
+
+ def __init__(self, args=None, progname=None, doc=None):
+ """Constructor.
+
+ Optional arguments:
+
+ args -- the command line arguments, less the program name
+ (default is sys.argv[1:] at the time of call)
+
+ progname -- the program name (default sys.argv[0])
+
+ doc -- usage message (default, __main__.__doc__)
+ """
+
+ if args is None:
+ args = sys.argv[1:]
+ if progname is None:
+ progname = sys.argv[0]
+ self.progname = progname
+ if doc is None:
+ import __main__
+ doc = __main__.__doc__
+ if doc and not doc.endswith("\n"):
+ doc += "\n"
+ self.doc = doc
+ try:
+ self.options, self.args = getopt.getopt(args,
+ self._short_options,
+ self._long_options)
+ except getopt.error, msg:
+ self.usage(str(msg))
+ for opt, arg in self.options:
+ self.handle_option(opt, arg)
+ self.check_options()
+
+ # Default set of options. Subclasses should override.
+ _short_options = "C:h"
+ _long_options = ["--configuration=", "--help"]
+
+ def handle_option(self, opt, arg):
+ """Handle one option. Subclasses should override.
+
+ This sets the various instance variables overriding the defaults.
+
+ When -h is detected, print the module docstring to stdout and exit(0).
+ """
+ if opt in ("-C", "--configuration"):
+ self.set_configuration(arg)
+ if opt in ("-h", "--help"):
+ self.help()
+
+ def set_configuration(self, arg):
+ self.configuration = arg
+
+ def check_options(self):
+ """Check options. Subclasses may override.
+
+ This can be used to ensure certain options are set, etc.
+ """
+ self.load_configuration()
+
+ def load_configuration(self):
+ if self.rootconf or not self.configuration:
+ return
+ self.rootconf = ZConfig.load(self.configuration)
+
+ def help(self):
+ """Print a long help message (self.doc) to stdout and exit(0).
+
+ Occurrences of "%s" in self.doc are replaced by self.progname.
+ """
+ doc = self.doc
+ if doc.find("%s") > 0:
+ doc = doc.replace("%s", self.progname)
+ print doc
+ sys.exit(0)
+
+ def usage(self, msg):
+ """Print a brief error message to stderr and exit(2)."""
+ sys.stderr.write("Error: %s\n" % str(msg))
+ sys.stderr.write("For help, use %s -h\n" % self.progname)
+ sys.exit(2)
+
+
+class ZEOOptions(Options):
+
+ hostname = None # A subclass may set this
+ hostconf = None # <Host> section
+ zeoconf = None # <ZEO> section
+ logconf = None # <Log> section
+
+ family = None # set by -a; AF_UNIX or AF_INET
+ address = None # set by -a; string or (host, port)
+ storages = None # set by -f
+
+ _short_options = "a:C:f:h"
+ _long_options = [
+ "--address=",
+ "--configuration=",
+ "--filename=",
+ "--help",
+ ]
+
+ def handle_option(self, opt, arg):
+ # Alphabetical order please!
+ if opt in ("-a", "--address"):
+ if "/" in arg:
+ self.family = socket.AF_UNIX
+ self.address = arg
+ else:
+ self.family = socket.AF_INET
+ if ":" in arg:
+ host, port = arg.split(":", 1)
+ else:
+ host = ""
+ port = arg
+ try:
+ port = int(port)
+ except: # int() can raise all sorts of errors
+ self.usage("invalid port number: %r" % port)
+ self.address = (host, port)
+ elif opt in ("-f", "--filename"):
+ from zodb.storage.file import FileStorage
+ if not self.storages:
+ self.storages = {}
+ key = str(1 + len(self.storages))
+ self.storages[key] = (FileStorage, {"file_name": arg})
+ else:
+ # Pass it to the base class, for --help/-h
+ Options.handle_option(self, opt, arg)
+
+ def check_options(self):
+ Options.check_options(self) # Calls load_configuration()
+ if not self.storages:
+ self.usage("no storages specified; use -f or -C")
+ if self.family is None:
+ self.usage("no server address specified; use -a or -C")
+ if self.args:
+ self.usage("positional arguments are not supported")
+
+ def load_configuration(self):
+ Options.load_configuration(self) # Sets self.rootconf
+ if not self.rootconf:
+ return
+ try:
+ self.hostconf = self.rootconf.getSection("Host")
+ except ZConfig.ConfigurationConflictingSectionError:
+ if not self.hostname:
+ self.hostname = socket.getfqdn()
+ self.hostconf = self.rootconf.getSection("Host", self.hostname)
+ if self.hostconf is None:
+ # If no <Host> section exists, fall back to the root
+ self.hostconf = self.rootconf
+ self.zeoconf = self.hostconf.getSection("ZEO")
+ if self.zeoconf is None:
+ # If no <ZEO> section exists, fall back to the host (or root)
+ self.zeoconf = self.hostconf
+ self.logconf = self.hostconf.getSection("Log")
+
+ # Now extract options from various configuration sections
+ self.load_zeoconf()
+ self.load_storages()
+
+ def load_zeoconf(self):
+ # Get some option defaults from the configuration
+ if self.family:
+ # -a option overrides
+ return
+ port = self.zeoconf.getint("server-port")
+ path = self.zeoconf.get("path")
+ if port and path:
+ self.usage(
+ "Configuration contains conflicting ZEO information:\n"
+ "Exactly one of 'path' and 'server-port' may be given.")
+ if port:
+ host = self.hostconf.get("hostname", "")
+ self.family = socket.AF_INET
+ self.address = (host, port)
+ elif path:
+ self.family = socket.AF_UNIX
+ self.address = path
+
+ def load_storages(self):
+ # Get the storage specifications
+ if self.storages:
+ # -f option overrides
+ return
+ storagesections = self.zeoconf.getChildSections("Storage")
+ self.storages = {}
+ from zodb.config import getStorageInfo
+ for section in storagesections:
+ name = section.name
+ if not name:
+ name = str(1 + len(self.storages))
+ if self.storages.has_key(name):
+ # (Actually, the parser doesn't allow this)
+ self.usage("duplicate storage name %r" % name)
+ self.storages[name] = getStorageInfo(section)
+
+
+class ZEOServer:
+
+ OptionsClass = ZEOOptions
+
+ def __init__(self, options=None):
+ if options is None:
+ options = self.OptionsClass()
+ self.options = options
+
+ def main(self):
+ self.check_socket()
+ self.clear_socket()
+ try:
+ self.open_storages()
+ self.setup_signals()
+ self.create_server()
+ self.loop_forever()
+ finally:
+ self.close_storages()
+ self.clear_socket()
+
+ def check_socket(self):
+ if self.can_connect(self.options.family, self.options.address):
+ self.options.usage("address %s already in use" %
+ repr(self.options.address))
+
+ def can_connect(self, family, address):
+ s = socket.socket(family, socket.SOCK_STREAM)
+ try:
+ s.connect(address)
+ except socket.error:
+ return 0
+ else:
+ s.close()
+ return 1
+
+ def clear_socket(self):
+ if isinstance(self.options.address, type("")):
+ try:
+ os.unlink(self.options.address)
+ except os.error:
+ pass
+
+ def open_storages(self):
+ self.storages = {}
+ for name, (cls, args) in self.options.storages.items():
+ logging.info("open storage %r: %s.%s(**%r)",
+ name, cls.__module__, cls.__name__, args)
+ self.storages[name] = cls(**args)
+
+ def setup_signals(self):
+ """Set up signal handlers.
+
+ The signal handler for SIGFOO is a method handle_sigfoo().
+ If no handler method is defined for a signal, the signal
+ action is not changed from its initial value. The handler
+ method is called without additional arguments.
+ """
+ if os.name != "posix":
+ return
+ if hasattr(signal, 'SIGXFSZ'):
+ signal.signal(signal.SIGXFSZ, signal.SIG_IGN) # Special case
+ init_signames()
+ for sig, name in signames.items():
+ method = getattr(self, "handle_" + name.lower(), None)
+ if method is not None:
+ def wrapper(sig_dummy, frame_dummy, method=method):
+ method()
+ signal.signal(sig, wrapper)
+
+ def create_server(self):
+ from zodb.zeo.server import StorageServer
+ self.server = StorageServer(self.options.address, self.storages)
+
+ def loop_forever(self):
+ from zodb.zeo import threadedasync
+ threadedasync.loop()
+
+ def handle_sigterm(self):
+ logging.info("terminated by SIGTERM")
+ sys.exit(0)
+
+ def handle_sigint(self):
+ logging.info("terminated by SIGINT")
+ sys.exit(0)
+
+ def handle_sigusr2(self):
+ # XXX What to do here?
+ logging.error("Don't know how to reinitialize log files yet")
+
+ def close_storages(self):
+ for name, storage in self.storages.items():
+ logging.info("closing storage %r", name)
+ try:
+ storage.close()
+ except: # Keep going
+ logging.exception("failed to close storage %r", name)
+
+
+# Signal names
+
+signames = None
+
+def signame(sig):
+ """Return a symbolic name for a signal.
+
+ Return "signal NNN" if there is no corresponding SIG name in the
+ signal module.
+ """
+
+ if signames is None:
+ init_signames()
+ return signames.get(sig) or "signal %d" % sig
+
+def init_signames():
+ global signames
+ signames = {}
+ for name, sig in signal.__dict__.items():
+ k_startswith = getattr(name, "startswith", None)
+ if k_startswith is None:
+ continue
+ if k_startswith("SIG") and not k_startswith("SIG_"):
+ signames[sig] = name
+
+
+# Main program
+
+def main(args=None):
+
+ # Initialize the logging module.
+ # XXX This is a temporary hack.
+ import logging.config
+ logging.basicConfig()
+ logging.root.setLevel(logging.CRITICAL)
+ # If log.ini exists, use it
+ if os.path.exists("log.ini"):
+ logging.config.fileConfig("log.ini")
+
+ options = ZEOOptions(args)
+ s = ZEOServer(options)
+ s.main()
+
+if __name__ == "__main__":
+ main()
=== Zope3/src/zodb/zeo/server.py 1.1 => 1.2 === (690/790 lines abridged)
--- /dev/null Wed Dec 25 09:13:53 2002
+++ Zope3/src/zodb/zeo/server.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,787 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""The StorageServer class and the exception that it may raise.
+
+This server acts as a front-end for one or more real storages, like
+file storage or Berkeley storage.
+
+XXX Need some basic access control-- a declaration of the methods
+exported for invocation by the server.
+"""
+
+import asyncore
+import cPickle
+import os
+import sys
+import threading
+import time
+import logging
+
+from zodb.zeo import stubs
+from zodb.zeo.commitlog import CommitLog
+from zodb.zeo.zrpc.server import Dispatcher
+from zodb.zeo.zrpc.connection import ManagedServerConnection, Delay, MTDelay
+
+from transaction.txn import Transaction
+from zodb.interfaces import StorageError, StorageTransactionError
+from zodb.interfaces import TransactionError, ReadOnlyError
+
+class StorageServerError(StorageError):
+ """Error reported when an unpickleable exception is raised."""
+
+class StorageServer:
+
+ """The server side implementation of ZEO.
+
+ The StorageServer is the 'manager' for incoming connections. Each
[-=- -=- -=- 690 lines omitted -=- -=- -=-]
+ meth = getattr(new_strategy, self.name)
+ return meth(*self.args)
+
+ def abort(self, zeo_storage):
+ # Delete (d, zeo_storage) from the _waiting list, if found.
+ waiting = self.storage._waiting
+ for i in range(len(waiting)):
+ d, z = waiting[i]
+ if z is zeo_storage:
+ del waiting[i]
+ break
+
+def run_in_thread(method, *args):
+ t = SlowMethodThread(method, args)
+ t.start()
+ return t.delay
+
+class SlowMethodThread(threading.Thread):
+ """Thread to run potentially slow storage methods.
+
+ Clients can use the delay attribute to access the MTDelay object
+ used to send a zrpc response at the right time.
+ """
+
+ # Some storage methods can take a long time to complete. If we
+ # run these methods via a standard asyncore read handler, they
+ # will block all other server activity until they complete. To
+ # avoid blocking, we spawn a separate thread, return an MTDelay()
+ # object, and have the thread reply() when it finishes.
+
+ def __init__(self, method, args):
+ threading.Thread.__init__(self)
+ self._method = method
+ self._args = args
+ self.delay = MTDelay()
+
+ def run(self):
+ try:
+ result = self._method(*self._args)
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except Exception:
+ self.delay.error(sys.exc_info())
+ else:
+ self.delay.reply(result)
+
+# Patch up class references
+StorageServer.ZEOStorageClass = ZEOStorage
+ZEOStorage.DelayedCommitStrategyClass = DelayedCommitStrategy
+ZEOStorage.ImmediateCommitStrategyClass = ImmediateCommitStrategy
=== Zope3/src/zodb/zeo/simul.py 1.1 => 1.2 === (667/767 lines abridged)
--- /dev/null Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/simul.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,764 @@
+#! /usr/bin/env python
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Cache simulation.
+
+Usage: simul.py [-bflyz] [-X] [-s size] tracefile
+
+Use one of -b, -f, -l, -y or -z select the cache simulator:
+-b: buddy system allocator
+-f: simple free list allocator
+-l: idealized LRU (no allocator)
+-y: variation on the existing ZEO cache that copies to current file
+-z: existing ZEO cache (default)
+
+Options:
+-s size: cache size in MB (default 20 MB)
+-X: enable heuristic checking for misaligned records: oids > 2**32
+ will be rejected; this requires the tracefile to be seekable
+
+Note: the buddy system allocator rounds the cache size up to a power of 2
+"""
+
+import sys
+import time
+import getopt
+import struct
+
+def usage(msg):
+ print >>sys.stderr, msg
+ print >>sys.stderr, __doc__
+
+def main():
+ # Parse options
+ MB = 1000*1000
+ cachelimit = 20*MB
+ simclass = ZEOCacheSimulation
[-=- -=- -=- 667 lines omitted -=- -=- -=-]
+ queue = []
+ T = 0
+ blocks = 0
+ while T < 5000:
+ while queue and queue[0][0] <= T:
+ time, node = heapq.heappop(queue)
+ assert time == T
+ ##print "free addr=%d, size=%d" % (node.addr, node.size)
+ cache.free(node)
+ blocks -= 1
+ size = random.randint(100, 2000)
+ lifetime = random.randint(1, 100)
+ node = cache.alloc(size)
+ if node is None:
+ print "out of mem"
+ cache.dump("T=%4d: %d blocks;" % (T, blocks))
+ break
+ else:
+ ##print "alloc addr=%d, size=%d" % (node.addr, node.size)
+ blocks += 1
+ heapq.heappush(queue, (T + lifetime, node))
+ T = T+1
+ if T % reportfreq == 0:
+ cache.dump("T=%4d: %d blocks;" % (T, blocks))
+
+def hitrate(loads, hits):
+ return "%5.1f%%" % (100.0 * hits / max(1, loads))
+
+def duration(secs):
+
+ mm, ss = divmod(secs, 60)
+ hh, mm = divmod(mm, 60)
+ if hh:
+ return "%d:%02d:%02d" % (hh, mm, ss)
+ if mm:
+ return "%d:%02d" % (mm, ss)
+ return "%d" % ss
+
+def addcommas(n):
+ sign, s = '', str(n)
+ if s[0] == '-':
+ sign, s = '-', s[1:]
+ i = len(s) - 3
+ while i > 0:
+ s = s[:i] + ',' + s[i:]
+ i -= 3
+ return sign + s
+
+if __name__ == "__main__":
+ sys.exit(main())
=== Zope3/src/zodb/zeo/stats.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/stats.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,405 @@
+#! /usr/bin/env python
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Trace file statistics analyzer.
+
+Usage: stats.py [-h] [-i interval] [-q] [-s] [-S] [-v] [-X] tracefile
+-h: print histogram of object load frequencies
+-i: summarizing interval in minutes (default 15; max 60)
+-q: quiet; don't print summaries
+-s: print histogram of object sizes
+-S: don't print statistics
+-v: verbose; print each record
+-X: enable heuristic checking for misaligned records: oids > 2**32
+ will be rejected; this requires the tracefile to be seekable
+"""
+
+"""File format:
+
+Each record is 24 bytes, with the following layout. Numbers are
+big-endian integers.
+
+Offset Size Contents
+
+0 4 timestamp (seconds since 1/1/1970)
+4 3 data size, in 256-byte increments, rounded up
+7 1 code (see below)
+8 8 object id
+16 8 serial number
+
+The code at offset 7 packs three fields:
+
+Mask bits Contents
+
+0x80 1 set if there was a non-empty version string
+0x7e 6 function and outcome code
+0x01 1 current cache file (0 or 1)
+
+The function and outcome codes are documented in detail at the end of
+this file in the 'explain' dictionary. Note that the keys there (and
+also the arguments to _trace() in ClientStorage.py) are 'code & 0x7e',
+i.e. the low bit is always zero.
+"""
+
+import sys
+import time
+import getopt
+import struct
+
+def usage(msg):
+ print >>sys.stderr, msg
+ print >>sys.stderr, __doc__
+
+def main():
+ # Parse options
+ verbose = 0
+ quiet = 0
+ dostats = 1
+ print_size_histogram = 0
+ print_histogram = 0
+ interval = 900 # Every 15 minutes
+ heuristic = 0
+ try:
+ opts, args = getopt.getopt(sys.argv[1:], "hi:qsSvX")
+ except getopt.error, msg:
+ usage(msg)
+ return 2
+ for o, a in opts:
+ if o == '-h':
+ print_histogram = 1
+ if o == "-i":
+ interval = int(60 * float(a))
+ if interval <= 0:
+ interval = 60
+ elif interval > 3600:
+ interval = 3600
+ if o == "-q":
+ quiet = 1
+ verbose = 0
+ if o == "-s":
+ print_size_histogram = 1
+ if o == "-S":
+ dostats = 0
+ if o == "-v":
+ verbose = 1
+ if o == '-X':
+ heuristic = 1
+ if len(args) != 1:
+ usage("exactly one file argument required")
+ return 2
+ filename = args[0]
+
+ # Open file
+ if filename.endswith(".gz"):
+ # Open gzipped file
+ try:
+ import gzip
+ except ImportError:
+ print >>sys.stderr, "can't read gzipped files (no module gzip)"
+ return 1
+ try:
+ f = gzip.open(filename, "rb")
+ except IOError, msg:
+ print >>sys.stderr, "can't open %s: %s" % (filename, msg)
+ return 1
+ elif filename == '-':
+ # Read from stdin
+ f = sys.stdin
+ else:
+ # Open regular file
+ try:
+ f = open(filename, "rb")
+ except IOError, msg:
+ print >>sys.stderr, "can't open %s: %s" % (filename, msg)
+ return 1
+
+ # Read file, gathering statistics, and printing each record if verbose
+ rt0 = time.time()
+ bycode = {}
+ records = 0
+ versions = 0
+ t0 = te = None
+ datarecords = 0
+ datasize = 0L
+ file0 = file1 = 0
+ oids = {}
+ bysize = {}
+ bysizew = {}
+ total_loads = 0
+ byinterval = {}
+ thisinterval = None
+ h0 = he = None
+ offset = 0
+ f_read = f.read
+ struct_unpack = struct.unpack
+ try:
+ while 1:
+ r = f_read(8)
+ if len(r) < 8:
+ break
+ offset += 8
+ ts, code = struct_unpack(">ii", r)
+ if ts == 0:
+ # Must be a misaligned record caused by a crash
+ if not quiet:
+ print "Skipping 8 bytes at offset", offset-8,
+ print repr(r)
+ continue
+ oid = f_read(8)
+ if len(oid) < 8:
+ break
+ if heuristic and oid[:4] != '\0\0\0\0':
+ # Heuristic for severe data corruption
+ print "Seeking back over bad oid at offset", offset,
+ print repr(r)
+ f.seek(-8, 1)
+ continue
+ offset += 8
+ serial = f_read(8)
+ if len(serial) < 8:
+ break
+ offset += 8
+ records += 1
+ if t0 is None:
+ t0 = ts
+ thisinterval = t0 / interval
+ h0 = he = ts
+ te = ts
+ if ts / interval != thisinterval:
+ if not quiet:
+ dumpbyinterval(byinterval, h0, he)
+ byinterval = {}
+ thisinterval = ts / interval
+ h0 = ts
+ he = ts
+ dlen, code = code & 0x7fffff00, code & 0xff
+ if dlen:
+ datarecords += 1
+ datasize += dlen
+ version = '-'
+ if code & 0x80:
+ version = 'V'
+ versions += 1
+ current = code & 1
+ if current:
+ file1 += 1
+ else:
+ file0 += 1
+ code = code & 0x7e
+ bycode[code] = bycode.get(code, 0) + 1
+ byinterval[code] = byinterval.get(code, 0) + 1
+ if dlen:
+ if code & 0x70 == 0x20: # All loads
+ bysize[dlen] = d = bysize.get(dlen) or {}
+ d[oid] = d.get(oid, 0) + 1
+ elif code == 0x3A: # Update
+ bysizew[dlen] = d = bysizew.get(dlen) or {}
+ d[oid] = d.get(oid, 0) + 1
+ if verbose:
+ print "%s %d %02x %016x %016x %1s %s" % (
+ time.ctime(ts)[4:-5],
+ current,
+ code,
+ u64(oid),
+ u64(serial),
+ version,
+ dlen and str(dlen) or "")
+ if code & 0x70 == 0x20:
+ oids[oid] = oids.get(oid, 0) + 1
+ total_loads += 1
+ if code in (0x00, 0x70):
+ if not quiet:
+ dumpbyinterval(byinterval, h0, he)
+ byinterval = {}
+ thisinterval = ts / interval
+ h0 = he = ts
+ if not quiet:
+ print time.ctime(ts)[4:-5],
+ if code == 0x00:
+ print '='*20, "Restart", '='*20
+ else:
+ print '-'*20, "Flip->%d" % current, '-'*20
+ except KeyboardInterrupt:
+ print "\nInterrupted. Stats so far:\n"
+
+ f.close()
+ rte = time.time()
+ if not quiet:
+ dumpbyinterval(byinterval, h0, he)
+
+ # Error if nothing was read
+ if not records:
+ print >>sys.stderr, "No records processed"
+ return 1
+
+ # Print statistics
+ if dostats:
+ print
+ print "Read %s records (%s bytes) in %.1f seconds" % (
+ addcommas(records), addcommas(records*24), rte-rt0)
+ print "Versions: %s records used a version" % addcommas(versions)
+ print "First time: %s" % time.ctime(t0)
+ print "Last time: %s" % time.ctime(te)
+ print "Duration: %s seconds" % addcommas(te-t0)
+ print "File stats: %s in file 0; %s in file 1" % (
+ addcommas(file0), addcommas(file1))
+ print "Data recs: %s (%.1f%%), average size %.1f KB" % (
+ addcommas(datarecords),
+ 100.0 * datarecords / records,
+ datasize / 1024.0 / datarecords)
+ print "Hit rate: %.1f%% (load hits / loads)" % hitrate(bycode)
+ print
+ codes = bycode.keys()
+ codes.sort()
+ print "%13s %4s %s" % ("Count", "Code", "Function (action)")
+ for code in codes:
+ print "%13s %02x %s" % (
+ addcommas(bycode.get(code, 0)),
+ code,
+ explain.get(code) or "*** unknown code ***")
+
+ # Print histogram
+ if print_histogram:
+ print
+ print "Histogram of object load frequency"
+ total = len(oids)
+ print "Unique oids: %s" % addcommas(total)
+ print "Total loads: %s" % addcommas(total_loads)
+ s = addcommas(total)
+ width = max(len(s), len("objects"))
+ fmt = "%5d %" + str(width) + "s %5.1f%% %5.1f%% %5.1f%%"
+ hdr = "%5s %" + str(width) + "s %6s %6s %6s"
+ print hdr % ("loads", "objects", "%obj", "%load", "%cum")
+ cum = 0.0
+ for binsize, count in histogram(oids):
+ obj_percent = 100.0 * count / total
+ load_percent = 100.0 * count * binsize / total_loads
+ cum += load_percent
+ print fmt % (binsize, addcommas(count),
+ obj_percent, load_percent, cum)
+
+ # Print size histogram
+ if print_size_histogram:
+ print
+ print "Histograms of object sizes"
+ print
+ dumpbysize(bysizew, "written", "writes")
+ dumpbysize(bysize, "loaded", "loads")
+
+def dumpbysize(bysize, how, how2):
+ print
+ print "Unique sizes %s: %s" % (how, addcommas(len(bysize)))
+ print "%10s %6s %6s" % ("size", "objs", how2)
+ sizes = bysize.keys()
+ sizes.sort()
+ for size in sizes:
+ loads = 0
+ for n in bysize[size].itervalues():
+ loads += n
+ print "%10s %6d %6d" % (addcommas(size),
+ len(bysize.get(size, "")),
+ loads)
+
+def dumpbyinterval(byinterval, h0, he):
+ loads = 0
+ hits = 0
+ for code in byinterval.keys():
+ if code & 0x70 == 0x20:
+ n = byinterval[code]
+ loads += n
+ if code in (0x2A, 0x2C, 0x2E):
+ hits += n
+ if not loads:
+ return
+ if loads:
+ hr = 100.0 * hits / loads
+ else:
+ hr = 0.0
+ print "%s-%s %10s loads, %10s hits,%5.1f%% hit rate" % (
+ time.ctime(h0)[4:-8], time.ctime(he)[14:-8],
+ addcommas(loads), addcommas(hits), hr)
+
+def hitrate(bycode):
+ loads = 0
+ hits = 0
+ for code in bycode.keys():
+ if code & 0x70 == 0x20:
+ n = bycode[code]
+ loads += n
+ if code in (0x2A, 0x2C, 0x2E):
+ hits += n
+ if loads:
+ return 100.0 * hits / loads
+ else:
+ return 0.0
+
+def histogram(d):
+ bins = {}
+ for v in d.itervalues():
+ bins[v] = bins.get(v, 0) + 1
+ L = bins.items()
+ L.sort()
+ return L
+
+def u64(v):
+ """Unpack an 8-byte string into a 64-bit long integer."""
+ return struct.unpack(">Q", v)[0]
+
+def addcommas(n):
+ sign, s = '', str(n)
+ if s[0] == '-':
+ sign, s = '-', s[1:]
+ i = len(s) - 3
+ while i > 0:
+ s = s[:i] + ',' + s[i:]
+ i -= 3
+ return sign + s
+
+explain = {
+ # The first hex digit shows the operation, the second the outcome.
+ # If the second digit is in "02468" then it is a 'miss'.
+ # If it is in "ACE" then it is a 'hit'.
+
+ 0x00: "_setup_trace (initialization)",
+
+ 0x10: "invalidate (miss)",
+ 0x1A: "invalidate (hit, version, writing 'n')",
+ 0x1C: "invalidate (hit, writing 'i')",
+
+ 0x20: "load (miss)",
+ 0x22: "load (miss, version, status 'n')",
+ 0x24: "load (miss, deleting index entry)",
+ 0x26: "load (miss, no non-version data)",
+ 0x28: "load (miss, version mismatch, no non-version data)",
+ 0x2A: "load (hit, returning non-version data)",
+ 0x2C: "load (hit, version mismatch, returning non-version data)",
+ 0x2E: "load (hit, returning version data)",
+
+ 0x3A: "update",
+
+ 0x40: "modifiedInVersion (miss)",
+ 0x4A: "modifiedInVersion (hit, return None, status 'n')",
+ 0x4C: "modifiedInVersion (hit, return '')",
+ 0x4E: "modifiedInVersion (hit, return version)",
+
+ 0x5A: "store (non-version data present)",
+ 0x5C: "store (only version data present)",
+
+ 0x6A: "_copytocurrent",
+
+ 0x70: "checkSize (cache flip)",
+ }
+
+if __name__ == "__main__":
+ sys.exit(main())
=== Zope3/src/zodb/zeo/stubs.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/stubs.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,200 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""RPC stubs for interface exported by ClientStorage."""
+
+class ClientStorage:
+
+ """An RPC stub class for the interface exported by ClientStorage.
+
+ This is the interface presented by ClientStorage to the
+ StorageServer; i.e. the StorageServer calls these methods and they
+ are executed in the ClientStorage.
+
+ See the ClientStorage class for documentation on these methods.
+
+ It is currently important that all methods here are asynchronous
+ (meaning they don't have a return value and the caller doesn't
+ wait for them to complete), *and* that none of them cause any
+ calls from the client to the storage. This is due to limitations
+ in the zrpc subpackage.
+
+ The on-the-wire names of some of the methods don't match the
+ Python method names. That's because the on-the-wire protocol was
+ fixed for ZEO 2 and we don't want to change it. There are some
+ aliases in ClientStorage.py to make up for this.
+ """
+
+ def __init__(self, rpc):
+ """Constructor.
+
+ The argument is a connection: an instance of the
+ zrpc.connection.Connection class.
+ """
+ self.rpc = rpc
+
+ def beginVerify(self):
+ self.rpc.callAsync('begin')
+
+ def invalidateVerify(self, args):
+ self.rpc.callAsync('invalidate', args)
+
+ def endVerify(self):
+ self.rpc.callAsync('end')
+
+ def invalidateTrans(self, args):
+ self.rpc.callAsync('Invalidate', args)
+
+ def serialnos(self, arg):
+ self.rpc.callAsync('serialnos', arg)
+
+ def info(self, arg):
+ self.rpc.callAsync('info', arg)
+
+
+"""RPC stubs for interface exported by StorageServer."""
+
+class StorageServer:
+
+ """An RPC stub class for the interface exported by ClientStorage.
+
+ This is the interface presented by the StorageServer to the
+ ClientStorage; i.e. the ClientStorage calls these methods and they
+ are executed in the StorageServer.
+
+ See the StorageServer module for documentation on these methods,
+ with the exception of _update(), which is documented here.
+ """
+
+ def __init__(self, rpc):
+ """Constructor.
+
+ The argument is a connection: an instance of the
+ zrpc.connection.Connection class.
+ """
+ self.rpc = rpc
+
+ def extensionMethod(self, name):
+ return ExtensionMethodWrapper(self.rpc, name).call
+
+ def _update(self):
+ """Handle pending incoming messages.
+
+ This method is typically only used when no asyncore mainloop
+ is already active. It can cause arbitrary callbacks from the
+ server to the client to be handled.
+ """
+ self.rpc.pending()
+
+ def register(self, storage_name, read_only):
+ self.rpc.call('register', storage_name, read_only)
+
+ def get_info(self):
+ return self.rpc.call('get_info')
+
+ def beginZeoVerify(self):
+ self.rpc.callAsync('beginZeoVerify')
+
+ def zeoVerify(self, oid, s, sv):
+ self.rpc.callAsync('zeoVerify', oid, s, sv)
+
+ def endZeoVerify(self):
+ self.rpc.callAsync('endZeoVerify')
+
+ def new_oids(self, n=None):
+ if n is None:
+ return self.rpc.call('new_oids')
+ else:
+ return self.rpc.call('new_oids', n)
+
+ def pack(self, t, wait=None):
+ if wait is None:
+ self.rpc.call('pack', t)
+ else:
+ self.rpc.call('pack', t, wait)
+
+ def zeoLoad(self, oid):
+ return self.rpc.call('zeoLoad', oid)
+
+ def storea(self, oid, serial, data, version, id):
+ self.rpc.callAsync('storea', oid, serial, data, version, id)
+
+ def tpc_begin(self, id, user, descr, ext, tid, status):
+ return self.rpc.call('tpc_begin', id, user, descr, ext, tid, status)
+
+ def vote(self, trans_id):
+ return self.rpc.call('vote', trans_id)
+
+ def tpc_finish(self, id):
+ return self.rpc.call('tpc_finish', id)
+
+ def tpc_abort(self, id):
+ self.rpc.callAsync('tpc_abort', id)
+
+ def abortVersion(self, src, id):
+ return self.rpc.call('abortVersion', src, id)
+
+ def commitVersion(self, src, dest, id):
+ return self.rpc.call('commitVersion', src, dest, id)
+
+ def history(self, oid, version, length=None):
+ if length is None:
+ return self.rpc.call('history', oid, version)
+ else:
+ return self.rpc.call('history', oid, version, length)
+
+ def load(self, oid, version):
+ return self.rpc.call('load', oid, version)
+
+ def loadSerial(self, oid, serial):
+ return self.rpc.call('loadSerial', oid, serial)
+
+ def modifiedInVersion(self, oid):
+ return self.rpc.call('modifiedInVersion', oid)
+
+ def new_oid(self, last=None):
+ if last is None:
+ return self.rpc.call('new_oid')
+ else:
+ return self.rpc.call('new_oid', last)
+
+ def store(self, oid, serial, data, version, trans):
+ return self.rpc.call('store', oid, serial, data, version, trans)
+
+ def transactionalUndo(self, trans_id, trans):
+ return self.rpc.call('transactionalUndo', trans_id, trans)
+
+ def undo(self, trans_id):
+ return self.rpc.call('undo', trans_id)
+
+ def undoLog(self, first, last):
+ return self.rpc.call('undoLog', first, last)
+
+ def undoInfo(self, first, last, spec):
+ return self.rpc.call('undoInfo', first, last, spec)
+
+ def versionEmpty(self, vers):
+ return self.rpc.call('versionEmpty', vers)
+
+ def versions(self, max=None):
+ if max is None:
+ return self.rpc.call('versions')
+ else:
+ return self.rpc.call('versions', max)
+
+class ExtensionMethodWrapper:
+ def __init__(self, rpc, name):
+ self.rpc = rpc
+ self.name = name
+ def call(self, *a, **kwa):
+ return apply(self.rpc.call, (self.name,)+a, kwa)
=== Zope3/src/zodb/zeo/tbuf.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/tbuf.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,156 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""A TransactionBuffer store transaction updates until commit or abort.
+
+A transaction may generate enough data that it is not practical to
+always hold pending updates in memory. Instead, a TransactionBuffer
+is used to store the data until a commit or abort.
+"""
+
+# A faster implementation might store trans data in memory until it
+# reaches a certain size.
+
+import cPickle
+import tempfile
+from threading import Lock
+
+class TransactionBuffer:
+
+ # Valid call sequences:
+ #
+ # ((store | invalidate)* begin_iterate next* clear)* close
+ #
+ # get_size can be called any time
+
+ # The TransactionBuffer is used by client storage to hold update
+ # data until the tpc_finish(). It is normally used by a single
+ # thread, because only one thread can be in the two-phase commit
+ # at one time.
+
+ # It is possible, however, for one thread to close the storage
+ # while another thread is in the two-phase commit. We must use
+ # a lock to guard against this race, because unpredictable things
+ # can happen in Python if one thread closes a file that another
+ # thread is reading. In a debug build, an assert() can fail.
+
+ # XXX If an operation is performed on a closed TransactionBuffer,
+ # it has no effect and does not raise an exception. The only time
+ # this should occur is when a ClientStorage is closed in one
+ # thread while another thread is in its tpc_finish(). It's not
+ # clear what should happen in this case. If the tpc_finish()
+ # completes without error, the Connection using it could have
+ # inconsistent data. This should have minimal effect, though,
+ # because the Connection is connected to a closed storage.
+
+ def __init__(self):
+ self.file = tempfile.TemporaryFile(suffix=".tbuf")
+ self.lock = Lock()
+ self.closed = 0
+ self.count = 0
+ self.size = 0
+ # It's safe to use a fast pickler because the only objects
+ # stored are builtin types -- strings or None.
+ self.pickler = cPickle.Pickler(self.file, 1)
+ self.pickler.fast = 1
+
+ def close(self):
+ self.lock.acquire()
+ try:
+ self.closed = 1
+ try:
+ self.file.close()
+ except OSError:
+ pass
+ finally:
+ self.lock.release()
+
+ def store(self, oid, version, data):
+ self.lock.acquire()
+ try:
+ self._store(oid, version, data)
+ finally:
+ self.lock.release()
+
+ def _store(self, oid, version, data):
+ """Store oid, version, data for later retrieval"""
+ if self.closed:
+ return
+ self.pickler.dump((oid, version, data))
+ self.count += 1
+ # Estimate per-record cache size
+ self.size = self.size + len(data) + 31
+ if version:
+ # Assume version data has same size as non-version data
+ self.size = self.size + len(version) + len(data) + 12
+
+ def invalidate(self, oid, version):
+ self.lock.acquire()
+ try:
+ if self.closed:
+ return
+ self.pickler.dump((oid, version, None))
+ self.count += 1
+ finally:
+ self.lock.release()
+
+ def clear(self):
+ """Mark the buffer as empty"""
+ self.lock.acquire()
+ try:
+ if self.closed:
+ return
+ self.file.seek(0)
+ self.count = 0
+ self.size = 0
+ finally:
+ self.lock.release()
+
+ # unchecked constraints:
+ # 1. can't call store() after begin_iterate()
+ # 2. must call clear() after iteration finishes
+
+ def begin_iterate(self):
+ """Move the file pointer in advance of iteration"""
+ self.lock.acquire()
+ try:
+ if self.closed:
+ return
+ self.file.flush()
+ self.file.seek(0)
+ self.unpickler = cPickle.Unpickler(self.file)
+ finally:
+ self.lock.release()
+
+ def next(self):
+ self.lock.acquire()
+ try:
+ return self._next()
+ finally:
+ self.lock.release()
+
+ def _next(self):
+ """Return next tuple of data or None if EOF"""
+ if self.closed:
+ return None
+ if self.count == 0:
+ del self.unpickler
+ return None
+ oid_ver_data = self.unpickler.load()
+ self.count -= 1
+ return oid_ver_data
+
+ def get_size(self):
+ """Return size of data stored in buffer (just a hint)."""
+
+ return self.size
=== Zope3/src/zodb/zeo/threadedasync.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/threadedasync.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,171 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Manage the asyncore mainloop in a multi-threaded app
+
+In a multi-threaded application, only a single thread runs the
+asyncore mainloop. This thread (the "mainloop thread") may not start
+the mainloop before another thread needs to perform an async action
+that requires it. As a result, other threads need to coordinate with
+the mainloop thread to find out whether the mainloop is running.
+
+This module implements a callback mechanism that allows other threads
+to be notified when the mainloop starts. A thread calls
+register_loop_callback() to register interest. When the mainloop
+thread calls loop(), each registered callback will be called with the
+socket map as its first argument.
+"""
+__version__ = '$Revision$'[11:-2]
+
+import asyncore
+import select
+import thread
+import time
+from errno import EINTR
+
+_loop_lock = thread.allocate_lock()
+_looping = None
+_loop_callbacks = []
+
+def register_loop_callback(callback, args=(), kw=None):
+ """Register callback function to be called when mainloop starts
+
+ The callable object callback will be invokved when the mainloop
+ starts. If the mainloop is currently running, the callback will
+ be invoked immediately.
+
+ The callback will be called with a single argument, the mainloop
+ socket map, unless the optional args or kw arguments are used.
+ args defines a tuple of extra arguments to pass after the socket
+ map. kw defines a dictionary of keyword arguments.
+ """
+ _loop_lock.acquire()
+ try:
+ if _looping is not None:
+ apply(callback, (_looping,) + args, kw or {})
+ else:
+ _loop_callbacks.append((callback, args, kw))
+ finally:
+ _loop_lock.release()
+
+def _start_loop(map):
+ _loop_lock.acquire()
+ try:
+ global _looping
+ _looping = map
+ while _loop_callbacks:
+ cb, args, kw = _loop_callbacks.pop()
+ apply(cb, (map,) + args, kw or {})
+ finally:
+ _loop_lock.release()
+
+def _stop_loop():
+ _loop_lock.acquire()
+ try:
+ global _looping
+ _looping = None
+ finally:
+ _loop_lock.release()
+
+def poll(timeout=0.0, map=None):
+ """A copy of asyncore.poll() with a bug fixed (see comment).
+
+ (asyncore.poll2() and .poll3() don't have this bug.)
+ """
+ if map is None:
+ map = asyncore.socket_map
+ if map:
+ r = []; w = []; e = []
+ for fd, obj in map.items():
+ if obj.readable():
+ r.append(fd)
+ if obj.writable():
+ w.append(fd)
+ if [] == r == w == e:
+ time.sleep(timeout)
+ else:
+ try:
+ r, w, e = select.select(r, w, e, timeout)
+ except select.error, err:
+ if err[0] != EINTR:
+ raise
+ else:
+ # This part is missing in asyncore before Python 2.3
+ return
+
+ for fd in r:
+ obj = map.get(fd)
+ if obj is not None:
+ try:
+ obj.handle_read_event()
+ except asyncore.ExitNow:
+ raise asyncore.ExitNow
+ except:
+ obj.handle_error()
+
+ for fd in w:
+ obj = map.get(fd)
+ if obj is not None:
+ try:
+ obj.handle_write_event()
+ except asyncore.ExitNow:
+ raise asyncore.ExitNow
+ except:
+ obj.handle_error()
+
+def loop(timeout=30.0, use_poll=0, map=None):
+ """Invoke asyncore mainloop
+
+ This function functions like the regular asyncore.loop() function
+ except that it also triggers ThreadedAsync callback functions
+ before starting the loop.
+ """
+ if use_poll:
+ if hasattr(select, 'poll'):
+ poll_fun = asyncore.poll3
+ else:
+ poll_fun = asyncore.poll2
+ else:
+ poll_fun = poll
+
+ if map is None:
+ map = asyncore.socket_map
+
+ _start_loop(map)
+ while map:
+ poll_fun(timeout, map)
+ _stop_loop()
+
+
+# This module used to do something evil -- it rebound asyncore.loop to the
+# above loop() function. What was evil about this is that if you added some
+# debugging to asyncore.loop, you'd spend 6 hours debugging why your debugging
+# code wasn't called!
+#
+# Code should instead explicitly call ThreadedAsync.loop() instead of
+# asyncore.loop(). Most of ZODB has been fixed, but ripping this out may
+# break 3rd party code. So we'll issue a warning and let it continue -- for
+# now.
+
+##def deprecated_loop(*args, **kws):
+## import warnings
+## warnings.warn("""\
+##ThreadedAsync.loop() called through sneaky asyncore.loop() rebinding.
+##You should change your code to call ThreadedAsync.loop() explicitly.""",
+## DeprecationWarning)
+## loop(*args, **kws)
+
+##asyncore.loop = deprecated_loop
+
+# XXX Remove this once we've updated ZODB4 since they share this package
+asyncore.loop = loop
=== Zope3/src/zodb/zeo/utils.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/utils.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,56 @@
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Utilities for setting up the server environment."""
+
+import os
+
+def parentdir(p, n=1):
+ """Return the parent of p, n levels up."""
+ d = p
+ while n:
+ d = os.path.split(d)[0]
+ if not d or d == '.':
+ d = os.getcwd()
+ n -= 1
+ return d
+
+class Environment:
+ """Determine location of the Data.fs & ZEO_SERVER.pid files.
+
+ Pass the argv[0] used to start ZEO to the constructor.
+
+ Use the zeo_pid and fs attributes to get the filenames.
+ """
+
+ def __init__(self, argv0):
+ v = os.environ.get("INSTANCE_HOME")
+ if v is None:
+ # looking for a Zope/var directory assuming that this code
+ # is installed in Zope/lib/python/ZEO
+ p = parentdir(argv0, 4)
+ if os.path.isdir(os.path.join(p, "var")):
+ v = p
+ else:
+ v = os.getcwd()
+ self.home = v
+ self.var = os.path.join(v, "var")
+ if not os.path.isdir(self.var):
+ self.var = self.home
+
+ pid = os.environ.get("ZEO_SERVER_PID")
+ if pid is None:
+ pid = os.path.join(self.var, "ZEO_SERVER.pid")
+
+ self.zeo_pid = pid
+ self.fs = os.path.join(self.var, "Data.fs")