Top

kiteconnect.ticker module

ticker.py

Websocket implementation for kite ticker

:copyright: (c) 2017 by Zerodha Technology. :license: see LICENSE for details.

# -*- coding: utf-8 -*-
"""
    ticker.py

    Websocket implementation for kite ticker

    :copyright: (c) 2017 by Zerodha Technology.
    :license: see LICENSE for details.
"""
import six
import sys
import time
import json
import struct
import logging
import threading
from datetime import datetime
from twisted.internet import reactor, ssl
from twisted.python import log as twisted_log
from twisted.internet.protocol import ReconnectingClientFactory
from autobahn.twisted.websocket import WebSocketClientProtocol, \
    WebSocketClientFactory, connectWS

from .__version__ import __version__, __title__

log = logging.getLogger(__name__)


class KiteTickerClientProtocol(WebSocketClientProtocol):
    """Kite ticker autobahn WebSocket protocol."""

    PING_INTERVAL = 2.5
    KEEPALIVE_INTERVAL = 5

    _ping_message = ""
    _next_ping = None
    _next_pong_check = None
    _last_pong_time = None
    _last_ping_time = None

    def __init__(self, *args, **kwargs):
        """Initialize protocol with all options passed from factory."""
        super(KiteTickerClientProtocol, self).__init__(*args, **kwargs)

    # Overide method
    def onConnect(self, response):  # noqa
        """Called when WebSocket server connection was established"""
        self.factory.ws = self

        if self.factory.on_connect:
            self.factory.on_connect(self, response)

        # Reset reconnect on successful reconnect
        self.factory.resetDelay()

    # Overide method
    def onOpen(self):  # noqa
        """Called when the initial WebSocket opening handshake was completed."""
        # send ping
        self._loop_ping()
        # init last pong check after X seconds
        self._loop_pong_check()

        if self.factory.on_open:
            self.factory.on_open(self)

    # Overide method
    def onMessage(self, payload, is_binary):  # noqa
        """Called when text or binary message is received."""
        if self.factory.on_message:
            self.factory.on_message(self, payload, is_binary)

    # Overide method
    def onClose(self, was_clean, code, reason):  # noqa
        """Called when connection is closed."""
        if not was_clean:
            if self.factory.on_error:
                self.factory.on_error(self, code, reason)

        if self.factory.on_close:
            self.factory.on_close(self, code, reason)

        # Cancel next ping and timer
        self._last_ping_time = None
        self._last_pong_time = None

        if self._next_ping:
            self._next_ping.cancel()

        if self._next_pong_check:
            self._next_pong_check.cancel()

    def onPong(self, response):  # noqa
        """Called when pong message is received."""
        if self._last_pong_time and self.factory.debug:
            log.debug("last pong was {} seconds back.".format(time.time() - self._last_pong_time))

        self._last_pong_time = time.time()

        if self.factory.debug:
            log.debug("pong => {}".format(response))

    """
    Custom helper and exposed methods.
    """
    def _loop_ping(self): # noqa
        """Start a ping loop where it sends ping message every X seconds."""
        if self.factory.debug:
            log.debug("ping => {}".format(self._ping_message))
            if self._last_ping_time:
                log.debug("last ping was {} seconds back.".format(time.time() - self._last_ping_time))

        # Set current time as last ping time
        self._last_ping_time = time.time()
        # Send a ping message to server
        self.sendPing(self._ping_message)

        # Call self after X seconds
        self._next_ping = self.factory.reactor.callLater(self.PING_INTERVAL, self._loop_ping)

    def _loop_pong_check(self):
        """
        Timer sortof to check if connection is still there.

        Checks last pong message time and disconnects the existing connection to make sure it doesn't become a ghost connection.
        """
        if self._last_pong_time:
            # No pong message since long time, so init reconnect
            last_pong_diff = time.time() - self._last_pong_time
            if last_pong_diff > (2 * self.PING_INTERVAL):
                if self.factory.debug:
                    log.debug("Last pong was {} seconds ago. So dropping connection to reconnect.".format(
                        last_pong_diff))
                # drop existing connection to avoid ghost connection
                self.dropConnection(abort=True)

        # Call self after X seconds
        self._next_pong_check = self.factory.reactor.callLater(self.PING_INTERVAL, self._loop_pong_check)


class KiteTickerClientFactory(WebSocketClientFactory, ReconnectingClientFactory):
    """Autobahn WebSocket client factory to implement reconnection and custom callbacks."""

    protocol = KiteTickerClientProtocol
    maxDelay = 5
    maxRetries = 10

    _last_connection_time = None

    def __init__(self, *args, **kwargs):
        """Initialize with default callback method values."""
        self.debug = False
        self.ws = None
        self.on_open = None
        self.on_error = None
        self.on_close = None
        self.on_message = None
        self.on_connect = None
        self.on_reconnect = None
        self.on_noreconnect = None

        super(KiteTickerClientFactory, self).__init__(*args, **kwargs)

    def startedConnecting(self, connector):  # noqa
        """On connecting start or reconnection."""
        if not self._last_connection_time and self.debug:
            log.debug("Start WebSocket connection.")

        self._last_connection_time = time.time()

    def clientConnectionFailed(self, connector, reason):  # noqa
        """On connection failure (When connect request fails)"""
        if self.retries > 0:
            log.error("Retrying connection. Retry attempt count: {}. Next retry in around: {} seconds".format(self.retries, int(round(self.delay))))

            # on reconnect callback
            if self.on_reconnect:
                self.on_reconnect(self.retries)

        # Retry the connection
        self.retry(connector)
        self.send_noreconnect()

    def clientConnectionLost(self, connector, reason):  # noqa
        """On connection lost (When ongoing connection got disconnected)."""
        if self.retries > 0:
            # on reconnect callback
            if self.on_reconnect:
                self.on_reconnect(self.retries)

        # Retry the connection
        self.retry(connector)
        self.send_noreconnect()

    def send_noreconnect(self):
        """Callback `no_reconnect` if max retries are exhausted."""
        if self.maxRetries is not None and (self.retries > self.maxRetries):
            if self.debug:
                log.debug("Maximum retries ({}) exhausted.".format(self.maxRetries))

            if self.on_noreconnect:
                self.on_noreconnect()


class KiteTicker(object):
    """
    The WebSocket client for connecting to Kite Connect's streaming quotes service.

    Getting started:
    ---------------
        #!python
        import logging
        from kiteconnect import KiteTicker

        logging.basicConfig(level=logging.DEBUG)

        # Initialise
        kws = KiteTicker("your_api_key", "your_access_token")

        def on_ticks(ws, ticks):
            # Callback to receive ticks.
            logging.debug("Ticks: {}".format(ticks))

        def on_connect(ws, response):
            # Callback on successful connect.
            # Subscribe to a list of instrument_tokens (RELIANCE and ACC here).
            ws.subscribe([738561, 5633])

            # Set RELIANCE to tick in `full` mode.
            ws.set_mode(ws.MODE_FULL, [738561])

        def on_close(ws, code, reason):
            # On connection close stop the event loop.
            # Reconnection will not happen after executing `ws.stop()`
            ws.stop()

        # Assign the callbacks.
        kws.on_ticks = on_ticks
        kws.on_connect = on_connect
        kws.on_close = on_close

        # Infinite loop on the main thread. Nothing after this will run.
        # You have to use the pre-defined callbacks to manage subscriptions.
        kws.connect()

    Callbacks
    ---------
    In below examples `ws` is the currently initialised WebSocket object.

    - `on_ticks(ws, ticks)` -  Triggered when ticks are recevied.
        - `ticks` - List of `tick` object. Check below for sample structure.
    - `on_close(ws, code, reason)` -  Triggered when connection is closed.
        - `code` - WebSocket standard close event code (https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent)
        - `reason` - DOMString indicating the reason the server closed the connection
    - `on_error(ws, code, reason)` -  Triggered when connection is closed with an error.
        - `code` - WebSocket standard close event code (https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent)
        - `reason` - DOMString indicating the reason the server closed the connection
    - `on_connect` -  Triggered when connection is established successfully.
        - `response` - Response received from server on successful connection.
    - `on_message(ws, payload, is_binary)` -  Triggered when message is received from the server.
        - `payload` - Raw response from the server (either text or binary).
        - `is_binary` - Bool to check if response is binary type.
    - `on_reconnect(ws, attempts_count)` -  Triggered when auto reconnection is attempted.
        - `attempts_count` - Current reconnect attempt number.
    - `on_noreconnect(ws)` -  Triggered when number of auto reconnection attempts exceeds `reconnect_tries`.
    - `on_order_update(ws, data)` -  Triggered when there is an order update for the connected user.


    Tick structure (passed to the `on_ticks` callback)
    ---------------------------
        [{
            'instrument_token': 53490439,
            'mode': 'full',
            'volume': 12510,
            'last_price': 4084.0,
            'average_price': 4086.55,
            'last_quantity': 1,
            'buy_quantity': 2356
            'sell_quantity': 2440,
            'change': 0.46740467404674046,
            'last_trade_time': datetime.datetime(2018, 1, 15, 13, 16, 54),
            'timestamp': datetime.datetime(2018, 1, 15, 13, 16, 56),
            'oi': 21845,
            'oi_day_low': 0,
            'oi_day_high': 0,
            'ohlc': {
                'high': 4093.0,
                'close': 4065.0,
                'open': 4088.0,
                'low': 4080.0
            },
            'tradable': True,
            'depth': {
                'sell': [{
                    'price': 4085.0,
                    'orders': 1048576,
                    'quantity': 43
                }, {
                    'price': 4086.0,
                    'orders': 2752512,
                    'quantity': 134
                }, {
                    'price': 4087.0,
                    'orders': 1703936,
                    'quantity': 133
                }, {
                    'price': 4088.0,
                    'orders': 1376256,
                    'quantity': 70
                }, {
                    'price': 4089.0,
                    'orders': 1048576,
                    'quantity': 46
                }],
                'buy': [{
                    'price': 4084.0,
                    'orders': 589824,
                    'quantity': 53
                }, {
                    'price': 4083.0,
                    'orders': 1245184,
                    'quantity': 145
                }, {
                    'price': 4082.0,
                    'orders': 1114112,
                    'quantity': 63
                }, {
                    'price': 4081.0,
                    'orders': 1835008,
                    'quantity': 69
                }, {
                    'price': 4080.0,
                    'orders': 2752512,
                    'quantity': 89
                }]
            }
        },
        ...,
        ...]

    Auto reconnection
    -----------------

    Auto reconnection is enabled by default and it can be disabled by passing `reconnect` param while initialising `KiteTicker`.
    On a side note, reconnection mechanism cannot happen if event loop is terminated using `stop` method inide `on_close` callback.

    Auto reonnection mechanism is based on [Exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff) algorithm in which
    next retry interval will be increased exponentially. `reconnect_max_delay` and `reconnect_max_tries` params can be used to tewak
    the alogrithm where `reconnect_max_delay` is the maximum delay after which subsequent reconnection interval will become constant and
    `reconnect_max_tries` is maximum number of retries before its quiting reconnection.

    For example if `reconnect_max_delay` is 60 seconds and `reconnect_max_tries` is 50 then the first reconnection interval starts from
    minimum interval which is 2 seconds and keep increasing up to 60 seconds after which it becomes constant and when reconnection attempt
    is reached upto 50 then it stops reconnecting.

    method `stop_retry` can be used to stop ongoing reconnect attempts and `on_reconnect` callback will be called with current reconnect
    attempt and `on_noreconnect` is called when reconnection attempts reaches max retries.
    """

    EXCHANGE_MAP = {
        "nse": 1,
        "nfo": 2,
        "cds": 3,
        "bse": 4,
        "bfo": 5,
        "bsecds": 6,
        "mcx": 7,
        "mcxsx": 8,
        "indices": 9
    }

    # Default connection timeout
    CONNECT_TIMEOUT = 30
    # Default Reconnect max delay.
    RECONNECT_MAX_DELAY = 60
    # Default reconnect attempts
    RECONNECT_MAX_TRIES = 50
    # Default root API endpoint. It's possible to
    # override this by passing the `root` parameter during initialisation.
    ROOT_URI = "wss://ws.kite.trade"

    # Available streaming modes.
    MODE_FULL = "full"
    MODE_QUOTE = "quote"
    MODE_LTP = "ltp"

    # Flag to set if its first connect
    _is_first_connect = True

    # Available actions.
    _message_code = 11
    _message_subscribe = "subscribe"
    _message_unsubscribe = "unsubscribe"
    _message_setmode = "mode"

    # Minimum delay which should be set between retries. User can't set less than this
    _minimum_reconnect_max_delay = 5
    # Maximum number or retries user can set
    _maximum_reconnect_max_tries = 300

    def __init__(self, api_key, access_token, debug=False, root=None,
                 reconnect=True, reconnect_max_tries=RECONNECT_MAX_TRIES, reconnect_max_delay=RECONNECT_MAX_DELAY,
                 connect_timeout=CONNECT_TIMEOUT):
        """
        Initialise websocket client instance.

        - `api_key` is the API key issued to you
        - `access_token` is the token obtained after the login flow in
            exchange for the `request_token`. Pre-login, this will default to None,
            but once you have obtained it, you should
            persist it in a database or session to pass
            to the Kite Connect class initialisation for subsequent requests.
        - `root` is the websocket API end point root. Unless you explicitly
            want to send API requests to a non-default endpoint, this
            can be ignored.
        - `reconnect` is a boolean to enable WebSocket autreconnect in case of network failure/disconnection.
        - `reconnect_max_delay` in seconds is the maximum delay after which subsequent reconnection interval will become constant. Defaults to 60s and minimum acceptable value is 5s.
        - `reconnect_max_tries` is maximum number reconnection attempts. Defaults to 50 attempts and maximum up to 300 attempts.
        - `connect_timeout` in seconds is the maximum interval after which connection is considered as timeout. Defaults to 30s.
        """
        self.root = root or self.ROOT_URI

        # Set max reconnect tries
        if reconnect_max_tries > self._maximum_reconnect_max_tries:
            log.warning("`reconnect_max_tries` can not be more than {val}. Setting to highest possible value - {val}.".format(
                val=self._maximum_reconnect_max_tries))
            self.reconnect_max_tries = self._maximum_reconnect_max_tries
        else:
            self.reconnect_max_tries = reconnect_max_tries

        # Set max reconnect delay
        if reconnect_max_delay < self._minimum_reconnect_max_delay:
            log.warning("`reconnect_max_delay` can not be less than {val}. Setting to lowest possible value - {val}.".format(
                val=self._minimum_reconnect_max_delay))
            self.reconnect_max_delay = self._minimum_reconnect_max_delay
        else:
            self.reconnect_max_delay = reconnect_max_delay

        self.connect_timeout = connect_timeout

        self.socket_url = "{root}?api_key={api_key}"\
            "&access_token={access_token}".format(
                root=self.root,
                api_key=api_key,
                access_token=access_token
            )

        # Debug enables logs
        self.debug = debug

        # Placeholders for callbacks.
        self.on_ticks = None
        self.on_open = None
        self.on_close = None
        self.on_error = None
        self.on_connect = None
        self.on_message = None
        self.on_reconnect = None
        self.on_noreconnect = None

        # Text message updates
        self.on_order_update = None

        # List of current subscribed tokens
        self.subscribed_tokens = {}

    def _create_connection(self, url, **kwargs):
        """Create a WebSocket client connection."""
        self.factory = KiteTickerClientFactory(url, **kwargs)

        # Alias for current websocket connection
        self.ws = self.factory.ws

        self.factory.debug = self.debug

        # Register private callbacks
        self.factory.on_open = self._on_open
        self.factory.on_error = self._on_error
        self.factory.on_close = self._on_close
        self.factory.on_message = self._on_message
        self.factory.on_connect = self._on_connect
        self.factory.on_reconnect = self._on_reconnect
        self.factory.on_noreconnect = self._on_noreconnect

        self.factory.maxDelay = self.reconnect_max_delay
        self.factory.maxRetries = self.reconnect_max_tries

    def _user_agent(self):
        return (__title__ + "-python/").capitalize() + __version__

    def connect(self, threaded=False, disable_ssl_verification=False, proxy=None):
        """
        Establish a websocket connection.

        - `threaded` is a boolean indicating if the websocket client has to be run in threaded mode or not
        - `disable_ssl_verification` disables building ssl context
        - `proxy` is a dictionary with keys `host` and `port` which denotes the proxy settings
        """
        # Custom headers
        headers = {
            "X-Kite-Version": "3",  # For version 3
        }

        # Init WebSocket client factory
        self._create_connection(self.socket_url,
                                useragent=self._user_agent(),
                                proxy=proxy, headers=headers)

        # Set SSL context
        context_factory = None
        if self.factory.isSecure and not disable_ssl_verification:
            context_factory = ssl.ClientContextFactory()

        # Establish WebSocket connection to a server
        connectWS(self.factory, contextFactory=context_factory, timeout=self.connect_timeout)

        if self.debug:
            twisted_log.startLogging(sys.stdout)

        # Run in seperate thread of blocking
        opts = {}

        # Run when reactor is not running
        if not reactor.running:
            if threaded:
                # Signals are not allowed in non main thread by twisted so suppress it.
                opts["installSignalHandlers"] = False
                self.websocket_thread = threading.Thread(target=reactor.run, kwargs=opts)
                self.websocket_thread.daemon = True
                self.websocket_thread.start()
            else:
                reactor.run(**opts)

    def is_connected(self):
        """Check if WebSocket connection is established."""
        if self.ws and self.ws.state == self.ws.STATE_OPEN:
            return True
        else:
            return False

    def _close(self, code=None, reason=None):
        """Close the WebSocket connection."""
        if self.ws:
            self.ws.sendClose(code, reason)

    def close(self, code=None, reason=None):
        """Close the WebSocket connection."""
        self.stop_retry()
        self._close(code, reason)

    def stop(self):
        """Stop the event loop. Should be used if main thread has to be closed in `on_close` method.
        Reconnection mechanism cannot happen past this method
        """
        reactor.stop()

    def stop_retry(self):
        """Stop auto retry when it is in progress."""
        if self.factory:
            self.factory.stopTrying()

    def subscribe(self, instrument_tokens):
        """
        Subscribe to a list of instrument_tokens.

        - `instrument_tokens` is list of instrument instrument_tokens to subscribe
        """
        try:
            self.ws.sendMessage(
                six.b(json.dumps({"a": self._message_subscribe, "v": instrument_tokens}))
            )

            for token in instrument_tokens:
                self.subscribed_tokens[token] = self.MODE_QUOTE

            return True
        except Exception as e:
            self._close(reason="Error while subscribe: {}".format(str(e)))
            raise

    def unsubscribe(self, instrument_tokens):
        """
        Unsubscribe the given list of instrument_tokens.

        - `instrument_tokens` is list of instrument_tokens to unsubscribe.
        """
        try:
            self.ws.sendMessage(
                six.b(json.dumps({"a": self._message_unsubscribe, "v": instrument_tokens}))
            )

            for token in instrument_tokens:
                try:
                    del(self.subscribed_tokens[token])
                except KeyError:
                    pass

            return True
        except Exception as e:
            self._close(reason="Error while unsubscribe: {}".format(str(e)))
            raise

    def set_mode(self, mode, instrument_tokens):
        """
        Set streaming mode for the given list of tokens.

        - `mode` is the mode to set. It can be one of the following class constants:
            MODE_LTP, MODE_QUOTE, or MODE_FULL.
        - `instrument_tokens` is list of instrument tokens on which the mode should be applied
        """
        try:
            self.ws.sendMessage(
                six.b(json.dumps({"a": self._message_setmode, "v": [mode, instrument_tokens]}))
            )

            # Update modes
            for token in instrument_tokens:
                self.subscribed_tokens[token] = mode

            return True
        except Exception as e:
            self._close(reason="Error while setting mode: {}".format(str(e)))
            raise

    def resubscribe(self):
        """Resubscribe to all current subscribed tokens."""
        modes = {}

        for token in self.subscribed_tokens:
            m = self.subscribed_tokens[token]

            if not modes.get(m):
                modes[m] = []

            modes[m].append(token)

        for mode in modes:
            if self.debug:
                log.debug("Resubscribe and set mode: {} - {}".format(mode, modes[mode]))

            self.subscribe(modes[mode])
            self.set_mode(mode, modes[mode])

    def _on_connect(self, ws, response):
        self.ws = ws
        if self.on_connect:
            self.on_connect(self, response)

    def _on_close(self, ws, code, reason):
        """Call `on_close` callback when connection is closed."""
        log.error("Connection closed: {} - {}".format(code, str(reason)))

        if self.on_close:
            self.on_close(self, code, reason)

    def _on_error(self, ws, code, reason):
        """Call `on_error` callback when connection throws an error."""
        log.error("Connection error: {} - {}".format(code, str(reason)))

        if self.on_error:
            self.on_error(self, code, reason)

    def _on_message(self, ws, payload, is_binary):
        """Call `on_message` callback when text message is received."""
        if self.on_message:
            self.on_message(self, payload, is_binary)

        # If the message is binary, parse it and send it to the callback.
        if self.on_ticks and is_binary and len(payload) > 4:
            self.on_ticks(self, self._parse_binary(payload))

        # Parse text messages
        if not is_binary:
            self._parse_text_message(payload)

    def _on_open(self, ws):
        # Resubscribe if its reconnect
        if not self._is_first_connect:
            self.resubscribe()

        # Set first connect to false once its connected first time
        self._is_first_connect = False

        if self.on_open:
            return self.on_open(self)

    def _on_reconnect(self, attempts_count):
        if self.on_reconnect:
            return self.on_reconnect(self, attempts_count)

    def _on_noreconnect(self):
        if self.on_noreconnect:
            return self.on_noreconnect(self)

    def _parse_text_message(self, payload):
        """Parse text message."""
        # Decode unicode data
        if not six.PY2 and type(payload) == bytes:
            payload = payload.decode("utf-8")

        try:
            data = json.loads(payload)
        except ValueError:
            return

        # Order update callback
        if self.on_order_update and data.get("type") == "order" and data.get("data"):
            self.on_order_update(self, data["data"])

        # Custom error with websocket error code 0
        if data.get("type") == "error":
            self._on_error(self, 0, data.get("data"))

    def _parse_binary(self, bin):
        """Parse binary data to a (list of) ticks structure."""
        packets = self._split_packets(bin)  # split data to individual ticks packet
        data = []

        for packet in packets:
            instrument_token = self._unpack_int(packet, 0, 4)
            segment = instrument_token & 0xff  # Retrive segment constant from instrument_token

            divisor = 10000000.0 if segment == self.EXCHANGE_MAP["cds"] else 100.0

            # All indices are not tradable
            tradable = False if segment == self.EXCHANGE_MAP["indices"] else True

            # LTP packets
            if len(packet) == 8:
                data.append({
                    "tradable": tradable,
                    "mode": self.MODE_LTP,
                    "instrument_token": instrument_token,
                    "last_price": self._unpack_int(packet, 4, 8) / divisor
                })
            # Indices quote and full mode
            elif len(packet) == 28 or len(packet) == 32:
                mode = self.MODE_QUOTE if len(packet) == 28 else self.MODE_FULL

                d = {
                    "tradable": tradable,
                    "mode": mode,
                    "instrument_token": instrument_token,
                    "last_price": self._unpack_int(packet, 4, 8) / divisor,
                    "ohlc": {
                        "high": self._unpack_int(packet, 8, 12) / divisor,
                        "low": self._unpack_int(packet, 12, 16) / divisor,
                        "open": self._unpack_int(packet, 16, 20) / divisor,
                        "close": self._unpack_int(packet, 20, 24) / divisor
                    }
                }

                # Compute the change price using close price and last price
                d["change"] = 0
                if(d["ohlc"]["close"] != 0):
                    d["change"] = (d["last_price"] - d["ohlc"]["close"]) * 100 / d["ohlc"]["close"]

                # Full mode with timestamp
                if len(packet) == 32:
                    try:
                        timestamp = datetime.fromtimestamp(self._unpack_int(packet, 28, 32))
                    except Exception:
                        timestamp = None

                    d["timestamp"] = timestamp

                data.append(d)
            # Quote and full mode
            elif len(packet) == 44 or len(packet) == 184:
                mode = self.MODE_QUOTE if len(packet) == 44 else self.MODE_FULL

                d = {
                    "tradable": tradable,
                    "mode": mode,
                    "instrument_token": instrument_token,
                    "last_price": self._unpack_int(packet, 4, 8) / divisor,
                    "last_quantity": self._unpack_int(packet, 8, 12),
                    "average_price": self._unpack_int(packet, 12, 16) / divisor,
                    "volume": self._unpack_int(packet, 16, 20),
                    "buy_quantity": self._unpack_int(packet, 20, 24),
                    "sell_quantity": self._unpack_int(packet, 24, 28),
                    "ohlc": {
                        "open": self._unpack_int(packet, 28, 32) / divisor,
                        "high": self._unpack_int(packet, 32, 36) / divisor,
                        "low": self._unpack_int(packet, 36, 40) / divisor,
                        "close": self._unpack_int(packet, 40, 44) / divisor
                    }
                }

                # Compute the change price using close price and last price
                d["change"] = 0
                if(d["ohlc"]["close"] != 0):
                    d["change"] = (d["last_price"] - d["ohlc"]["close"]) * 100 / d["ohlc"]["close"]

                # Parse full mode
                if len(packet) == 184:
                    try:
                        last_trade_time = datetime.fromtimestamp(self._unpack_int(packet, 44, 48))
                    except Exception:
                        last_trade_time = None

                    try:
                        timestamp = datetime.fromtimestamp(self._unpack_int(packet, 60, 64))
                    except Exception:
                        timestamp = None

                    d["last_trade_time"] = last_trade_time
                    d["oi"] = self._unpack_int(packet, 48, 52)
                    d["oi_day_high"] = self._unpack_int(packet, 52, 56)
                    d["oi_day_low"] = self._unpack_int(packet, 56, 60)
                    d["timestamp"] = timestamp

                    # Market depth entries.
                    depth = {
                        "buy": [],
                        "sell": []
                    }

                    # Compile the market depth lists.
                    for i, p in enumerate(range(64, len(packet), 12)):
                        depth["sell" if i >= 5 else "buy"].append({
                            "quantity": self._unpack_int(packet, p, p + 4),
                            "price": self._unpack_int(packet, p + 4, p + 8) / divisor,
                            "orders": self._unpack_int(packet, p + 8, p + 10, byte_format="H")
                        })

                    d["depth"] = depth

                data.append(d)

        return data

    def _unpack_int(self, bin, start, end, byte_format="I"):
        """Unpack binary data as unsgined interger."""
        return struct.unpack(">" + byte_format, bin[start:end])[0]

    def _split_packets(self, bin):
        """Split the data to individual packets of ticks."""
        # Ignore heartbeat data.
        if len(bin) < 2:
            return []

        number_of_packets = self._unpack_int(bin, 0, 2, byte_format="H")
        packets = []

        j = 2
        for i in range(number_of_packets):
            packet_length = self._unpack_int(bin, j, j + 2, byte_format="H")
            packets.append(bin[j + 2: j + 2 + packet_length])
            j = j + 2 + packet_length

        return packets

Module variables

var log

Classes

class KiteTicker

The WebSocket client for connecting to Kite Connect's streaming quotes service.

Getting started:

#!python
import logging
from kiteconnect import KiteTicker

logging.basicConfig(level=logging.DEBUG)

# Initialise
kws = KiteTicker("your_api_key", "your_access_token")

def on_ticks(ws, ticks):
    # Callback to receive ticks.
    logging.debug("Ticks: {}".format(ticks))

def on_connect(ws, response):
    # Callback on successful connect.
    # Subscribe to a list of instrument_tokens (RELIANCE and ACC here).
    ws.subscribe([738561, 5633])

    # Set RELIANCE to tick in `full` mode.
    ws.set_mode(ws.MODE_FULL, [738561])

def on_close(ws, code, reason):
    # On connection close stop the event loop.
    # Reconnection will not happen after executing `ws.stop()`
    ws.stop()

# Assign the callbacks.
kws.on_ticks = on_ticks
kws.on_connect = on_connect
kws.on_close = on_close

# Infinite loop on the main thread. Nothing after this will run.
# You have to use the pre-defined callbacks to manage subscriptions.
kws.connect()

Callbacks

In below examples ws is the currently initialised WebSocket object.

  • on_ticks(ws, ticks) - Triggered when ticks are recevied.
    • ticks - List of tick object. Check below for sample structure.
  • on_close(ws, code, reason) - Triggered when connection is closed.
    • code - WebSocket standard close event code (https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent)
    • reason - DOMString indicating the reason the server closed the connection
  • on_error(ws, code, reason) - Triggered when connection is closed with an error.
    • code - WebSocket standard close event code (https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent)
    • reason - DOMString indicating the reason the server closed the connection
  • on_connect - Triggered when connection is established successfully.
    • response - Response received from server on successful connection.
  • on_message(ws, payload, is_binary) - Triggered when message is received from the server.
    • payload - Raw response from the server (either text or binary).
    • is_binary - Bool to check if response is binary type.
  • on_reconnect(ws, attempts_count) - Triggered when auto reconnection is attempted.
    • attempts_count - Current reconnect attempt number.
  • on_noreconnect(ws) - Triggered when number of auto reconnection attempts exceeds reconnect_tries.
  • on_order_update(ws, data) - Triggered when there is an order update for the connected user.

Tick structure (passed to the on_ticks callback)

[{
    'instrument_token': 53490439,
    'mode': 'full',
    'volume': 12510,
    'last_price': 4084.0,
    'average_price': 4086.55,
    'last_quantity': 1,
    'buy_quantity': 2356
    'sell_quantity': 2440,
    'change': 0.46740467404674046,
    'last_trade_time': datetime.datetime(2018, 1, 15, 13, 16, 54),
    'timestamp': datetime.datetime(2018, 1, 15, 13, 16, 56),
    'oi': 21845,
    'oi_day_low': 0,
    'oi_day_high': 0,
    'ohlc': {
        'high': 4093.0,
        'close': 4065.0,
        'open': 4088.0,
        'low': 4080.0
    },
    'tradable': True,
    'depth': {
        'sell': [{
            'price': 4085.0,
            'orders': 1048576,
            'quantity': 43
        }, {
            'price': 4086.0,
            'orders': 2752512,
            'quantity': 134
        }, {
            'price': 4087.0,
            'orders': 1703936,
            'quantity': 133
        }, {
            'price': 4088.0,
            'orders': 1376256,
            'quantity': 70
        }, {
            'price': 4089.0,
            'orders': 1048576,
            'quantity': 46
        }],
        'buy': [{
            'price': 4084.0,
            'orders': 589824,
            'quantity': 53
        }, {
            'price': 4083.0,
            'orders': 1245184,
            'quantity': 145
        }, {
            'price': 4082.0,
            'orders': 1114112,
            'quantity': 63
        }, {
            'price': 4081.0,
            'orders': 1835008,
            'quantity': 69
        }, {
            'price': 4080.0,
            'orders': 2752512,
            'quantity': 89
        }]
    }
},
...,
...]

Auto reconnection

Auto reconnection is enabled by default and it can be disabled by passing reconnect param while initialising KiteTicker. On a side note, reconnection mechanism cannot happen if event loop is terminated using stop method inide on_close callback.

Auto reonnection mechanism is based on Exponential backoff algorithm in which next retry interval will be increased exponentially. reconnect_max_delay and reconnect_max_tries params can be used to tewak the alogrithm where reconnect_max_delay is the maximum delay after which subsequent reconnection interval will become constant and reconnect_max_tries is maximum number of retries before its quiting reconnection.

For example if reconnect_max_delay is 60 seconds and reconnect_max_tries is 50 then the first reconnection interval starts from minimum interval which is 2 seconds and keep increasing up to 60 seconds after which it becomes constant and when reconnection attempt is reached upto 50 then it stops reconnecting.

method stop_retry can be used to stop ongoing reconnect attempts and on_reconnect callback will be called with current reconnect attempt and on_noreconnect is called when reconnection attempts reaches max retries.

class KiteTicker(object):
    """
    The WebSocket client for connecting to Kite Connect's streaming quotes service.

    Getting started:
    ---------------
        #!python
        import logging
        from kiteconnect import KiteTicker

        logging.basicConfig(level=logging.DEBUG)

        # Initialise
        kws = KiteTicker("your_api_key", "your_access_token")

        def on_ticks(ws, ticks):
            # Callback to receive ticks.
            logging.debug("Ticks: {}".format(ticks))

        def on_connect(ws, response):
            # Callback on successful connect.
            # Subscribe to a list of instrument_tokens (RELIANCE and ACC here).
            ws.subscribe([738561, 5633])

            # Set RELIANCE to tick in `full` mode.
            ws.set_mode(ws.MODE_FULL, [738561])

        def on_close(ws, code, reason):
            # On connection close stop the event loop.
            # Reconnection will not happen after executing `ws.stop()`
            ws.stop()

        # Assign the callbacks.
        kws.on_ticks = on_ticks
        kws.on_connect = on_connect
        kws.on_close = on_close

        # Infinite loop on the main thread. Nothing after this will run.
        # You have to use the pre-defined callbacks to manage subscriptions.
        kws.connect()

    Callbacks
    ---------
    In below examples `ws` is the currently initialised WebSocket object.

    - `on_ticks(ws, ticks)` -  Triggered when ticks are recevied.
        - `ticks` - List of `tick` object. Check below for sample structure.
    - `on_close(ws, code, reason)` -  Triggered when connection is closed.
        - `code` - WebSocket standard close event code (https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent)
        - `reason` - DOMString indicating the reason the server closed the connection
    - `on_error(ws, code, reason)` -  Triggered when connection is closed with an error.
        - `code` - WebSocket standard close event code (https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent)
        - `reason` - DOMString indicating the reason the server closed the connection
    - `on_connect` -  Triggered when connection is established successfully.
        - `response` - Response received from server on successful connection.
    - `on_message(ws, payload, is_binary)` -  Triggered when message is received from the server.
        - `payload` - Raw response from the server (either text or binary).
        - `is_binary` - Bool to check if response is binary type.
    - `on_reconnect(ws, attempts_count)` -  Triggered when auto reconnection is attempted.
        - `attempts_count` - Current reconnect attempt number.
    - `on_noreconnect(ws)` -  Triggered when number of auto reconnection attempts exceeds `reconnect_tries`.
    - `on_order_update(ws, data)` -  Triggered when there is an order update for the connected user.


    Tick structure (passed to the `on_ticks` callback)
    ---------------------------
        [{
            'instrument_token': 53490439,
            'mode': 'full',
            'volume': 12510,
            'last_price': 4084.0,
            'average_price': 4086.55,
            'last_quantity': 1,
            'buy_quantity': 2356
            'sell_quantity': 2440,
            'change': 0.46740467404674046,
            'last_trade_time': datetime.datetime(2018, 1, 15, 13, 16, 54),
            'timestamp': datetime.datetime(2018, 1, 15, 13, 16, 56),
            'oi': 21845,
            'oi_day_low': 0,
            'oi_day_high': 0,
            'ohlc': {
                'high': 4093.0,
                'close': 4065.0,
                'open': 4088.0,
                'low': 4080.0
            },
            'tradable': True,
            'depth': {
                'sell': [{
                    'price': 4085.0,
                    'orders': 1048576,
                    'quantity': 43
                }, {
                    'price': 4086.0,
                    'orders': 2752512,
                    'quantity': 134
                }, {
                    'price': 4087.0,
                    'orders': 1703936,
                    'quantity': 133
                }, {
                    'price': 4088.0,
                    'orders': 1376256,
                    'quantity': 70
                }, {
                    'price': 4089.0,
                    'orders': 1048576,
                    'quantity': 46
                }],
                'buy': [{
                    'price': 4084.0,
                    'orders': 589824,
                    'quantity': 53
                }, {
                    'price': 4083.0,
                    'orders': 1245184,
                    'quantity': 145
                }, {
                    'price': 4082.0,
                    'orders': 1114112,
                    'quantity': 63
                }, {
                    'price': 4081.0,
                    'orders': 1835008,
                    'quantity': 69
                }, {
                    'price': 4080.0,
                    'orders': 2752512,
                    'quantity': 89
                }]
            }
        },
        ...,
        ...]

    Auto reconnection
    -----------------

    Auto reconnection is enabled by default and it can be disabled by passing `reconnect` param while initialising `KiteTicker`.
    On a side note, reconnection mechanism cannot happen if event loop is terminated using `stop` method inide `on_close` callback.

    Auto reonnection mechanism is based on [Exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff) algorithm in which
    next retry interval will be increased exponentially. `reconnect_max_delay` and `reconnect_max_tries` params can be used to tewak
    the alogrithm where `reconnect_max_delay` is the maximum delay after which subsequent reconnection interval will become constant and
    `reconnect_max_tries` is maximum number of retries before its quiting reconnection.

    For example if `reconnect_max_delay` is 60 seconds and `reconnect_max_tries` is 50 then the first reconnection interval starts from
    minimum interval which is 2 seconds and keep increasing up to 60 seconds after which it becomes constant and when reconnection attempt
    is reached upto 50 then it stops reconnecting.

    method `stop_retry` can be used to stop ongoing reconnect attempts and `on_reconnect` callback will be called with current reconnect
    attempt and `on_noreconnect` is called when reconnection attempts reaches max retries.
    """

    EXCHANGE_MAP = {
        "nse": 1,
        "nfo": 2,
        "cds": 3,
        "bse": 4,
        "bfo": 5,
        "bsecds": 6,
        "mcx": 7,
        "mcxsx": 8,
        "indices": 9
    }

    # Default connection timeout
    CONNECT_TIMEOUT = 30
    # Default Reconnect max delay.
    RECONNECT_MAX_DELAY = 60
    # Default reconnect attempts
    RECONNECT_MAX_TRIES = 50
    # Default root API endpoint. It's possible to
    # override this by passing the `root` parameter during initialisation.
    ROOT_URI = "wss://ws.kite.trade"

    # Available streaming modes.
    MODE_FULL = "full"
    MODE_QUOTE = "quote"
    MODE_LTP = "ltp"

    # Flag to set if its first connect
    _is_first_connect = True

    # Available actions.
    _message_code = 11
    _message_subscribe = "subscribe"
    _message_unsubscribe = "unsubscribe"
    _message_setmode = "mode"

    # Minimum delay which should be set between retries. User can't set less than this
    _minimum_reconnect_max_delay = 5
    # Maximum number or retries user can set
    _maximum_reconnect_max_tries = 300

    def __init__(self, api_key, access_token, debug=False, root=None,
                 reconnect=True, reconnect_max_tries=RECONNECT_MAX_TRIES, reconnect_max_delay=RECONNECT_MAX_DELAY,
                 connect_timeout=CONNECT_TIMEOUT):
        """
        Initialise websocket client instance.

        - `api_key` is the API key issued to you
        - `access_token` is the token obtained after the login flow in
            exchange for the `request_token`. Pre-login, this will default to None,
            but once you have obtained it, you should
            persist it in a database or session to pass
            to the Kite Connect class initialisation for subsequent requests.
        - `root` is the websocket API end point root. Unless you explicitly
            want to send API requests to a non-default endpoint, this
            can be ignored.
        - `reconnect` is a boolean to enable WebSocket autreconnect in case of network failure/disconnection.
        - `reconnect_max_delay` in seconds is the maximum delay after which subsequent reconnection interval will become constant. Defaults to 60s and minimum acceptable value is 5s.
        - `reconnect_max_tries` is maximum number reconnection attempts. Defaults to 50 attempts and maximum up to 300 attempts.
        - `connect_timeout` in seconds is the maximum interval after which connection is considered as timeout. Defaults to 30s.
        """
        self.root = root or self.ROOT_URI

        # Set max reconnect tries
        if reconnect_max_tries > self._maximum_reconnect_max_tries:
            log.warning("`reconnect_max_tries` can not be more than {val}. Setting to highest possible value - {val}.".format(
                val=self._maximum_reconnect_max_tries))
            self.reconnect_max_tries = self._maximum_reconnect_max_tries
        else:
            self.reconnect_max_tries = reconnect_max_tries

        # Set max reconnect delay
        if reconnect_max_delay < self._minimum_reconnect_max_delay:
            log.warning("`reconnect_max_delay` can not be less than {val}. Setting to lowest possible value - {val}.".format(
                val=self._minimum_reconnect_max_delay))
            self.reconnect_max_delay = self._minimum_reconnect_max_delay
        else:
            self.reconnect_max_delay = reconnect_max_delay

        self.connect_timeout = connect_timeout

        self.socket_url = "{root}?api_key={api_key}"\
            "&access_token={access_token}".format(
                root=self.root,
                api_key=api_key,
                access_token=access_token
            )

        # Debug enables logs
        self.debug = debug

        # Placeholders for callbacks.
        self.on_ticks = None
        self.on_open = None
        self.on_close = None
        self.on_error = None
        self.on_connect = None
        self.on_message = None
        self.on_reconnect = None
        self.on_noreconnect = None

        # Text message updates
        self.on_order_update = None

        # List of current subscribed tokens
        self.subscribed_tokens = {}

    def _create_connection(self, url, **kwargs):
        """Create a WebSocket client connection."""
        self.factory = KiteTickerClientFactory(url, **kwargs)

        # Alias for current websocket connection
        self.ws = self.factory.ws

        self.factory.debug = self.debug

        # Register private callbacks
        self.factory.on_open = self._on_open
        self.factory.on_error = self._on_error
        self.factory.on_close = self._on_close
        self.factory.on_message = self._on_message
        self.factory.on_connect = self._on_connect
        self.factory.on_reconnect = self._on_reconnect
        self.factory.on_noreconnect = self._on_noreconnect

        self.factory.maxDelay = self.reconnect_max_delay
        self.factory.maxRetries = self.reconnect_max_tries

    def _user_agent(self):
        return (__title__ + "-python/").capitalize() + __version__

    def connect(self, threaded=False, disable_ssl_verification=False, proxy=None):
        """
        Establish a websocket connection.

        - `threaded` is a boolean indicating if the websocket client has to be run in threaded mode or not
        - `disable_ssl_verification` disables building ssl context
        - `proxy` is a dictionary with keys `host` and `port` which denotes the proxy settings
        """
        # Custom headers
        headers = {
            "X-Kite-Version": "3",  # For version 3
        }

        # Init WebSocket client factory
        self._create_connection(self.socket_url,
                                useragent=self._user_agent(),
                                proxy=proxy, headers=headers)

        # Set SSL context
        context_factory = None
        if self.factory.isSecure and not disable_ssl_verification:
            context_factory = ssl.ClientContextFactory()

        # Establish WebSocket connection to a server
        connectWS(self.factory, contextFactory=context_factory, timeout=self.connect_timeout)

        if self.debug:
            twisted_log.startLogging(sys.stdout)

        # Run in seperate thread of blocking
        opts = {}

        # Run when reactor is not running
        if not reactor.running:
            if threaded:
                # Signals are not allowed in non main thread by twisted so suppress it.
                opts["installSignalHandlers"] = False
                self.websocket_thread = threading.Thread(target=reactor.run, kwargs=opts)
                self.websocket_thread.daemon = True
                self.websocket_thread.start()
            else:
                reactor.run(**opts)

    def is_connected(self):
        """Check if WebSocket connection is established."""
        if self.ws and self.ws.state == self.ws.STATE_OPEN:
            return True
        else:
            return False

    def _close(self, code=None, reason=None):
        """Close the WebSocket connection."""
        if self.ws:
            self.ws.sendClose(code, reason)

    def close(self, code=None, reason=None):
        """Close the WebSocket connection."""
        self.stop_retry()
        self._close(code, reason)

    def stop(self):
        """Stop the event loop. Should be used if main thread has to be closed in `on_close` method.
        Reconnection mechanism cannot happen past this method
        """
        reactor.stop()

    def stop_retry(self):
        """Stop auto retry when it is in progress."""
        if self.factory:
            self.factory.stopTrying()

    def subscribe(self, instrument_tokens):
        """
        Subscribe to a list of instrument_tokens.

        - `instrument_tokens` is list of instrument instrument_tokens to subscribe
        """
        try:
            self.ws.sendMessage(
                six.b(json.dumps({"a": self._message_subscribe, "v": instrument_tokens}))
            )

            for token in instrument_tokens:
                self.subscribed_tokens[token] = self.MODE_QUOTE

            return True
        except Exception as e:
            self._close(reason="Error while subscribe: {}".format(str(e)))
            raise

    def unsubscribe(self, instrument_tokens):
        """
        Unsubscribe the given list of instrument_tokens.

        - `instrument_tokens` is list of instrument_tokens to unsubscribe.
        """
        try:
            self.ws.sendMessage(
                six.b(json.dumps({"a": self._message_unsubscribe, "v": instrument_tokens}))
            )

            for token in instrument_tokens:
                try:
                    del(self.subscribed_tokens[token])
                except KeyError:
                    pass

            return True
        except Exception as e:
            self._close(reason="Error while unsubscribe: {}".format(str(e)))
            raise

    def set_mode(self, mode, instrument_tokens):
        """
        Set streaming mode for the given list of tokens.

        - `mode` is the mode to set. It can be one of the following class constants:
            MODE_LTP, MODE_QUOTE, or MODE_FULL.
        - `instrument_tokens` is list of instrument tokens on which the mode should be applied
        """
        try:
            self.ws.sendMessage(
                six.b(json.dumps({"a": self._message_setmode, "v": [mode, instrument_tokens]}))
            )

            # Update modes
            for token in instrument_tokens:
                self.subscribed_tokens[token] = mode

            return True
        except Exception as e:
            self._close(reason="Error while setting mode: {}".format(str(e)))
            raise

    def resubscribe(self):
        """Resubscribe to all current subscribed tokens."""
        modes = {}

        for token in self.subscribed_tokens:
            m = self.subscribed_tokens[token]

            if not modes.get(m):
                modes[m] = []

            modes[m].append(token)

        for mode in modes:
            if self.debug:
                log.debug("Resubscribe and set mode: {} - {}".format(mode, modes[mode]))

            self.subscribe(modes[mode])
            self.set_mode(mode, modes[mode])

    def _on_connect(self, ws, response):
        self.ws = ws
        if self.on_connect:
            self.on_connect(self, response)

    def _on_close(self, ws, code, reason):
        """Call `on_close` callback when connection is closed."""
        log.error("Connection closed: {} - {}".format(code, str(reason)))

        if self.on_close:
            self.on_close(self, code, reason)

    def _on_error(self, ws, code, reason):
        """Call `on_error` callback when connection throws an error."""
        log.error("Connection error: {} - {}".format(code, str(reason)))

        if self.on_error:
            self.on_error(self, code, reason)

    def _on_message(self, ws, payload, is_binary):
        """Call `on_message` callback when text message is received."""
        if self.on_message:
            self.on_message(self, payload, is_binary)

        # If the message is binary, parse it and send it to the callback.
        if self.on_ticks and is_binary and len(payload) > 4:
            self.on_ticks(self, self._parse_binary(payload))

        # Parse text messages
        if not is_binary:
            self._parse_text_message(payload)

    def _on_open(self, ws):
        # Resubscribe if its reconnect
        if not self._is_first_connect:
            self.resubscribe()

        # Set first connect to false once its connected first time
        self._is_first_connect = False

        if self.on_open:
            return self.on_open(self)

    def _on_reconnect(self, attempts_count):
        if self.on_reconnect:
            return self.on_reconnect(self, attempts_count)

    def _on_noreconnect(self):
        if self.on_noreconnect:
            return self.on_noreconnect(self)

    def _parse_text_message(self, payload):
        """Parse text message."""
        # Decode unicode data
        if not six.PY2 and type(payload) == bytes:
            payload = payload.decode("utf-8")

        try:
            data = json.loads(payload)
        except ValueError:
            return

        # Order update callback
        if self.on_order_update and data.get("type") == "order" and data.get("data"):
            self.on_order_update(self, data["data"])

        # Custom error with websocket error code 0
        if data.get("type") == "error":
            self._on_error(self, 0, data.get("data"))

    def _parse_binary(self, bin):
        """Parse binary data to a (list of) ticks structure."""
        packets = self._split_packets(bin)  # split data to individual ticks packet
        data = []

        for packet in packets:
            instrument_token = self._unpack_int(packet, 0, 4)
            segment = instrument_token & 0xff  # Retrive segment constant from instrument_token

            divisor = 10000000.0 if segment == self.EXCHANGE_MAP["cds"] else 100.0

            # All indices are not tradable
            tradable = False if segment == self.EXCHANGE_MAP["indices"] else True

            # LTP packets
            if len(packet) == 8:
                data.append({
                    "tradable": tradable,
                    "mode": self.MODE_LTP,
                    "instrument_token": instrument_token,
                    "last_price": self._unpack_int(packet, 4, 8) / divisor
                })
            # Indices quote and full mode
            elif len(packet) == 28 or len(packet) == 32:
                mode = self.MODE_QUOTE if len(packet) == 28 else self.MODE_FULL

                d = {
                    "tradable": tradable,
                    "mode": mode,
                    "instrument_token": instrument_token,
                    "last_price": self._unpack_int(packet, 4, 8) / divisor,
                    "ohlc": {
                        "high": self._unpack_int(packet, 8, 12) / divisor,
                        "low": self._unpack_int(packet, 12, 16) / divisor,
                        "open": self._unpack_int(packet, 16, 20) / divisor,
                        "close": self._unpack_int(packet, 20, 24) / divisor
                    }
                }

                # Compute the change price using close price and last price
                d["change"] = 0
                if(d["ohlc"]["close"] != 0):
                    d["change"] = (d["last_price"] - d["ohlc"]["close"]) * 100 / d["ohlc"]["close"]

                # Full mode with timestamp
                if len(packet) == 32:
                    try:
                        timestamp = datetime.fromtimestamp(self._unpack_int(packet, 28, 32))
                    except Exception:
                        timestamp = None

                    d["timestamp"] = timestamp

                data.append(d)
            # Quote and full mode
            elif len(packet) == 44 or len(packet) == 184:
                mode = self.MODE_QUOTE if len(packet) == 44 else self.MODE_FULL

                d = {
                    "tradable": tradable,
                    "mode": mode,
                    "instrument_token": instrument_token,
                    "last_price": self._unpack_int(packet, 4, 8) / divisor,
                    "last_quantity": self._unpack_int(packet, 8, 12),
                    "average_price": self._unpack_int(packet, 12, 16) / divisor,
                    "volume": self._unpack_int(packet, 16, 20),
                    "buy_quantity": self._unpack_int(packet, 20, 24),
                    "sell_quantity": self._unpack_int(packet, 24, 28),
                    "ohlc": {
                        "open": self._unpack_int(packet, 28, 32) / divisor,
                        "high": self._unpack_int(packet, 32, 36) / divisor,
                        "low": self._unpack_int(packet, 36, 40) / divisor,
                        "close": self._unpack_int(packet, 40, 44) / divisor
                    }
                }

                # Compute the change price using close price and last price
                d["change"] = 0
                if(d["ohlc"]["close"] != 0):
                    d["change"] = (d["last_price"] - d["ohlc"]["close"]) * 100 / d["ohlc"]["close"]

                # Parse full mode
                if len(packet) == 184:
                    try:
                        last_trade_time = datetime.fromtimestamp(self._unpack_int(packet, 44, 48))
                    except Exception:
                        last_trade_time = None

                    try:
                        timestamp = datetime.fromtimestamp(self._unpack_int(packet, 60, 64))
                    except Exception:
                        timestamp = None

                    d["last_trade_time"] = last_trade_time
                    d["oi"] = self._unpack_int(packet, 48, 52)
                    d["oi_day_high"] = self._unpack_int(packet, 52, 56)
                    d["oi_day_low"] = self._unpack_int(packet, 56, 60)
                    d["timestamp"] = timestamp

                    # Market depth entries.
                    depth = {
                        "buy": [],
                        "sell": []
                    }

                    # Compile the market depth lists.
                    for i, p in enumerate(range(64, len(packet), 12)):
                        depth["sell" if i >= 5 else "buy"].append({
                            "quantity": self._unpack_int(packet, p, p + 4),
                            "price": self._unpack_int(packet, p + 4, p + 8) / divisor,
                            "orders": self._unpack_int(packet, p + 8, p + 10, byte_format="H")
                        })

                    d["depth"] = depth

                data.append(d)

        return data

    def _unpack_int(self, bin, start, end, byte_format="I"):
        """Unpack binary data as unsgined interger."""
        return struct.unpack(">" + byte_format, bin[start:end])[0]

    def _split_packets(self, bin):
        """Split the data to individual packets of ticks."""
        # Ignore heartbeat data.
        if len(bin) < 2:
            return []

        number_of_packets = self._unpack_int(bin, 0, 2, byte_format="H")
        packets = []

        j = 2
        for i in range(number_of_packets):
            packet_length = self._unpack_int(bin, j, j + 2, byte_format="H")
            packets.append(bin[j + 2: j + 2 + packet_length])
            j = j + 2 + packet_length

        return packets

Ancestors (in MRO)

Class variables

var CONNECT_TIMEOUT

var EXCHANGE_MAP

var MODE_FULL

var MODE_LTP

var MODE_QUOTE

var RECONNECT_MAX_DELAY

var RECONNECT_MAX_TRIES

var ROOT_URI

Static methods

def __init__(

self, api_key, access_token, debug=False, root=None, reconnect=True, reconnect_max_tries=50, reconnect_max_delay=60, connect_timeout=30)

Initialise websocket client instance.

  • api_key is the API key issued to you
  • access_token is the token obtained after the login flow in exchange for the request_token. Pre-login, this will default to None, but once you have obtained it, you should persist it in a database or session to pass to the Kite Connect class initialisation for subsequent requests.
  • root is the websocket API end point root. Unless you explicitly want to send API requests to a non-default endpoint, this can be ignored.
  • reconnect is a boolean to enable WebSocket autreconnect in case of network failure/disconnection.
  • reconnect_max_delay in seconds is the maximum delay after which subsequent reconnection interval will become constant. Defaults to 60s and minimum acceptable value is 5s.
  • reconnect_max_tries is maximum number reconnection attempts. Defaults to 50 attempts and maximum up to 300 attempts.
  • connect_timeout in seconds is the maximum interval after which connection is considered as timeout. Defaults to 30s.
def __init__(self, api_key, access_token, debug=False, root=None,
             reconnect=True, reconnect_max_tries=RECONNECT_MAX_TRIES, reconnect_max_delay=RECONNECT_MAX_DELAY,
             connect_timeout=CONNECT_TIMEOUT):
    """
    Initialise websocket client instance.
    - `api_key` is the API key issued to you
    - `access_token` is the token obtained after the login flow in
        exchange for the `request_token`. Pre-login, this will default to None,
        but once you have obtained it, you should
        persist it in a database or session to pass
        to the Kite Connect class initialisation for subsequent requests.
    - `root` is the websocket API end point root. Unless you explicitly
        want to send API requests to a non-default endpoint, this
        can be ignored.
    - `reconnect` is a boolean to enable WebSocket autreconnect in case of network failure/disconnection.
    - `reconnect_max_delay` in seconds is the maximum delay after which subsequent reconnection interval will become constant. Defaults to 60s and minimum acceptable value is 5s.
    - `reconnect_max_tries` is maximum number reconnection attempts. Defaults to 50 attempts and maximum up to 300 attempts.
    - `connect_timeout` in seconds is the maximum interval after which connection is considered as timeout. Defaults to 30s.
    """
    self.root = root or self.ROOT_URI
    # Set max reconnect tries
    if reconnect_max_tries > self._maximum_reconnect_max_tries:
        log.warning("`reconnect_max_tries` can not be more than {val}. Setting to highest possible value - {val}.".format(
            val=self._maximum_reconnect_max_tries))
        self.reconnect_max_tries = self._maximum_reconnect_max_tries
    else:
        self.reconnect_max_tries = reconnect_max_tries
    # Set max reconnect delay
    if reconnect_max_delay < self._minimum_reconnect_max_delay:
        log.warning("`reconnect_max_delay` can not be less than {val}. Setting to lowest possible value - {val}.".format(
            val=self._minimum_reconnect_max_delay))
        self.reconnect_max_delay = self._minimum_reconnect_max_delay
    else:
        self.reconnect_max_delay = reconnect_max_delay
    self.connect_timeout = connect_timeout
    self.socket_url = "{root}?api_key={api_key}"\
        "&access_token={access_token}".format(
            root=self.root,
            api_key=api_key,
            access_token=access_token
        )
    # Debug enables logs
    self.debug = debug
    # Placeholders for callbacks.
    self.on_ticks = None
    self.on_open = None
    self.on_close = None
    self.on_error = None
    self.on_connect = None
    self.on_message = None
    self.on_reconnect = None
    self.on_noreconnect = None
    # Text message updates
    self.on_order_update = None
    # List of current subscribed tokens
    self.subscribed_tokens = {}

def close(

self, code=None, reason=None)

Close the WebSocket connection.

def close(self, code=None, reason=None):
    """Close the WebSocket connection."""
    self.stop_retry()
    self._close(code, reason)

def connect(

self, threaded=False, disable_ssl_verification=False, proxy=None)

Establish a websocket connection.

  • threaded is a boolean indicating if the websocket client has to be run in threaded mode or not
  • disable_ssl_verification disables building ssl context
  • proxy is a dictionary with keys host and port which denotes the proxy settings
def connect(self, threaded=False, disable_ssl_verification=False, proxy=None):
    """
    Establish a websocket connection.
    - `threaded` is a boolean indicating if the websocket client has to be run in threaded mode or not
    - `disable_ssl_verification` disables building ssl context
    - `proxy` is a dictionary with keys `host` and `port` which denotes the proxy settings
    """
    # Custom headers
    headers = {
        "X-Kite-Version": "3",  # For version 3
    }
    # Init WebSocket client factory
    self._create_connection(self.socket_url,
                            useragent=self._user_agent(),
                            proxy=proxy, headers=headers)
    # Set SSL context
    context_factory = None
    if self.factory.isSecure and not disable_ssl_verification:
        context_factory = ssl.ClientContextFactory()
    # Establish WebSocket connection to a server
    connectWS(self.factory, contextFactory=context_factory, timeout=self.connect_timeout)
    if self.debug:
        twisted_log.startLogging(sys.stdout)
    # Run in seperate thread of blocking
    opts = {}
    # Run when reactor is not running
    if not reactor.running:
        if threaded:
            # Signals are not allowed in non main thread by twisted so suppress it.
            opts["installSignalHandlers"] = False
            self.websocket_thread = threading.Thread(target=reactor.run, kwargs=opts)
            self.websocket_thread.daemon = True
            self.websocket_thread.start()
        else:
            reactor.run(**opts)

def is_connected(

self)

Check if WebSocket connection is established.

def is_connected(self):
    """Check if WebSocket connection is established."""
    if self.ws and self.ws.state == self.ws.STATE_OPEN:
        return True
    else:
        return False

def resubscribe(

self)

Resubscribe to all current subscribed tokens.

def resubscribe(self):
    """Resubscribe to all current subscribed tokens."""
    modes = {}
    for token in self.subscribed_tokens:
        m = self.subscribed_tokens[token]
        if not modes.get(m):
            modes[m] = []
        modes[m].append(token)
    for mode in modes:
        if self.debug:
            log.debug("Resubscribe and set mode: {} - {}".format(mode, modes[mode]))
        self.subscribe(modes[mode])
        self.set_mode(mode, modes[mode])

def set_mode(

self, mode, instrument_tokens)

Set streaming mode for the given list of tokens.

  • mode is the mode to set. It can be one of the following class constants: MODE_LTP, MODE_QUOTE, or MODE_FULL.
  • instrument_tokens is list of instrument tokens on which the mode should be applied
def set_mode(self, mode, instrument_tokens):
    """
    Set streaming mode for the given list of tokens.
    - `mode` is the mode to set. It can be one of the following class constants:
        MODE_LTP, MODE_QUOTE, or MODE_FULL.
    - `instrument_tokens` is list of instrument tokens on which the mode should be applied
    """
    try:
        self.ws.sendMessage(
            six.b(json.dumps({"a": self._message_setmode, "v": [mode, instrument_tokens]}))
        )
        # Update modes
        for token in instrument_tokens:
            self.subscribed_tokens[token] = mode
        return True
    except Exception as e:
        self._close(reason="Error while setting mode: {}".format(str(e)))
        raise

def stop(

self)

Stop the event loop. Should be used if main thread has to be closed in on_close method. Reconnection mechanism cannot happen past this method

def stop(self):
    """Stop the event loop. Should be used if main thread has to be closed in `on_close` method.
    Reconnection mechanism cannot happen past this method
    """
    reactor.stop()

def stop_retry(

self)

Stop auto retry when it is in progress.

def stop_retry(self):
    """Stop auto retry when it is in progress."""
    if self.factory:
        self.factory.stopTrying()

def subscribe(

self, instrument_tokens)

Subscribe to a list of instrument_tokens.

  • instrument_tokens is list of instrument instrument_tokens to subscribe
def subscribe(self, instrument_tokens):
    """
    Subscribe to a list of instrument_tokens.
    - `instrument_tokens` is list of instrument instrument_tokens to subscribe
    """
    try:
        self.ws.sendMessage(
            six.b(json.dumps({"a": self._message_subscribe, "v": instrument_tokens}))
        )
        for token in instrument_tokens:
            self.subscribed_tokens[token] = self.MODE_QUOTE
        return True
    except Exception as e:
        self._close(reason="Error while subscribe: {}".format(str(e)))
        raise

def unsubscribe(

self, instrument_tokens)

Unsubscribe the given list of instrument_tokens.

  • instrument_tokens is list of instrument_tokens to unsubscribe.
def unsubscribe(self, instrument_tokens):
    """
    Unsubscribe the given list of instrument_tokens.
    - `instrument_tokens` is list of instrument_tokens to unsubscribe.
    """
    try:
        self.ws.sendMessage(
            six.b(json.dumps({"a": self._message_unsubscribe, "v": instrument_tokens}))
        )
        for token in instrument_tokens:
            try:
                del(self.subscribed_tokens[token])
            except KeyError:
                pass
        return True
    except Exception as e:
        self._close(reason="Error while unsubscribe: {}".format(str(e)))
        raise

Instance variables

var connect_timeout

var debug

var on_close

var on_connect

var on_error

var on_message

var on_noreconnect

var on_open

var on_order_update

var on_reconnect

var on_ticks

var root

var socket_url

var subscribed_tokens

class KiteTickerClientFactory

Autobahn WebSocket client factory to implement reconnection and custom callbacks.

class KiteTickerClientFactory(WebSocketClientFactory, ReconnectingClientFactory):
    """Autobahn WebSocket client factory to implement reconnection and custom callbacks."""

    protocol = KiteTickerClientProtocol
    maxDelay = 5
    maxRetries = 10

    _last_connection_time = None

    def __init__(self, *args, **kwargs):
        """Initialize with default callback method values."""
        self.debug = False
        self.ws = None
        self.on_open = None
        self.on_error = None
        self.on_close = None
        self.on_message = None
        self.on_connect = None
        self.on_reconnect = None
        self.on_noreconnect = None

        super(KiteTickerClientFactory, self).__init__(*args, **kwargs)

    def startedConnecting(self, connector):  # noqa
        """On connecting start or reconnection."""
        if not self._last_connection_time and self.debug:
            log.debug("Start WebSocket connection.")

        self._last_connection_time = time.time()

    def clientConnectionFailed(self, connector, reason):  # noqa
        """On connection failure (When connect request fails)"""
        if self.retries > 0:
            log.error("Retrying connection. Retry attempt count: {}. Next retry in around: {} seconds".format(self.retries, int(round(self.delay))))

            # on reconnect callback
            if self.on_reconnect:
                self.on_reconnect(self.retries)

        # Retry the connection
        self.retry(connector)
        self.send_noreconnect()

    def clientConnectionLost(self, connector, reason):  # noqa
        """On connection lost (When ongoing connection got disconnected)."""
        if self.retries > 0:
            # on reconnect callback
            if self.on_reconnect:
                self.on_reconnect(self.retries)

        # Retry the connection
        self.retry(connector)
        self.send_noreconnect()

    def send_noreconnect(self):
        """Callback `no_reconnect` if max retries are exhausted."""
        if self.maxRetries is not None and (self.retries > self.maxRetries):
            if self.debug:
                log.debug("Maximum retries ({}) exhausted.".format(self.maxRetries))

            if self.on_noreconnect:
                self.on_noreconnect()

Ancestors (in MRO)

  • KiteTickerClientFactory
  • autobahn.twisted.websocket.WebSocketClientFactory
  • autobahn.twisted.websocket.WebSocketAdapterFactory
  • autobahn.websocket.protocol.WebSocketClientFactory
  • autobahn.websocket.protocol.WebSocketFactory
  • twisted.internet.protocol.ReconnectingClientFactory
  • twisted.internet.protocol.ClientFactory
  • twisted.internet.protocol.Factory
  • builtins.object

Class variables

var clock

var connector

var continueTrying

var delay

var factor

var initialDelay

var isServer

var jitter

var maxDelay

var maxRetries

var noisy

var numPorts

var protocol

var retries

Static methods

def __init__(

self, *args, **kwargs)

Initialize with default callback method values.

def __init__(self, *args, **kwargs):
    """Initialize with default callback method values."""
    self.debug = False
    self.ws = None
    self.on_open = None
    self.on_error = None
    self.on_close = None
    self.on_message = None
    self.on_connect = None
    self.on_reconnect = None
    self.on_noreconnect = None
    super(KiteTickerClientFactory, self).__init__(*args, **kwargs)

def buildProtocol(

self, addr)

Create an instance of a subclass of Protocol.

The returned instance will handle input on an incoming server connection, and an attribute "factory" pointing to the creating factory.

Alternatively, L{None} may be returned to immediately close the new connection.

Override this method to alter how Protocol instances get created.

@param addr: an object implementing L{twisted.internet.interfaces.IAddress}

def buildProtocol(self, addr):
    """
    Create an instance of a subclass of Protocol.
    The returned instance will handle input on an incoming server
    connection, and an attribute "factory" pointing to the creating
    factory.
    Alternatively, L{None} may be returned to immediately close the
    new connection.
    Override this method to alter how Protocol instances get created.
    @param addr: an object implementing L{twisted.internet.interfaces.IAddress}
    """
    p = self.protocol()
    p.factory = self
    return p

def clientConnectionFailed(

self, connector, reason)

On connection failure (When connect request fails)

def clientConnectionFailed(self, connector, reason):  # noqa
    """On connection failure (When connect request fails)"""
    if self.retries > 0:
        log.error("Retrying connection. Retry attempt count: {}. Next retry in around: {} seconds".format(self.retries, int(round(self.delay))))
        # on reconnect callback
        if self.on_reconnect:
            self.on_reconnect(self.retries)
    # Retry the connection
    self.retry(connector)
    self.send_noreconnect()

def clientConnectionLost(

self, connector, reason)

On connection lost (When ongoing connection got disconnected).

def clientConnectionLost(self, connector, reason):  # noqa
    """On connection lost (When ongoing connection got disconnected)."""
    if self.retries > 0:
        # on reconnect callback
        if self.on_reconnect:
            self.on_reconnect(self.retries)
    # Retry the connection
    self.retry(connector)
    self.send_noreconnect()

def doStart(

self)

Make sure startFactory is called.

Users should not call this function themselves!

def doStart(self):
    """
    Make sure startFactory is called.
    Users should not call this function themselves!
    """
    if not self.numPorts:
        if self.noisy:
            _loggerFor(self).info("Starting factory {factory!r}",
                                  factory=self)
        self.startFactory()
    self.numPorts = self.numPorts + 1

def doStop(

self)

Make sure stopFactory is called.

Users should not call this function themselves!

def doStop(self):
    """
    Make sure stopFactory is called.
    Users should not call this function themselves!
    """
    if self.numPorts == 0:
        # This shouldn't happen, but does sometimes and this is better
        # than blowing up in assert as we did previously.
        return
    self.numPorts = self.numPorts - 1
    if not self.numPorts:
        if self.noisy:
            _loggerFor(self).info("Stopping factory {factory!r}",
                                  factory=self)
        self.stopFactory()

def logPrefix(

self)

Describe this factory for log messages.

def logPrefix(self):
    """
    Describe this factory for log messages.
    """
    return self.__class__.__name__

def prepareMessage(

self, payload, isBinary=False, doNotCompress=False)

Prepare a WebSocket message. This can be later sent on multiple instances of :class:autobahn.websocket.WebSocketProtocol using :meth:autobahn.websocket.WebSocketProtocol.sendPreparedMessage.

By doing so, you can avoid the (small) overhead of framing the same payload into WebSocket messages multiple times when that same payload is to be sent out on multiple connections.

:param payload: The message payload. :type payload: bytes :param isBinary: True iff payload is binary, else the payload must be UTF-8 encoded text. :type isBinary: bool :param doNotCompress: Iff True, never compress this message. This only applies when WebSocket compression has been negotiated on the WebSocket connection. Use when you know the payload incompressible (e.g. encrypted or already compressed). :type doNotCompress: bool

:returns: obj -- An instance of :class:autobahn.websocket.protocol.PreparedMessage.

def prepareMessage(self, payload, isBinary=False, doNotCompress=False):
    """
    Prepare a WebSocket message. This can be later sent on multiple
    instances of :class:`autobahn.websocket.WebSocketProtocol` using
    :meth:`autobahn.websocket.WebSocketProtocol.sendPreparedMessage`.
    By doing so, you can avoid the (small) overhead of framing the
    *same* payload into WebSocket messages multiple times when that
    same payload is to be sent out on multiple connections.
    :param payload: The message payload.
    :type payload: bytes
    :param isBinary: `True` iff payload is binary, else the payload must be
        UTF-8 encoded text.
    :type isBinary: bool
    :param doNotCompress: Iff `True`, never compress this message. This
        only applies when WebSocket compression has been negotiated on the
        WebSocket connection. Use when you know the payload incompressible
        (e.g. encrypted or already compressed).
    :type doNotCompress: bool
    :returns: obj -- An instance of :class:`autobahn.websocket.protocol.PreparedMessage`.
    """
    applyMask = not self.isServer
    return PreparedMessage(payload, isBinary, applyMask, doNotCompress)

def resetDelay(

self)

Call this method after a successful connection: it resets the delay and the retry counter.

def resetDelay(self):
    """
    Call this method after a successful connection: it resets the delay and
    the retry counter.
    """
    self.delay = self.initialDelay
    self.retries = 0
    self._callID = None
    self.continueTrying = 1

def resetProtocolOptions(

self)

Implements :func:autobahn.websocket.interfaces.IWebSocketClientChannelFactory.resetProtocolOptions

def resetProtocolOptions(self):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketClientChannelFactory.resetProtocolOptions`
    """
    self.version = WebSocketProtocol.DEFAULT_SPEC_VERSION
    self.utf8validateIncoming = True
    self.acceptMaskedServerFrames = False
    self.maskClientFrames = True
    self.applyMask = True
    self.maxFramePayloadSize = 0
    self.maxMessagePayloadSize = 0
    self.autoFragmentSize = 0
    self.failByDrop = True
    self.echoCloseCodeReason = False
    self.serverConnectionDropTimeout = 1
    self.openHandshakeTimeout = 5
    self.closeHandshakeTimeout = 1
    self.tcpNoDelay = True
    # permessage-XXX extensions
    #
    self.perMessageCompressionOffers = []
    self.perMessageCompressionAccept = lambda _: None
    # automatic ping/pong ("heartbeating")
    #
    self.autoPingInterval = 0
    self.autoPingTimeout = 0
    self.autoPingSize = 4

def retry(

self, connector=None)

Have this connector connect again, after a suitable delay.

def retry(self, connector=None):
    """
    Have this connector connect again, after a suitable delay.
    """
    if not self.continueTrying:
        if self.noisy:
            log.msg("Abandoning %s on explicit request" % (connector,))
        return
    if connector is None:
        if self.connector is None:
            raise ValueError("no connector to retry")
        else:
            connector = self.connector
    self.retries += 1
    if self.maxRetries is not None and (self.retries > self.maxRetries):
        if self.noisy:
            log.msg("Abandoning %s after %d retries." %
                    (connector, self.retries))
        return
    self.delay = min(self.delay * self.factor, self.maxDelay)
    if self.jitter:
        self.delay = random.normalvariate(self.delay,
                                          self.delay * self.jitter)
    if self.noisy:
        log.msg("%s will retry in %d seconds" % (connector, self.delay,))
    def reconnector():
        self._callID = None
        connector.connect()
    if self.clock is None:
        from twisted.internet import reactor
        self.clock = reactor
    self._callID = self.clock.callLater(self.delay, reconnector)

def send_noreconnect(

self)

Callback no_reconnect if max retries are exhausted.

def send_noreconnect(self):
    """Callback `no_reconnect` if max retries are exhausted."""
    if self.maxRetries is not None and (self.retries > self.maxRetries):
        if self.debug:
            log.debug("Maximum retries ({}) exhausted.".format(self.maxRetries))
        if self.on_noreconnect:
            self.on_noreconnect()

def setProtocolOptions(

self, version=None, utf8validateIncoming=None, acceptMaskedServerFrames=None, maskClientFrames=None, applyMask=None, maxFramePayloadSize=None, maxMessagePayloadSize=None, autoFragmentSize=None, failByDrop=None, echoCloseCodeReason=None, serverConnectionDropTimeout=None, openHandshakeTimeout=None, closeHandshakeTimeout=None, tcpNoDelay=None, perMessageCompressionOffers=None, perMessageCompressionAccept=None, autoPingInterval=None, autoPingTimeout=None, autoPingSize=None)

Implements :func:autobahn.websocket.interfaces.IWebSocketClientChannelFactory.setProtocolOptions

def setProtocolOptions(self,
                       version=None,
                       utf8validateIncoming=None,
                       acceptMaskedServerFrames=None,
                       maskClientFrames=None,
                       applyMask=None,
                       maxFramePayloadSize=None,
                       maxMessagePayloadSize=None,
                       autoFragmentSize=None,
                       failByDrop=None,
                       echoCloseCodeReason=None,
                       serverConnectionDropTimeout=None,
                       openHandshakeTimeout=None,
                       closeHandshakeTimeout=None,
                       tcpNoDelay=None,
                       perMessageCompressionOffers=None,
                       perMessageCompressionAccept=None,
                       autoPingInterval=None,
                       autoPingTimeout=None,
                       autoPingSize=None):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketClientChannelFactory.setProtocolOptions`
    """
    if version is not None:
        if version not in WebSocketProtocol.SUPPORTED_SPEC_VERSIONS:
            raise Exception("invalid WebSocket draft version %s (allowed values: %s)" % (version, str(WebSocketProtocol.SUPPORTED_SPEC_VERSIONS)))
        if version != self.version:
            self.version = version
    if utf8validateIncoming is not None and utf8validateIncoming != self.utf8validateIncoming:
        self.utf8validateIncoming = utf8validateIncoming
    if acceptMaskedServerFrames is not None and acceptMaskedServerFrames != self.acceptMaskedServerFrames:
        self.acceptMaskedServerFrames = acceptMaskedServerFrames
    if maskClientFrames is not None and maskClientFrames != self.maskClientFrames:
        self.maskClientFrames = maskClientFrames
    if applyMask is not None and applyMask != self.applyMask:
        self.applyMask = applyMask
    if maxFramePayloadSize is not None and maxFramePayloadSize != self.maxFramePayloadSize:
        self.maxFramePayloadSize = maxFramePayloadSize
    if maxMessagePayloadSize is not None and maxMessagePayloadSize != self.maxMessagePayloadSize:
        self.maxMessagePayloadSize = maxMessagePayloadSize
    if autoFragmentSize is not None and autoFragmentSize != self.autoFragmentSize:
        self.autoFragmentSize = autoFragmentSize
    if failByDrop is not None and failByDrop != self.failByDrop:
        self.failByDrop = failByDrop
    if echoCloseCodeReason is not None and echoCloseCodeReason != self.echoCloseCodeReason:
        self.echoCloseCodeReason = echoCloseCodeReason
    if serverConnectionDropTimeout is not None and serverConnectionDropTimeout != self.serverConnectionDropTimeout:
        self.serverConnectionDropTimeout = serverConnectionDropTimeout
    if openHandshakeTimeout is not None and openHandshakeTimeout != self.openHandshakeTimeout:
        self.openHandshakeTimeout = openHandshakeTimeout
    if closeHandshakeTimeout is not None and closeHandshakeTimeout != self.closeHandshakeTimeout:
        self.closeHandshakeTimeout = closeHandshakeTimeout
    if tcpNoDelay is not None and tcpNoDelay != self.tcpNoDelay:
        self.tcpNoDelay = tcpNoDelay
    if perMessageCompressionOffers is not None and pickle.dumps(perMessageCompressionOffers) != pickle.dumps(self.perMessageCompressionOffers):
        if type(perMessageCompressionOffers) == list:
            #
            # FIXME: more rigorous verification of passed argument
            #
            self.perMessageCompressionOffers = copy.deepcopy(perMessageCompressionOffers)
        else:
            raise Exception("invalid type %s for perMessageCompressionOffers - expected list" % type(perMessageCompressionOffers))
    if perMessageCompressionAccept is not None and perMessageCompressionAccept != self.perMessageCompressionAccept:
        self.perMessageCompressionAccept = perMessageCompressionAccept
    if autoPingInterval is not None and autoPingInterval != self.autoPingInterval:
        self.autoPingInterval = autoPingInterval
    if autoPingTimeout is not None and autoPingTimeout != self.autoPingTimeout:
        self.autoPingTimeout = autoPingTimeout
    if autoPingSize is not None and autoPingSize != self.autoPingSize:
        assert(type(autoPingSize) == float or type(autoPingSize) in six.integer_types)
        assert(4 <= autoPingSize <= 125)
        self.autoPingSize = autoPingSize

def setSessionParameters(

self, url=None, origin=None, protocols=None, useragent=None, headers=None, proxy=None)

Implements :func:autobahn.websocket.interfaces.IWebSocketClientChannelFactory.setSessionParameters

def setSessionParameters(self,
                         url=None,
                         origin=None,
                         protocols=None,
                         useragent=None,
                         headers=None,
                         proxy=None):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketClientChannelFactory.setSessionParameters`
    """
    # parse WebSocket URI into components
    (isSecure, host, port, resource, path, params) = parse_url(url or "ws://localhost")
    self.url = url
    self.isSecure = isSecure
    self.host = host
    self.port = port
    self.resource = resource
    self.path = path
    self.params = params
    self.origin = origin
    self.protocols = protocols or []
    self.useragent = useragent
    self.headers = headers or {}
    self.proxy = proxy

def startFactory(

self)

This will be called before I begin listening on a Port or Connector.

It will only be called once, even if the factory is connected to multiple ports.

This can be used to perform 'unserialization' tasks that are best put off until things are actually running, such as connecting to a database, opening files, etcetera.

def startFactory(self):
    """
    This will be called before I begin listening on a Port or Connector.
    It will only be called once, even if the factory is connected
    to multiple ports.
    This can be used to perform 'unserialization' tasks that
    are best put off until things are actually running, such
    as connecting to a database, opening files, etcetera.
    """

def startedConnecting(

self, connector)

On connecting start or reconnection.

def startedConnecting(self, connector):  # noqa
    """On connecting start or reconnection."""
    if not self._last_connection_time and self.debug:
        log.debug("Start WebSocket connection.")
    self._last_connection_time = time.time()

def stopFactory(

self)

This will be called before I stop listening on all Ports/Connectors.

This can be overridden to perform 'shutdown' tasks such as disconnecting database connections, closing files, etc.

It will be called, for example, before an application shuts down, if it was connected to a port. User code should not call this function directly.

def stopFactory(self):
    """
    This will be called before I stop listening on all Ports/Connectors.
    This can be overridden to perform 'shutdown' tasks such as disconnecting
    database connections, closing files, etc.
    It will be called, for example, before an application shuts down,
    if it was connected to a port. User code should not call this function
    directly.
    """

def stopTrying(

self)

Put a stop to any attempt to reconnect in progress.

def stopTrying(self):
    """
    Put a stop to any attempt to reconnect in progress.
    """
    # ??? Is this function really stopFactory?
    if self._callID:
        self._callID.cancel()
        self._callID = None
    self.continueTrying = 0
    if self.connector:
        try:
            self.connector.stopConnecting()
        except error.NotConnectingError:
            pass

Instance variables

var debug

var on_close

var on_connect

var on_error

var on_message

var on_noreconnect

var on_open

var on_reconnect

var ws

Methods

def forProtocol(

cls, protocol, *args, **kwargs)

Create a factory for the given protocol.

It sets the C{protocol} attribute and returns the constructed factory instance.

@param protocol: A L{Protocol} subclass

@param args: Positional arguments for the factory.

@param kwargs: Keyword arguments for the factory.

@return: A L{Factory} instance wired up to C{protocol}.

@classmethod
def forProtocol(cls, protocol, *args, **kwargs):
    """
    Create a factory for the given protocol.
    It sets the C{protocol} attribute and returns the constructed factory
    instance.
    @param protocol: A L{Protocol} subclass
    @param args: Positional arguments for the factory.
    @param kwargs: Keyword arguments for the factory.
    @return: A L{Factory} instance wired up to C{protocol}.
    """
    factory = cls(*args, **kwargs)
    factory.protocol = protocol
    return factory

class KiteTickerClientProtocol

Kite ticker autobahn WebSocket protocol.

class KiteTickerClientProtocol(WebSocketClientProtocol):
    """Kite ticker autobahn WebSocket protocol."""

    PING_INTERVAL = 2.5
    KEEPALIVE_INTERVAL = 5

    _ping_message = ""
    _next_ping = None
    _next_pong_check = None
    _last_pong_time = None
    _last_ping_time = None

    def __init__(self, *args, **kwargs):
        """Initialize protocol with all options passed from factory."""
        super(KiteTickerClientProtocol, self).__init__(*args, **kwargs)

    # Overide method
    def onConnect(self, response):  # noqa
        """Called when WebSocket server connection was established"""
        self.factory.ws = self

        if self.factory.on_connect:
            self.factory.on_connect(self, response)

        # Reset reconnect on successful reconnect
        self.factory.resetDelay()

    # Overide method
    def onOpen(self):  # noqa
        """Called when the initial WebSocket opening handshake was completed."""
        # send ping
        self._loop_ping()
        # init last pong check after X seconds
        self._loop_pong_check()

        if self.factory.on_open:
            self.factory.on_open(self)

    # Overide method
    def onMessage(self, payload, is_binary):  # noqa
        """Called when text or binary message is received."""
        if self.factory.on_message:
            self.factory.on_message(self, payload, is_binary)

    # Overide method
    def onClose(self, was_clean, code, reason):  # noqa
        """Called when connection is closed."""
        if not was_clean:
            if self.factory.on_error:
                self.factory.on_error(self, code, reason)

        if self.factory.on_close:
            self.factory.on_close(self, code, reason)

        # Cancel next ping and timer
        self._last_ping_time = None
        self._last_pong_time = None

        if self._next_ping:
            self._next_ping.cancel()

        if self._next_pong_check:
            self._next_pong_check.cancel()

    def onPong(self, response):  # noqa
        """Called when pong message is received."""
        if self._last_pong_time and self.factory.debug:
            log.debug("last pong was {} seconds back.".format(time.time() - self._last_pong_time))

        self._last_pong_time = time.time()

        if self.factory.debug:
            log.debug("pong => {}".format(response))

    """
    Custom helper and exposed methods.
    """
    def _loop_ping(self): # noqa
        """Start a ping loop where it sends ping message every X seconds."""
        if self.factory.debug:
            log.debug("ping => {}".format(self._ping_message))
            if self._last_ping_time:
                log.debug("last ping was {} seconds back.".format(time.time() - self._last_ping_time))

        # Set current time as last ping time
        self._last_ping_time = time.time()
        # Send a ping message to server
        self.sendPing(self._ping_message)

        # Call self after X seconds
        self._next_ping = self.factory.reactor.callLater(self.PING_INTERVAL, self._loop_ping)

    def _loop_pong_check(self):
        """
        Timer sortof to check if connection is still there.

        Checks last pong message time and disconnects the existing connection to make sure it doesn't become a ghost connection.
        """
        if self._last_pong_time:
            # No pong message since long time, so init reconnect
            last_pong_diff = time.time() - self._last_pong_time
            if last_pong_diff > (2 * self.PING_INTERVAL):
                if self.factory.debug:
                    log.debug("Last pong was {} seconds ago. So dropping connection to reconnect.".format(
                        last_pong_diff))
                # drop existing connection to avoid ghost connection
                self.dropConnection(abort=True)

        # Call self after X seconds
        self._next_pong_check = self.factory.reactor.callLater(self.PING_INTERVAL, self._loop_pong_check)

Ancestors (in MRO)

  • KiteTickerClientProtocol
  • autobahn.twisted.websocket.WebSocketClientProtocol
  • autobahn.twisted.websocket.WebSocketAdapterProtocol
  • twisted.internet.protocol.Protocol
  • twisted.internet.protocol.BaseProtocol
  • autobahn.websocket.protocol.WebSocketClientProtocol
  • autobahn.websocket.protocol.WebSocketProtocol
  • autobahn.util.ObservableMixin
  • builtins.object

Class variables

var CLOSE_STATUS_CODES_ALLOWED

var CLOSE_STATUS_CODE_ABNORMAL_CLOSE

var CLOSE_STATUS_CODE_GOING_AWAY

var CLOSE_STATUS_CODE_INTERNAL_ERROR

var CLOSE_STATUS_CODE_INVALID_PAYLOAD

var CLOSE_STATUS_CODE_MANDATORY_EXTENSION

var CLOSE_STATUS_CODE_MESSAGE_TOO_BIG

var CLOSE_STATUS_CODE_NORMAL

var CLOSE_STATUS_CODE_NULL

var CLOSE_STATUS_CODE_POLICY_VIOLATION

var CLOSE_STATUS_CODE_PROTOCOL_ERROR

var CLOSE_STATUS_CODE_RESERVED1

var CLOSE_STATUS_CODE_SERVICE_RESTART

var CLOSE_STATUS_CODE_TLS_HANDSHAKE_FAILED

var CLOSE_STATUS_CODE_TRY_AGAIN_LATER

var CLOSE_STATUS_CODE_UNASSIGNED1

var CLOSE_STATUS_CODE_UNSUPPORTED_DATA

var CONFIG_ATTRS

var CONFIG_ATTRS_CLIENT

var CONFIG_ATTRS_COMMON

var CONFIG_ATTRS_SERVER

var DEFAULT_SPEC_VERSION

var KEEPALIVE_INTERVAL

var MESSAGE_TYPE_BINARY

var MESSAGE_TYPE_TEXT

var PING_INTERVAL

var PROTOCOL_TO_SPEC_VERSION

var SEND_STATE_GROUND

var SEND_STATE_INSIDE_MESSAGE

var SEND_STATE_INSIDE_MESSAGE_FRAME

var SEND_STATE_MESSAGE_BEGIN

var SPEC_TO_PROTOCOL_VERSION

var STATE_CLOSED

var STATE_CLOSING

var STATE_CONNECTING

var STATE_OPEN

var STATE_PROXY_CONNECTING

var SUPPORTED_PROTOCOL_VERSIONS

var SUPPORTED_SPEC_VERSIONS

var connected

var peer

var transport

Static methods

def __init__(

self, *args, **kwargs)

Initialize protocol with all options passed from factory.

def __init__(self, *args, **kwargs):
    """Initialize protocol with all options passed from factory."""
    super(KiteTickerClientProtocol, self).__init__(*args, **kwargs)

def beginMessage(

self, isBinary=False, doNotCompress=False)

Implements :func:autobahn.websocket.interfaces.IWebSocketChannel.beginMessage

def beginMessage(self, isBinary=False, doNotCompress=False):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.beginMessage`
    """
    if self.state != WebSocketProtocol.STATE_OPEN:
        return
    # check if sending state is valid for this method
    #
    if self.send_state != WebSocketProtocol.SEND_STATE_GROUND:
        raise Exception("WebSocketProtocol.beginMessage invalid in current sending state")
    self.send_message_opcode = WebSocketProtocol.MESSAGE_TYPE_BINARY if isBinary else WebSocketProtocol.MESSAGE_TYPE_TEXT
    self.send_state = WebSocketProtocol.SEND_STATE_MESSAGE_BEGIN
    # setup compressor
    #
    if self._perMessageCompress is not None and not doNotCompress:
        self.send_compressed = True
        self._perMessageCompress.start_compress_message()
    else:
        self.send_compressed = False
    self.trafficStats.outgoingWebSocketMessages += 1

def beginMessageFrame(

self, length)

Implements :func:autobahn.websocket.interfaces.IWebSocketChannel.beginMessageFrame

def beginMessageFrame(self, length):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.beginMessageFrame`
    """
    if self.state != WebSocketProtocol.STATE_OPEN:
        return
    # check if sending state is valid for this method
    #
    if self.send_state not in [WebSocketProtocol.SEND_STATE_MESSAGE_BEGIN, WebSocketProtocol.SEND_STATE_INSIDE_MESSAGE]:
        raise Exception("WebSocketProtocol.beginMessageFrame invalid in current sending state [%d]" % self.send_state)
    if type(length) != int or length < 0 or length > 0x7FFFFFFFFFFFFFFF:  # 2**63
        raise Exception("invalid value for message frame length")
    self.send_message_frame_length = length
    self.trafficStats.outgoingWebSocketFrames += 1
    if (not self.factory.isServer and self.maskClientFrames) or (self.factory.isServer and self.maskServerFrames):
        # automatic mask:
        # - client-to-server masking (if not deactivated)
        # - server-to-client masking (if activated)
        #
        # see note above about getrandbits
        self.send_message_frame_mask = struct.pack("!I", random.getrandbits(32))
    else:
        # no mask
        #
        self.send_message_frame_mask = None
    # payload masker
    #
    if self.send_message_frame_mask and length > 0 and self.applyMask:
        self.send_message_frame_masker = create_xor_masker(self.send_message_frame_mask, length)
    else:
        self.send_message_frame_masker = XorMaskerNull()
    # first byte
    #
    # FIN = false .. since with streaming, we don't know when message ends
    b0 = 0
    if self.send_state == WebSocketProtocol.SEND_STATE_MESSAGE_BEGIN:
        b0 |= self.send_message_opcode % 128
        if self.send_compressed:
            b0 |= (4 % 8) << 4
        self.send_state = WebSocketProtocol.SEND_STATE_INSIDE_MESSAGE
    else:
        pass  # message continuation frame
    # second byte, payload len bytes and mask
    #
    b1 = 0
    if self.send_message_frame_mask:
        b1 |= 1 << 7
        mv = self.send_message_frame_mask
    else:
        mv = b''
    el = b''
    if length <= 125:
        b1 |= length
    elif length <= 0xFFFF:
        b1 |= 126
        el = struct.pack("!H", length)
    elif length <= 0x7FFFFFFFFFFFFFFF:
        b1 |= 127
        el = struct.pack("!Q", length)
    else:
        raise Exception("invalid payload length")
    # write message frame header
    #
    if six.PY3:
        header = b''.join([b0.to_bytes(1, 'big'), b1.to_bytes(1, 'big'), el, mv])
    else:
        header = b''.join([chr(b0), chr(b1), el, mv])
    self.sendData(header)
    # now we are inside message frame ..
    #
    self.send_state = WebSocketProtocol.SEND_STATE_INSIDE_MESSAGE_FRAME

def connectionLost(

self, reason)

Called when the connection is shut down.

Clear any circular references here, and any external references to this Protocol. The connection has been closed.

@type reason: L{twisted.python.failure.Failure}

def connectionLost(self, reason):
    if isinstance(reason.value, ConnectionDone):
        self.log.debug("Connection to/from {peer} was closed cleanly",
                       peer=self.peer)
    elif _is_tls_error(reason.value):
        self.log.error(_maybe_tls_reason(reason.value))
    elif isinstance(reason.value, ConnectionAborted):
        self.log.debug("Connection to/from {peer} was aborted locally",
                       peer=self.peer)
    elif isinstance(reason.value, ConnectionLost):
        message = str(reason.value)
        if hasattr(reason.value, 'message'):
            message = reason.value.message
        self.log.debug(
            "Connection to/from {peer} was lost in a non-clean fashion: {message}",
            peer=self.peer,
            message=message,
        )
    # at least: FileDescriptorOverrun, ConnectionFdescWentAway - but maybe others as well?
    else:
        self.log.debug("Connection to/from {peer} lost ({error_type}): {error})",
                       peer=self.peer, error_type=type(reason.value), error=reason.value)
    self._connectionLost(reason)

def connectionMade(

self)

Called when a connection is made.

This may be considered the initializer of the protocol, because it is called when the connection is completed. For clients, this is called once the connection to the server has been established; for servers, this is called after an accept() call stops blocking and a socket has been received. If you need to send any greeting or initial message, do it here.

def connectionMade(self):
    # the peer we are connected to
    try:
        self.peer = peer2str(self.transport.getPeer())
    except AttributeError:
        # ProcessProtocols lack getPeer()
        self.peer = u'process:{}'.format(self.transport.pid)
    self._connectionMade()
    self.log.debug('Connection made to {peer}', peer=self.peer)
    # Set "Nagle"
    try:
        self.transport.setTcpNoDelay(self.tcpNoDelay)
    except:  # don't touch this! does not work: AttributeError, OSError
        # eg Unix Domain sockets throw Errno 22 on this
        pass

def consumeData(

self)

Consume buffered (incoming) data.

def consumeData(self):
    """
    Consume buffered (incoming) data.
    """
    # WebSocket is open (handshake was completed) or close was sent
    #
    if self.state == WebSocketProtocol.STATE_OPEN or self.state == WebSocketProtocol.STATE_CLOSING:
        # process until no more buffered data left or WS was closed
        #
        while self.processData() and self.state != WebSocketProtocol.STATE_CLOSED:
            pass
    # need to establish proxy connection
    #
    elif self.state == WebSocketProtocol.STATE_PROXY_CONNECTING:
        self.processProxyConnect()
    # WebSocket needs handshake
    #
    elif self.state == WebSocketProtocol.STATE_CONNECTING:
        # the implementation of processHandshake() in derived
        # class needs to perform client or server handshake
        # from other party here ..
        #
        self.processHandshake()
    # we failed the connection .. don't process any more data!
    #
    elif self.state == WebSocketProtocol.STATE_CLOSED:
        # ignore any data received after WS was closed
        #
        self.log.debug("received data in STATE_CLOSED")
    # should not arrive here (invalid state)
    #
    else:
        raise Exception("invalid state")

def dataReceived(

self, data)

Called whenever data is received.

Use this method to translate to a higher-level message. Usually, some callback will be made upon the receipt of each complete protocol message.

@param data: a string of indeterminate length. Please keep in mind that you will probably need to buffer some data, as partial (or multiple) protocol messages may be received! I recommend that unit tests for protocols call through to this method with differing chunk sizes, down to one byte at a time.

def dataReceived(self, data):
    self._dataReceived(data)

def dropConnection(

self, abort=False)

Drop the underlying TCP connection.

def dropConnection(self, abort=False):
    """
    Drop the underlying TCP connection.
    """
    self.unregisterProducer()
    if self.state != WebSocketProtocol.STATE_CLOSED:
        if self.wasClean:
            self.log.debug('dropping connection to peer {peer} with abort={abort}', peer=self.peer, abort=abort)
        else:
            self.log.warn('dropping connection to peer {peer} with abort={abort}: {reason}', peer=self.peer, abort=abort, reason=self.wasNotCleanReason)
        self.droppedByMe = True
        # this code-path will be hit (*without* hitting
        # _connectionLost) in some timeout scenarios (unit-tests
        # cover these). However, sometimes we hit both.
        self.state = WebSocketProtocol.STATE_CLOSED
        txaio.resolve(self.is_closed, self)
        self._closeConnection(abort)
    else:
        self.log.debug('dropping connection to peer {peer} skipped - connection already closed', peer=self.peer)

def endMessage(

self)

Implements :func:autobahn.websocket.interfaces.IWebSocketChannel.endMessage

def endMessage(self):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.endMessage`
    """
    if self.state != WebSocketProtocol.STATE_OPEN:
        return
    # check if sending state is valid for this method
    #
    # if self.send_state != WebSocketProtocol.SEND_STATE_INSIDE_MESSAGE:
    #   raise Exception("WebSocketProtocol.endMessage invalid in current sending state [%d]" % self.send_state)
    if self.send_compressed:
        payload = self._perMessageCompress.end_compress_message()
        self.trafficStats.outgoingOctetsWebSocketLevel += len(payload)
    else:
        # send continuation frame with empty payload and FIN set to end message
        payload = b''
    self.sendFrame(opcode=0, payload=payload, fin=True)
    self.send_state = WebSocketProtocol.SEND_STATE_GROUND

def failHandshake(

self, reason)

During opening handshake the server response is invalid and we drop the connection.

def failHandshake(self, reason):
    """
    During opening handshake the server response is invalid and we drop the
    connection.
    """
    self.wasNotCleanReason = reason
    self.log.info(
        "failing WebSocket opening handshake ('{reason}')",
        reason=reason,
    )
    self.dropConnection(abort=True)

def failProxyConnect(

self, reason)

During initial explicit proxy connect, the server response indicates some failure and we drop the connection.

def failProxyConnect(self, reason):
    """
    During initial explicit proxy connect, the server response indicates some failure and we drop the
    connection.
    """
    self.log.debug("failing proxy connect ('{reason}')", reason=reason)
    self.dropConnection(abort=True)

def fire(

self, event, *args, **kwargs)

Fire a particular event.

:param event: the event to fire. All other args and kwargs are passed on to the handler(s) for the event.

:return: a Deferred/Future gathering all async results from all handlers and/or parent handlers.

def fire(self, event, *args, **kwargs):
    """
    Fire a particular event.
    :param event: the event to fire. All other args and kwargs are
        passed on to the handler(s) for the event.
    :return: a Deferred/Future gathering all async results from
        all handlers and/or parent handlers.
    """
    # print("firing '{}' from '{}'".format(event, hash(self)))
    if self._listeners is None:
        return txaio.create_future(result=[])
    self._check_event(event)
    res = []
    for handler in self._listeners.get(event, []):
        future = txaio.as_future(handler, *args, **kwargs)
        res.append(future)
    if self._parent is not None:
        res.append(self._parent.fire(event, *args, **kwargs))
    return txaio.gather(res, consume_exceptions=False)

def get_channel_id(

self, channel_id_type='tls-unique')

Implements :func:autobahn.wamp.interfaces.ITransport.get_channel_id

def get_channel_id(self, channel_id_type=u'tls-unique'):
    """
    Implements :func:`autobahn.wamp.interfaces.ITransport.get_channel_id`
    """
    return transport_channel_id(self.transport, False, channel_id_type)

def logPrefix(

self)

Return a prefix matching the class name, to identify log messages related to this protocol instance.

def logPrefix(self):
    """
    Return a prefix matching the class name, to identify log messages
    related to this protocol instance.
    """
    return self.__class__.__name__

def logRxFrame(

self, frameHeader, payload)

Hook fired right after WebSocket frame has been received and decoded, but only when self.logFrames == True.

def logRxFrame(self, frameHeader, payload):
    """
    Hook fired right after WebSocket frame has been received and decoded,
    but only when self.logFrames == True.
    """
    data = b''.join(payload)
    self.log.debug(
        "RX Frame from {peer} : fin = {fin}, rsv = {rsv}, opcode = {opcode}"
        ", mask = {mask}, length = {length}, payload = {payload}",
        peer=self.peer,
        fin=frameHeader.fin,
        rsv=frameHeader.rsv,
        opcode=frameHeader.opcode,
        mask=binascii.b2a_hex(frameHeader.mask) if frameHeader.mask else "-",
        length=frameHeader.length,
        payload=repr(data) if frameHeader.opcode == 1 else _LazyHexFormatter(data),
    )

def logRxOctets(

self, data)

Hook fired right after raw octets have been received, but only when self.logOctets == True.

def logRxOctets(self, data):
    """
    Hook fired right after raw octets have been received, but only when
    self.logOctets == True.
    """
    self.log.debug(
        "RX Octets from {peer} : octets = {octets}",
        peer=self.peer,
        octets=_LazyHexFormatter(data),
    )

def logTxFrame(

self, frameHeader, payload, repeatLength, chopsize, sync)

Hook fired right after WebSocket frame has been encoded and sent, but only when self.logFrames == True.

def logTxFrame(self, frameHeader, payload, repeatLength, chopsize, sync):
    """
    Hook fired right after WebSocket frame has been encoded and sent, but
    only when self.logFrames == True.
    """
    self.log.debug(
        "TX Frame to {peer} : fin = {fin}, rsv = {rsv}, opcode = {opcode}, "
        "mask = {mask}, length = {length}, repeat_length = {repeat_length},"
        " chopsize = {chopsize}, sync = {sync}, payload = {payload}",
        peer=self.peer,
        fin=frameHeader.fin,
        rsv=frameHeader.rsv,
        opcode=frameHeader.opcode,
        mask=binascii.b2a_hex(frameHeader.mask) if frameHeader.mask else "-",
        length=frameHeader.length,
        repeat_length=repeatLength,
        chopsize=chopsize,
        sync=sync,
        payload=repr(payload) if frameHeader.opcode == 1 else _LazyHexFormatter(payload),
    )

def logTxOctets(

self, data, sync)

Hook fired right after raw octets have been sent, but only when self.logOctets == True.

def logTxOctets(self, data, sync):
    """
    Hook fired right after raw octets have been sent, but only when
    self.logOctets == True.
    """
    self.log.debug(
        "TX Octets to {peer} : sync = {sync}, octets = {octets}",
        peer=self.peer,
        sync=sync,
        octets=_LazyHexFormatter(data),
    )

def makeConnection(

self, transport)

Make a connection to a transport and a server.

This sets the 'transport' attribute of this Protocol, and calls the connectionMade() callback.

def makeConnection(self, transport):
    """
    Make a connection to a transport and a server.
    This sets the 'transport' attribute of this Protocol, and calls the
    connectionMade() callback.
    """
    self.connected = 1
    self.transport = transport
    self.connectionMade()

def off(

self, event=None, handler=None)

Stop listening for a single event, or all events.

:param event: if None, remove all listeners. Otherwise, remove listeners for the single named event.

:param handler: if None, remove all handlers for the named event; otherwise remove just the given handler.

def off(self, event=None, handler=None):
    """
    Stop listening for a single event, or all events.
    :param event: if None, remove all listeners. Otherwise, remove
        listeners for the single named event.
    :param handler: if None, remove all handlers for the named
        event; otherwise remove just the given handler.
    """
    if event is None:
        if handler is not None:
            # maybe this should mean "remove the given handler
            # from any event at all that contains it"...?
            raise RuntimeError(
                "Can't specificy a specific handler without an event"
            )
        self._listeners = dict()
    else:
        if self._listeners is None:
            return
        self._check_event(event)
        if event in self._listeners:
            if handler is None:
                del self._listeners[event]
            else:
                self._listeners[event].discard(handler)

def on(

self, event, handler)

Add a handler for an event.

:param event: the name of the event

:param handler: a callable thats invoked when .fire() is called for this events. Arguments will be whatever are given to .fire()

def on(self, event, handler):
    """
    Add a handler for an event.
    :param event: the name of the event
    :param handler: a callable thats invoked when .fire() is
        called for this events. Arguments will be whatever are given
        to .fire()
    """
    # print("adding '{}' to '{}': {}".format(event, hash(self), handler))
    self._check_event(event)
    if self._listeners is None:
        self._listeners = dict()
    if event not in self._listeners:
        self._listeners[event] = []
    self._listeners[event].append(handler)

def onAutoPingTimeout(

self)

When doing automatic ping/pongs to detect broken connection, the peer did not reply in time to our ping. We drop the connection.

def onAutoPingTimeout(self):
    """
    When doing automatic ping/pongs to detect broken connection, the peer
    did not reply in time to our ping. We drop the connection.
    """
    self.wasClean = False
    self.wasNotCleanReason = u'WebSocket ping timeout (peer did not respond with pong in time)'
    self.autoPingTimeoutCall = None
    self.dropConnection(abort=True)

def onClose(

self, was_clean, code, reason)

Called when connection is closed.

def onClose(self, was_clean, code, reason):  # noqa
    """Called when connection is closed."""
    if not was_clean:
        if self.factory.on_error:
            self.factory.on_error(self, code, reason)
    if self.factory.on_close:
        self.factory.on_close(self, code, reason)
    # Cancel next ping and timer
    self._last_ping_time = None
    self._last_pong_time = None
    if self._next_ping:
        self._next_ping.cancel()
    if self._next_pong_check:
        self._next_pong_check.cancel()

def onCloseFrame(

self, code, reasonRaw)

Callback when a Close frame was received. The default implementation answers by sending a Close when no Close was sent before. Otherwise it drops the TCP connection either immediately (when we are a server) or after a timeout (when we are a client and expect the server to drop the TCP).

:param code: Close status code, if there was one (:class:WebSocketProtocol.CLOSE_STATUS_CODE_*). :type code: int :param reasonRaw: Close reason (when present, a status code MUST have been also be present). :type reasonRaw: bytes

def onCloseFrame(self, code, reasonRaw):
    """
    Callback when a Close frame was received. The default implementation answers by
    sending a Close when no Close was sent before. Otherwise it drops
    the TCP connection either immediately (when we are a server) or after a timeout
    (when we are a client and expect the server to drop the TCP).
    :param code: Close status code, if there was one (:class:`WebSocketProtocol`.CLOSE_STATUS_CODE_*).
    :type code: int
    :param reasonRaw: Close reason (when present, a status code MUST have been also be present).
    :type reasonRaw: bytes
    """
    self.remoteCloseCode = None
    self.remoteCloseReason = None
    # reserved close codes: 0-999, 1004, 1005, 1006, 1011-2999, >= 5000
    #
    if code is not None and (code < 1000 or (1000 <= code <= 2999 and code not in WebSocketProtocol.CLOSE_STATUS_CODES_ALLOWED) or code >= 5000):
        if self._protocol_violation(u'invalid close code {}'.format(code)):
            return True
        else:
            self.remoteCloseCode = WebSocketProtocol.CLOSE_STATUS_CODE_NORMAL
    else:
        self.remoteCloseCode = code
    # closing reason
    #
    if reasonRaw is not None:
        # we use our own UTF-8 validator to get consistent and fully conformant
        # UTF-8 validation behavior
        u = Utf8Validator()
        val = u.validate(reasonRaw)
        # the UTF8 must be valid _and_ end on a Unicode code point
        if not (val[0] and val[1]):
            if self._invalid_payload(u'invalid close reason (non-UTF8 payload)'):
                return True
        else:
            self.remoteCloseReason = reasonRaw.decode('utf8')
    # handle receive of close frame depending on protocol state
    #
    if self.state == WebSocketProtocol.STATE_CLOSING:
        # We already initiated the closing handshake, so this
        # is the peer's reply to our close frame.
        # cancel any closing HS timer if present
        #
        if self.closeHandshakeTimeoutCall is not None:
            self.log.debug("connection closed properly: canceling closing handshake timeout")
            self.closeHandshakeTimeoutCall.cancel()
            self.closeHandshakeTimeoutCall = None
        self.wasClean = True
        if self.factory.isServer:
            # When we are a server, we immediately drop the TCP.
            self.dropConnection(abort=True)
        else:
            # When we are a client, the server should drop the TCP
            # If that doesn't happen, we do. And that will set wasClean = False.
            if self.serverConnectionDropTimeout > 0:
                self.serverConnectionDropTimeoutCall = txaio.call_later(
                    self.serverConnectionDropTimeout,
                    self.onServerConnectionDropTimeout,
                )
    elif self.state == WebSocketProtocol.STATE_OPEN:
        # The peer initiates a closing handshake, so we reply
        # by sending close frame.
        self.wasClean = True
        if self.websocket_version == 0:
            self.sendCloseFrame(isReply=True)
        else:
            # Either reply with same code/reason, or code == NORMAL/reason=None
            if self.echoCloseCodeReason:
                self.sendCloseFrame(code=self.remoteCloseCode, reasonUtf8=encode_truncate(self.remoteCloseReason, 123), isReply=True)
            else:
                self.sendCloseFrame(code=WebSocketProtocol.CLOSE_STATUS_CODE_NORMAL, isReply=True)
        if self.factory.isServer:
            # When we are a server, we immediately drop the TCP.
            self.dropConnection(abort=False)
        else:
            # When we are a client, we expect the server to drop the TCP,
            # and when the server fails to do so, a timeout in sendCloseFrame()
            # will set wasClean = False back again.
            pass
    elif self.state == WebSocketProtocol.STATE_CLOSED:
        # The peer initiated a closing handshake but dropped the TCP immediately.
        self.wasClean = False
    else:
        # STATE_PROXY_CONNECTING, STATE_CONNECTING
        raise Exception("logic error")

def onCloseHandshakeTimeout(

self)

We expected the peer to respond to us initiating a close handshake. It didn't respond (in time self.closeHandshakeTimeout) with a close response frame though. So we drop the connection, but set self.wasClean = False.

def onCloseHandshakeTimeout(self):
    """
    We expected the peer to respond to us initiating a close handshake. It didn't
    respond (in time self.closeHandshakeTimeout) with a close response frame though.
    So we drop the connection, but set self.wasClean = False.
    """
    self.closeHandshakeTimeoutCall = None
    if self.state != WebSocketProtocol.STATE_CLOSED:
        self.wasClean = False
        self.wasNotCleanReason = u'WebSocket closing handshake timeout (peer did not finish the opening handshake in time)'
        self.wasCloseHandshakeTimeout = True
        self.dropConnection(abort=True)
    else:
        self.log.debug('skipping closing handshake timeout: WebSocket connection is already closed')

def onConnect(

self, response)

Called when WebSocket server connection was established

def onConnect(self, response):  # noqa
    """Called when WebSocket server connection was established"""
    self.factory.ws = self
    if self.factory.on_connect:
        self.factory.on_connect(self, response)
    # Reset reconnect on successful reconnect
    self.factory.resetDelay()

def onConnecting(

self, transport_details)

:param transport_details: a :class:autobahn.websocket.types.TransportDetails

Callback fired after the connection is established, but before the handshake has started. This may return a :class:autobahn.websocket.types.ConnectingRequest instance (or a future which resolves to one) to control aspects of the handshake (or None for defaults)

def onConnecting(self, transport_details):
    """
    :param transport_details: a :class:`autobahn.websocket.types.TransportDetails`
    Callback fired after the connection is established, but before the
    handshake has started. This may return a
    :class:`autobahn.websocket.types.ConnectingRequest` instance
    (or a future which resolves to one) to control aspects of the
    handshake (or None for defaults)
    """
    pass

def onFrameBegin(

self)

Begin of receive new frame.

def onFrameBegin(self):
    """
    Begin of receive new frame.
    """
    if self.current_frame.opcode > 7:
        self.control_frame_data = []
    else:
        # new message started
        #
        if not self.inside_message:
            self.inside_message = True
            # setup decompressor
            #
            if self._perMessageCompress is not None and self.current_frame.rsv == 4:
                self._isMessageCompressed = True
                self._perMessageCompress.start_decompress_message()
            else:
                self._isMessageCompressed = False
            # setup UTF8 validator
            #
            if self.current_frame.opcode == WebSocketProtocol.MESSAGE_TYPE_TEXT and self.utf8validateIncoming:
                self.utf8validator.reset()
                self.utf8validateIncomingCurrentMessage = True
                self.utf8validateLast = (True, True, 0, 0)
            else:
                self.utf8validateIncomingCurrentMessage = False
            # track timings
            #
            if self.trackedTimings:
                self.trackedTimings.track("onMessageBegin")
            # fire onMessageBegin
            #
            self._onMessageBegin(self.current_frame.opcode == WebSocketProtocol.MESSAGE_TYPE_BINARY)
        self._onMessageFrameBegin(self.current_frame.length)

def onFrameData(

self, payload)

New data received within frame.

def onFrameData(self, payload):
    """
    New data received within frame.
    """
    if self.current_frame.opcode > 7:
        self.control_frame_data.append(payload)
    else:
        # decompress frame payload
        #
        if self._isMessageCompressed:
            compressedLen = len(payload)
            self.log.debug(
                "RX compressed [length]: octets",
                legnth=compressedLen,
                octets=_LazyHexFormatter(payload),
            )
            # XXX oberstet
            payload = self._perMessageCompress.decompress_message_data(payload)
            uncompressedLen = len(payload)
        else:
            l = len(payload)
            compressedLen = l
            uncompressedLen = l
        if self.state == WebSocketProtocol.STATE_OPEN:
            self.trafficStats.incomingOctetsWebSocketLevel += compressedLen
            self.trafficStats.incomingOctetsAppLevel += uncompressedLen
        # incrementally validate UTF-8 payload
        #
        if self.utf8validateIncomingCurrentMessage:
            self.utf8validateLast = self.utf8validator.validate(payload)
            if not self.utf8validateLast[0]:
                if self._invalid_payload(u'encountered invalid UTF-8 while processing text message at payload octet index {}'.format(self.utf8validateLast[3])):
                    return False
        self._onMessageFrameData(payload)

def onFrameEnd(

self)

End of frame received.

def onFrameEnd(self):
    """
    End of frame received.
    """
    if self.current_frame.opcode > 7:
        if self.logFrames:
            self.logRxFrame(self.current_frame, self.control_frame_data)
        self.processControlFrame()
    else:
        if self.state == WebSocketProtocol.STATE_OPEN:
            self.trafficStats.incomingWebSocketFrames += 1
        if self.logFrames:
            self.logRxFrame(self.current_frame, self.frame_data)
        self._onMessageFrameEnd()
        if self.current_frame.fin:
            # handle end of compressed message
            #
            if self._isMessageCompressed:
                self._perMessageCompress.end_decompress_message()
            # verify UTF8 has actually ended
            #
            if self.utf8validateIncomingCurrentMessage:
                if not self.utf8validateLast[1]:
                    if self._invalid_payload(u'UTF-8 text message payload ended within Unicode code point at payload octet index {}'.format(self.utf8validateLast[3])):
                        return False
            if self.state == WebSocketProtocol.STATE_OPEN:
                self.trafficStats.incomingWebSocketMessages += 1
            self._onMessageEnd()
            self.inside_message = False
    self.current_frame = None

def onMessage(

self, payload, is_binary)

Called when text or binary message is received.

def onMessage(self, payload, is_binary):  # noqa
    """Called when text or binary message is received."""
    if self.factory.on_message:
        self.factory.on_message(self, payload, is_binary)

def onMessageBegin(

self, isBinary)

Implements :func:autobahn.websocket.interfaces.IWebSocketChannel.onMessageBegin

def onMessageBegin(self, isBinary):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onMessageBegin`
    """
    self.message_is_binary = isBinary
    self.message_data = []
    self.message_data_total_length = 0

def onMessageEnd(

self)

Implements :func:autobahn.websocket.interfaces.IWebSocketChannel.onMessageEnd

def onMessageEnd(self):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onMessageEnd`
    """
    if not self.failedByMe:
        payload = b''.join(self.message_data)
        if self.trackedTimings:
            self.trackedTimings.track("onMessage")
        self._onMessage(payload, self.message_is_binary)
        # notify any listeners about this message
        f = self.fire("message", payload, is_binary=self.message_is_binary)
        def error(f):
            self.log.error(
                "Firing signal 'message' failed: {fail}",
                fail=f,
            )
            # all we can really do here is log; user code error
        txaio.add_callbacks(f, None, error)
    self.message_data = None

def onMessageFrame(

self, payload)

Implements :func:autobahn.websocket.interfaces.IWebSocketChannel.onMessageFrame

def onMessageFrame(self, payload):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onMessageFrame`
    """
    if not self.failedByMe:
        self.message_data.extend(payload)

def onMessageFrameBegin(

self, length)

Implements :func:autobahn.websocket.interfaces.IWebSocketChannel.onMessageFrameBegin

def onMessageFrameBegin(self, length):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onMessageFrameBegin`
    """
    self.frame_length = length
    self.frame_data = []
    self.message_data_total_length += length
    if not self.failedByMe:
        if 0 < self.maxMessagePayloadSize < self.message_data_total_length:
            self.wasMaxMessagePayloadSizeExceeded = True
            self._max_message_size_exceeded(self.message_data_total_length,
                                            self.maxMessagePayloadSize,
                                            u'received WebSocket message size {} exceeds payload limit of {} octets'.format(self.message_data_total_length, self.maxMessagePayloadSize))
        elif 0 < self.maxFramePayloadSize < length:
            self.wasMaxFramePayloadSizeExceeded = True
            self._max_message_size_exceeded(length,
                                            self.maxFramePayloadSize,
                                            u'received WebSocket frame size {} exceeds payload limit of {} octets'.format(length, self.maxFramePayloadSize))

def onMessageFrameData(

self, payload)

Implements :func:autobahn.websocket.interfaces.IWebSocketChannel.onMessageFrameData

def onMessageFrameData(self, payload):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onMessageFrameData`
    """
    if not self.failedByMe:
        if self.websocket_version == 0:
            self.message_data_total_length += len(payload)
            if 0 < self.maxMessagePayloadSize < self.message_data_total_length:
                self.wasMaxMessagePayloadSizeExceeded = True
                self._max_message_size_exceeded(self.message_data_total_length,
                                                self.maxMessagePayloadSize,
                                                u'received (partial) WebSocket message size {} (already) exceeds payload limit of {} octets'.format(self.message_data_total_length, self.maxMessagePayloadSize))
            self.message_data.append(payload)
        else:
            self.frame_data.append(payload)

def onMessageFrameEnd(

self)

Implements :func:autobahn.websocket.interfaces.IWebSocketChannel.onMessageFrameEnd

def onMessageFrameEnd(self):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onMessageFrameEnd`
    """
    if not self.failedByMe:
        self._onMessageFrame(self.frame_data)
    self.frame_data = None

def onOpen(

self)

Called when the initial WebSocket opening handshake was completed.

def onOpen(self):  # noqa
    """Called when the initial WebSocket opening handshake was completed."""
    # send ping
    self._loop_ping()
    # init last pong check after X seconds
    self._loop_pong_check()
    if self.factory.on_open:
        self.factory.on_open(self)

def onOpenHandshakeTimeout(

self)

We expected the peer to complete the opening handshake with to us. It didn't do so (in time self.openHandshakeTimeout). So we drop the connection, but set self.wasClean = False.

def onOpenHandshakeTimeout(self):
    """
    We expected the peer to complete the opening handshake with to us.
    It didn't do so (in time self.openHandshakeTimeout).
    So we drop the connection, but set self.wasClean = False.
    """
    self.openHandshakeTimeoutCall = None
    if self.state in [WebSocketProtocol.STATE_CONNECTING, WebSocketProtocol.STATE_PROXY_CONNECTING]:
        self.wasClean = False
        self.wasNotCleanReason = u'WebSocket opening handshake timeout (peer did not finish the opening handshake in time)'
        self.wasOpenHandshakeTimeout = True
        self.dropConnection(abort=True)
    elif self.state == WebSocketProtocol.STATE_OPEN:
        self.log.debug("skipping opening handshake timeout: WebSocket connection is open (opening handshake already finished)")
    elif self.state == WebSocketProtocol.STATE_CLOSING:
        self.log.debug("skipping opening handshake timeout: WebSocket connection is already closing ..")
    elif self.state == WebSocketProtocol.STATE_CLOSED:
        self.log.debug("skipping opening handshake timeout: WebSocket connection is already closed")
    else:
        # should not arrive here
        raise Exception("logic error")

def onPing(

self, payload)

Implements :func:autobahn.websocket.interfaces.IWebSocketChannel.onPing

def onPing(self, payload):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onPing`
    """
    self.log.debug(
        "WebSocketProtocol.onPing(payload=<{payload_len} bytes>)",
        payload_len=(len(payload) if payload else 0),
    )
    if self.state == WebSocketProtocol.STATE_OPEN:
        self.sendPong(payload)

def onPong(

self, response)

Called when pong message is received.

def onPong(self, response):  # noqa
    """Called when pong message is received."""
    if self._last_pong_time and self.factory.debug:
        log.debug("last pong was {} seconds back.".format(time.time() - self._last_pong_time))
    self._last_pong_time = time.time()
    if self.factory.debug:
        log.debug("pong => {}".format(response))

def onServerConnectionDropTimeout(

self)

We (a client) expected the peer (a server) to drop the connection, but it didn't (in time self.serverConnectionDropTimeout). So we drop the connection, but set self.wasClean = False.

def onServerConnectionDropTimeout(self):
    """
    We (a client) expected the peer (a server) to drop the connection,
    but it didn't (in time self.serverConnectionDropTimeout).
    So we drop the connection, but set self.wasClean = False.
    """
    self.serverConnectionDropTimeoutCall = None
    if self.state != WebSocketProtocol.STATE_CLOSED:
        self.wasClean = False
        self.wasNotCleanReason = u'WebSocket closing handshake timeout (server did not drop TCP connection in time)'
        self.wasServerConnectionDropTimeout = True
        self.dropConnection(abort=True)
    else:
        self.log.debug("skipping closing handshake timeout: server did indeed drop the connection in time")

def processControlFrame(

self)

Process a completely received control frame.

def processControlFrame(self):
    """
    Process a completely received control frame.
    """
    payload = b''.join(self.control_frame_data)
    self.control_frame_data = None
    # CLOSE frame
    #
    if self.current_frame.opcode == 8:
        code = None
        reasonRaw = None
        ll = len(payload)
        if ll > 1:
            code = struct.unpack("!H", payload[0:2])[0]
            if ll > 2:
                reasonRaw = payload[2:]
        if self.onCloseFrame(code, reasonRaw):
            return False
    # PING frame
    #
    elif self.current_frame.opcode == 9:
        self._onPing(payload)
    # PONG frame
    #
    elif self.current_frame.opcode == 10:
        # auto ping/pong processing
        #
        if self.autoPingPending:
            try:
                if payload == self.autoPingPending:
                    self.log.debug("Auto ping/pong: received pending pong for auto-ping/pong")
                    if self.autoPingTimeoutCall:
                        self.autoPingTimeoutCall.cancel()
                    self.autoPingPending = None
                    self.autoPingTimeoutCall = None
                    if self.autoPingInterval:
                        self.autoPingPendingCall = self.factory._batched_timer.call_later(
                            self.autoPingInterval,
                            self._sendAutoPing,
                        )
                else:
                    self.log.debug("Auto ping/pong: received non-pending pong")
            except:
                self.log.debug("Auto ping/pong: received non-pending pong")
        # fire app-level callback
        #
        self._onPong(payload)
    else:
        # we might arrive here, when protocolViolation
        # wants us to continue anyway
        pass
    return True

def processData(

self)

After WebSocket handshake has been completed, this procedure will do all subsequent processing of incoming bytes.

def processData(self):
    """
    After WebSocket handshake has been completed, this procedure will do
    all subsequent processing of incoming bytes.
    """
    buffered_len = len(self.data)
    # outside a frame, that is we are awaiting data which starts a new frame
    #
    if self.current_frame is None:
        # need minimum of 2 octets to for new frame
        #
        if buffered_len >= 2:
            # FIN, RSV, OPCODE
            #
            if six.PY3:
                b = self.data[0]
            else:
                b = ord(self.data[0])
            frame_fin = (b & 0x80) != 0
            frame_rsv = (b & 0x70) >> 4
            frame_opcode = b & 0x0f
            # MASK, PAYLOAD LEN 1
            #
            if six.PY3:
                b = self.data[1]
            else:
                b = ord(self.data[1])
            frame_masked = (b & 0x80) != 0
            frame_payload_len1 = b & 0x7f
            # MUST be 0 when no extension defining
            # the semantics of RSV has been negotiated
            #
            if frame_rsv != 0:
                if self._perMessageCompress is not None and frame_rsv == 4:
                    pass
                else:
                    if self._protocol_violation(u'RSV = {} and no extension negotiated'.format(frame_rsv)):
                        return False
            # all client-to-server frames MUST be masked
            #
            if self.factory.isServer and self.requireMaskedClientFrames and not frame_masked:
                if self._protocol_violation(u'unmasked client-to-server frame'):
                    return False
            # all server-to-client frames MUST NOT be masked
            #
            if not self.factory.isServer and not self.acceptMaskedServerFrames and frame_masked:
                if self._protocol_violation(u'masked server-to-client frame'):
                    return False
            # check frame
            #
            if frame_opcode > 7:  # control frame (have MSB in opcode set)
                # control frames MUST NOT be fragmented
                #
                if not frame_fin:
                    if self._protocol_violation(u'fragmented control frame'):
                        return False
                # control frames MUST have payload 125 octets or less
                #
                if frame_payload_len1 > 125:
                    if self._protocol_violation(u'control frame with payload length > 125 octets'):
                        return False
                # check for reserved control frame opcodes
                #
                if frame_opcode not in [8, 9, 10]:
                    if self._protocol_violation(u'control frame using reserved opcode {}'.format(frame_opcode)):
                        return False
                # close frame : if there is a body, the first two bytes of the body MUST be a 2-byte
                # unsigned integer (in network byte order) representing a status code
                #
                if frame_opcode == 8 and frame_payload_len1 == 1:
                    if self._protocol_violation(u'received close control frame with payload len 1'):
                        return False
                # control frames MUST NOT be compressed
                #
                if self._perMessageCompress is not None and frame_rsv == 4:
                    if self._protocol_violation(u'received compressed control frame [{}]'.format(self._perMessageCompress.EXTENSION_NAME)):
                        return False
            else:  # data frame
                # check for reserved data frame opcodes
                #
                if frame_opcode not in [0, 1, 2]:
                    if self._protocol_violation(u'data frame using reserved opcode {}'.format(frame_opcode)):
                        return False
                # check opcode vs message fragmentation state 1/2
                #
                if not self.inside_message and frame_opcode == 0:
                    if self._protocol_violation(u'received continuation data frame outside fragmented message'):
                        return False
                # check opcode vs message fragmentation state 2/2
                #
                if self.inside_message and frame_opcode != 0:
                    if self._protocol_violation(u'received non-continuation data frame while inside fragmented message'):
                        return False
                # continuation data frames MUST NOT have the compressed bit set
                #
                if self._perMessageCompress is not None and frame_rsv == 4 and self.inside_message:
                    if self._protocol_violation(u'received continuation data frame with compress bit set [{}]'.format(self._perMessageCompress.EXTENSION_NAME)):
                        return False
            # compute complete header length
            #
            if frame_masked:
                mask_len = 4
            else:
                mask_len = 0
            if frame_payload_len1 < 126:
                frame_header_len = 2 + mask_len
            elif frame_payload_len1 == 126:
                frame_header_len = 2 + 2 + mask_len
            elif frame_payload_len1 == 127:
                frame_header_len = 2 + 8 + mask_len
            else:
                raise Exception("logic error")
            # only proceed when we have enough data buffered for complete
            # frame header (which includes extended payload len + mask)
            #
            if buffered_len >= frame_header_len:
                # minimum frame header length (already consumed)
                #
                i = 2
                # extract extended payload length
                #
                if frame_payload_len1 == 126:
                    frame_payload_len = struct.unpack("!H", self.data[i:i + 2])[0]
                    if frame_payload_len < 126:
                        if self._protocol_violation(u'invalid data frame length (not using minimal length encoding)'):
                            return False
                    i += 2
                elif frame_payload_len1 == 127:
                    frame_payload_len = struct.unpack("!Q", self.data[i:i + 8])[0]
                    if frame_payload_len > 0x7FFFFFFFFFFFFFFF:  # 2**63
                        if self._protocol_violation(u'invalid data frame length (>2^63)'):
                            return False
                    if frame_payload_len < 65536:
                        if self._protocol_violation(u'invalid data frame length (not using minimal length encoding)'):
                            return False
                    i += 8
                else:
                    frame_payload_len = frame_payload_len1
                # when payload is masked, extract frame mask
                #
                frame_mask = None
                if frame_masked:
                    frame_mask = self.data[i:i + 4]
                    i += 4
                if frame_masked and frame_payload_len > 0 and self.applyMask:
                    self.current_frame_masker = create_xor_masker(frame_mask, frame_payload_len)
                else:
                    self.current_frame_masker = XorMaskerNull()
                # remember rest (payload of current frame after header and everything thereafter)
                #
                self.data = self.data[i:]
                # ok, got complete frame header
                #
                self.current_frame = FrameHeader(frame_opcode,
                                                 frame_fin,
                                                 frame_rsv,
                                                 frame_payload_len,
                                                 frame_mask)
                # process begin on new frame
                #
                self.onFrameBegin()
                # reprocess when frame has no payload or and buffered data left
                #
                return frame_payload_len == 0 or len(self.data) > 0
            else:
                return False  # need more data
        else:
            return False  # need more data
    # inside a started frame
    #
    else:
        # cut out rest of frame payload
        #
        rest = self.current_frame.length - self.current_frame_masker.pointer()
        if buffered_len >= rest:
            data = self.data[:rest]
            length = rest
            self.data = self.data[rest:]
        else:
            data = self.data
            length = buffered_len
            self.data = b''
        if length > 0:
            # unmask payload
            #
            payload = self.current_frame_masker.process(data)
        else:
            # we also process empty payloads, since we need to fire
            # our hooks (at least for streaming processing, this is
            # necessary for correct protocol state transitioning)
            #
            payload = b''
        # process frame data
        #
        fr = self.onFrameData(payload)
        # noinspection PySimplifyBooleanCheck
        if fr is False:
            return False
        # fire frame end handler when frame payload is complete
        #
        if self.current_frame_masker.pointer() == self.current_frame.length:
            fr = self.onFrameEnd()
            # noinspection PySimplifyBooleanCheck
            if fr is False:
                return False
        # reprocess when no error occurred and buffered data left
        #
        return len(self.data) > 0

def processHandshake(

self)

Process WebSocket opening handshake response from server.

def processHandshake(self):
    """
    Process WebSocket opening handshake response from server.
    """
    # only proceed when we have fully received the HTTP request line and all headers
    #
    end_of_header = self.data.find(b"\x0d\x0a\x0d\x0a")
    if end_of_header >= 0:
        self.http_response_data = self.data[:end_of_header + 4]
        self.log.debug(
            "received HTTP response:\n\n{response}\n\n",
            response=self.http_response_data,
        )
        # extract HTTP status line and headers
        #
        (self.http_status_line, self.http_headers, http_headers_cnt) = parseHttpHeader(self.http_response_data)
        # validate WebSocket opening handshake server response
        #
        self.log.debug(
            "received HTTP status line in opening handshake : {status}",
            status=self.http_status_line,
        )
        self.log.debug(
            "received HTTP headers in opening handshake : {headers}",
            headers=self.http_headers,
        )
        # Response Line
        #
        sl = self.http_status_line.split()
        if len(sl) < 2:
            return self.failHandshake("Bad HTTP response status line '%s'" % self.http_status_line)
        # HTTP version
        #
        http_version = sl[0].strip()
        if http_version != "HTTP/1.1":
            return self.failHandshake("Unsupported HTTP version ('%s')" % http_version)
        # HTTP status code
        #
        try:
            status_code = int(sl[1].strip())
        except ValueError:
            return self.failHandshake("Bad HTTP status code ('%s')" % sl[1].strip())
        if status_code != 101:  # Switching Protocols
            # FIXME: handle redirects
            # FIXME: handle authentication required
            if len(sl) > 2:
                reason = " - %s" % ''.join(sl[2:])
            else:
                reason = ""
            return self.failHandshake("WebSocket connection upgrade failed (%d%s)" % (status_code, reason))
        # Upgrade
        #
        if 'upgrade' not in self.http_headers:
            return self.failHandshake("HTTP Upgrade header missing")
        if self.http_headers["upgrade"].strip().lower() != "websocket":
            return self.failHandshake("HTTP Upgrade header different from 'websocket' (case-insensitive) : %s" % self.http_headers["upgrade"])
        # Connection
        #
        if 'connection' not in self.http_headers:
            return self.failHandshake("HTTP Connection header missing")
        connectionUpgrade = False
        for c in self.http_headers["connection"].split(","):
            if c.strip().lower() == "upgrade":
                connectionUpgrade = True
                break
        if not connectionUpgrade:
            return self.failHandshake("HTTP Connection header does not include 'upgrade' value (case-insensitive) : %s" % self.http_headers["connection"])
        # compute Sec-WebSocket-Accept
        #
        if 'sec-websocket-accept' not in self.http_headers:
            return self.failHandshake("HTTP Sec-WebSocket-Accept header missing in opening handshake reply")
        else:
            if http_headers_cnt["sec-websocket-accept"] > 1:
                return self.failHandshake("HTTP Sec-WebSocket-Accept header appears more than once in opening handshake reply")
            sec_websocket_accept_got = self.http_headers["sec-websocket-accept"].strip()
            sha1 = hashlib.sha1()
            sha1.update(self.websocket_key + WebSocketProtocol._WS_MAGIC)
            sec_websocket_accept = base64.b64encode(sha1.digest()).decode()
            if sec_websocket_accept_got != sec_websocket_accept:
                return self.failHandshake("HTTP Sec-WebSocket-Accept bogus value : expected %s / got %s" % (sec_websocket_accept, sec_websocket_accept_got))
        # Sec-WebSocket-Extensions
        #
        # extensions effectively in use for this connection
        #
        self.websocket_extensions_in_use = []
        if 'sec-websocket-extensions' in self.http_headers:
            if http_headers_cnt["sec-websocket-extensions"] > 1:
                return self.failHandshake("HTTP Sec-WebSocket-Extensions header appears more than once in opening handshake reply")
            else:
                # extensions select by server
                #
                websocket_extensions = self._parseExtensionsHeader(self.http_headers["sec-websocket-extensions"])
            # process extensions selected by server
            #
            for (extension, params) in websocket_extensions:
                self.log.debug(
                    "parsed WebSocket extension '{extension}' with params '{params}'",
                    extension=extension,
                    params=params,
                )
                # process permessage-compress extension
                #
                if extension in PERMESSAGE_COMPRESSION_EXTENSION:
                    # check that server only responded with 1 configuration ("PMCE")
                    #
                    if self._perMessageCompress is not None:
                        return self.failHandshake("multiple occurrence of a permessage-compress extension")
                    PMCE = PERMESSAGE_COMPRESSION_EXTENSION[extension]
                    try:
                        pmceResponse = PMCE['Response'].parse(params)
                    except Exception as e:
                        return self.failHandshake(str(e))
                    accept = self.perMessageCompressionAccept(pmceResponse)
                    if accept is None:
                        return self.failHandshake("WebSocket permessage-compress extension response from server denied by client")
                    self._perMessageCompress = PMCE['PMCE'].create_from_response_accept(self.factory.isServer, accept)
                    self.websocket_extensions_in_use.append(self._perMessageCompress)
                else:
                    return self.failHandshake("server wants to use extension '%s' we did not request, haven't implemented or did not enable" % extension)
        # handle "subprotocol in use" - if any
        #
        self.websocket_protocol_in_use = None
        if 'sec-websocket-protocol' in self.http_headers:
            if http_headers_cnt["sec-websocket-protocol"] > 1:
                return self.failHandshake("HTTP Sec-WebSocket-Protocol header appears more than once in opening handshake reply")
            sp = str(self.http_headers["sec-websocket-protocol"].strip())
            if sp != "":
                if sp not in self.factory.protocols:
                    return self.failHandshake("subprotocol selected by server (%s) not in subprotocol list requested by client (%s)" % (sp, str(self.factory.protocols)))
                else:
                    # ok, subprotocol in use
                    #
                    self.websocket_protocol_in_use = sp
        # Ok, got complete HS input, remember rest (if any)
        #
        self.data = self.data[end_of_header + 4:]
        # opening handshake completed, move WebSocket connection into OPEN state
        #
        self.state = WebSocketProtocol.STATE_OPEN
        # cancel any opening HS timer if present
        #
        if self.openHandshakeTimeoutCall is not None:
            self.log.debug("openHandshakeTimeoutCall.cancel")
            self.openHandshakeTimeoutCall.cancel()
            self.openHandshakeTimeoutCall = None
        # init state
        #
        self.inside_message = False
        self.current_frame = None
        self.websocket_version = self.version
        # automatic ping/pong
        #
        if self.autoPingInterval:
            self.autoPingPendingCall = self.factory._batched_timer.call_later(
                self.autoPingInterval,
                self._sendAutoPing,
            )
        # we handle this symmetrical to server-side .. that is, give the
        # client a chance to bail out .. i.e. on no subprotocol selected
        # by server
        try:
            response = ConnectionResponse(self.peer,
                                          self.http_headers,
                                          self.websocket_version,
                                          self.websocket_protocol_in_use,
                                          self.websocket_extensions_in_use)
            self._onConnect(response)
        except Exception as e:
            # immediately close the WS connection
            #
            self._fail_connection(1000, u'{}'.format(e))
        else:
            # fire handler on derived class
            #
            if self.trackedTimings:
                self.trackedTimings.track("onOpen")
            self._onOpen()
        txaio.resolve(self.is_open, None)
        # process rest, if any
        #
        if len(self.data) > 0:
            self.consumeData()

def processProxyConnect(

self)

Process HTTP/CONNECT response from server.

def processProxyConnect(self):
    """
    Process HTTP/CONNECT response from server.
    """
    # only proceed when we have fully received the HTTP request line and all headers
    #
    end_of_header = self.data.find(b"\x0d\x0a\x0d\x0a")
    if end_of_header >= 0:
        http_response_data = self.data[:end_of_header + 4]
        self.log.debug(
            "received HTTP response:\n\n{response}\n\n",
            response=http_response_data,
        )
        # extract HTTP status line and headers
        #
        (http_status_line, http_headers, http_headers_cnt) = parseHttpHeader(http_response_data)
        # validate proxy connect response
        #
        self.log.debug(
            "received HTTP status line for proxy connect request : {status}",
            status=http_status_line,
        )
        self.log.debug(
            "received HTTP headers for proxy connect request : {headers}",
            headers=http_headers,
        )
        # Response Line
        #
        sl = http_status_line.split()
        if len(sl) < 2:
            return self.failProxyConnect("Bad HTTP response status line '%s'" % http_status_line)
        # HTTP version
        #
        http_version = sl[0].strip()
        if http_version not in ("HTTP/1.1", "HTTP/1.0"):
            return self.failProxyConnect("Unsupported HTTP version ('%s')" % http_version)
        # HTTP status code
        #
        try:
            status_code = int(sl[1].strip())
        except ValueError:
            return self.failProxyConnect("Bad HTTP status code ('%s')" % sl[1].strip())
        if not (200 <= status_code < 300):
            # FIXME: handle redirects
            # FIXME: handle authentication required
            if len(sl) > 2:
                reason = " - %s" % ''.join(sl[2:])
            else:
                reason = ""
            return self.failProxyConnect("HTTP proxy connect failed (%d%s)" % (status_code, reason))
        # Ok, got complete response for HTTP/CONNECT, remember rest (if any)
        #
        self.data = self.data[end_of_header + 4:]
        # opening handshake completed, move WebSocket connection into OPEN state
        #
        self.state = WebSocketProtocol.STATE_CONNECTING
        # process rest of buffered data, if any
        #
        if len(self.data) > 0:
            self.consumeData()
        # now start WebSocket opening handshake
        #
        if self.factory.isSecure:
            self.startTLS()
        self.startHandshake()

def registerProducer(

self, producer, streaming)

Register a Twisted producer with this protocol.

:param producer: A Twisted push or pull producer. :type producer: object :param streaming: Producer type. :type streaming: bool

def registerProducer(self, producer, streaming):
    """
    Register a Twisted producer with this protocol.
    :param producer: A Twisted push or pull producer.
    :type producer: object
    :param streaming: Producer type.
    :type streaming: bool
    """
    self.transport.registerProducer(producer, streaming)

def sendClose(

self, code=None, reason=None)

Implements :func:autobahn.websocket.interfaces.IWebSocketChannel.sendClose

def sendClose(self, code=None, reason=None):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.sendClose`
    """
    if code is not None:
        if type(code) not in six.integer_types:
            raise Exception("invalid type '{}' for close code (must be an integer)".format(type(code)))
        # 1000 Normal Closure
        # 3000-3999 First Come First Served
        # 4000-4999 Reserved for Private Use
        # See: https://www.iana.org/assignments/websocket/websocket.xml#close-code-number-rules
        #
        if code != 1000 and not (3000 <= code <= 4999):
            raise Exception("invalid close code {} (must be 1000 or from [3000, 4999])".format(code))
    if reason is not None:
        if code is None:
            raise Exception("close reason without close code")
        if type(reason) != six.text_type:
            raise Exception("reason must be of type unicode (was '{}')".format(type(reason)))
        reasonUtf8 = encode_truncate(reason, 123)
    else:
        reasonUtf8 = None
    self.sendCloseFrame(code=code, reasonUtf8=reasonUtf8, isReply=False)

def sendCloseFrame(

self, code=None, reasonUtf8=None, isReply=False)

Send a close frame and update protocol state. Note, that this is an internal method which deliberately allows not send close frame with invalid payload.

def sendCloseFrame(self, code=None, reasonUtf8=None, isReply=False):
    """
    Send a close frame and update protocol state. Note, that this is
    an internal method which deliberately allows not send close
    frame with invalid payload.
    """
    if self.state == WebSocketProtocol.STATE_CLOSING:
        self.log.debug("ignoring sendCloseFrame since connection is closing")
    elif self.state == WebSocketProtocol.STATE_CLOSED:
        self.log.debug("ignoring sendCloseFrame since connection already closed")
    elif self.state in [WebSocketProtocol.STATE_PROXY_CONNECTING, WebSocketProtocol.STATE_CONNECTING]:
        raise Exception("cannot close a connection not yet connected")
    elif self.state == WebSocketProtocol.STATE_OPEN:
        # construct Hybi close frame payload and send frame
        payload = b''
        if code is not None:
            payload += struct.pack("!H", code)
        if reasonUtf8 is not None:
            payload += reasonUtf8
        self.sendFrame(opcode=8, payload=payload)
        # update state
        self.state = WebSocketProtocol.STATE_CLOSING
        self.closedByMe = not isReply
        # remember payload of close frame we sent
        self.localCloseCode = code
        self.localCloseReason = reasonUtf8
        # drop connection when timeout on receiving close handshake reply
        if self.closedByMe and self.closeHandshakeTimeout > 0:
            self.closeHandshakeTimeoutCall = self.factory._batched_timer.call_later(
                self.closeHandshakeTimeout,
                self.onCloseHandshakeTimeout,
            )
    else:
        raise Exception("logic error")

def sendData(

self, data, sync=False, chopsize=None)

Wrapper for self.transport.write which allows to give a chopsize. When asked to chop up writing to TCP stream, we write only chopsize octets and then give up control to select() in underlying reactor so that bytes get onto wire immediately. Note that this is different from and unrelated to WebSocket data message fragmentation. Note that this is also different from the TcpNoDelay option which can be set on the socket.

def sendData(self, data, sync=False, chopsize=None):
    """
    Wrapper for self.transport.write which allows to give a chopsize.
    When asked to chop up writing to TCP stream, we write only chopsize
    octets and then give up control to select() in underlying reactor so
    that bytes get onto wire immediately. Note that this is different from
    and unrelated to WebSocket data message fragmentation. Note that this
    is also different from the TcpNoDelay option which can be set on the
    socket.
    """
    if chopsize and chopsize > 0:
        i = 0
        n = len(data)
        done = False
        while not done:
            j = i + chopsize
            if j >= n:
                done = True
                j = n
            self.send_queue.append((data[i:j], True))
            i += chopsize
        self._trigger()
    else:
        if sync or len(self.send_queue) > 0:
            self.send_queue.append((data, sync))
            self._trigger()
        else:
            self.transport.write(data)
            if self.state == WebSocketProtocol.STATE_OPEN:
                self.trafficStats.outgoingOctetsWireLevel += len(data)
            elif self.state == WebSocketProtocol.STATE_CONNECTING or self.state == WebSocketProtocol.STATE_PROXY_CONNECTING:
                self.trafficStats.preopenOutgoingOctetsWireLevel += len(data)
            if self.logOctets:
                self.logTxOctets(data, False)

def sendFrame(

self, opcode, payload=b'', fin=True, rsv=0, mask=None, payload_len=None, chopsize=None, sync=False)

Send out frame. Normally only used internally via sendMessage(), sendPing(), sendPong() and sendClose().

This method deliberately allows to send invalid frames (that is frames invalid per-se, or frames invalid because of protocol state). Other than in fuzzing servers, calling methods will ensure that no invalid frames are sent.

In addition, this method supports explicit specification of payload length. When payload_len is given, it will always write that many octets to the stream. It'll wrap within payload, resending parts of that when more octets were requested The use case is again for fuzzing server which want to sent increasing amounts of payload data to peers without having to construct potentially large messages themselves.

def sendFrame(self,
              opcode,
              payload=b'',
              fin=True,
              rsv=0,
              mask=None,
              payload_len=None,
              chopsize=None,
              sync=False):
    """
    Send out frame. Normally only used internally via sendMessage(),
    sendPing(), sendPong() and sendClose().
    This method deliberately allows to send invalid frames (that is frames
    invalid per-se, or frames invalid because of protocol state). Other
    than in fuzzing servers, calling methods will ensure that no invalid
    frames are sent.
    In addition, this method supports explicit specification of payload
    length. When payload_len is given, it will always write that many
    octets to the stream. It'll wrap within payload, resending parts of
    that when more octets were requested The use case is again for fuzzing
    server which want to sent increasing amounts of payload data to peers
    without having to construct potentially large messages themselves.
    """
    if payload_len is not None:
        if len(payload) < 1:
            raise Exception("cannot construct repeated payload with length %d from payload of length %d" % (payload_len, len(payload)))
        l = payload_len
        pl = b''.join([payload for _ in range(payload_len // len(payload))]) + payload[:payload_len % len(payload)]
    else:
        l = len(payload)
        pl = payload
    # first byte
    #
    b0 = 0
    if fin:
        b0 |= (1 << 7)
    b0 |= (rsv % 8) << 4
    b0 |= opcode % 128
    # second byte, payload len bytes and mask
    #
    b1 = 0
    if mask or (not self.factory.isServer and self.maskClientFrames) or (self.factory.isServer and self.maskServerFrames):
        b1 |= 1 << 7
        if not mask:
            # note: the RFC mentions "cryptographic randomness"
            # for the masks, which *does* make sense for browser
            # implementations, but not in this case -- for
            # example, a user of this library could just
            # monkey-patch os.urandom (or getrandbits) and predict
            # the masks easily. See issue 758 for more.
            mask = struct.pack("!I", random.getrandbits(32))
            mv = mask
        else:
            mv = b''
        # mask frame payload
        #
        if l > 0 and self.applyMask:
            masker = create_xor_masker(mask, l)
            plm = masker.process(pl)
        else:
            plm = pl
    else:
        mv = b''
        plm = pl
    el = b''
    if l <= 125:
        b1 |= l
    elif l <= 0xFFFF:
        b1 |= 126
        el = struct.pack("!H", l)
    elif l <= 0x7FFFFFFFFFFFFFFF:
        b1 |= 127
        el = struct.pack("!Q", l)
    else:
        raise Exception("invalid payload length")
    if six.PY3:
        raw = b''.join([b0.to_bytes(1, 'big'), b1.to_bytes(1, 'big'), el, mv, plm])
    else:
        raw = b''.join([chr(b0), chr(b1), el, mv, plm])
    if opcode in [0, 1, 2]:
        self.trafficStats.outgoingWebSocketFrames += 1
    if self.logFrames:
        frameHeader = FrameHeader(opcode, fin, rsv, l, mask)
        self.logTxFrame(frameHeader, payload, payload_len, chopsize, sync)
    # send frame octets
    #
    self.sendData(raw, sync, chopsize)

def sendMessage(

self, payload, isBinary=False, fragmentSize=None, sync=False, doNotCompress=False)

Implements :func:autobahn.websocket.interfaces.IWebSocketChannel.sendMessage

def sendMessage(self,
                payload,
                isBinary=False,
                fragmentSize=None,
                sync=False,
                doNotCompress=False):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.sendMessage`
    """
    assert type(payload) == six.binary_type, '"payload" must have type bytes, but was "{}"'.format(type(payload))
    assert type(isBinary) == bool, '"isBinary" must have type bool, but was "{}"'.format(type(isBinary))
    assert fragmentSize is None or type(fragmentSize) == int, '"fragmentSize" must have type int, but was "{}"'.format(type(fragmentSize))
    assert type(sync) == bool, '"sync" must have type bool, but was "{}"'.format(type(sync))
    assert type(doNotCompress) == bool, '"doNotCompress" must have type bool, but was "{}"'.format(type(doNotCompress))
    if self.state != WebSocketProtocol.STATE_OPEN:
        return
    if self.trackedTimings:
        self.trackedTimings.track("sendMessage")
    # (initial) frame opcode
    #
    if isBinary:
        opcode = 2
    else:
        opcode = 1
    self.trafficStats.outgoingWebSocketMessages += 1
    # setup compressor
    #
    if self._perMessageCompress is not None and not doNotCompress:
        sendCompressed = True
        self._perMessageCompress.start_compress_message()
        self.trafficStats.outgoingOctetsAppLevel += len(payload)
        payload1 = self._perMessageCompress.compress_message_data(payload)
        payload2 = self._perMessageCompress.end_compress_message()
        payload = b''.join([payload1, payload2])
        payload_len = len(payload)
        self.trafficStats.outgoingOctetsWebSocketLevel += payload_len
    else:
        sendCompressed = False
        payload_len = len(payload)
        self.trafficStats.outgoingOctetsAppLevel += payload_len
        self.trafficStats.outgoingOctetsWebSocketLevel += payload_len
    if 0 < self.maxMessagePayloadSize < payload_len:
        self.wasMaxMessagePayloadSizeExceeded = True
        emsg = u'tried to send WebSocket message with size {} exceeding payload limit of {} octets'.format(payload_len, self.maxMessagePayloadSize)
        self.log.warn(emsg)
        raise PayloadExceededError(emsg)
    # explicit fragmentSize arguments overrides autoFragmentSize setting
    #
    if fragmentSize is not None:
        pfs = fragmentSize
    else:
        if self.autoFragmentSize > 0:
            pfs = self.autoFragmentSize
        else:
            pfs = None
    # send unfragmented
    #
    if pfs is None or len(payload) <= pfs:
        self.sendFrame(opcode=opcode, payload=payload, sync=sync, rsv=4 if sendCompressed else 0)
    # send data message in fragments
    #
    else:
        if pfs < 1:
            raise Exception("payload fragment size must be at least 1 (was %d)" % pfs)
        n = len(payload)
        i = 0
        done = False
        first = True
        while not done:
            j = i + pfs
            if j > n:
                done = True
                j = n
            if first:
                self.sendFrame(opcode=opcode, payload=payload[i:j], fin=done, sync=sync, rsv=4 if sendCompressed else 0)
                first = False
            else:
                self.sendFrame(opcode=0, payload=payload[i:j], fin=done, sync=sync)
            i += pfs

def sendMessageFrame(

self, payload, sync=False)

Implements :func:autobahn.websocket.interfaces.IWebSocketChannel.sendMessageFrame

def sendMessageFrame(self, payload, sync=False):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.sendMessageFrame`
    """
    if self.state != WebSocketProtocol.STATE_OPEN:
        return
    if self.send_compressed:
        self.trafficStats.outgoingOctetsAppLevel += len(payload)
        payload = self._perMessageCompress.compress_message_data(payload)
    self.beginMessageFrame(len(payload))
    self.sendMessageFrameData(payload, sync)

def sendMessageFrameData(

self, payload, sync=False)

Implements :func:autobahn.websocket.interfaces.IWebSocketChannel.sendMessageFrameData

def sendMessageFrameData(self, payload, sync=False):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.sendMessageFrameData`
    """
    if self.state != WebSocketProtocol.STATE_OPEN:
        return
    if not self.send_compressed:
        self.trafficStats.outgoingOctetsAppLevel += len(payload)
    self.trafficStats.outgoingOctetsWebSocketLevel += len(payload)
    if self.send_state != WebSocketProtocol.SEND_STATE_INSIDE_MESSAGE_FRAME:
        raise Exception("WebSocketProtocol.sendMessageFrameData invalid in current sending state")
    rl = len(payload)
    if self.send_message_frame_masker.pointer() + rl > self.send_message_frame_length:
        l = self.send_message_frame_length - self.send_message_frame_masker.pointer()
        rest = -(rl - l)
        pl = payload[:l]
    else:
        l = rl
        rest = self.send_message_frame_length - self.send_message_frame_masker.pointer() - l
        pl = payload
    # mask frame payload
    #
    plm = self.send_message_frame_masker.process(pl)
    # send frame payload
    #
    self.sendData(plm, sync=sync)
    # if we are done with frame, move back into "inside message" state
    #
    if self.send_message_frame_masker.pointer() >= self.send_message_frame_length:
        self.send_state = WebSocketProtocol.SEND_STATE_INSIDE_MESSAGE
    # when =0 : frame was completed exactly
    # when >0 : frame is still uncomplete and that much amount is still left to complete the frame
    # when <0 : frame was completed and there was this much unconsumed data in payload argument
    #
    return rest

def sendPing(

self, payload=None)

Implements :func:autobahn.websocket.interfaces.IWebSocketChannel.sendPing

def sendPing(self, payload=None):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.sendPing`
    """
    if self.state != WebSocketProtocol.STATE_OPEN:
        return
    if payload:
        l = len(payload)
        if l > 125:
            raise Exception("invalid payload for PING (payload length must be <= 125, was %d)" % l)
        self.sendFrame(opcode=9, payload=payload)
    else:
        self.sendFrame(opcode=9)

def sendPong(

self, payload=None)

Implements :func:autobahn.websocket.interfaces.IWebSocketChannel.sendPong

def sendPong(self, payload=None):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.sendPong`
    """
    if self.state != WebSocketProtocol.STATE_OPEN:
        return
    if payload:
        l = len(payload)
        if l > 125:
            raise Exception("invalid payload for PONG (payload length must be <= 125, was %d)" % l)
        self.sendFrame(opcode=10, payload=payload)
    else:
        self.sendFrame(opcode=10)

def sendPreparedMessage(

self, preparedMsg)

Implements :func:autobahn.websocket.interfaces.IWebSocketChannel.sendPreparedMessage

def sendPreparedMessage(self, preparedMsg):
    """
    Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.sendPreparedMessage`
    """
    if self._perMessageCompress is None or preparedMsg.doNotCompress:
        self.sendData(preparedMsg.payloadHybi)
    else:
        self.sendMessage(preparedMsg.payload, preparedMsg.binary)

def setTrackTimings(

self, enable)

Enable/disable tracking of detailed timings.

:param enable: Turn time tracking on/off. :type enable: bool

def setTrackTimings(self, enable):
    """
    Enable/disable tracking of detailed timings.
    :param enable: Turn time tracking on/off.
    :type enable: bool
    """
    if not hasattr(self, 'trackTimings') or self.trackTimings != enable:
        self.trackTimings = enable
        if self.trackTimings:
            self.trackedTimings = Timings()
        else:
            self.trackedTimings = None

def set_valid_events(

self, valid_events=None)

:param valid_events: if non-None, .on() or .fire() with an event not listed in valid_events raises an exception.

def set_valid_events(self, valid_events=None):
    """
    :param valid_events: if non-None, .on() or .fire() with an event
        not listed in valid_events raises an exception.
    """
    self._valid_events = list(valid_events)

def startHandshake(

self)

Start WebSocket opening handshake.

def startHandshake(self):
    """
    Start WebSocket opening handshake.
    """
    # extract framework-specific transport information
    transport_details = self._create_transport_details()
    # ask our specialized framework-specific (or user-code) for a
    # ConnectingRequest instance
    options_d = txaio.as_future(self.onConnecting, transport_details)
    def got_options(request_options):
        """
        onConnecting succeeded and returned options
        """
        if request_options is None:
            # Note, before onConnecting was added, everything came
            # from self.factory so we get the required parameters from
            # there still by default
            request_options = ConnectingRequest(
                # required (no defaults):
                host=self.factory.host,
                port=self.factory.port,
                resource=self.factory.resource,
                # optional (useful defaults):
                headers=self.factory.headers,  # might be None
                useragent=self.factory.useragent,  # might be None
                origin=self.factory.origin,  # might be None
                protocols=self.factory.protocols,  # might be None
            )
        self._actuallyStartHandshake(request_options)
        return request_options
    def options_failed(fail):
        self.log.error(
            "onConnecting failed: {fail}",
            fail=fail,
        )
        self.dropConnection(abort=False)
        return None
    txaio.add_callbacks(options_d, got_options, options_failed)
    return options_d

def startProxyConnect(

self)

Connect to explicit proxy.

def startProxyConnect(self):
    """
    Connect to explicit proxy.
    """
    # construct proxy connect HTTP request
    #
    request = "CONNECT %s:%d HTTP/1.1\x0d\x0a" % (self.factory.host, self.factory.port)
    request += "Host: %s:%d\x0d\x0a" % (self.factory.host, self.factory.port)
    request += "\x0d\x0a"
    self.log.debug("{request}", request=request)
    self.sendData(request.encode('utf8'))

def startTLS(

self)

def startTLS(self):
    self.log.debug("Starting TLS upgrade")
    self.transport.startTLS(self.factory.contextFactory)

def unregisterProducer(

self)

Unregister Twisted producer with this protocol.

def unregisterProducer(self):
    """
    Unregister Twisted producer with this protocol.
    """
    self.transport.unregisterProducer()