ticker.js

  1. var WebSocket = require("ws");
  2. var utils = require("./utils");
  3. /**
  4. * The WebSocket client for connecting to Kite connect streaming quotes service.
  5. *
  6. * Getting started:
  7. * ---------------
  8. *
  9. * var KiteTicker = require("kiteconnect").KiteTicker;
  10. * var ticker = new KiteTicker({
  11. * api_key: "api_key",
  12. * access_token: "access_token"
  13. * });
  14. *
  15. * ticker.connect();
  16. * ticker.on("ticks", onTicks);
  17. * ticker.on("connect", subscribe);
  18. *
  19. * function onTicks(ticks) {
  20. * console.log("Ticks", ticks);
  21. * }
  22. *
  23. * function subscribe() {
  24. * var items = [738561];
  25. * ticker.subscribe(items);
  26. * ticker.setMode(ticker.modeFull, items);
  27. * }
  28. *
  29. * Tick structure (passed to the tick callback you assign):
  30. * ---------------------------
  31. * [{ tradable: true,
  32. * mode: 'full',
  33. * instrument_token: 208947,
  34. * last_price: 3939,
  35. * last_quantity: 1,
  36. * average_price: 3944.77,
  37. * volume: 28940,
  38. * buy_quantity: 4492,
  39. * sell_quantity: 4704,
  40. * ohlc: { open: 3927, high: 3955, low: 3927, close: 3906 },
  41. * change: 0.8448540706605223,
  42. * last_trade_time: 1515491369,
  43. * timestamp: 1515491373,
  44. * oi: 24355,
  45. * oi_day_high: 0,
  46. * oi_day_low: 0,
  47. * depth:
  48. * buy: [{
  49. * quantity: 59,
  50. * price: 3223,
  51. * orders: 5
  52. * },
  53. * {
  54. * quantity: 164,
  55. * price: 3222,
  56. * orders: 15
  57. * },
  58. * {
  59. * quantity: 123,
  60. * price: 3221,
  61. * orders: 7
  62. * },
  63. * {
  64. * quantity: 48,
  65. * price: 3220,
  66. * orders: 7
  67. * },
  68. * {
  69. * quantity: 33,
  70. * price: 3219,
  71. * orders: 5
  72. * }],
  73. * sell: [{
  74. * quantity: 115,
  75. * price: 3224,
  76. * orders: 15
  77. * },
  78. * {
  79. * quantity: 50,
  80. * price: 3225,
  81. * orders: 5
  82. * },
  83. * {
  84. * quantity: 175,
  85. * price: 3226,
  86. * orders: 14
  87. * },
  88. * {
  89. * quantity: 49,
  90. * price: 3227,
  91. * orders: 10
  92. * },
  93. * {
  94. * quantity: 106,
  95. * price: 3228,
  96. * orders: 13
  97. * }]
  98. * }
  99. * }, ...]
  100. *
  101. * Auto reconnection
  102. * -----------------
  103. * Auto reonnection is enabled by default and it can be disabled by passing `reconnect` param while initialising `KiteTicker`.
  104. *
  105. * Auto reonnection mechanism is based on [Exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff) algorithm in which
  106. * next retry interval will be increased exponentially. `max_delay` and `max_tries` params can be used to tweak
  107. * the alogrithm where `max_delay` is the maximum delay after which subsequent reconnection interval will become constant and
  108. * `max_tries` is maximum number of retries before it quits reconnection.
  109. * For example if `max_delay` is 60 seconds and `max_tries` is 50 then the first reconnection interval starts from
  110. * minimum interval which is 2 seconds and keep increasing up to 60 seconds after which it becomes constant and when reconnection attempt
  111. * is reached upto 50 then it stops reconnecting.
  112. * Callback `reconnect` will be called with current reconnect attempt and next reconnect interval and
  113. * `on_noreconnect` is called when reconnection attempts reaches max retries.
  114. *
  115. * Here is an example demonstrating auto reconnection.
  116. *
  117. * var KiteTicker = require("kiteconnect").KiteTicker;
  118. * var ticker = new KiteTicker({
  119. * api_key: "api_key",
  120. * access_token: "access_token"
  121. * });
  122. *
  123. * // set autoreconnect with 10 maximum reconnections and 5 second interval
  124. * ticker.autoReconnect(true, 10, 5)
  125. * ticker.connect();
  126. * ticker.on("ticks", onTicks);
  127. * ticker.on("connect", subscribe);
  128. *
  129. * ticker.on("noreconnect", function() {
  130. * console.log("noreconnect");
  131. * });
  132. *
  133. * ticker.on("reconnect", function(reconnect_count, reconnect_interval) {
  134. * console.log("Reconnecting: attempt - ", reconnect_count, " interval - ", reconnect_interval);
  135. * });
  136. *
  137. * function onTicks(ticks) {
  138. * console.log("Ticks", ticks);
  139. * }
  140. *
  141. * function subscribe() {
  142. * var items = [738561];
  143. * ticker.subscribe(items);
  144. * ticker.setMode(ticker.modeFull, items);
  145. * }
  146. *
  147. *
  148. * @constructor
  149. * @name KiteTicker
  150. * @param {Object} params
  151. * @param {string} params.api_key API key issued you.
  152. * @param {string} params.access_token Access token obtained after successful login flow.
  153. * @param {bool} [params.reconnect] Enable/Disable auto reconnect. Enabled by default.
  154. * @param {number} [params.max_retry=50] is maximum number re-connection attempts. Defaults to 50 attempts and maximum up to 300 attempts.
  155. * @param {number} [params.max_delay=60] in seconds is the maximum delay after which subsequent re-connection interval will become constant. Defaults to 60s and minimum acceptable value is 5s.
  156. * #param {string} [params.root="wss://websocket.kite.trade/"] Kite websocket root.
  157. */
  158. var KiteTicker = function(params) {
  159. var root = params.root || "wss://ws.kite.trade/";
  160. var read_timeout = 5, // seconds
  161. reconnect_max_delay = 0,
  162. reconnect_max_tries = 0,
  163. // message flags (outgoing)
  164. mSubscribe = "subscribe",
  165. mUnSubscribe = "unsubscribe",
  166. mSetMode = "mode",
  167. // incoming
  168. mAlert = 10,
  169. mMessage = 11,
  170. mLogout = 12,
  171. mReload = 13,
  172. mClearCache = 14,
  173. // public constants
  174. modeFull = "full", // Full quote including market depth. 164 bytes.
  175. modeQuote = "quote", // Quote excluding market depth. 52 bytes.
  176. modeLTP = "ltp";
  177. // public constants
  178. /**
  179. * @memberOf KiteTicker
  180. * @desc Set mode full
  181. */
  182. this.modeFull = modeFull;
  183. /**
  184. * @memberOf KiteTicker
  185. * @desc Set mode quote
  186. */
  187. this.modeQuote = modeQuote;
  188. /**
  189. * @memberOf KiteTicker
  190. * @desc Set mode LTP
  191. */
  192. this.modeLTP = modeLTP;
  193. var ws = null,
  194. triggers = {"connect": [],
  195. "ticks": [],
  196. "disconnect": [],
  197. "error": [],
  198. "close": [],
  199. "reconnect": [],
  200. "noreconnect": [],
  201. "message": [],
  202. "order_update": []},
  203. read_timer = null,
  204. last_read = 0,
  205. reconnect_timer = null,
  206. auto_reconnect = false,
  207. current_reconnection_count = 0,
  208. last_reconnect_interval = 0;
  209. current_ws_url = null,
  210. token_modes = {},
  211. defaultReconnectMaxDelay = 60,
  212. defaultReconnectMaxRetries = 50,
  213. maximumReconnectMaxRetries = 300,
  214. minimumReconnectMaxDelay = 5;
  215. // segment constants
  216. var NseCM = 1,
  217. NseFO = 2,
  218. NseCD = 3,
  219. BseCM = 4,
  220. BseFO = 5,
  221. BseCD = 6,
  222. McxFO = 7,
  223. McxSX = 8,
  224. Indices = 9;
  225. // Enable auto reconnect by default
  226. if (!params.reconnect) params.reconnect = true;
  227. autoReconnect(params.reconnect, params.max_retry, params.max_delay);
  228. /**
  229. * Auto reconnect settings
  230. * @param {bool} Enable or disable auto disconnect, defaults to false
  231. * @param {number} [max_retry=50] is maximum number re-connection attempts. Defaults to 50 attempts and maximum up to 300 attempts.
  232. * @param {number} [max_delay=60] in seconds is the maximum delay after which subsequent re-connection interval will become constant. Defaults to 60s and minimum acceptable value is 5s.
  233. * @memberOf KiteTicker
  234. * @method autoReconnect
  235. */
  236. this.autoReconnect = function(t, max_retry, max_delay) {
  237. autoReconnect(t, max_retry, max_delay)
  238. };
  239. /**
  240. * Initiate a websocket connection
  241. * @memberOf KiteTicker
  242. * @method connect
  243. * @instance
  244. */
  245. this.connect = function() {
  246. // Skip if its already connected
  247. if(ws && (ws.readyState == ws.CONNECTING || ws.readyState == ws.OPEN)) return;
  248. var url = root + "?api_key=" + params.api_key +
  249. "&access_token=" + params.access_token + "&uid=" + (new Date().getTime().toString());
  250. ws = new WebSocket(url, {
  251. headers: {
  252. "X-Kite-Version": "3",
  253. "User-Agent": utils.getUserAgent()
  254. }
  255. });
  256. ws.binaryType = "arraybuffer";
  257. ws.onopen = function() {
  258. // Reset last reconnect interval
  259. last_reconnect_interval = null;
  260. // Reset current_reconnection_count attempt
  261. current_reconnection_count = 0
  262. // Store current open connection url to check for auto re-connection.
  263. if (!current_ws_url) current_ws_url = this.url
  264. // Trigger on connect event
  265. trigger("connect");
  266. // If there isn't an incoming message in n seconds, assume disconnection.
  267. clearInterval(read_timer);
  268. last_read = new Date();
  269. read_timer = setInterval(function() {
  270. if((new Date() - last_read ) / 1000 >= read_timeout) {
  271. // reset current_ws_url incase current connection times out
  272. // This is determined when last heart beat received time interval
  273. // exceeds read_timeout value
  274. current_ws_url = null;
  275. if(ws) ws.close();
  276. clearInterval(read_timer);
  277. triggerDisconnect();
  278. }
  279. }, read_timeout * 1000);
  280. };
  281. ws.onmessage = function(e) {
  282. // Binary tick data.
  283. if(e.data instanceof ArrayBuffer) {
  284. if(e.data.byteLength > 2) {
  285. var d = parseBinary(e.data);
  286. if(d) trigger("ticks", [d]);
  287. }
  288. } else {
  289. parseTextMessage(e.data)
  290. }
  291. // Set last read time to check for connection timeout
  292. last_read = new Date();
  293. };
  294. ws.onerror = function(e) {
  295. trigger("error", [e]);
  296. // Force close to avoid ghost connections
  297. if(this && this.readyState == this.OPEN) this.close();
  298. };
  299. ws.onclose = function(e) {
  300. trigger("close", [e]);
  301. // the ws id doesn't match the current global id,
  302. // meaning it's a ghost close event. just ignore.
  303. if(current_ws_url && (this.url != current_ws_url)) return;
  304. triggerDisconnect(e);
  305. };
  306. };
  307. /**
  308. * @memberOf KiteTicker
  309. * @method disconnect
  310. * @instance
  311. */
  312. this.disconnect = function() {
  313. if(ws && ws.readyState != ws.CLOSING && ws.readyState != ws.CLOSED) {
  314. ws.close();
  315. }
  316. }
  317. /**
  318. * Check if the ticker is connected
  319. * @memberOf KiteTicker
  320. * @method connected
  321. * @instance
  322. * @returns {bool}
  323. */
  324. this.connected = function() {
  325. if(ws && ws.readyState == ws.OPEN) {
  326. return true;
  327. } else {
  328. return false;
  329. }
  330. };
  331. /**
  332. * Register websocket event callbacks
  333. * Available events
  334. * ~~~~
  335. * connect - when connection is successfully established.
  336. * ticks - when ticks are available (Arrays of `ticks` object as the first argument).
  337. * disconnect - when socket connection is disconnected. Error is received as a first param.
  338. * error - when socket connection is closed with error. Error is received as a first param.
  339. * close - when socket connection is closed cleanly.
  340. * reconnect - When reconnecting (current re-connection count and reconnect interval as arguments respectively).
  341. * noreconnect - When re-connection fails after n number times.
  342. * order_update - When order update (postback) is received for the connected user (Data object is received as first argument).
  343. * ~~~~
  344. *
  345. * @memberOf KiteTicker
  346. * @method on
  347. * @instance
  348. *
  349. * @example
  350. * ticker.on("ticks", callback);
  351. * ticker.on("connect", callback);
  352. * ticker.on("disconnect", callback);
  353. */
  354. this.on = function(e, callback) {
  355. if(triggers.hasOwnProperty(e)) {
  356. triggers[e].push(callback);
  357. }
  358. };
  359. /**
  360. * Subscribe to array of tokens
  361. * @memberOf KiteTicker
  362. * @method subscribe
  363. * @instance
  364. * @param {array} tokens Array of tokens to be subscribed
  365. *
  366. * @example
  367. * ticker.subscribe([738561]);
  368. */
  369. this.subscribe = function(tokens) {
  370. if(tokens.length > 0) {
  371. send({"a": mSubscribe, "v": tokens});
  372. }
  373. return tokens;
  374. };
  375. /**
  376. * Unsubscribe to array of tokens
  377. * @memberOf KiteTicker
  378. * @method unsubscribe
  379. * @instance
  380. * @param {array} tokens Array of tokens to be subscribed
  381. *
  382. * @example
  383. * ticker.unsubscribe([738561]);
  384. */
  385. this.unsubscribe = function(tokens) {
  386. if(tokens.length > 0) {
  387. send({"a": mUnSubscribe, "v": tokens});
  388. }
  389. return tokens;
  390. };
  391. /**
  392. * Set modes to array of tokens
  393. * @memberOf KiteTicker
  394. * @method setMode
  395. * @instance
  396. * @param {string} mode - mode to set
  397. * @param {array} tokens Array of tokens to be subscribed
  398. *
  399. * @example
  400. * ticker.setMode(ticker.modeFull, [738561]);
  401. */
  402. this.setMode = function(mode, tokens) {
  403. if(tokens.length > 0) {
  404. send({"a": mSetMode, "v": [mode, tokens]});
  405. }
  406. return tokens;
  407. };
  408. function autoReconnect(t, max_retry, max_delay) {
  409. auto_reconnect = (t == true ? true : false);
  410. // Set default values
  411. max_retry = max_retry || defaultReconnectMaxRetries;
  412. max_delay = max_delay || defaultReconnectMaxDelay;
  413. // Set reconnect constraints
  414. reconnect_max_tries = max_retry >= maximumReconnectMaxRetries ? maximumReconnectMaxRetries : max_retry;
  415. reconnect_max_delay = max_delay <= minimumReconnectMaxDelay ? minimumReconnectMaxDelay : max_delay;
  416. }
  417. function triggerDisconnect(e) {
  418. ws = null;
  419. trigger("disconnect", [e]);
  420. if(auto_reconnect) attemptReconnection();
  421. }
  422. // send a message via the socket
  423. // automatically encodes json if possible
  424. function send(message) {
  425. if(!ws || ws.readyState != ws.OPEN) return;
  426. try {
  427. if(typeof(message) == "object") {
  428. message = JSON.stringify(message);
  429. }
  430. ws.send(message);
  431. } catch(e) { ws.close(); };
  432. }
  433. // trigger event callbacks
  434. function trigger(e, args) {
  435. if (!triggers[e]) return
  436. for(var n=0; n<triggers[e].length; n++) {
  437. triggers[e][n].apply(triggers[e][n], args ? args : []);
  438. }
  439. }
  440. function parseTextMessage(data) {
  441. try {
  442. data = JSON.parse(data)
  443. } catch (e) {
  444. return
  445. }
  446. if (data.type === "order") {
  447. trigger("order_update", [data.data]);
  448. }
  449. }
  450. // parse received binary message. each message is a combination of multiple tick packets
  451. // [2-bytes num packets][size1][tick1][size2][tick2] ...
  452. function parseBinary(binpacks) {
  453. var packets = splitPackets(binpacks),
  454. ticks = [];
  455. for(var n=0; n<packets.length; n++) {
  456. var bin = packets[n],
  457. instrument_token = buf2long(bin.slice(0, 4)),
  458. segment = instrument_token & 0xff;
  459. var tradable = true;
  460. if (segment === Indices) tradable = false;
  461. // Add price divisor based on segment
  462. var divisor = 100.0;
  463. if (segment === NseCD) {
  464. divisor = 10000000.0;
  465. } else if (segment == BseCD) {
  466. divisor = 10000.0;
  467. }
  468. // Parse LTP
  469. if (bin.byteLength === 8) {
  470. ticks.push({
  471. tradable: tradable,
  472. mode: modeLTP,
  473. instrument_token: instrument_token,
  474. last_price: buf2long(bin.slice(4,8)) / divisor
  475. });
  476. // Parse indices quote and full mode
  477. } else if (bin.byteLength === 28 || bin.byteLength === 32) {
  478. var mode = modeQuote;
  479. if (bin.byteLength === 32) mode = modeFull;
  480. var tick = {
  481. tradable: tradable,
  482. mode: mode,
  483. instrument_token: instrument_token,
  484. last_price: buf2long(bin.slice(4,8)) / divisor,
  485. ohlc: {
  486. high: buf2long(bin.slice(8, 12)) / divisor,
  487. low: buf2long(bin.slice(12, 16)) / divisor,
  488. open: buf2long(bin.slice(16, 20)) / divisor,
  489. close: buf2long(bin.slice(20, 24)) / divisor
  490. },
  491. change: buf2long(bin.slice(24, 28))
  492. };
  493. // Compute the change price using close price and last price
  494. if(tick.ohlc.close != 0) {
  495. tick.change = (tick.last_price - tick.ohlc.close) * 100 / tick.ohlc.close;
  496. }
  497. // Full mode with timestamp in seconds
  498. if (bin.byteLength === 32) {
  499. tick.exchange_timestamp = null;
  500. var timestamp = buf2long(bin.slice(28, 32));
  501. if (timestamp) tick.exchange_timestamp = new Date(timestamp * 1000);
  502. }
  503. ticks.push(tick);
  504. } else if (bin.byteLength === 44 || bin.byteLength === 184) {
  505. var mode = modeQuote;
  506. if (bin.byteLength === 184) mode = modeFull;
  507. var tick = {
  508. tradable: tradable,
  509. mode: mode,
  510. instrument_token: instrument_token,
  511. last_price: buf2long(bin.slice(4, 8)) / divisor,
  512. last_traded_quantity: buf2long(bin.slice(8, 12)),
  513. average_traded_price: buf2long(bin.slice(12, 16)) / divisor,
  514. volume_traded: buf2long(bin.slice(16, 20)),
  515. total_buy_quantity: buf2long(bin.slice(20, 24)),
  516. total_sell_quantity: buf2long(bin.slice(24, 28)),
  517. ohlc: {
  518. open: buf2long(bin.slice(28, 32)) / divisor,
  519. high: buf2long(bin.slice(32, 36)) / divisor,
  520. low: buf2long(bin.slice(36, 40)) / divisor,
  521. close: buf2long(bin.slice(40, 44)) / divisor
  522. }
  523. };
  524. // Compute the change price using close price and last price
  525. if (tick.ohlc.close != 0) {
  526. tick.change = (tick.last_price - tick.ohlc.close) * 100 / tick.ohlc.close;
  527. }
  528. // Parse full mode
  529. if (bin.byteLength === 184) {
  530. // Parse last trade time
  531. tick.last_trade_time = null;
  532. var last_trade_time = buf2long(bin.slice(44, 48));
  533. if (last_trade_time) tick.last_trade_time = new Date(last_trade_time * 1000);
  534. // Parse timestamp
  535. tick.exchange_timestamp = null;
  536. var timestamp = buf2long(bin.slice(60, 64));
  537. if (timestamp) tick.exchange_timestamp = new Date(timestamp * 1000);
  538. // Parse OI
  539. tick.oi = buf2long(bin.slice(48, 52));
  540. tick.oi_day_high = buf2long(bin.slice(52, 56));
  541. tick.oi_day_low = buf2long(bin.slice(56, 60));
  542. tick.depth = {
  543. buy: [],
  544. sell: []
  545. };
  546. var s = 0, depth = bin.slice(64, 184);
  547. for (var i=0; i<10; i++) {
  548. s = i * 12;
  549. tick.depth[i < 5 ? "buy" : "sell"].push({
  550. quantity: buf2long(depth.slice(s, s + 4)),
  551. price: buf2long(depth.slice(s + 4, s + 8)) / divisor,
  552. orders: buf2long(depth.slice(s + 8, s + 10))
  553. });
  554. }
  555. }
  556. ticks.push(tick);
  557. }
  558. }
  559. return ticks;
  560. }
  561. // split one long binary message into individual tick packets
  562. function splitPackets(bin) {
  563. // number of packets
  564. var num = buf2long(bin.slice(0, 2)),
  565. j = 2,
  566. packets = [];
  567. for(var i=0; i<num; i++) {
  568. // first two bytes is the packet length
  569. var size = buf2long(bin.slice(j, j+2)),
  570. packet = bin.slice(j+2, j+2+size);
  571. packets.push(packet);
  572. j += 2 + size;
  573. }
  574. return packets;
  575. }
  576. function attemptReconnection() {
  577. // Try reconnecting only so many times.
  578. if(current_reconnection_count > reconnect_max_tries) {
  579. trigger("noreconnect");
  580. process.exit(1);
  581. }
  582. if (current_reconnection_count > 0) {
  583. last_reconnect_interval = Math.pow(2, current_reconnection_count);
  584. } else if (!last_reconnect_interval) {
  585. last_reconnect_interval = 1;
  586. }
  587. if (last_reconnect_interval > reconnect_max_delay) {
  588. last_reconnect_interval = reconnect_max_delay;
  589. }
  590. current_reconnection_count++;
  591. trigger("reconnect", [current_reconnection_count, last_reconnect_interval]);
  592. reconnect_timer = setTimeout(function() {
  593. self.connect();
  594. }, last_reconnect_interval * 1000);
  595. }
  596. // Big endian byte array to long.
  597. function buf2long(buf) {
  598. var b = new Uint8Array(buf),
  599. val = 0,
  600. len = b.length;
  601. for(var i=0, j=len-1; i<len; i++, j--) {
  602. val += b[j] << (i*8);
  603. }
  604. return val;
  605. }
  606. // de-duplicate an array
  607. function arrayUnique() {
  608. var u = {}, a = [];
  609. for(var i = 0, l = this.length; i < l; ++i){
  610. if(u.hasOwnProperty(this[i])) {
  611. continue;
  612. }
  613. a.push(this[i]);
  614. u[this[i]] = 1;
  615. }
  616. return a;
  617. }
  618. var self = this;
  619. };
  620. module.exports = KiteTicker;