39 #include <unordered_map>
43 #include "kiteppexceptions.hpp"
47 #include "rapidjson/document.h"
48 #include "rapidjson/rapidjson.h"
49 #include "rapidjson/writer.h"
50 #include "rjutils.hpp"
53 namespace kiteconnect {
56 static_assert(std::numeric_limits<double>::is_iec559,
"Requires IEEE 754 floating point!");
59 namespace rj = rapidjson;
60 namespace kc = kiteconnect;
61 namespace rju = kc::rjutils;
82 std::function<void(
kiteWS* ws,
const std::vector<kc::tick>& ticks)>
onTicks;
97 std::function<void(
kiteWS* ws,
int code,
const string& message)>
onError;
142 kiteWS(
const string& apikey,
unsigned int connecttimeout = 5,
bool enablereconnect =
false,
143 unsigned int maxreconnectdelay = 60,
unsigned int maxreconnecttries = 30)
144 : _apiKey(apikey), _connectTimeout(connecttimeout * 1000), _enableReconnect(enablereconnect),
145 _maxReconnectDelay(maxreconnectdelay), _maxReconnectTries(maxreconnecttries),
146 _hubGroup(_hub.createGroup<uWS::CLIENT>()) {};
203 std::chrono::time_point<std::chrono::system_clock>
getLastBeatTime() {
return _lastBeatTime; };
209 void run() { _hub.run(); };
225 void subscribe(
const std::vector<int>& instrumentToks) {
229 auto& reqAlloc = req.GetAllocator();
231 rj::Value toksArr(rj::kArrayType);
233 val.SetString(
"subscribe", reqAlloc);
234 req.AddMember(
"a", val, reqAlloc);
236 for (
const int tok : instrumentToks) { toksArr.PushBack(tok, reqAlloc); }
237 req.AddMember(
"v", toksArr, reqAlloc);
239 string reqStr = rju::_dump(req);
241 _WS->send(reqStr.data(), reqStr.size(), uWS::OpCode::TEXT);
242 for (
const int tok : instrumentToks) { _subbedInstruments[tok] =
""; };
257 auto& reqAlloc = req.GetAllocator();
259 rj::Value toksArr(rj::kArrayType);
261 val.SetString(
"unsubscribe", reqAlloc);
262 req.AddMember(
"a", val, reqAlloc);
264 for (
const int tok : instrumentToks) { toksArr.PushBack(tok, reqAlloc); }
265 req.AddMember(
"v", toksArr, reqAlloc);
267 string reqStr = rju::_dump(req);
269 _WS->send(reqStr.data(), reqStr.size(), uWS::OpCode::TEXT);
270 for (
const int tok : instrumentToks) {
271 auto it = _subbedInstruments.find(tok);
272 if (it != _subbedInstruments.end()) { _subbedInstruments.erase(it); };
285 void setMode(
const string& mode,
const std::vector<int>& instrumentToks) {
289 auto& reqAlloc = req.GetAllocator();
291 rj::Value valArr(rj::kArrayType);
292 rj::Value toksArr(rj::kArrayType);
294 val.SetString(
"mode", reqAlloc);
295 req.AddMember(
"a", val, reqAlloc);
297 val.SetString(mode.c_str(), mode.size(), reqAlloc);
298 valArr.PushBack(val, reqAlloc);
299 for (
const int tok : instrumentToks) { toksArr.PushBack(tok, reqAlloc); }
300 valArr.PushBack(toksArr, reqAlloc);
301 req.AddMember(
"v", valArr, reqAlloc);
303 string reqStr = rju::_dump(req);
305 _WS->send(reqStr.data(), reqStr.size(), uWS::OpCode::TEXT);
306 for (
const int tok : instrumentToks) { _subbedInstruments[tok] = mode; };
314 friend class kWSTest_binaryParsingTest_Test;
316 const string _connectURLFmt =
"wss://ws.kite.trade/?api_key={0}&access_token={1}";
319 const std::unordered_map<string, int> _segmentConstants = {
330 std::unordered_map<int, string> _subbedInstruments;
333 uWS::Group<uWS::CLIENT>* _hubGroup;
334 uWS::WebSocket<uWS::CLIENT>* _WS =
nullptr;
335 const unsigned int _connectTimeout = 5000;
337 const string _pingMessage =
"";
338 const unsigned int _pingInterval = 3000;
340 const bool _enableReconnect =
false;
341 const unsigned int _initReconnectDelay = 2;
342 unsigned int _reconnectDelay = _initReconnectDelay;
343 const unsigned int _maxReconnectDelay = 0;
344 unsigned int _reconnectTries = 0;
345 const unsigned int _maxReconnectTries = 0;
346 std::atomic<bool> _isReconnecting {
false };
348 std::chrono::time_point<std::chrono::system_clock> _lastPongTime;
349 std::chrono::time_point<std::chrono::system_clock> _lastBeatTime;
355 _hub.connect(FMT(_connectURLFmt, _apiKey, _accessToken),
nullptr, {}, _connectTimeout, _hubGroup);
362 _isReconnecting =
true;
365 if (_reconnectTries <= _maxReconnectTries) {
366 std::this_thread::sleep_for(std::chrono::seconds(_reconnectDelay));
367 _reconnectDelay = (_reconnectDelay * 2 > _maxReconnectDelay) ? _maxReconnectDelay : _reconnectDelay * 2;
375 _isReconnecting =
false;
379 void _processTextMessage(
char* message,
size_t length) {
382 rju::_parse(res,
string(message, length));
383 if (!res.IsObject()) {
throw libException(
"Expected a JSON object"); };
386 rju::_getIfExists(res, type,
"type");
387 if (type.empty()) {
throw kc::libException(FMT(
"Cannot recognize websocket message type {0}", type)); }
391 if (type ==
"error" &&
onError) {
onError(
this, 0, res[
"data"].GetString()); };
395 template <
typename T> T _getNum(
const std::vector<char>& bytes,
size_t start,
size_t end) {
398 std::vector<char> requiredBytes(bytes.begin() + start, bytes.begin() + end + 1);
401 #ifndef WORDS_BIGENDIAN
402 std::reverse(requiredBytes.begin(), requiredBytes.end());
406 std::memcpy(&value, requiredBytes.data(),
sizeof(T));
411 std::vector<std::vector<char>> _splitPackets(
const std::vector<char>& bytes) {
413 const int16_t numberOfPackets = _getNum<int16_t>(bytes, 0, 1);
415 std::vector<std::vector<char>> packets;
417 unsigned int packetLengthStartIdx = 2;
418 for (
int i = 1; i <= numberOfPackets; i++) {
419 unsigned int packetLengthEndIdx = packetLengthStartIdx + 1;
420 int16_t packetLength = _getNum<int16_t>(bytes, packetLengthStartIdx, packetLengthEndIdx);
421 packetLengthStartIdx = packetLengthEndIdx + packetLength + 1;
422 packets.emplace_back(bytes.begin() + packetLengthEndIdx + 1, bytes.begin() + packetLengthStartIdx);
428 std::vector<kc::tick> _parseBinaryMessage(
char* bytes,
size_t size) {
430 std::vector<std::vector<char>> packets = _splitPackets(std::vector<char>(bytes, bytes + size));
431 if (packets.empty()) {
return {}; };
433 std::vector<kc::tick> ticks;
434 for (
const auto& packet : packets) {
435 size_t packetSize = packet.size();
436 int32_t instrumentToken = _getNum<int32_t>(packet, 0, 3);
437 int segment = instrumentToken & 0xff;
438 double divisor = (segment == _segmentConstants.at(
"cds")) ? 10000000.0 : 100.0;
439 bool tradable = (segment == _segmentConstants.at(
"indices")) ?
false :
true;
443 Tick.isTradable = tradable;
444 Tick.instrumentToken = instrumentToken;
447 if (packetSize == 8) {
448 Tick.mode = MODE_LTP;
449 Tick.lastPrice = _getNum<int32_t>(packet, 4, 7) / divisor;
451 }
else if (packetSize == 28 || packetSize == 32) {
454 Tick.mode = (packetSize == 28) ? MODE_QUOTE : MODE_FULL;
455 Tick.lastPrice = _getNum<int32_t>(packet, 4, 7) / divisor;
456 Tick.OHLC.high = _getNum<int32_t>(packet, 8, 11) / divisor;
457 Tick.OHLC.low = _getNum<int32_t>(packet, 12, 15) / divisor;
458 Tick.OHLC.open = _getNum<int32_t>(packet, 16, 19) / divisor;
459 Tick.OHLC.close = _getNum<int32_t>(packet, 20, 23) / divisor;
461 Tick.netChange = _getNum<int32_t>(packet, 24, 27) / divisor;
464 if (packetSize == 32) { Tick.timestamp = _getNum<int32_t>(packet, 28, 33); }
466 }
else if (packetSize == 44 || packetSize == 184) {
469 Tick.mode = (packetSize == 44) ? MODE_QUOTE : MODE_FULL;
470 Tick.lastPrice = _getNum<int32_t>(packet, 4, 7) / divisor;
471 Tick.lastTradedQuantity = _getNum<int32_t>(packet, 8, 11);
472 Tick.averageTradePrice = _getNum<int32_t>(packet, 12, 15) / divisor;
473 Tick.volumeTraded = _getNum<int32_t>(packet, 16, 19);
474 Tick.totalBuyQuantity = _getNum<int32_t>(packet, 20, 23);
475 Tick.totalSellQuantity = _getNum<int32_t>(packet, 24, 27);
476 Tick.OHLC.open = _getNum<int32_t>(packet, 28, 31) / divisor;
477 Tick.OHLC.high = _getNum<int32_t>(packet, 32, 35) / divisor;
478 Tick.OHLC.low = _getNum<int32_t>(packet, 36, 39) / divisor;
479 Tick.OHLC.close = _getNum<int32_t>(packet, 40, 43) / divisor;
481 Tick.netChange = (Tick.lastPrice - Tick.OHLC.close) * 100 / Tick.OHLC.close;
484 if (packetSize == 184) {
485 Tick.lastTradeTime = _getNum<int32_t>(packet, 44, 47);
486 Tick.OI = _getNum<int32_t>(packet, 48, 51);
487 Tick.OIDayHigh = _getNum<int32_t>(packet, 52, 55);
488 Tick.OIDayLow = _getNum<int32_t>(packet, 56, 59);
489 Tick.timestamp = _getNum<int32_t>(packet, 60, 63);
491 unsigned int depthStartIdx = 64;
492 for (
int i = 0; i <= 9; i++) {
494 depth.quantity = _getNum<int32_t>(packet, depthStartIdx, depthStartIdx + 3);
495 depth.price = _getNum<int32_t>(packet, depthStartIdx + 4, depthStartIdx + 7) / divisor;
496 depth.orders = _getNum<int16_t>(packet, depthStartIdx + 8, depthStartIdx + 9);
498 (i >= 5) ? Tick.marketDepth.sell.emplace_back(depth) : Tick.marketDepth.buy.emplace_back(depth);
499 depthStartIdx = depthStartIdx + 12;
504 ticks.emplace_back(Tick);
510 void _resubInstruments() {
512 std::vector<int> LTPInstruments;
513 std::vector<int> quoteInstruments;
514 std::vector<int> fullInstruments;
515 for (
const auto& i : _subbedInstruments) {
516 if (i.second == MODE_LTP) { LTPInstruments.push_back(i.first); };
517 if (i.second == MODE_QUOTE) { quoteInstruments.push_back(i.first); };
518 if (i.second == MODE_FULL) { fullInstruments.push_back(i.first); };
520 if (i.second.empty()) { quoteInstruments.push_back(i.first); };
523 if (!LTPInstruments.empty()) {
setMode(MODE_LTP, LTPInstruments); };
524 if (!quoteInstruments.empty()) {
setMode(MODE_QUOTE, quoteInstruments); };
525 if (!fullInstruments.empty()) {
setMode(MODE_FULL, fullInstruments); };
552 void _assignCallbacks() {
554 _hubGroup->onConnection([&](uWS::WebSocket<uWS::CLIENT>* ws, uWS::HttpRequest req) {
558 _lastPongTime = std::chrono::system_clock::now();
561 _reconnectDelay = _initReconnectDelay;
562 _isReconnecting =
false;
563 if (!_subbedInstruments.empty()) { _resubInstruments(); };
567 _hubGroup->onMessage([&](uWS::WebSocket<uWS::CLIENT>* ws,
char* message,
size_t length, uWS::OpCode opCode) {
568 if (opCode == uWS::OpCode::BINARY &&
onTicks) {
571 _lastBeatTime = std::chrono::system_clock::now();
573 onTicks(
this, _parseBinaryMessage(message, length));
575 }
else if (opCode == uWS::OpCode::TEXT) {
576 _processTextMessage(message, length);
580 _hubGroup->onPong([&](uWS::WebSocket<uWS::CLIENT>* ws,
char* message,
size_t length) {
581 _lastPongTime = std::chrono::system_clock::now();
584 _hubGroup->onError([&](
void*) {
588 if (_enableReconnect) { _reconnect(); };
591 _hubGroup->onDisconnection([&](uWS::WebSocket<uWS::CLIENT>* ws,
int code,
char* reason,
size_t length) {
599 if (_enableReconnect && !_isReconnecting) { _reconnect(); };
603 _hubGroup->startAutoPing(_pingInterval, _pingMessage);