39 #include <unordered_map>
43 #include "../exceptions.hpp"
44 #include "../responses/responses.hpp"
45 #include "../userconstants.hpp"
46 #include "../utils.hpp"
49 #include "rapidjson/include/rapidjson/document.h"
50 #include "rapidjson/include/rapidjson/rapidjson.h"
51 #include "rapidjson/include/rapidjson/writer.h"
54 namespace kiteconnect {
56 static_assert(std::numeric_limits<double>::is_iec559,
57 "Requires IEEE 754 floating point!");
59 namespace rj = rapidjson;
60 namespace kc = kiteconnect;
61 namespace utils = kc::internal::utils;
65 bool EnableReconnect,
unsigned int maxreconnectdelay,
66 unsigned int MaxReconnectTries)
67 : key(std::move(Key)),
68 connectTimeout(ConnectTimeout * utils::MILLISECONDS_IN_A_SECOND),
69 enableReconnect(EnableReconnect), maxReconnectDelay(maxreconnectdelay),
70 maxReconnectTries(MaxReconnectTries),
71 group(hub.createGroup<uWS::CLIENT>()) {};
88 inline std::chrono::time_point<std::chrono::system_clock>
ticker::
100 utils::json::json<utils::json::JsonObject> req;
101 req.field(
"a",
"subscribe");
102 req.field(
"v", instrumentTokens);
103 string reqStr = req.serialize();
106 ws->send(reqStr.data(), reqStr.size(), uWS::OpCode::TEXT);
107 for (
const int tok : instrumentTokens) {
108 subbedInstruments[tok] = DEFAULT_MODE;
116 utils::json::json<utils::json::JsonObject> req;
117 req.field(
"a",
"unsubscribe");
118 req.field(
"v", instrumentTokens);
119 string reqStr = req.serialize();
122 ws->send(reqStr.data(), reqStr.size(), uWS::OpCode::TEXT);
123 for (
const int tok : instrumentTokens) {
124 auto it = subbedInstruments.find(tok);
125 if (it != subbedInstruments.end()) { subbedInstruments.erase(it); };
133 const string& mode,
const std::vector<int>& instrumentTokens) {
137 auto& reqAlloc = req.GetAllocator();
139 rj::Value valArr(rj::kArrayType);
140 rj::Value toksArr(rj::kArrayType);
142 val.SetString(
"mode", reqAlloc);
143 req.AddMember(
"a", val, reqAlloc);
145 val.SetString(mode.c_str(), mode.size(), reqAlloc);
146 valArr.PushBack(val, reqAlloc);
147 for (
const int tok : instrumentTokens) { toksArr.PushBack(tok, reqAlloc); }
148 valArr.PushBack(toksArr, reqAlloc);
149 req.AddMember(
"v", valArr, reqAlloc);
152 string reqStr = utils::json::serialize(req);
154 ws->send(reqStr.data(), reqStr.size(), uWS::OpCode::TEXT);
155 for (
const int tok : instrumentTokens) {
156 if (mode == MODE_LTP) {
157 subbedInstruments[tok] = MODES::LTP;
158 }
else if (mode == MODE_QUOTE) {
159 subbedInstruments[tok] = MODES::QUOTE;
161 subbedInstruments[tok] = MODES::FULL;
169 inline void ticker::connectInternal() {
170 hub.connect(FMT(connectUrlFmt, key, token),
nullptr, {},
171 static_cast<int>(connectTimeout), group);
174 inline void ticker::reconnect() {
176 isReconnecting =
true;
179 if (reconnectTries <= maxReconnectTries) {
180 std::this_thread::sleep_for(std::chrono::seconds(reconnectDelay));
181 reconnectDelay = (reconnectDelay * 2 > maxReconnectDelay) ?
191 isReconnecting =
false;
195 inline void ticker::processTextMessage(
const string& message) {
197 utils::json::parse(res, message);
198 if (!res.IsObject()) {
throw libException(
"Expected a JSON object"); };
200 auto type = utils::json::get<string>(res,
"type");
203 FMT(
"Cannot recognize websocket message type {0}", type));
210 if (type ==
"error" &&
onError) {
211 onError(
this, 0, utils::json::extractString(res));
215 template <
typename T>
216 T ticker::unpack(
const std::vector<char>& bytes,
size_t start,
size_t end) {
219 std::vector<char> requiredBytes(bytes.begin() +
static_cast<int64_t
>(start),
220 bytes.begin() +
static_cast<int64_t
>(end) + 1);
223 #ifndef WORDS_BIGENDIAN
224 std::reverse(requiredBytes.begin(), requiredBytes.end());
228 std::memcpy(&value, requiredBytes.data(),
sizeof(T));
232 inline std::vector<std::vector<char>> ticker::splitPackets(
233 const std::vector<char>& bytes) {
234 const auto numberOfPackets = unpack<int16_t>(bytes, 0, 1);
235 std::vector<std::vector<char>> packets;
237 unsigned int packetLengthStartIdx = 2;
238 for (
int i = 1; i <= numberOfPackets; i++) {
239 unsigned int packetLengthEndIdx = packetLengthStartIdx + 1;
241 unpack<int16_t>(bytes, packetLengthStartIdx, packetLengthEndIdx);
242 packetLengthStartIdx = packetLengthEndIdx + packetLength + 1;
243 packets.emplace_back(bytes.begin() + packetLengthEndIdx + 1,
244 bytes.begin() + packetLengthStartIdx);
249 inline std::vector<kc::tick> ticker::parseBinaryMessage(
250 char* bytes,
size_t size) {
251 static constexpr uint8_t SEGMENT_MASK = 0xff;
252 static constexpr
double CDS_DIVISOR = 10000000.0;
253 static constexpr
double BSECDS_DIVISOR = 10000.0;
254 static constexpr
double GENERIC_DIVISOR = 100.0;
255 static constexpr
size_t LTP_PACKET_SIZE = 8;
256 static constexpr
size_t INDICES_QUOTE_PACKET_SIZE = 28;
257 static constexpr
size_t INDICES_FULL_PACKET_SIZE = 32;
258 static constexpr
size_t QUOTE_PACKET_SIZE = 44;
259 static constexpr
size_t FULL_PACKET_SIZE = 184;
262 std::vector<std::vector<char>> packets =
263 splitPackets(std::vector<char>(bytes, bytes + size));
264 if (packets.empty()) {
return {}; };
266 std::vector<kc::tick> ticks;
267 for (
const auto& packet : packets) {
268 const size_t packetSize = packet.size();
269 const auto instrumentToken = unpack<int32_t>(packet, 0, 3);
271 const uint8_t segment = instrumentToken & SEGMENT_MASK;
272 const bool tradable =
273 segment !=
static_cast<uint8_t
>(SEGMENTS::INDICES);
274 double divisor = 0.0;
275 if (segment ==
static_cast<uint8_t
>(SEGMENTS::CDS)) {
276 divisor = CDS_DIVISOR;
278 }
else if (segment ==
static_cast<uint8_t
>(SEGMENTS::BSECDS)) {
279 divisor = BSECDS_DIVISOR;
282 divisor = GENERIC_DIVISOR;
286 Tick.isTradable = tradable;
287 Tick.instrumentToken = instrumentToken;
291 if (packetSize == LTP_PACKET_SIZE) {
292 Tick.mode = MODE_LTP;
293 Tick.lastPrice = unpack<int32_t>(packet, 4, 7) / divisor;
294 }
else if (packetSize == INDICES_QUOTE_PACKET_SIZE ||
295 packetSize == INDICES_FULL_PACKET_SIZE) {
297 Tick.mode = (packetSize == INDICES_QUOTE_PACKET_SIZE) ? MODE_QUOTE :
299 Tick.lastPrice = unpack<int32_t>(packet, 4, 7) / divisor;
300 Tick.ohlc.high = unpack<int32_t>(packet, 8, 11) / divisor;
301 Tick.ohlc.low = unpack<int32_t>(packet, 12, 15) / divisor;
302 Tick.ohlc.open = unpack<int32_t>(packet, 16, 19) / divisor;
303 Tick.ohlc.close = unpack<int32_t>(packet, 20, 23) / divisor;
304 Tick.netChange = unpack<int32_t>(packet, 24, 27) / divisor;
305 if (packetSize == INDICES_FULL_PACKET_SIZE) {
306 Tick.timestamp = unpack<int32_t>(packet, 28, 31);
308 }
else if (packetSize == QUOTE_PACKET_SIZE ||
309 packetSize == FULL_PACKET_SIZE) {
312 (packetSize == QUOTE_PACKET_SIZE) ? MODE_QUOTE : MODE_FULL;
313 Tick.lastPrice = unpack<int32_t>(packet, 4, 7) / divisor;
314 Tick.lastTradedQuantity = unpack<int32_t>(packet, 8, 11);
315 Tick.averageTradePrice = unpack<int32_t>(packet, 12, 15) / divisor;
316 Tick.volumeTraded = unpack<int32_t>(packet, 16, 19);
317 Tick.totalBuyQuantity = unpack<int32_t>(packet, 20, 23);
318 Tick.totalSellQuantity = unpack<int32_t>(packet, 24, 27);
319 Tick.ohlc.open = unpack<int32_t>(packet, 28, 31) / divisor;
320 Tick.ohlc.high = unpack<int32_t>(packet, 32, 35) / divisor;
321 Tick.ohlc.low = unpack<int32_t>(packet, 36, 39) / divisor;
322 Tick.ohlc.close = unpack<int32_t>(packet, 40, 43) / divisor;
324 (Tick.lastPrice - Tick.ohlc.close) * 100 / Tick.ohlc.close;
327 if (packetSize == FULL_PACKET_SIZE) {
328 Tick.lastTradeTime = unpack<int32_t>(packet, 44, 47);
329 Tick.oi = unpack<int32_t>(packet, 48, 51);
330 Tick.oiDayHigh = unpack<int32_t>(packet, 52, 55);
331 Tick.oiDayLow = unpack<int32_t>(packet, 56, 59);
332 Tick.timestamp = unpack<int32_t>(packet, 60, 63);
334 unsigned int depthStartIdx = 64;
335 for (
int i = 0; i <= 9; i++) {
337 depth.quantity = unpack<int32_t>(
338 packet, depthStartIdx, depthStartIdx + 3);
339 depth.price = unpack<int32_t>(packet, depthStartIdx + 4,
342 depth.orders = unpack<int16_t>(
343 packet, depthStartIdx + 8, depthStartIdx + 9);
345 (i >= 5) ? Tick.marketDepth.sell.emplace_back(depth) :
346 Tick.marketDepth.buy.emplace_back(depth);
347 depthStartIdx = depthStartIdx + 12;
352 ticks.emplace_back(Tick);
357 inline void ticker::resubInstruments() {
358 std::vector<int> ltpInstruments;
359 std::vector<int> quoteInstruments;
360 std::vector<int> fullInstruments;
361 for (
const auto& i : subbedInstruments) {
362 if (i.second == MODES::LTP) { ltpInstruments.push_back(i.first); };
363 if (i.second == MODES::QUOTE) { quoteInstruments.push_back(i.first); };
364 if (i.second == MODES::FULL) { fullInstruments.push_back(i.first); };
367 if (!ltpInstruments.empty()) {
setMode(MODE_LTP, ltpInstruments); };
368 if (!quoteInstruments.empty()) {
setMode(MODE_QUOTE, quoteInstruments); };
369 if (!fullInstruments.empty()) {
setMode(MODE_FULL, fullInstruments); };
372 inline void ticker::assignCallbacks() {
375 [&](uWS::WebSocket<uWS::CLIENT>* Ws, uWS::HttpRequest ) {
379 lastPongTime = std::chrono::system_clock::now();
382 reconnectDelay = initReconnectDelay;
383 isReconnecting =
false;
384 if (!subbedInstruments.empty()) { resubInstruments(); };
389 group->onMessage([&](uWS::WebSocket<uWS::CLIENT>* ,
char* message,
390 size_t length, uWS::OpCode opCode) {
391 if (opCode == uWS::OpCode::BINARY &&
onTicks) {
394 lastBeatTime = std::chrono::system_clock::now();
396 onTicks(
this, parseBinaryMessage(message, length));
398 }
else if (opCode == uWS::OpCode::TEXT) {
399 processTextMessage(
string(message, length));
404 group->onPong([&](uWS::WebSocket<uWS::CLIENT>* ,
char* ,
406 lastPongTime = std::chrono::system_clock::now();
409 group->onError([&](
void*) {
412 if (
isConnected()) { ws->close(utils::ws::ERROR_CODE::NO_REASON); };
413 if (enableReconnect) { reconnect(); };
417 group->onDisconnection([&](uWS::WebSocket<uWS::CLIENT>* ,
int code,
418 char* reason,
size_t length) {
421 if (code != utils::ws::ERROR_CODE::NORMAL_CLOSURE) {
425 if (code != utils::ws::ERROR_CODE::NORMAL_CLOSURE) {
426 if (enableReconnect && !isReconnecting) { reconnect(); };
430 group->startAutoPing(
static_cast<int>(pingInterval), pingMessage);