CPPKiteConnect
internal.hpp
1 /*
2  * Licensed under the MIT License <http://opensource.org/licenses/MIT>.
3  * SPDX-License-Identifier: MIT
4  *
5  * Copyright (c) 2020-2022 Bhumit Attarde
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
18  * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
20  * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
21  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT
22  * OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
23  * USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 
26 #pragma once
27 
28 #include <algorithm> //reverse
29 #include <atomic>
30 #include <chrono>
31 #include <cstdint>
32 #include <cstring> //memcpy
33 #include <functional>
34 #include <ios>
35 #include <iostream>
36 #include <limits>
37 #include <string>
38 #include <thread>
39 #include <unordered_map>
40 #include <utility>
41 #include <vector>
42 
43 #include "../exceptions.hpp"
44 #include "../responses/responses.hpp"
45 #include "../userconstants.hpp" //modes
46 #include "../utils.hpp"
47 #include "ws.hpp"
48 
49 #include "rapidjson/include/rapidjson/document.h"
50 #include "rapidjson/include/rapidjson/rapidjson.h"
51 #include "rapidjson/include/rapidjson/writer.h"
52 #include <uWS/uWS.h>
53 
54 namespace kiteconnect {
55 // To make sure doubles are parsed correctly
56 static_assert(std::numeric_limits<double>::is_iec559,
57  "Requires IEEE 754 floating point!");
58 using std::string;
59 namespace rj = rapidjson;
60 namespace kc = kiteconnect;
61 namespace utils = kc::internal::utils;
62 
63 // NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
64 inline ticker::ticker(string Key, unsigned int ConnectTimeout,
65  bool EnableReconnect, unsigned int maxreconnectdelay,
66  unsigned int MaxReconnectTries)
67  : key(std::move(Key)),
68  connectTimeout(ConnectTimeout * utils::MILLISECONDS_IN_A_SECOND),
69  enableReconnect(EnableReconnect), maxReconnectDelay(maxreconnectdelay),
70  maxReconnectTries(MaxReconnectTries),
71  group(hub.createGroup<uWS::CLIENT>()) {};
72 
73 inline void ticker::setApiKey(const string& Key) { key = Key; };
74 
75 inline string ticker::getApiKey() const { return key; };
76 
77 inline void ticker::setAccessToken(const string& Token) { token = Token; };
78 
79 inline string ticker::getAccessToken() const { return token; };
80 
81 inline void ticker::connect() {
82  assignCallbacks();
83  connectInternal();
84 };
85 
86 inline bool ticker::isConnected() const { return ws != nullptr; };
87 
88 inline std::chrono::time_point<std::chrono::system_clock> ticker::
90  return lastBeatTime;
91 };
92 
93 inline void ticker::run() { hub.run(); };
94 
95 inline void ticker::stop() {
96  if (isConnected()) { ws->close(); };
97 };
98 
99 inline void ticker::subscribe(const std::vector<int>& instrumentTokens) {
100  utils::json::json<utils::json::JsonObject> req;
101  req.field("a", "subscribe");
102  req.field("v", instrumentTokens);
103  string reqStr = req.serialize();
104 
105  if (isConnected()) {
106  ws->send(reqStr.data(), reqStr.size(), uWS::OpCode::TEXT);
107  for (const int tok : instrumentTokens) {
108  subbedInstruments[tok] = DEFAULT_MODE;
109  };
110  } else {
111  throw kc::libException("not connected to websocket server");
112  };
113 };
114 
115 inline void ticker::unsubscribe(const std::vector<int>& instrumentTokens) {
116  utils::json::json<utils::json::JsonObject> req;
117  req.field("a", "unsubscribe");
118  req.field("v", instrumentTokens);
119  string reqStr = req.serialize();
120 
121  if (isConnected()) {
122  ws->send(reqStr.data(), reqStr.size(), uWS::OpCode::TEXT);
123  for (const int tok : instrumentTokens) {
124  auto it = subbedInstruments.find(tok);
125  if (it != subbedInstruments.end()) { subbedInstruments.erase(it); };
126  };
127  } else {
128  throw kc::libException("not connected to websocket server");
129  };
130 };
131 
132 inline void ticker::setMode(
133  const string& mode, const std::vector<int>& instrumentTokens) {
134  // create request json
135  rj::Document req;
136  req.SetObject();
137  auto& reqAlloc = req.GetAllocator();
138  rj::Value val;
139  rj::Value valArr(rj::kArrayType);
140  rj::Value toksArr(rj::kArrayType);
141 
142  val.SetString("mode", reqAlloc);
143  req.AddMember("a", val, reqAlloc);
144 
145  val.SetString(mode.c_str(), mode.size(), reqAlloc);
146  valArr.PushBack(val, reqAlloc);
147  for (const int tok : instrumentTokens) { toksArr.PushBack(tok, reqAlloc); }
148  valArr.PushBack(toksArr, reqAlloc);
149  req.AddMember("v", valArr, reqAlloc);
150 
151  // send the request
152  string reqStr = utils::json::serialize(req);
153  if (isConnected()) {
154  ws->send(reqStr.data(), reqStr.size(), uWS::OpCode::TEXT);
155  for (const int tok : instrumentTokens) {
156  if (mode == MODE_LTP) {
157  subbedInstruments[tok] = MODES::LTP;
158  } else if (mode == MODE_QUOTE) {
159  subbedInstruments[tok] = MODES::QUOTE;
160  } else {
161  subbedInstruments[tok] = MODES::FULL;
162  }
163  };
164  } else {
165  throw kc::libException("not connected to websocket server");
166  };
167 };
168 
169 inline void ticker::connectInternal() {
170  hub.connect(FMT(connectUrlFmt, key, token), nullptr, {},
171  static_cast<int>(connectTimeout), group);
172 };
173 
174 inline void ticker::reconnect() {
175  if (isConnected()) { return; };
176  isReconnecting = true;
177  reconnectTries++;
178 
179  if (reconnectTries <= maxReconnectTries) {
180  std::this_thread::sleep_for(std::chrono::seconds(reconnectDelay));
181  reconnectDelay = (reconnectDelay * 2 > maxReconnectDelay) ?
182  maxReconnectDelay :
183  reconnectDelay * 2;
184 
185  if (onTryReconnect) { onTryReconnect(this, reconnectTries); };
186  connectInternal();
187 
188  if (isConnected()) { return; };
189  } else {
190  if (onReconnectFail) { onReconnectFail(this); };
191  isReconnecting = false;
192  };
193 };
194 
195 inline void ticker::processTextMessage(const string& message) {
196  rj::Document res;
197  utils::json::parse(res, message);
198  if (!res.IsObject()) { throw libException("Expected a JSON object"); };
199 
200  auto type = utils::json::get<string>(res, "type");
201  if (type.empty()) {
202  throw kc::libException(
203  FMT("Cannot recognize websocket message type {0}", type));
204  }
205 
206  if (type == "order" && onOrderUpdate) {
207  onOrderUpdate(this, kc::postback(utils::json::extractObject(res)));
208  }
209  if (type == "message" && onMessage) { onMessage(this, message); };
210  if (type == "error" && onError) {
211  onError(this, 0, utils::json::extractString(res));
212  };
213 };
214 
215 template <typename T>
216 T ticker::unpack(const std::vector<char>& bytes, size_t start, size_t end) {
217  // FIXME directly iterate over bytes instead of making a copy or reversing
218  T value;
219  std::vector<char> requiredBytes(bytes.begin() + static_cast<int64_t>(start),
220  bytes.begin() + static_cast<int64_t>(end) + 1);
221 
222  // clang-format off
223  #ifndef WORDS_BIGENDIAN
224  std::reverse(requiredBytes.begin(), requiredBytes.end());
225  #endif
226  // clang-format on
227 
228  std::memcpy(&value, requiredBytes.data(), sizeof(T));
229  return value;
230 };
231 
232 inline std::vector<std::vector<char>> ticker::splitPackets(
233  const std::vector<char>& bytes) {
234  const auto numberOfPackets = unpack<int16_t>(bytes, 0, 1);
235  std::vector<std::vector<char>> packets;
236 
237  unsigned int packetLengthStartIdx = 2;
238  for (int i = 1; i <= numberOfPackets; i++) {
239  unsigned int packetLengthEndIdx = packetLengthStartIdx + 1;
240  auto packetLength =
241  unpack<int16_t>(bytes, packetLengthStartIdx, packetLengthEndIdx);
242  packetLengthStartIdx = packetLengthEndIdx + packetLength + 1;
243  packets.emplace_back(bytes.begin() + packetLengthEndIdx + 1,
244  bytes.begin() + packetLengthStartIdx);
245  };
246  return packets;
247 };
248 
249 inline std::vector<kc::tick> ticker::parseBinaryMessage(
250  char* bytes, size_t size) {
251  static constexpr uint8_t SEGMENT_MASK = 0xff;
252  static constexpr double CDS_DIVISOR = 10000000.0;
253  static constexpr double BSECDS_DIVISOR = 10000.0;
254  static constexpr double GENERIC_DIVISOR = 100.0;
255  static constexpr size_t LTP_PACKET_SIZE = 8;
256  static constexpr size_t INDICES_QUOTE_PACKET_SIZE = 28;
257  static constexpr size_t INDICES_FULL_PACKET_SIZE = 32;
258  static constexpr size_t QUOTE_PACKET_SIZE = 44;
259  static constexpr size_t FULL_PACKET_SIZE = 184;
260 
261  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
262  std::vector<std::vector<char>> packets =
263  splitPackets(std::vector<char>(bytes, bytes + size));
264  if (packets.empty()) { return {}; };
265 
266  std::vector<kc::tick> ticks;
267  for (const auto& packet : packets) {
268  const size_t packetSize = packet.size();
269  const auto instrumentToken = unpack<int32_t>(packet, 0, 3);
270  // NOLINTNEXTLINE(hicpp-signed-bitwise)
271  const uint8_t segment = instrumentToken & SEGMENT_MASK;
272  const bool tradable =
273  segment != static_cast<uint8_t>(SEGMENTS::INDICES);
274  double divisor = 0.0;
275  if (segment == static_cast<uint8_t>(SEGMENTS::CDS)) {
276  divisor = CDS_DIVISOR;
277 
278  } else if (segment == static_cast<uint8_t>(SEGMENTS::BSECDS)) {
279  divisor = BSECDS_DIVISOR;
280 
281  } else {
282  divisor = GENERIC_DIVISOR;
283  }
284 
285  kc::tick Tick;
286  Tick.isTradable = tradable;
287  Tick.instrumentToken = instrumentToken;
288 
289  // NOLINTBEGIN(cppcoreguidelines-avoid-magic-numbers)
290  // LTP packet
291  if (packetSize == LTP_PACKET_SIZE) {
292  Tick.mode = MODE_LTP;
293  Tick.lastPrice = unpack<int32_t>(packet, 4, 7) / divisor;
294  } else if (packetSize == INDICES_QUOTE_PACKET_SIZE ||
295  packetSize == INDICES_FULL_PACKET_SIZE) {
296  // indices quote and full mode
297  Tick.mode = (packetSize == INDICES_QUOTE_PACKET_SIZE) ? MODE_QUOTE :
298  MODE_FULL;
299  Tick.lastPrice = unpack<int32_t>(packet, 4, 7) / divisor;
300  Tick.ohlc.high = unpack<int32_t>(packet, 8, 11) / divisor;
301  Tick.ohlc.low = unpack<int32_t>(packet, 12, 15) / divisor;
302  Tick.ohlc.open = unpack<int32_t>(packet, 16, 19) / divisor;
303  Tick.ohlc.close = unpack<int32_t>(packet, 20, 23) / divisor;
304  Tick.netChange = unpack<int32_t>(packet, 24, 27) / divisor;
305  if (packetSize == INDICES_FULL_PACKET_SIZE) {
306  Tick.timestamp = unpack<int32_t>(packet, 28, 31);
307  }
308  } else if (packetSize == QUOTE_PACKET_SIZE ||
309  packetSize == FULL_PACKET_SIZE) {
310  // Quote and full mode
311  Tick.mode =
312  (packetSize == QUOTE_PACKET_SIZE) ? MODE_QUOTE : MODE_FULL;
313  Tick.lastPrice = unpack<int32_t>(packet, 4, 7) / divisor;
314  Tick.lastTradedQuantity = unpack<int32_t>(packet, 8, 11);
315  Tick.averageTradePrice = unpack<int32_t>(packet, 12, 15) / divisor;
316  Tick.volumeTraded = unpack<int32_t>(packet, 16, 19);
317  Tick.totalBuyQuantity = unpack<int32_t>(packet, 20, 23);
318  Tick.totalSellQuantity = unpack<int32_t>(packet, 24, 27);
319  Tick.ohlc.open = unpack<int32_t>(packet, 28, 31) / divisor;
320  Tick.ohlc.high = unpack<int32_t>(packet, 32, 35) / divisor;
321  Tick.ohlc.low = unpack<int32_t>(packet, 36, 39) / divisor;
322  Tick.ohlc.close = unpack<int32_t>(packet, 40, 43) / divisor;
323  Tick.netChange =
324  (Tick.lastPrice - Tick.ohlc.close) * 100 / Tick.ohlc.close;
325 
326  // parse full mode
327  if (packetSize == FULL_PACKET_SIZE) {
328  Tick.lastTradeTime = unpack<int32_t>(packet, 44, 47);
329  Tick.oi = unpack<int32_t>(packet, 48, 51);
330  Tick.oiDayHigh = unpack<int32_t>(packet, 52, 55);
331  Tick.oiDayLow = unpack<int32_t>(packet, 56, 59);
332  Tick.timestamp = unpack<int32_t>(packet, 60, 63);
333 
334  unsigned int depthStartIdx = 64;
335  for (int i = 0; i <= 9; i++) {
336  kc::depthWS depth;
337  depth.quantity = unpack<int32_t>(
338  packet, depthStartIdx, depthStartIdx + 3);
339  depth.price = unpack<int32_t>(packet, depthStartIdx + 4,
340  depthStartIdx + 7) /
341  divisor;
342  depth.orders = unpack<int16_t>(
343  packet, depthStartIdx + 8, depthStartIdx + 9);
344 
345  (i >= 5) ? Tick.marketDepth.sell.emplace_back(depth) :
346  Tick.marketDepth.buy.emplace_back(depth);
347  depthStartIdx = depthStartIdx + 12;
348  };
349  };
350  };
351  // NOLINTEND(cppcoreguidelines-avoid-magic-numbers)
352  ticks.emplace_back(Tick);
353  };
354  return ticks;
355 };
356 
357 inline void ticker::resubInstruments() {
358  std::vector<int> ltpInstruments;
359  std::vector<int> quoteInstruments;
360  std::vector<int> fullInstruments;
361  for (const auto& i : subbedInstruments) {
362  if (i.second == MODES::LTP) { ltpInstruments.push_back(i.first); };
363  if (i.second == MODES::QUOTE) { quoteInstruments.push_back(i.first); };
364  if (i.second == MODES::FULL) { fullInstruments.push_back(i.first); };
365  };
366 
367  if (!ltpInstruments.empty()) { setMode(MODE_LTP, ltpInstruments); };
368  if (!quoteInstruments.empty()) { setMode(MODE_QUOTE, quoteInstruments); };
369  if (!fullInstruments.empty()) { setMode(MODE_FULL, fullInstruments); };
370 };
371 
372 inline void ticker::assignCallbacks() {
373  // NOLINTNEXTLINE(readability-implicit-bool-conversion)
374  group->onConnection(
375  [&](uWS::WebSocket<uWS::CLIENT>* Ws, uWS::HttpRequest /*req*/) {
376  ws = Ws;
379  lastPongTime = std::chrono::system_clock::now();
380 
381  reconnectTries = 0;
382  reconnectDelay = initReconnectDelay;
383  isReconnecting = false;
384  if (!subbedInstruments.empty()) { resubInstruments(); };
385  if (onConnect) { onConnect(this); };
386  });
387 
388  // NOLINTNEXTLINE(readability-implicit-bool-conversion)
389  group->onMessage([&](uWS::WebSocket<uWS::CLIENT>* /*ws*/, char* message,
390  size_t length, uWS::OpCode opCode) {
391  if (opCode == uWS::OpCode::BINARY && onTicks) {
392  if (length == 1) {
393  // is a heartbeat
394  lastBeatTime = std::chrono::system_clock::now();
395  } else {
396  onTicks(this, parseBinaryMessage(message, length));
397  };
398  } else if (opCode == uWS::OpCode::TEXT) {
399  processTextMessage(string(message, length));
400  };
401  });
402 
403  // NOLINTNEXTLINE(readability-implicit-bool-conversion)
404  group->onPong([&](uWS::WebSocket<uWS::CLIENT>* /*ws*/, char* /*message*/,
405  size_t /*length*/) {
406  lastPongTime = std::chrono::system_clock::now();
407  });
408 
409  group->onError([&](void*) {
410  if (onConnectError) { onConnectError(this); }
411  // close the non-responsive connection
412  if (isConnected()) { ws->close(utils::ws::ERROR_CODE::NO_REASON); };
413  if (enableReconnect) { reconnect(); };
414  });
415 
416  // NOLINTNEXTLINE(readability-implicit-bool-conversion)
417  group->onDisconnection([&](uWS::WebSocket<uWS::CLIENT>* /*ws*/, int code,
418  char* reason, size_t length) {
419  ws = nullptr;
420 
421  if (code != utils::ws::ERROR_CODE::NORMAL_CLOSURE) {
422  if (onError) { onError(this, code, string(reason, length)); };
423  };
424  if (onClose) { onClose(this, code, string(reason, length)); };
425  if (code != utils::ws::ERROR_CODE::NORMAL_CLOSURE) {
426  if (enableReconnect && !isReconnecting) { reconnect(); };
427  };
428  });
429 
430  group->startAutoPing(static_cast<int>(pingInterval), pingMessage);
431 };
432 
433 } // namespace kiteconnect
kiteconnect::ticker::connect
void connect()
Connect to the websocket server.
Definition: internal.hpp:81
kiteconnect::libException
This exception is thrown when an error occures at the library level.
Definition: exceptions.hpp:206
kiteconnect::ticker::onOrderUpdate
std::function< void(ticker *ws, const kc::postback &postback)> onOrderUpdate
Called when an order update is received.
Definition: ws.hpp:77
kiteconnect::ticker::onConnect
std::function< void(ticker *ws)> onConnect
Called on successful connect.
Definition: ws.hpp:71
kiteconnect::ticker::onMessage
std::function< void(ticker *ws, const string &message)> onMessage
Called when a message is received.
Definition: ws.hpp:80
kiteconnect::ticker::run
void run()
Start the client. Should always be called after connect().
Definition: internal.hpp:93
kiteconnect::depthWS
Represents a single entry in market depth returned by ticker.
Definition: ws.hpp:43
kiteconnect::ticker::setAccessToken
void setAccessToken(const string &token)
Set the access token.
Definition: internal.hpp:77
kiteconnect::ticker::subscribe
void subscribe(const std::vector< int > &instrumentTokens)
Subscribe to a list of instrument tokens.
Definition: internal.hpp:99
kiteconnect::postback
Represents a postback.
Definition: ws.hpp:79
kiteconnect::ticker::onConnectError
std::function< void(ticker *ws)> onConnectError
Called when an error occures while trying to connect.
Definition: ws.hpp:87
kiteconnect::ticker::setMode
void setMode(const string &mode, const std::vector< int > &instrumentTokens)
Set the subscription mode for a list of instrument tokens.
Definition: internal.hpp:132
kiteconnect::ticker::onTryReconnect
std::function< void(ticker *ws, unsigned int attemptCount)> onTryReconnect
Called when reconnection is being attempted.
Definition: ws.hpp:103
kiteconnect::ticker::isConnected
bool isConnected() const
Check if client is connected.
Definition: internal.hpp:86
kiteconnect::ticker::onClose
std::function< void(ticker *ws, int code, const string &message)> onClose
Called when connection is closed.
Definition: ws.hpp:112
kiteconnect::tick
Represents a single market data tick.
Definition: ws.hpp:50
kiteconnect::ticker::onReconnectFail
std::function< void(ticker *ws)> onReconnectFail
Called when reconnect attempts exceed maximum reconnect attempts set by user i.e.,...
Definition: ws.hpp:109
kiteconnect::ticker::onError
std::function< void(ticker *ws, int code, const string &message)> onError
Called when connection is closed with an error or websocket server sends an error message.
Definition: ws.hpp:84
kiteconnect::ticker::setApiKey
void setApiKey(const string &key)
Set the API key.
Definition: internal.hpp:73
kiteconnect::ticker::getAccessToken
string getAccessToken() const
Get access token set at the moment.
Definition: internal.hpp:79
kiteconnect::ticker::getLastBeatTime
std::chrono::time_point< std::chrono::system_clock > getLastBeatTime() const
Get the last time heartbeat was received. Should be used in conjunction with the isConnected() method...
Definition: internal.hpp:89
kiteconnect::ticker::getApiKey
string getApiKey() const
Get API key set at the moment.
Definition: internal.hpp:75
kiteconnect::ticker::ticker
ticker(string Key, unsigned int ConnectTimeout=DEFAULT_CONNECT_TIMEOUT, bool EnableReconnect=false, unsigned int MaxReconnectDelay=DEFAULT_MAX_RECONNECT_DELAY, unsigned int MaxReconnectTries=DEFAULT_MAX_RECONNECT_TRIES)
Construct a new kiteWS object.
Definition: internal.hpp:64
kiteconnect::ticker::onTicks
std::function< void(ticker *ws, const std::vector< kc::tick > &ticks)> onTicks
Called when ticks are received.
Definition: ws.hpp:74
kiteconnect::ticker::stop
void stop()
Stop the client. Closes the connection if connected. Should be the last method that is called.
Definition: internal.hpp:95
kiteconnect::ticker::unsubscribe
void unsubscribe(const std::vector< int > &instrumentTokens)
Unsubscribe.
Definition: internal.hpp:115