ticker.js

var WebSocket = require("ws");
var utils = require("./utils");

/**
 * The WebSocket client for connecting to Kite connect streaming quotes service.
 *
 * Getting started:
 * ---------------
 *
 * 	var KiteTicker = require("kiteconnect").KiteTicker;
 * 	var ticker = new KiteTicker({
 * 		api_key: "api_key",
 * 		access_token: "access_token"
 *	});
 *
 * 	ticker.connect();
 * 	ticker.on("ticks", onTicks);
 * 	ticker.on("connect", subscribe);
 *
 * 	function onTicks(ticks) {
 * 		console.log("Ticks", ticks);
 * 	}
 *
 * 	function subscribe() {
 * 		var items = [738561];
 * 		ticker.subscribe(items);
 * 		ticker.setMode(ticker.modeFull, items);
 * 	}
 *
 * Tick structure (passed to the tick callback you assign):
 * ---------------------------
 * [{ tradable: true,
 *    mode: 'full',
 *    instrument_token: 208947,
 *    last_price: 3939,
 *    last_quantity: 1,
 *    average_price: 3944.77,
 *    volume: 28940,
 *    buy_quantity: 4492,
 *    sell_quantity: 4704,
 *    ohlc: { open: 3927, high: 3955, low: 3927, close: 3906 },
 *    change: 0.8448540706605223,
 *    last_trade_time: 1515491369,
 *    timestamp: 1515491373,
 *    oi: 24355,
 *    oi_day_high: 0,
 *    oi_day_low: 0,
 *    depth:
 *			buy: [{
 *				 quantity: 59,
 *				 price: 3223,
 *				 orders: 5
 *			  },
 *			  {
 *				 quantity: 164,
 *				 price: 3222,
 *				 orders: 15
 *			  },
 *			  {
 *				 quantity: 123,
 *				 price: 3221,
 *				 orders: 7
 *			  },
 *			  {
 *				 quantity: 48,
 *				 price: 3220,
 *				 orders: 7
 *			  },
 *			  {
 *				 quantity: 33,
 *				 price: 3219,
 *				 orders: 5
 *			  }],
 *		   sell: [{
 *				 quantity: 115,
 *				 price: 3224,
 *				 orders: 15
 *			  },
 *			  {
 *				 quantity: 50,
 *				 price: 3225,
 *				 orders: 5
 *			  },
 *			  {
 *				 quantity: 175,
 *				 price: 3226,
 *				 orders: 14
 *			  },
 *			  {
 *				 quantity: 49,
 *				 price: 3227,
 *				 orders: 10
 *			  },
 *			  {
 *				 quantity: 106,
 *				 price: 3228,
 *				 orders: 13
 *			  }]
 *		}
 *	}, ...]
 *
 * Auto reconnection
 * -----------------
 * Auto reonnection is enabled by default and it can be disabled by passing `reconnect` param while initialising `KiteTicker`.
 *
 * Auto reonnection mechanism is based on [Exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff) algorithm in which
 * next retry interval will be increased exponentially. `max_delay` and `max_tries` params can be used to tweak
 * the alogrithm where `max_delay` is the maximum delay after which subsequent reconnection interval will become constant and
 * `max_tries` is maximum number of retries before it quits reconnection.
 * For example if `max_delay` is 60 seconds and `max_tries` is 50 then the first reconnection interval starts from
 * minimum interval which is 2 seconds and keep increasing up to 60 seconds after which it becomes constant and when reconnection attempt
 * is reached upto 50 then it stops reconnecting.
 * Callback `reconnect` will be called with current reconnect attempt and next reconnect interval and
 * `on_noreconnect` is called when reconnection attempts reaches max retries.
 *
 * Here is an example demonstrating auto reconnection.
 *
 * 	var KiteTicker = require("kiteconnect").KiteTicker;
 * 	var ticker = new KiteTicker({
 * 		api_key: "api_key",
 * 		access_token: "access_token"
 *	});
 *
 * 	// set autoreconnect with 10 maximum reconnections and 5 second interval
 * 	ticker.autoReconnect(true, 10, 5)
 * 	ticker.connect();
 * 	ticker.on("ticks", onTicks);
 * 	ticker.on("connect", subscribe);
 *
 * 	ticker.on("noreconnect", function() {
 * 		console.log("noreconnect");
 * 	});
 *
 * 	ticker.on("reconnect", function(reconnect_count, reconnect_interval) {
 * 		console.log("Reconnecting: attempt - ", reconnect_count, " interval - ", reconnect_interval);
 * 	});
 *
 * 	function onTicks(ticks) {
 * 		console.log("Ticks", ticks);
 * 	}
 *
 * 	function subscribe() {
 * 		var items = [738561];
 * 		ticker.subscribe(items);
 * 		ticker.setMode(ticker.modeFull, items);
 * 	}
 *
 *
 * @constructor
 * @name KiteTicker
 * @param {Object} params
 * @param {string} params.api_key API key issued you.
 * @param {string} params.access_token Access token obtained after successful login flow.
 * @param {bool}   [params.reconnect] Enable/Disable auto reconnect. Enabled by default.
 * @param {number} [params.max_retry=50] is maximum number re-connection attempts. Defaults to 50 attempts and maximum up to 300 attempts.
 * @param {number} [params.max_delay=60] in seconds is the maximum delay after which subsequent re-connection interval will become constant. Defaults to 60s and minimum acceptable value is 5s.
 * #param {string} [params.root="wss://websocket.kite.trade/"] Kite websocket root.
 */
var KiteTicker = function(params) {
	var root = params.root || "wss://ws.kite.trade/";

	var read_timeout = 5, // seconds
		reconnect_max_delay = 0,
		reconnect_max_tries = 0,

		// message flags (outgoing)
		mSubscribe = "subscribe",
		mUnSubscribe = "unsubscribe",
		mSetMode = "mode",

		// incoming
		mAlert = 10,
		mMessage = 11,
		mLogout = 12,
		mReload = 13,
		mClearCache = 14,

		// public constants
		modeFull  = "full", // Full quote including market depth. 164 bytes.
		modeQuote = "quote", // Quote excluding market depth. 52 bytes.
		modeLTP   = "ltp";

	// public constants
	/**
	 * @memberOf KiteTicker
	 * @desc Set mode full
	 */
	this.modeFull = modeFull;

	/**
	 * @memberOf KiteTicker
	 * @desc Set mode quote
	 */
	this.modeQuote = modeQuote;

	/**
	 * @memberOf KiteTicker
	 * @desc Set mode LTP
	 */
	this.modeLTP = modeLTP;

	var ws = null,
		triggers = {"connect": [],
					"ticks": [],
					"disconnect": [],
					"error": [],
					"close": [],
					"reconnect": [],
					"noreconnect": [],
					"message": [],
					"order_update": []},
		read_timer = null,
		last_read = 0,
		reconnect_timer = null,
		auto_reconnect = false,
		current_reconnection_count = 0,
		last_reconnect_interval = 0;
		current_ws_url = null,
		token_modes = {},
		defaultReconnectMaxDelay = 60,
		defaultReconnectMaxRetries = 50,
		maximumReconnectMaxRetries = 300,
		minimumReconnectMaxDelay = 5;

	// segment constants
	var NseCM = 1,
		NseFO = 2,
		NseCD = 3,
		BseCM = 4,
		BseFO = 5,
		BseCD = 6,
		McxFO = 7,
		McxSX = 8,
		Indices = 9;


	// Enable auto reconnect by default
	if (!params.reconnect) params.reconnect = true;
	autoReconnect(params.reconnect, params.max_retry, params.max_delay);

	/**
	 * Auto reconnect settings
	 * @param  {bool} Enable or disable auto disconnect, defaults to false
	 * @param  {number} [max_retry=50] is maximum number re-connection attempts. Defaults to 50 attempts and maximum up to 300 attempts.
	 * @param  {number} [max_delay=60] in seconds is the maximum delay after which subsequent re-connection interval will become constant. Defaults to 60s and minimum acceptable value is 5s.
	 * @memberOf KiteTicker
	 * @method autoReconnect
	 */
	this.autoReconnect = function(t, max_retry, max_delay) {
		autoReconnect(t, max_retry, max_delay)
	};

	/**
	 * Initiate a websocket connection
	 * @memberOf KiteTicker
	 * @method connect
	 * @instance
	 */
	this.connect = function() {
		// Skip if its already connected
		if(ws && (ws.readyState == ws.CONNECTING || ws.readyState == ws.OPEN)) return;

		var url = root + "?api_key=" + params.api_key +
					"&access_token=" + params.access_token + "&uid=" + (new Date().getTime().toString());

		ws = new WebSocket(url, {
			headers: {
				"X-Kite-Version": "3",
				"User-Agent": utils.getUserAgent()
			}
		});

		ws.binaryType = "arraybuffer";

		ws.onopen = function() {
			// Reset last reconnect interval
			last_reconnect_interval = null;
			// Reset current_reconnection_count attempt
			current_reconnection_count = 0
			// Store current open connection url to check for auto re-connection.
			if (!current_ws_url) current_ws_url = this.url
			// Trigger on connect event
			trigger("connect");
			// If there isn't an incoming message in n seconds, assume disconnection.
			clearInterval(read_timer);

			last_read = new Date();
			read_timer = setInterval(function() {
				if((new Date() - last_read ) / 1000 >= read_timeout) {
					// reset current_ws_url incase current connection times out
					// This is determined when last heart beat received time interval
					// exceeds read_timeout value
					current_ws_url = null;
					if(ws) ws.close();
					clearInterval(read_timer);
					triggerDisconnect();
				}
			}, read_timeout * 1000);
		};

		ws.onmessage = function(e) {
			// Binary tick data.
			if(e.data instanceof ArrayBuffer) {
				if(e.data.byteLength > 2) {
					var d = parseBinary(e.data);
					if(d) trigger("ticks", [d]);
				}
			} else {
				parseTextMessage(e.data)
			}

			// Set last read time to check for connection timeout
			last_read = new Date();
		};

		ws.onerror = function(e) {
			trigger("error", [e]);

			// Force close to avoid ghost connections
			if(this && this.readyState == this.OPEN) this.close();
		};

		ws.onclose = function(e) {
			trigger("close", [e]);

			// the ws id doesn't match the current global id,
			// meaning it's a ghost close event. just ignore.
			if(current_ws_url && (this.url != current_ws_url)) return;

			triggerDisconnect(e);
		};
	};

	/**
	 * @memberOf KiteTicker
	 * @method disconnect
	 * @instance
	 */
	this.disconnect = function() {
		if(ws && ws.readyState != ws.CLOSING && ws.readyState != ws.CLOSED) {
			ws.close();
		}
	}

	/**
	 * Check if the ticker is connected
	 * @memberOf KiteTicker
	 * @method connected
	 * @instance
	 * @returns {bool}
	 */
	this.connected = function() {
		if(ws && ws.readyState == ws.OPEN) {
			return true;
		} else {
			return false;
		}
	};

	/**
	 * Register websocket event callbacks
	 * Available events
	 * ~~~~
	 * connect -  when connection is successfully established.
	 * ticks - when ticks are available (Arrays of `ticks` object as the first argument).
	 * disconnect - when socket connection is disconnected. Error is received as a first param.
	 * error - when socket connection is closed with error. Error is received as a first param.
	 * close - when socket connection is closed cleanly.
	 * reconnect - When reconnecting (current re-connection count and reconnect interval as arguments respectively).
	 * noreconnect - When re-connection fails after n number times.
	 * order_update - When order update (postback) is received for the connected user (Data object is received as first argument).
	 * ~~~~
	 *
	 * @memberOf KiteTicker
	 * @method on
	 * @instance
	 *
	 * @example
	 * ticker.on("ticks", callback);
	 * ticker.on("connect", callback);
	 * ticker.on("disconnect", callback);
	 */
	this.on = function(e, callback) {
		if(triggers.hasOwnProperty(e)) {
			triggers[e].push(callback);
		}
	};

	/**
	 * Subscribe to array of tokens
	 * @memberOf KiteTicker
	 * @method subscribe
	 * @instance
	 * @param {array} tokens Array of tokens to be subscribed
	 *
	 * @example
	 * ticker.subscribe([738561]);
	 */
	this.subscribe = function(tokens) {
		if(tokens.length > 0) {
			send({"a": mSubscribe, "v": tokens});
		}
		return tokens;
	};

	/**
	 * Unsubscribe to array of tokens
	 * @memberOf KiteTicker
	 * @method unsubscribe
	 * @instance
	 * @param {array} tokens Array of tokens to be subscribed
	 *
	 * @example
	 * ticker.unsubscribe([738561]);
	 */
	this.unsubscribe = function(tokens) {
		if(tokens.length > 0) {
			send({"a": mUnSubscribe, "v": tokens});
		}
		return tokens;
	};

	/**
	 * Set modes to array of tokens
	 * @memberOf KiteTicker
	 * @method setMode
	 * @instance
	 * @param {string} mode - mode to set
	 * @param {array} tokens Array of tokens to be subscribed
	 *
	 * @example
	 * ticker.setMode(ticker.modeFull, [738561]);
	 */
	this.setMode = function(mode, tokens) {
		if(tokens.length > 0) {
			send({"a": mSetMode, "v": [mode, tokens]});
		}
		return tokens;
	};

	function autoReconnect(t, max_retry, max_delay) {
		auto_reconnect = (t == true ? true : false);

		// Set default values
		max_retry = max_retry || defaultReconnectMaxRetries;
		max_delay = max_delay || defaultReconnectMaxDelay;

		// Set reconnect constraints
		reconnect_max_tries = max_retry >= maximumReconnectMaxRetries ? maximumReconnectMaxRetries : max_retry;
		reconnect_max_delay = max_delay <= minimumReconnectMaxDelay  ? minimumReconnectMaxDelay : max_delay;
	}

	function triggerDisconnect(e) {
		ws = null;
		trigger("disconnect", [e]);
		if(auto_reconnect) attemptReconnection();
	}

	// send a message via the socket
	// automatically encodes json if possible
	function send(message) {
		if(!ws || ws.readyState != ws.OPEN) return;

		try {
			if(typeof(message) == "object") {
				message = JSON.stringify(message);
			}
			ws.send(message);
		} catch(e) { ws.close(); };
	}

	// trigger event callbacks
	function trigger(e, args) {
		if (!triggers[e]) return
		for(var n=0; n<triggers[e].length; n++) {
			triggers[e][n].apply(triggers[e][n], args ? args : []);
		}
	}

	function parseTextMessage(data) {
        try {
            data = JSON.parse(data)
		} catch (e) {
			return
		}

		if (data.type === "order") {
			trigger("order_update", [data.data]);
		}
	}

	// parse received binary message. each message is a combination of multiple tick packets
	// [2-bytes num packets][size1][tick1][size2][tick2] ...
	function parseBinary(binpacks) {
		var packets = splitPackets(binpacks),
			ticks = [];

		for(var n=0; n<packets.length; n++) {
			var bin = packets[n],
				instrument_token = buf2long(bin.slice(0, 4)),
				segment = instrument_token & 0xff;

			var tradable = true;
			if (segment === Indices) tradable = false;

			// Add price divisor based on segment
			var divisor = 100.0;
			if (segment === NseCD) { 
				divisor = 10000000.0;

			} else if (segment == BseCD) {
				divisor = 10000.0;
			}

			// Parse LTP
			if (bin.byteLength === 8) {
				ticks.push({
					tradable: tradable,
					mode: modeLTP,
					instrument_token: instrument_token,
					last_price: buf2long(bin.slice(4,8)) / divisor
				});
			// Parse indices quote and full mode
			} else if (bin.byteLength === 28 || bin.byteLength === 32) {
				var mode = modeQuote;
				if (bin.byteLength === 32) mode = modeFull;

                var tick = {
                    tradable: tradable,
                    mode: mode,
                    instrument_token: instrument_token,
                    last_price: buf2long(bin.slice(4,8)) / divisor,
                    ohlc: {
                        high: buf2long(bin.slice(8, 12)) / divisor,
                        low: buf2long(bin.slice(12, 16)) / divisor,
                        open: buf2long(bin.slice(16, 20)) / divisor,
                        close: buf2long(bin.slice(20, 24)) / divisor
					},
					change: buf2long(bin.slice(24, 28))
				};

                // Compute the change price using close price and last price
                if(tick.ohlc.close != 0) {
                    tick.change = (tick.last_price - tick.ohlc.close) * 100 / tick.ohlc.close;
				}

                // Full mode with timestamp in seconds
                if (bin.byteLength === 32) {
					tick.exchange_timestamp = null;
					var timestamp = buf2long(bin.slice(28, 32));
					if (timestamp) tick.exchange_timestamp = new Date(timestamp * 1000);
				}

				ticks.push(tick);
			} else if (bin.byteLength === 44 || bin.byteLength === 184) {
				var mode = modeQuote;
				if (bin.byteLength === 184) mode = modeFull;

				var tick = {
                    tradable: tradable,
                    mode: mode,
                    instrument_token: instrument_token,
                    last_price: buf2long(bin.slice(4, 8)) / divisor,
                    last_traded_quantity: buf2long(bin.slice(8, 12)),
                    average_traded_price: buf2long(bin.slice(12, 16)) / divisor,
                    volume_traded: buf2long(bin.slice(16, 20)),
                    total_buy_quantity: buf2long(bin.slice(20, 24)),
                    total_sell_quantity: buf2long(bin.slice(24, 28)),
                    ohlc: {
                        open: buf2long(bin.slice(28, 32)) / divisor,
                        high: buf2long(bin.slice(32, 36)) / divisor,
                        low: buf2long(bin.slice(36, 40)) / divisor,
                        close: buf2long(bin.slice(40, 44)) / divisor
                    }
				};

                // Compute the change price using close price and last price
                if (tick.ohlc.close != 0) {
                    tick.change = (tick.last_price - tick.ohlc.close) * 100 / tick.ohlc.close;
				}

				// Parse full mode
				if (bin.byteLength === 184) {
					// Parse last trade time
					tick.last_trade_time = null;
					var last_trade_time = buf2long(bin.slice(44, 48));
					if (last_trade_time) tick.last_trade_time = new Date(last_trade_time * 1000);

					// Parse timestamp
					tick.exchange_timestamp = null;
					var timestamp = buf2long(bin.slice(60, 64));
					if (timestamp) tick.exchange_timestamp = new Date(timestamp * 1000);

					// Parse OI
					tick.oi = buf2long(bin.slice(48, 52));
                    tick.oi_day_high = buf2long(bin.slice(52, 56));
					tick.oi_day_low = buf2long(bin.slice(56, 60));
					tick.depth = {
						buy: [],
						sell: []
					};

					var s = 0, depth = bin.slice(64, 184);
					for (var i=0; i<10; i++) {
						s = i * 12;
						tick.depth[i < 5 ? "buy" : "sell"].push({
							quantity:	buf2long(depth.slice(s, s + 4)),
							price:		buf2long(depth.slice(s + 4, s + 8)) / divisor,
							orders: 	buf2long(depth.slice(s + 8, s + 10))
						});
					}
				}

				ticks.push(tick);
			}
		}

		return ticks;
	}

	// split one long binary message into individual tick packets
	function splitPackets(bin) {
		// number of packets
		var num = buf2long(bin.slice(0, 2)),
			j = 2,
			packets = [];

		for(var i=0; i<num; i++) {
			// first two bytes is the packet length
			var size = buf2long(bin.slice(j, j+2)),
				packet = bin.slice(j+2, j+2+size);

			packets.push(packet);

			j += 2 + size;
		}

		return packets;
	}

	function attemptReconnection() {
		// Try reconnecting only so many times.
		if(current_reconnection_count > reconnect_max_tries) {
			trigger("noreconnect");
			process.exit(1);
		}

		if (current_reconnection_count > 0) {
			last_reconnect_interval = Math.pow(2, current_reconnection_count);
		} else if (!last_reconnect_interval) {
			last_reconnect_interval = 1;
		}

		if (last_reconnect_interval > reconnect_max_delay) {
			last_reconnect_interval = reconnect_max_delay;
		}

		current_reconnection_count++;

		trigger("reconnect", [current_reconnection_count, last_reconnect_interval]);

		reconnect_timer = setTimeout(function() {
			self.connect();
		}, last_reconnect_interval * 1000);
	}

	// Big endian byte array to long.
	function buf2long(buf) {
		var b = new Uint8Array(buf),
			val = 0,
			len = b.length;

		for(var i=0, j=len-1; i<len; i++, j--) {
			val += b[j] << (i*8);
		}

		return val;
	}

	// de-duplicate an array
	function arrayUnique() {
		var u = {}, a = [];
		for(var i = 0, l = this.length; i < l; ++i){
			if(u.hasOwnProperty(this[i])) {
				continue;
			}

			a.push(this[i]);
			u[this[i]] = 1;
		}

		return a;
	}

	var self = this;
};

module.exports = KiteTicker;