##############################################################################
#
# Copyright (c) 2001, 2002, 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""RPC stubs for interface exported by StorageServer."""

# DM 2004-12-10 -- see below
ZEO_MAX_RESPONSE_TIME = 60 # s, maybe controllable via an environment variable?

from time import time


##
# ZEO storage server.
# <p>
# Remote method calls can be synchronous or asynchronous.  If the call
# is synchronous, the client thread blocks until the call returns.  A
# single client can only have one synchronous request outstanding.  If
# several threads share a single client, threads other than the caller
# will block only if the attempt to make another synchronous call.
# An asynchronous call does not cause the client thread to block.  An
# exception raised by an asynchronous method is logged on the server,
# but is not returned to the client.

class StorageServer:

    """An RPC stub class for the interface exported by ClientStorage.

    This is the interface presented by the StorageServer to the
    ClientStorage; i.e. the ClientStorage calls these methods and they
    are executed in the StorageServer.

    See the StorageServer module for documentation on these methods,
    with the exception of _update(), which is documented here.
    """

    def __init__(self, rpc):
        """Constructor.

        The argument is a connection: an instance of the
        zrpc.connection.Connection class.
        """
        self.rpc = rpc
        # Wait until we know what version the other side is using.
        # DM 2004-12-10: this free running loop occasionally did
        #  not finish (reasons still unknown) and consumed an excessive
        #  amount of CPU time. We call "pending" with a timeout of
        #  30s (this is effective only for synchronous mode!)
        #  and in addition check that we finish within "ZEO_MAX_RESPONSE_TIME".
        #while rpc.peer_protocol_version is None:
            #rpc.pending()
        st = time()
        while rpc.peer_protocol_version is None:
            if time() - st > ZEO_MAX_RESPONSE_TIME:
                # the server took too long to report its version
                rpc.close()
                raise EnvironmentError("ZEO server's initial response exceeded %ss" % ZEO_MAX_RESPONSE_TIME)
            rpc.pending(30)
        if rpc.peer_protocol_version == 'Z200':
            self.lastTransaction = lambda: None
            self.getInvalidations = lambda tid: None
            self.getAuthProtocol = lambda: None

    def extensionMethod(self, name):
        return ExtensionMethodWrapper(self.rpc, name).call

    ##
    # Register current connection with a storage and a mode.
    # In effect, it is like an open call.
    # @param storage_name a string naming the storage.  This argument
    #        is primarily for backwards compatibility with servers
    #        that supported multiple storages.
    # @param read_only boolean
    # @exception ValueError unknown storage_name or already registered
    # @exception ReadOnlyError storage is read-only and a read-write
    #            connectio was requested

    def register(self, storage_name, read_only):
        self.rpc.call('register', storage_name, read_only)

    ##
    # Return dictionary of meta-data about the storage.
    # @defreturn dict

    def get_info(self):
        return self.rpc.call('get_info')

    ##
    # Check whether the server requires authentication.  Returns
    # the name of the protocol.
    # @defreturn string

    def getAuthProtocol(self):
        return self.rpc.call('getAuthProtocol')

    ##
    # Return id of the last committed transaction
    # @defreturn string

    def lastTransaction(self):
        # Not in protocol version 2.0.0; see __init__()
        return self.rpc.call('lastTransaction')

    ##
    # Return invalidations for all transactions after tid.
    # @param tid transaction id
    # @defreturn 2-tuple, (tid, list)
    # @return tuple containing the last committed transaction
    #         and a list of oids that were invalidated.  Returns
    #         None and an empty list if the server does not have
    #         the list of oids available.

    def getInvalidations(self, tid):
        # Not in protocol version 2.0.0; see __init__()
        return self.rpc.call('getInvalidations', tid)

    ##
    # Check whether serial numbers s and sv are current for oid.
    # If one or both of the serial numbers are not current, the
    # server will make an asynchronous invalidateVerify() call.
    # @param oid object id
    # @param s serial number on non-version data
    # @param sv serial number of version data or None
    # @defreturn async

    def zeoVerify(self, oid, s, sv):
        self.rpc.callAsync('zeoVerify', oid, s, sv)

    ##
    # Check whether current serial number is valid for oid and version.
    # If the serial number is not current, the server will make an
    # asynchronous invalidateVerify() call.
    # @param oid object id
    # @param version name of version for oid
    # @param serial client's current serial number
    # @defreturn async

    def verify(self, oid, version, serial):
        self.rpc.callAsync('verify', oid, version, serial)

    ##
    # Signal to the server that cache verification is done.
    # @defreturn async

    def endZeoVerify(self):
        self.rpc.callAsync('endZeoVerify')

    ##
    # Generate a new set of oids.
    # @param n number of new oids to return
    # @defreturn list
    # @return list of oids

    def new_oids(self, n=None):
        if n is None:
            return self.rpc.call('new_oids')
        else:
            return self.rpc.call('new_oids', n)

    ##
    # Pack the storage.
    # @param t pack time
    # @param wait optional, boolean.  If true, the call will not
    #             return until the pack is complete.

    def pack(self, t, wait=None):
        if wait is None:
            self.rpc.call('pack', t)
        else:
            self.rpc.call('pack', t, wait)

    ##
    # Return current data for oid.  Version data is returned if
    # present.
    # @param oid object id
    # @defreturn 5-tuple
    # @return 5-tuple, current non-version data, serial number,
    #         version name, version data, version data serial number
    # @exception KeyError if oid is not found

    def zeoLoad(self, oid):
        if _doLOG:
            s= time()
            r= self.rpc.call('zeoLoad', oid)
            dt= time() - s
            data= r[0]
            try: size= len(data)
            except: size= 'unknown'
            LOG('zeoServerStub',INFO,
                'zeoLoad: oid=%s class=%s size=%s time=%.3fms' %
                (repr(oid),
                 _getClass(data),
                 size,
                 dt * 1000, # ms
                 )
                )
            return r
        return self.rpc.call('zeoLoad', oid)

    ##
    # Return current data for oid in version, the tid of the transaction that
    # wrote the most recent revision, and the name of the version for the
    # data returned.  Versions make this hard to understand; in particular,
    # the version string returned may not equal the version string passed
    # in, and that's "a feature" I don't understand.  Similarly, the tid
    # returned is the tid of the most recent revision of oid, and that may
    # not equal the tid of the transaction that wrote the data returned.
    # @param oid object id
    # @param version string, name of version
    # @defreturn 3-tuple
    # @return data, transaction id, version
    #         where version is the name of the version the data came
    #         from or "" for non-version data
    # @exception KeyError if oid is not found

    def loadEx(self, oid, version):
        if _doLOG:
            s= time()
            r= self.rpc.call('loadEx', oid, version)
            dt= time() - s
            data= r[0]
            try: size= len(data)
            except: size= 'unknown'
            LOG('zeoServerStub',INFO,
                'loadEx: oid=%s class=%s size=%s time=%.3fms' %
                (repr(oid),
                 _getClass(data),
                 size,
                 dt * 1000, # ms
                 )
                )
            return r
        return self.rpc.call("loadEx", oid, version)

    ##
    # Return non-current data along with transaction ids that identify
    # the lifetime of the specific revision.
    # @param oid object id
    # @param tid a transaction id that provides an upper bound on
    #            the lifetime of the revision.  That is, loadBefore
    #            returns the revision that was current before tid committed.
    # @defreturn 4-tuple
    # @return data, serial numbr, start transaction id, end transaction id

    def loadBefore(self, oid, tid):
        if _doLOG:
            s= time()
            r= self.rpc.call("loadBefore", oid, tid)
            dt= time() - s
            data= r[0]
            try: size= len(data)
            except: size= 'unknown'
            LOG('zeoServerStub',INFO,
                'loadBefore: oid=%s class=%s size=%s time=%.3fms' %
                (repr(oid),
                 _getClass(data),
                 size,
                 dt * 1000, # ms
                 )
                )
            return r
        return self.rpc.call("loadBefore", oid, tid)

    ##
    # Storage new revision of oid.
    # @param oid object id
    # @param serial serial number that this transaction read
    # @param data new data record for oid
    # @param version name of version or ""
    # @param id id of current transaction
    # @defreturn async

    def storea(self, oid, serial, data, version, id):
        if _doLOG:
            try: size= len(data)
            except: size= 'unknown'
            LOG('zeoServerStub',INFO,
                'storea: oid=%s class=%s size=%s' %
                (repr(oid),
                 _getClass(data),
                 size,
                 )
                )
        self.rpc.callAsync('storea', oid, serial, data, version, id)

    ##
    # Start two-phase commit for a transaction
    # @param id id used by client to identify current transaction.  The
    #        only purpose of this argument is to distinguish among multiple
    #        threads using a single ClientStorage.
    # @param user name of user committing transaction (can be "")
    # @param description string containing transaction metadata (can be "")
    # @param ext dictionary of extended metadata (?)
    # @param tid optional explicit tid to pass to underlying storage
    # @param status optional status character, e.g "p" for pack
    # @defreturn async

    def tpc_begin(self, id, user, descr, ext, tid, status):
        if _doLOG:
            s= time()
            r= self.rpc.call('tpc_begin', id, user, descr, ext, tid, status)
            dt= time() - s
            LOG('zeoServerStub',INFO,
                'tpc_begin: tid=%s descr=%s time=%.3fms' %
                (repr(tid), repr(descr),
                 dt * 1000, # ms
                 )
                )
            return r
        return self.rpc.call('tpc_begin', id, user, descr, ext, tid, status)

    def vote(self, trans_id):
        if _doLOG:
            s= time()
            r= self.rpc.call('vote', trans_id)
            dt= time() - s
            LOG('zeoServerStub',INFO,
                'vote: tid=%s time=%.3fms' %
                (repr(trans_id),
                 dt * 1000, # ms
                 )
                )
            return r
        return self.rpc.call('vote', trans_id)

    def tpc_finish(self, id):
        if _doLOG:
            s= time()
            r= self.rpc.call('tpc_finish', id)
            dt= time() - s
            LOG('zeoServerStub',INFO,
                'tpc_finish: tid=%s time=%.3fms' %
                (repr(id),
                 dt * 1000, # ms
                 )
                )
            return r
        return self.rpc.call('tpc_finish', id)

    def tpc_abort(self, id):
        if _doLOG:
            LOG('zeoServerStub',INFO,
                'vote: tid= %s' %
                (repr(id),
                 )
                )
        self.rpc.callAsync('tpc_abort', id)

    def abortVersion(self, src, id):
        return self.rpc.call('abortVersion', src, id)

    def commitVersion(self, src, dest, id):
        return self.rpc.call('commitVersion', src, dest, id)

    def history(self, oid, version, length=None):
        if length is None:
            return self.rpc.call('history', oid, version)
        else:
            return self.rpc.call('history', oid, version, length)

    def load(self, oid, version):
        if _doLOG:
            s= time()
            r= self.rpc.call('load', oid, version)
            dt= time() - s
            data= r[0]
            try: size= len(data)
            except: size= 'unknown'
            LOG('zeoServerStub',INFO,
                'load: oid=%s version=%s class=%s size=%s time=%.3fms' %
                (repr(oid), repr(version),
                 _getClass(data),
                 size,
                 dt * 1000, # ms
                 )
                )
            return r
        return self.rpc.call('load', oid, version)

    def getSerial(self, oid):
        return self.rpc.call('getSerial', oid)

    def loadSerial(self, oid, serial):
        if _doLOG:
            s= time()
            r= self.rpc.call('loadSerial', oid, serial)
            dt= time() - s
            try: size= len(r)
            except: size= 'unknown'
            LOG('zeoServerStub',INFO,
                'loadSerial: oid=%s serial=%s class=%s size=%s time=%.3fms' %
                (repr(oid), repr(serial),
                 _getClass(r),
                 size,
                 dt * 1000, # ms
                 )
                )
            return r
        return self.rpc.call('loadSerial', oid, serial)

    def modifiedInVersion(self, oid):
        return self.rpc.call('modifiedInVersion', oid)

    def new_oid(self):
        return self.rpc.call('new_oid')

    def store(self, oid, serial, data, version, trans):
        if _doLOG:
            s= time()
            r= self.rpc.call('store', oid, serial, data, version, trans)
            dt= time() - s
            LOG('zeoServerStub',INFO,
                'store: oid=%s class=%s size=%d' %
                (repr(oid),
                 _getClass(data),
                 len(data),
                 )
                )
            return r
        return self.rpc.call('store', oid, serial, data, version, trans)

    def undo(self, trans_id, trans):
        return self.rpc.call('undo', trans_id, trans)

    def undoLog(self, first, last):
        return self.rpc.call('undoLog', first, last)

    def undoInfo(self, first, last, spec):
        return self.rpc.call('undoInfo', first, last, spec)

    def versionEmpty(self, vers):
        return self.rpc.call('versionEmpty', vers)

    def versions(self, max=None):
        if max is None:
            return self.rpc.call('versions')
        else:
            return self.rpc.call('versions', max)

# DM: extended logging support
from os import environ
_doLOG= not not environ.get('ZEO_LOG_CLIENT')
if _doLOG:
    from zLOG import LOG, INFO
    from time import time
    from cPickle import loads
    from types import TupleType
    def _getClass(pickle, _tuple= TupleType):
        '''determine class of *pickle*.'''
        try:
            ci= loads(pickle)[0]
            if type(ci) is _tuple: ci= '.'.join(ci)
            return ci
        except: return '***exception***'
        

class ExtensionMethodWrapper:
    def __init__(self, rpc, name):
        self.rpc = rpc
        self.name = name

    def call(self, *a, **kwa):
        return self.rpc.call(self.name, *a, **kwa)
