Not receiving new ticks after re connection

adjas
I have faced this issue couple of times during my testing now. At certain point the web socket got disconnected. Since I do not have ws.stop(), kite tries to automatic reconnect. But after reconnect it stops receiving new ticks for the registered instrument tokens. My Web Socket runs in a separate thread and on every tick just updates the ltp, timestamp etc in a watchlist object. My main algo runs in a separate thread and reads the ltp, timestamp etc from this watchlist object.

As you can see in the attached log at around 14:44:58 it received the last tick. After that the main loop just kept reading the same price with same timestamp.
A few lines down there was a connection error, then it tried to reconnect and after a few lines it shows the websocket was connected.
The two lines about "Would try to Reconnect" and "Connected to Websocket" are logged from my algo on calls to on_close and on_connect. So am not sure if it actually connected or just printed to logs from callback function.

Another thing, the instruments that I register with web socket to receive ticks are not registered inside on_connect. So my question is that do I need to register the instrument tokens again on reconnect. My assumption was that once registered I do not need to register again on re connection.
Log.jpg 819.5K
  • adjas
    This is all that I have in the three callback functions. on_tick passes the tick for updating ltp, timestamp etc to other objects. on_connect initializes a watchlist on it's first run and no ws.stop() inside on_close. The web socket is run in threaded mode so the main execution thread doesn't block the execution of this call back functions in any way. Any pointers to what exactly is the issue would be appreciated.
  • tahseen
    @adjas

    Are you changing the state of global variable ws_connected once after it is True in on_connect ?

    From where is the log "RogueDelta" printing ?

    On reconnection are you sure your Thread is getting the connection handle of the new websocket ? Or it is attempting to use the same old one ?
  • adjas
    ws_connected is just to make sure that I do not initialize a new watchlist object on re connection. RogueDelta is the strategy code that's reading values from watchlist/portfolio and logging the strategy related details.

    Based on your input I realized I need to pass the new handle of web socket to watchlist on re connection. However that handle is used by watchlist to subscribe/unsubscribe so I wouldn't be able to do that till I pass the new handle of web socket to watchlist.

    But after re connection it should still call on_tick and inside on_tick should be able to call the update functions of portfolio and watchlist, right? The update functions only need ticks and not the ws handle to update the ltp timestamp etc.
  • adjas
    Updated on_connect to pass the new handle to watchlist
    This would take care of subscribe/unsubscribe after reconnect but it still doesn't solve the on_tick issue
    def on_connect(ws, response):
    # Callback on successful connect.
    global ws_connected
    global trader_watchlist

    logging.info("RogueTrader: Connected to WebSocket")
    if ws_connected is False:
    # Initialize Watchlist
    trader_watchlist = watchlist.Watchlist(web_socket=ws)
    ws_connected = True
    else:
    trader_watchlist.update_ws(ws)
  • adjas
    ok.... if it's a new web socket handle does that mean I need to again subscribe to all the instrument with this new handle?

    I think my assumption that on reconnect kite automatically switches all the registered tokens might be wrong.
  • sujith
    The pykiteconnect takes care of the re-subscription of all the tokens upon a reconnection. End-user app doesn't have to do anything for that, it even keeps track of mode it is subscribed to.
  • adjas
    Hi Sujith, In that case can you go through the post and lemme know what might be the issue?
  • tahseen
    @adjas on reconnect kite automatically connects. As I pointed out before, please pass your thread new websocket handle.

    In Linux / Unix, everything this is a file and thus what you get to read / write to it is a handle. You can print pointer / location of such handles.

    So when re connection happens, your web socket handle to thread has to be passed again else your thread would continue to use old web socket handle and thus continue to print old values, where it is pointing as that location has stopped getting updated


    @adjas but at the end, I don't know why you are doing all this. of passing web socket to your thread. Why can't you just using Python Queue to take data in inside on_ticks and then read the Queue in your thread. In fact Queues are thread safe, which means you can scale your threads also
  • sujith
    Can you paste the complete code here and mention your setup details?

    It seems like you are using an explicit variable to check if it is connected or not. Kite Ticker already exposes a method to do that. You can know more here.
  • adjas
    @tahseen all my strategy instances use watchlist to manage subscriptions to websocket. So after re connection if one of the strategy wants to register a new instrument token it'd call watchlist function to do that. Watchlist in turn uses web socket handle to subscribe that instrument and then the strategy instance starts reading the values from watchlist

    Now my main thread still has handle of watchlist and portfolio whose update functions are called inside on_tick. That's the reason I don't create a new watchlist instance inside on_connect and simply pass the new handle of ws to watchlist for managing future subscription requests from strategies. Inside my on_tick both calls simply take ticks and update the values based on that. They don't need web socket handle.
    def on_ticks(ws, ticks):
    # Callback to receive ticks.

    # Update LTP in portfolio
    portfolio.update_ltp(ticks)

    # Update Watchlist
    trader_watchlist.update_quotes(ticks)


    def on_connect(ws, response):
    # Callback on successful connect.
    global ws_connected
    global trader_watchlist

    logging.info("RogueTrader: Connected to WebSocket")
    if ws_connected is False:
    # Initialize Watchlist
    trader_watchlist = watchlist.Watchlist(web_socket=ws)
    ws_connected = True
    else:
    trader_watchlist.reconnect(ws)
    So my assumption is that after reconnect:
    1. Same old instance of watchlist gets the new handle of ws to manage future subscription requests
    2. The new web socket handle starts calling the on_tick
    3. Inside on_tick the ticks are passed to update functions of portfolio and watchlist. The update functions don't need ws handle
    4. My main thread has the handle to portfolio and watchlist so they keep reading the values from same instances after reconnect

    I am a bit new to threads in python so forgive me for asking so many questions but if the same old instances of portfolio and watchlist are called from inside on_tick then shouldn't they be getting the ticks? Or do I need to read the ticks in a queue and then the portfolio and watchlist objects need to read from this queue?

    @sujith that check_connection was just to send email incase portfolio values stop updating. It wasn't doing anything with ws handle. Infact it was a stop gap measure and I have removed that now.
  • zartimus
    https://github.com/zerodhatech/pykiteconnect/blob/master/kiteconnect/ticker.py#L683
    resubscribe happens if the previous disconnection was not an abrupt disconnection.
  • adjas
    adjas edited December 2019
    @zartimus @sujith as per the code that zartimus posted it looks like the assumption that all old tokens are resubscribed is not true in case on unclean dis connection. And in my log it shows that connection was closed uncleanly. Does that mean in this case even though ticker reconnects but skips the subscription of old tokens?

    Also what's the difference between un-clean and clean disconnection?

    In this case my algo it was running for almost 6 hrs without any issue before this disconnect happened. Also the last kite request of any kind was at 9:35.... after that for the next 5 hrs it was simply reading the values it was getting from on_tick without posting any request to the API
  • zartimus
    zartimus edited December 2019
    Read normal, abnormal disconnection here https://tools.ietf.org/html/rfc6455#section-7.4
    You can see abnormal disconnection when you plug out the LAN for an example. In kite ticker, every WebSocket connection has associated tokens.
    1. In case of normal disconnection, WebSocket client sends the close frame, WebSocket server removes this connection and do any cleanup.
    2. In case of abnormal disconnection, WebSocket client cant send this code frame. When the reconnection happens, it will use the same underlying connection. That means the client starts receiving the ticks automatically because it was the same old connection.
  • adjas
    @zartimus @sujith @tahseen if that's the case shouldn't the code above start receiving the ticks again on re connection?
    In log a few lines after disconnection it shows "Connected to WebSocket" which was logged from inside on_connect, so my assumption is that re connection did happen.

    My requirement is:
    1. My main thread has handles of Watchlist and portfolio.
    2. My Strategies manage subscription/read values using these two objects.
    3. Ticker should simply use the update function of these two objects to pass them the ticks from inside on_tick

    How would you code it?
  • zartimus
    @adjas Looks like the assumption that unclean closure will always use same old connection is wrong. Reverting this patch.
    Cheers
  • adjas
    @zartimus Thanks for that info!

    So here is what I have changed for now:
    1. on_connect in first run initializes a watchlist object whose handle my main thread gets to manage subscriptions/read values etc.
    2. on_connect in case of re-connection calls a reconnect function of watchlist passing it the new ws handle. Watchlist replaces the old ws handle with this new ws handle for all future subscription requests from strategies.
    3. In the same reconnect function watchlist also re-subscribes to all the tokens it currently holds using the new ws handle.
    4. on_tick simply has two calls to update functions of portfolio and watchlist that updates the new values in these two objects

    One extra change I can think of as suggested by @tahseen is that of using queue inside on_tick instead of calling the update function.

    Does this sounds like it would work?
  • sujith
    @adjas,
    I think this is more to do with the OS, in some cases we had observed that upon reconnection, the system was using the old connection itself. In your case, it looks like it is using a new connection.
  • tahseen
    @adjas Can you just remove your strategies and show your skeleton code ? Interesting to see where you coded incorrectly. It for sure is a bug in your code and not Kite Web Socket

    And as I said push ticks in Queue in the on_ticks.

    You can have two queues inside on_ticks. And you can push same data to both, one read by your portfolio function and other by the watcher function

    There are more optimal ways, but considering a longer discussion out of scope of this forum, I would keep it as simple as this for you
  • adjas
    adjas edited December 2019
    @tahseen This is my main flow:
    # Load Traders Profile
    # Load Trader's Portfolio and Order Book
    # Create variable for Watchlist
    logging.info("RogueTrader: Loading Traders Profile, Portfolio and Order Book")
    user = user.User(kite=kite)
    portfolio = portfolio.Portfolio(kite=kite)
    orderbook = orderbook.OrderBook(kite=kite, portfolio=portfolio)
    trader_watchlist = None # To be initiated at WebSocket connect


    # Initialize Web-socket and assign callback functions for connect, ticks and close
    # Use on_ticks callback to constantly update last price of registered instruments in
    # Portfolio and Watchlist
    kws = KiteTicker(trader_settings.API_KEY, data["access_token"])
    first_time_connection = True


    def on_ticks(ws, ticks):
    # Callback to receive ticks.

    # Update LTP in portfolio
    portfolio.update_ltp(ticks)

    # Update Watchlist
    trader_watchlist.update_quotes(ticks)


    def on_connect(ws, response):
    # Callback on successful connect.
    global first_time_connection
    global trader_watchlist

    logging.info("RogueTrader: Connected to WebSocket")
    if first_time_connection is True:
    # Initialize Watchlist
    trader_watchlist = watchlist.Watchlist(web_socket=ws)
    first_time_connection = False
    else:
    trader_watchlist.reconnect(ws)


    def on_close(ws, code, reason):
    # Tries auto-reconnect unless ws.stop() has been called
    logging.info("RogueTrader: WebSocket Closed - Would try to Reconnect")

    # Reconnection will not happen after executing `ws.stop()`
    # Use only if you do not want system to auto reconnect
    # ws.stop()


    # Assign the callbacks.
    kws.on_ticks = on_ticks
    kws.on_connect = on_connect
    kws.on_close = on_close

    # Infinite loop in a separate thread. Interacts through pre defined callbacks.
    # Have to use the pre-defined callbacks to manage subscriptions to WebSocket after this.
    kws.connect(threaded=True)
    time.sleep(5)

    ###############################################################################################
    # STRATEGY BLOCK #
    # #
    # Strategy List: #
    # 1) Rogue Delta - Index (BANKNIFTY) #
    ###############################################################################################
    # Initialize Strategies - Add calls to create strategy runners
    logging.info("RogueTrader: Initializing Strategies...")

    # Initializing RogueDelta for the list of instruments chosen by the screener
    # For now sending just list of Index Instruments.
    # TODO : Replace with list of Index/Stocks
    rogue_delta_runners = []
    for x in roguedelta.delta_screener([strategy_settings.ROGUE_DELTA_BANKNIFTY_INSTRUMENT]):
    rogue_delta_runners.append(roguedelta.RogueDelta(portfolio, orderbook, x))

    # Add calls to Connection Protocol of each Strategy Runner.
    # Connection Protocol checks if it's the first run of the day or restart after initial run
    # Calling connection protocol for Rogue Delta Runners
    logging.info("RogueTrader: Calling connection protocol for Rogue Delta Runners")
    for z in rogue_delta_runners:
    z.on_connect(trader_watchlist)


    # Infinite Loop on the main flow in parallel to WebSocket Thread containing all user defined strategy calls
    # Ends at Market Close Time
    while datetime.datetime.now() < trader_settings.TIME_NSE_CLOSE:

    # Call on_tick of all strategy runners in a thread pool
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(rogue_delta_runners)) as executor:
    for runner in rogue_delta_runners:
    executor.submit(runner.on_ticks, trader_watchlist)

    # Wait time (in seconds) between subsequent calls to on_ticks of strategy runners
    time.sleep(0.5)
    Right now it's testing with just one strategy and just one instrument hardcoded (Bank Nifty).
    The idea is to create multiple instances of any strategy class each for instruments picked by screener.
    After that on_connect of those strategy class request watchlist to subscribe to whichever instruments they want to.
    Then the flow goes into an infinite loop where in each iteration it calls on_tick of all strategy instances in a thread pool and these instances read the values from watchlist/portfolio.

    So there are two main threads, one with ticker and one with that infinite loop. The second thread uses worker thread pool to call on_tick of strategies.

    So the strategy instance have access to just watchlist and portfolio.
  • adjas
    @tahseen This is the reconnect() function of watchlist after I made changes to subscribe again to tokens.
    def reconnect(self, web_socket):

    self.ws = web_socket

    all_tokens = []
    ltp_tokens = []
    full_tokens = []
    quote_tokens = []

    for x in self.instruments:
    all_tokens.append(x.inst_token)
    if x.mode == self.MODE_LTP:
    ltp_tokens.append(x.inst_token)
    elif x.mode == self.MODE_FULL:
    full_tokens.append(x.inst_token)
    elif x.mode == self.MODE_QUOTE:
    quote_tokens.append(x.inst_token)
    try:
    self.ws.subscribe(all_tokens)
    self.ws.set_mode(self.ws.MODE_FULL, full_tokens)
    self.ws.set_mode(self.ws.MODE_LTP, ltp_tokens)
    self.ws.set_mode(self.ws.MODE_QUOTE, quote_tokens)
    except Exception as e:
    logging.error("Watchlist: Failed to re-subscribe on re-connection - {}".format(e))
    This is the on_connect of strategy instance:
     def on_connect(self, watchlist):

    # Subscribe to underlying instrument_tokens.
    watchlist.add_instrument(self.delta_underlying.inst_token, self.delta_underlying.instrument,
    watchlist.MODE_FULL)

    if self.portfolio.is_empty():
    logging.info("RougeDelta: No Previous {} Positions Detected."
    .format(self.delta_underlying.instrument))
    else:
    < Code to Recover State>

    watchlist.add_instrument(<Required instrument tokens>)

    And this is the on_tick of strategy instances:
    def on_ticks(self, watchlist):

    self.delta_underlying.update_price(watchlist)

    if self.live_delta is None:
    < Keep checking entry conditions>
    < Add instrument tokens to watchlist if goes into some trade>
    watchlist.add_instrument(<Instrument tokens>)

    else:
    # Update ltp for Live Delta
    self.live_delta.update_price(watchlist)

    ######################This is the Log that you see in log file##################
    # Log position info of Live Delta
    logging.info("RogueDelta: {} - {}: {}. Strike: {} EPV: {} CPV: {} P&L: {} ({} Rs)".format(<code>)

    <Code to modify/exit position in case certain conditions are met>

    # Check if position was exited.
    # Unsubscribe tokens and remove Live Delta reference
    if self.live_delta.in_trade is False:
    watchlist.unsubscribe(<instrument tokens>)
    self.live_delta = None
  • tahseen
    tahseen edited December 2019
    @adjas
    roguedelta.RogueDelta(portfolio, orderbook, x)
    Does this function return KiteTicker Object ?

    Because you are appending the return object to rogue_delta_runners array, on whose elements later you are loop and executing its member elements in thread like below

    executor.submit(runner.on_ticks, trader_watchlist)

    #EDIT: I see you updated your code, by the time I asked the question. I will check and revert later
  • adjas
    roguedelta.RogueDelta(portfolio, orderbook, x)

    this just create instances of strategy class (in this case RogueDelta) for the instrument selected (in this case hardcoded bank nifty). Then the rest of the code in main flow calls on_connect and on_tick of this strategy class instance.
  • tahseen
    @adjas I think you have complicated the code for no reasons. It looks like a deliberate attempt to fit Object Oriented programming, mixed up with threads. Result is a disaster, forget about the algo, the elementary aspect is not fulfilling

    Just simply the code. ticks in queue and let all read from there
  • adjas
    adjas edited December 2019
    @tahseen well my previous experience in coding is in java so that's the approach that came to my mind :|
    However I think the approach you suggested with queues would be more appropriate to python..... Will do that. Thanks for taking out time to review my code and for your inputs!! :smile:
Sign In or Register to comment.