Websockets ticks have time gaps

kartik_patekar
Hi,

I have a simple python code to receive data on websocket. It works for a few minutes, and then it stops receiving updates. After a few minutes it starts receiving updates again, so I am not sure what is happening. I have subscribed to very liquid tickers like HDFCBANK, NIFTY Futures, NIFTY index. I also have functions to show errors, which do not print anything on my console.

Can you let me know what is happening? I am sharing my code here.


def listen_to_mktdata(self):


def on_ticks(ws, ticks):
# Callback to receive live market data
# print(dt.datetime.now())
for json_data in ticks:

try:
token = json_data["instrument_token"]
symbol = self.contract_token_map.loc[token, "TradingSymbol"]


if('exchange_timestamp' in json_data and json_data["mode"] == "full"):

if "depth" in json_data:
b_px = float(json_data["depth"]["buy"][0]["price"])
a_px = float(json_data["depth"]["sell"][0]["price"])
b_qty = int(json_data["depth"]["buy"][0]["quantity"])
a_qty = int(json_data["depth"]["sell"][0]["quantity"])

b_px = np.nan if b_px == 0 else b_px
a_px = np.nan if a_px == 0 else a_px
mid_px = (b_px + a_px)/2

voltraded = int(json_data["volume_traded"])
last_update_time = int(json_data["exchange_timestamp"].timestamp())


else:
b_px = np.nan
a_px = np.nan
b_qty = np.nan
a_qty = np.nan

mid_px = json_data["last_price"]
voltraded = np.nan
last_update_time = int(json_data["exchange_timestamp"].timestamp())

self.intraday_prices_tracker.loc[symbol, "Last_Update_Time"] = last_update_time
self.intraday_prices_tracker.loc[symbol, "VolumeTradedToday"] = voltraded
self.intraday_prices_tracker.loc[symbol, "B1_Px"] = b_px
self.intraday_prices_tracker.loc[symbol, "A1_Px"] = a_px
self.intraday_prices_tracker.loc[symbol, "B1_Qty"] = b_qty
self.intraday_prices_tracker.loc[symbol, "A1_Qty"] = a_qty
self.intraday_prices_tracker.loc[symbol, "MidPx"] = mid_px


except Exception as e:
print(e)
print(json_data)
pass

def on_connect(ws, response):
# Callback on successful connect
if(len(list(self.contract_token_map.index)) > 0):
ws.subscribe(list(self.contract_token_map.index))
# self.kws.subscribe([260105])
ws.set_mode(ws.MODE_FULL, list(self.contract_token_map.index)) # Set to full mode to receive more detailed data

def on_close(ws, code, reason):
print("Closing connection: {}".format(reason))
ws.stop()

def on_error(ws, code, reason):
print("Some error: {}".format(reason))


self.kws = KiteTicker(self.API_KEY, self.access_token)

# Assign the callbacks
self.kws.on_ticks = on_ticks
self.kws.on_connect = on_connect
self.kws.on_close = on_close
self.kws.on_error = on_error

# Infinite loop on the main thread to keep the connection alive
self.kws.connect(threaded=True)

  • MAG

    def on_ticks(ws, ticks):
    # Callback to receive live market data
    # print(dt.datetime.now())
    for json_data in ticks:

    try:
    token = json_data["instrument_token"]
    symbol = self.contract_token_map.loc[token, "TradingSymbol"]


    if('exchange_timestamp' in json_data and json_data["mode"] == "full"):

    if "depth" in json_data:
    b_px = float(json_data["depth"]["buy"][0]["price"])
    a_px = float(json_data["depth"]["sell"][0]["price"])
    b_qty = int(json_data["depth"]["buy"][0]["quantity"])
    a_qty = int(json_data["depth"]["sell"][0]["quantity"])

    b_px = np.nan if b_px == 0 else b_px
    a_px = np.nan if a_px == 0 else a_px
    mid_px = (b_px + a_px)/2

    voltraded = int(json_data["volume_traded"])
    last_update_time = int(json_data["exchange_timestamp"].timestamp())


    else:
    b_px = np.nan
    a_px = np.nan
    b_qty = np.nan
    a_qty = np.nan

    mid_px = json_data["last_price"]
    voltraded = np.nan
    last_update_time = int(json_data["exchange_timestamp"].timestamp())

    self.intraday_prices_tracker.loc[symbol, "Last_Update_Time"] = last_update_time
    self.intraday_prices_tracker.loc[symbol, "VolumeTradedToday"] = voltraded
    self.intraday_prices_tracker.loc[symbol, "B1_Px"] = b_px
    self.intraday_prices_tracker.loc[symbol, "A1_Px"] = a_px
    self.intraday_prices_tracker.loc[symbol, "B1_Qty"] = b_qty
    self.intraday_prices_tracker.loc[symbol, "A1_Qty"] = a_qty
    self.intraday_prices_tracker.loc[symbol, "MidPx"] = mid_px


    except Exception as e:
    print(e)
    print(json_data)
    pass


    Thats too much processing within onticks method which is getting blocked leading to loss of data/disconnection.

    In the onticks method you are supposed to just receive the tick and pass it to another process/queue/db.
    And then you will have another program read from the queue/db and do any processing/computation on the ticks.
  • kartik_patekar
    I see. Thank you.
  • kartik_patekar
    Hi, I modified the code to send the data in to a different thread for processing. I ran into same issue.
    In the following code, the listen_to_mktdata is ran from the main thread, while I spawn a different thread to run process_MD.
    Also, I tried replicating the problem on a different computer, but it does not happen on another computer.

    Can you please help?



    def process_MD(self, queue):
    while True:
    json_data = queue.get()

    try:
    token = json_data["instrument_token"]
    symbol = self.contract_token_map.loc[token, "TradingSymbol"]


    if('exchange_timestamp' in json_data and json_data["mode"] == "full"):

    if "depth" in json_data:
    b_px = float(json_data["depth"]["buy"][0]["price"])
    a_px = float(json_data["depth"]["sell"][0]["price"])
    b_qty = int(json_data["depth"]["buy"][0]["quantity"])
    a_qty = int(json_data["depth"]["sell"][0]["quantity"])

    b_px = np.nan if b_px == 0 else b_px
    a_px = np.nan if a_px == 0 else a_px
    mid_px = (b_px + a_px)/2

    voltraded = int(json_data["volume_traded"])
    last_update_time = int(json_data["exchange_timestamp"].timestamp())


    else:
    b_px = np.nan
    a_px = np.nan
    b_qty = np.nan
    a_qty = np.nan

    mid_px = json_data["last_price"]
    voltraded = np.nan
    last_update_time = int(json_data["exchange_timestamp"].timestamp())

    self.intraday_prices_tracker.loc[symbol, "Last_Update_Time"] = last_update_time
    self.intraday_prices_tracker.loc[symbol, "VolumeTradedToday"] = voltraded
    self.intraday_prices_tracker.loc[symbol, "B1_Px"] = b_px
    self.intraday_prices_tracker.loc[symbol, "A1_Px"] = a_px
    self.intraday_prices_tracker.loc[symbol, "B1_Qty"] = b_qty
    self.intraday_prices_tracker.loc[symbol, "A1_Qty"] = a_qty
    self.intraday_prices_tracker.loc[symbol, "MidPx"] = mid_px


    except Exception as e:
    print(e)
    print(json_data)
    pass


    def listen_to_mktdata(self):


    def on_ticks(ws, ticks):
    # Callback to receive live market data
    start_time = dt.datetime.now()
    # print("Started")

    for json_data in ticks:
    self.queue.put(json_data)
    # print(start_time, dt.datetime.now())


    def on_connect(ws, response):
    # Callback on successful connect
    if(len(list(self.contract_token_map.index)) > 0):
    ws.subscribe(list(self.contract_token_map.index))
    # self.kws.subscribe([260105])
    ws.set_mode(ws.MODE_FULL, list(self.contract_token_map.index)) # Set to full mode to receive more detailed data

    def on_close(ws, code, reason):
    print("Closing connection: {}".format(reason))
    ws.stop()

    def on_error(ws, code, reason):
    print("Some error: {}".format(reason))


    self.kws = KiteTicker(self.API_KEY, self.access_token)

    # Assign the callbacks
    self.kws.on_ticks = on_ticks
    self.kws.on_connect = on_connect
    self.kws.on_close = on_close
    self.kws.on_error = on_error

    # Infinite loop on the main thread to keep the connection alive
    self.kws.connect(threaded=True)


  • MAG
    MAG edited July 2024
    Sorry, I don't have the time, bandwith or patience to read and correct other peoples code. I can tell you what to do and then you need the required programming skills and experience to figure things out yourself.
    The only thing I will say is this. Stop using multi-threading unless you are a very very accomplished programmer with lots of experience in writing multi-threaded code.
    Also your code is incomplete. You say you used threading. I don't see any thread initialization or even a threading module import statement.

    I write the ticks to a db. And then I run an entirely separate program that reads the tick from db and does the tick processing.
    In fact each functionality in my setup is a separate python program. With decades of IT experience I don't try to use multi-threading/multi-processing unless absolutely unavoidable.
  • kartik_patekar
    I see, thank you for your help!
Sign In or Register to comment.