I have a simple python code to receive data on websocket. It works for a few minutes, and then it stops receiving updates. After a few minutes it starts receiving updates again, so I am not sure what is happening. I have subscribed to very liquid tickers like HDFCBANK, NIFTY Futures, NIFTY index. I also have functions to show errors, which do not print anything on my console.
Can you let me know what is happening? I am sharing my code here.
def listen_to_mktdata(self):
def on_ticks(ws, ticks): # Callback to receive live market data # print(dt.datetime.now()) for json_data in ticks:
try: token = json_data["instrument_token"] symbol = self.contract_token_map.loc[token, "TradingSymbol"]
if('exchange_timestamp' in json_data and json_data["mode"] == "full"):
if "depth" in json_data: b_px = float(json_data["depth"]["buy"][0]["price"]) a_px = float(json_data["depth"]["sell"][0]["price"]) b_qty = int(json_data["depth"]["buy"][0]["quantity"]) a_qty = int(json_data["depth"]["sell"][0]["quantity"])
except Exception as e: print(e) print(json_data) pass
def on_connect(ws, response): # Callback on successful connect if(len(list(self.contract_token_map.index)) > 0): ws.subscribe(list(self.contract_token_map.index)) # self.kws.subscribe([260105]) ws.set_mode(ws.MODE_FULL, list(self.contract_token_map.index)) # Set to full mode to receive more detailed data
except Exception as e: print(e) print(json_data) pass
Thats too much processing within onticks method which is getting blocked leading to loss of data/disconnection.
In the onticks method you are supposed to just receive the tick and pass it to another process/queue/db. And then you will have another program read from the queue/db and do any processing/computation on the ticks.
Hi, I modified the code to send the data in to a different thread for processing. I ran into same issue. In the following code, the listen_to_mktdata is ran from the main thread, while I spawn a different thread to run process_MD. Also, I tried replicating the problem on a different computer, but it does not happen on another computer.
Can you please help?
def process_MD(self, queue): while True: json_data = queue.get()
try: token = json_data["instrument_token"] symbol = self.contract_token_map.loc[token, "TradingSymbol"]
if('exchange_timestamp' in json_data and json_data["mode"] == "full"):
if "depth" in json_data: b_px = float(json_data["depth"]["buy"][0]["price"]) a_px = float(json_data["depth"]["sell"][0]["price"]) b_qty = int(json_data["depth"]["buy"][0]["quantity"]) a_qty = int(json_data["depth"]["sell"][0]["quantity"])
except Exception as e: print(e) print(json_data) pass
def listen_to_mktdata(self):
def on_ticks(ws, ticks): # Callback to receive live market data start_time = dt.datetime.now() # print("Started")
for json_data in ticks: self.queue.put(json_data) # print(start_time, dt.datetime.now())
def on_connect(ws, response): # Callback on successful connect if(len(list(self.contract_token_map.index)) > 0): ws.subscribe(list(self.contract_token_map.index)) # self.kws.subscribe([260105]) ws.set_mode(ws.MODE_FULL, list(self.contract_token_map.index)) # Set to full mode to receive more detailed data
Sorry, I don't have the time, bandwith or patience to read and correct other peoples code. I can tell you what to do and then you need the required programming skills and experience to figure things out yourself. The only thing I will say is this. Stop using multi-threading unless you are a very very accomplished programmer with lots of experience in writing multi-threaded code. Also your code is incomplete. You say you used threading. I don't see any thread initialization or even a threading module import statement.
I write the ticks to a db. And then I run an entirely separate program that reads the tick from db and does the tick processing. In fact each functionality in my setup is a separate python program. With decades of IT experience I don't try to use multi-threading/multi-processing unless absolutely unavoidable.
def on_ticks(ws, ticks):
# Callback to receive live market data
# print(dt.datetime.now())
for json_data in ticks:
try:
token = json_data["instrument_token"]
symbol = self.contract_token_map.loc[token, "TradingSymbol"]
if('exchange_timestamp' in json_data and json_data["mode"] == "full"):
if "depth" in json_data:
b_px = float(json_data["depth"]["buy"][0]["price"])
a_px = float(json_data["depth"]["sell"][0]["price"])
b_qty = int(json_data["depth"]["buy"][0]["quantity"])
a_qty = int(json_data["depth"]["sell"][0]["quantity"])
b_px = np.nan if b_px == 0 else b_px
a_px = np.nan if a_px == 0 else a_px
mid_px = (b_px + a_px)/2
voltraded = int(json_data["volume_traded"])
last_update_time = int(json_data["exchange_timestamp"].timestamp())
else:
b_px = np.nan
a_px = np.nan
b_qty = np.nan
a_qty = np.nan
mid_px = json_data["last_price"]
voltraded = np.nan
last_update_time = int(json_data["exchange_timestamp"].timestamp())
self.intraday_prices_tracker.loc[symbol, "Last_Update_Time"] = last_update_time
self.intraday_prices_tracker.loc[symbol, "VolumeTradedToday"] = voltraded
self.intraday_prices_tracker.loc[symbol, "B1_Px"] = b_px
self.intraday_prices_tracker.loc[symbol, "A1_Px"] = a_px
self.intraday_prices_tracker.loc[symbol, "B1_Qty"] = b_qty
self.intraday_prices_tracker.loc[symbol, "A1_Qty"] = a_qty
self.intraday_prices_tracker.loc[symbol, "MidPx"] = mid_px
except Exception as e:
print(e)
print(json_data)
pass
Thats too much processing within onticks method which is getting blocked leading to loss of data/disconnection.
In the onticks method you are supposed to just receive the tick and pass it to another process/queue/db.
And then you will have another program read from the queue/db and do any processing/computation on the ticks.
In the following code, the listen_to_mktdata is ran from the main thread, while I spawn a different thread to run process_MD.
Also, I tried replicating the problem on a different computer, but it does not happen on another computer.
Can you please help?
def process_MD(self, queue):
while True:
json_data = queue.get()
try:
token = json_data["instrument_token"]
symbol = self.contract_token_map.loc[token, "TradingSymbol"]
if('exchange_timestamp' in json_data and json_data["mode"] == "full"):
if "depth" in json_data:
b_px = float(json_data["depth"]["buy"][0]["price"])
a_px = float(json_data["depth"]["sell"][0]["price"])
b_qty = int(json_data["depth"]["buy"][0]["quantity"])
a_qty = int(json_data["depth"]["sell"][0]["quantity"])
b_px = np.nan if b_px == 0 else b_px
a_px = np.nan if a_px == 0 else a_px
mid_px = (b_px + a_px)/2
voltraded = int(json_data["volume_traded"])
last_update_time = int(json_data["exchange_timestamp"].timestamp())
else:
b_px = np.nan
a_px = np.nan
b_qty = np.nan
a_qty = np.nan
mid_px = json_data["last_price"]
voltraded = np.nan
last_update_time = int(json_data["exchange_timestamp"].timestamp())
self.intraday_prices_tracker.loc[symbol, "Last_Update_Time"] = last_update_time
self.intraday_prices_tracker.loc[symbol, "VolumeTradedToday"] = voltraded
self.intraday_prices_tracker.loc[symbol, "B1_Px"] = b_px
self.intraday_prices_tracker.loc[symbol, "A1_Px"] = a_px
self.intraday_prices_tracker.loc[symbol, "B1_Qty"] = b_qty
self.intraday_prices_tracker.loc[symbol, "A1_Qty"] = a_qty
self.intraday_prices_tracker.loc[symbol, "MidPx"] = mid_px
except Exception as e:
print(e)
print(json_data)
pass
def listen_to_mktdata(self):
def on_ticks(ws, ticks):
# Callback to receive live market data
start_time = dt.datetime.now()
# print("Started")
for json_data in ticks:
self.queue.put(json_data)
# print(start_time, dt.datetime.now())
def on_connect(ws, response):
# Callback on successful connect
if(len(list(self.contract_token_map.index)) > 0):
ws.subscribe(list(self.contract_token_map.index))
# self.kws.subscribe([260105])
ws.set_mode(ws.MODE_FULL, list(self.contract_token_map.index)) # Set to full mode to receive more detailed data
def on_close(ws, code, reason):
print("Closing connection: {}".format(reason))
ws.stop()
def on_error(ws, code, reason):
print("Some error: {}".format(reason))
self.kws = KiteTicker(self.API_KEY, self.access_token)
# Assign the callbacks
self.kws.on_ticks = on_ticks
self.kws.on_connect = on_connect
self.kws.on_close = on_close
self.kws.on_error = on_error
# Infinite loop on the main thread to keep the connection alive
self.kws.connect(threaded=True)
The only thing I will say is this. Stop using multi-threading unless you are a very very accomplished programmer with lots of experience in writing multi-threaded code.
Also your code is incomplete. You say you used threading. I don't see any thread initialization or even a threading module import statement.
I write the ticks to a db. And then I run an entirely separate program that reads the tick from db and does the tick processing.
In fact each functionality in my setup is a separate python program. With decades of IT experience I don't try to use multi-threading/multi-processing unless absolutely unavoidable.