I have faced this issue couple of times during my testing now. At certain point the web socket got disconnected. Since I do not have ws.stop(), kite tries to automatic reconnect. But after reconnect it stops receiving new ticks for the registered instrument tokens. My Web Socket runs in a separate thread and on every tick just updates the ltp, timestamp etc in a watchlist object. My main algo runs in a separate thread and reads the ltp, timestamp etc from this watchlist object.
As you can see in the attached log at around 14:44:58 it received the last tick. After that the main loop just kept reading the same price with same timestamp. A few lines down there was a connection error, then it tried to reconnect and after a few lines it shows the websocket was connected. The two lines about "Would try to Reconnect" and "Connected to Websocket" are logged from my algo on calls to on_close and on_connect. So am not sure if it actually connected or just printed to logs from callback function.
Another thing, the instruments that I register with web socket to receive ticks are not registered inside on_connect. So my question is that do I need to register the instrument tokens again on reconnect. My assumption was that once registered I do not need to register again on re connection.
This is all that I have in the three callback functions. on_tick passes the tick for updating ltp, timestamp etc to other objects. on_connect initializes a watchlist on it's first run and no ws.stop() inside on_close. The web socket is run in threaded mode so the main execution thread doesn't block the execution of this call back functions in any way. Any pointers to what exactly is the issue would be appreciated.
ws_connected is just to make sure that I do not initialize a new watchlist object on re connection. RogueDelta is the strategy code that's reading values from watchlist/portfolio and logging the strategy related details.
Based on your input I realized I need to pass the new handle of web socket to watchlist on re connection. However that handle is used by watchlist to subscribe/unsubscribe so I wouldn't be able to do that till I pass the new handle of web socket to watchlist.
But after re connection it should still call on_tick and inside on_tick should be able to call the update functions of portfolio and watchlist, right? The update functions only need ticks and not the ws handle to update the ltp timestamp etc.
Updated on_connect to pass the new handle to watchlist This would take care of subscribe/unsubscribe after reconnect but it still doesn't solve the on_tick issue
def on_connect(ws, response): # Callback on successful connect. global ws_connected global trader_watchlist
logging.info("RogueTrader: Connected to WebSocket") if ws_connected is False: # Initialize Watchlist trader_watchlist = watchlist.Watchlist(web_socket=ws) ws_connected = True else: trader_watchlist.update_ws(ws)
The pykiteconnect takes care of the re-subscription of all the tokens upon a reconnection. End-user app doesn't have to do anything for that, it even keeps track of mode it is subscribed to.
@adjas on reconnect kite automatically connects. As I pointed out before, please pass your thread new websocket handle.
In Linux / Unix, everything this is a file and thus what you get to read / write to it is a handle. You can print pointer / location of such handles.
So when re connection happens, your web socket handle to thread has to be passed again else your thread would continue to use old web socket handle and thus continue to print old values, where it is pointing as that location has stopped getting updated
@adjas but at the end, I don't know why you are doing all this. of passing web socket to your thread. Why can't you just using Python Queue to take data in inside on_ticks and then read the Queue in your thread. In fact Queues are thread safe, which means you can scale your threads also
Can you paste the complete code here and mention your setup details?
It seems like you are using an explicit variable to check if it is connected or not. Kite Ticker already exposes a method to do that. You can know more here.
@tahseen all my strategy instances use watchlist to manage subscriptions to websocket. So after re connection if one of the strategy wants to register a new instrument token it'd call watchlist function to do that. Watchlist in turn uses web socket handle to subscribe that instrument and then the strategy instance starts reading the values from watchlist
Now my main thread still has handle of watchlist and portfolio whose update functions are called inside on_tick. That's the reason I don't create a new watchlist instance inside on_connect and simply pass the new handle of ws to watchlist for managing future subscription requests from strategies. Inside my on_tick both calls simply take ticks and update the values based on that. They don't need web socket handle.
def on_ticks(ws, ticks): # Callback to receive ticks.
# Update LTP in portfolio portfolio.update_ltp(ticks)
def on_connect(ws, response): # Callback on successful connect. global ws_connected global trader_watchlist
logging.info("RogueTrader: Connected to WebSocket") if ws_connected is False: # Initialize Watchlist trader_watchlist = watchlist.Watchlist(web_socket=ws) ws_connected = True else: trader_watchlist.reconnect(ws)
So my assumption is that after reconnect: 1. Same old instance of watchlist gets the new handle of ws to manage future subscription requests 2. The new web socket handle starts calling the on_tick 3. Inside on_tick the ticks are passed to update functions of portfolio and watchlist. The update functions don't need ws handle 4. My main thread has the handle to portfolio and watchlist so they keep reading the values from same instances after reconnect
I am a bit new to threads in python so forgive me for asking so many questions but if the same old instances of portfolio and watchlist are called from inside on_tick then shouldn't they be getting the ticks? Or do I need to read the ticks in a queue and then the portfolio and watchlist objects need to read from this queue?
@sujith that check_connection was just to send email incase portfolio values stop updating. It wasn't doing anything with ws handle. Infact it was a stop gap measure and I have removed that now.
@zartimus@sujith as per the code that zartimus posted it looks like the assumption that all old tokens are resubscribed is not true in case on unclean dis connection. And in my log it shows that connection was closed uncleanly. Does that mean in this case even though ticker reconnects but skips the subscription of old tokens?
Also what's the difference between un-clean and clean disconnection?
In this case my algo it was running for almost 6 hrs without any issue before this disconnect happened. Also the last kite request of any kind was at 9:35.... after that for the next 5 hrs it was simply reading the values it was getting from on_tick without posting any request to the API
Read normal, abnormal disconnection here https://tools.ietf.org/html/rfc6455#section-7.4 You can see abnormal disconnection when you plug out the LAN for an example. In kite ticker, every WebSocket connection has associated tokens. 1. In case of normal disconnection, WebSocket client sends the close frame, WebSocket server removes this connection and do any cleanup. 2. In case of abnormal disconnection, WebSocket client cant send this code frame. When the reconnection happens, it will use the same underlying connection. That means the client starts receiving the ticks automatically because it was the same old connection.
@zartimus@sujith@tahseen if that's the case shouldn't the code above start receiving the ticks again on re connection? In log a few lines after disconnection it shows "Connected to WebSocket" which was logged from inside on_connect, so my assumption is that re connection did happen.
My requirement is: 1. My main thread has handles of Watchlist and portfolio. 2. My Strategies manage subscription/read values using these two objects. 3. Ticker should simply use the update function of these two objects to pass them the ticks from inside on_tick
So here is what I have changed for now: 1. on_connect in first run initializes a watchlist object whose handle my main thread gets to manage subscriptions/read values etc. 2. on_connect in case of re-connection calls a reconnect function of watchlist passing it the new ws handle. Watchlist replaces the old ws handle with this new ws handle for all future subscription requests from strategies. 3. In the same reconnect function watchlist also re-subscribes to all the tokens it currently holds using the new ws handle. 4. on_tick simply has two calls to update functions of portfolio and watchlist that updates the new values in these two objects
One extra change I can think of as suggested by @tahseen is that of using queue inside on_tick instead of calling the update function.
@adjas, I think this is more to do with the OS, in some cases we had observed that upon reconnection, the system was using the old connection itself. In your case, it looks like it is using a new connection.
@adjas Can you just remove your strategies and show your skeleton code ? Interesting to see where you coded incorrectly. It for sure is a bug in your code and not Kite Web Socket
And as I said push ticks in Queue in the on_ticks.
You can have two queues inside on_ticks. And you can push same data to both, one read by your portfolio function and other by the watcher function
There are more optimal ways, but considering a longer discussion out of scope of this forum, I would keep it as simple as this for you
# Load Traders Profile # Load Trader's Portfolio and Order Book # Create variable for Watchlist logging.info("RogueTrader: Loading Traders Profile, Portfolio and Order Book") user = user.User(kite=kite) portfolio = portfolio.Portfolio(kite=kite) orderbook = orderbook.OrderBook(kite=kite, portfolio=portfolio) trader_watchlist = None # To be initiated at WebSocket connect
# Initialize Web-socket and assign callback functions for connect, ticks and close # Use on_ticks callback to constantly update last price of registered instruments in # Portfolio and Watchlist kws = KiteTicker(trader_settings.API_KEY, data["access_token"]) first_time_connection = True
def on_ticks(ws, ticks): # Callback to receive ticks.
# Update LTP in portfolio portfolio.update_ltp(ticks)
def on_connect(ws, response): # Callback on successful connect. global first_time_connection global trader_watchlist
logging.info("RogueTrader: Connected to WebSocket") if first_time_connection is True: # Initialize Watchlist trader_watchlist = watchlist.Watchlist(web_socket=ws) first_time_connection = False else: trader_watchlist.reconnect(ws)
def on_close(ws, code, reason): # Tries auto-reconnect unless ws.stop() has been called logging.info("RogueTrader: WebSocket Closed - Would try to Reconnect")
# Reconnection will not happen after executing `ws.stop()` # Use only if you do not want system to auto reconnect # ws.stop()
# Infinite loop in a separate thread. Interacts through pre defined callbacks. # Have to use the pre-defined callbacks to manage subscriptions to WebSocket after this. kws.connect(threaded=True) time.sleep(5)
# Initializing RogueDelta for the list of instruments chosen by the screener # For now sending just list of Index Instruments. # TODO : Replace with list of Index/Stocks rogue_delta_runners = [] for x in roguedelta.delta_screener([strategy_settings.ROGUE_DELTA_BANKNIFTY_INSTRUMENT]): rogue_delta_runners.append(roguedelta.RogueDelta(portfolio, orderbook, x))
# Add calls to Connection Protocol of each Strategy Runner. # Connection Protocol checks if it's the first run of the day or restart after initial run # Calling connection protocol for Rogue Delta Runners logging.info("RogueTrader: Calling connection protocol for Rogue Delta Runners") for z in rogue_delta_runners: z.on_connect(trader_watchlist)
# Infinite Loop on the main flow in parallel to WebSocket Thread containing all user defined strategy calls # Ends at Market Close Time while datetime.datetime.now() < trader_settings.TIME_NSE_CLOSE:
# Call on_tick of all strategy runners in a thread pool with concurrent.futures.ThreadPoolExecutor(max_workers=len(rogue_delta_runners)) as executor: for runner in rogue_delta_runners: executor.submit(runner.on_ticks, trader_watchlist)
# Wait time (in seconds) between subsequent calls to on_ticks of strategy runners time.sleep(0.5)
Right now it's testing with just one strategy and just one instrument hardcoded (Bank Nifty). The idea is to create multiple instances of any strategy class each for instruments picked by screener. After that on_connect of those strategy class request watchlist to subscribe to whichever instruments they want to. Then the flow goes into an infinite loop where in each iteration it calls on_tick of all strategy instances in a thread pool and these instances read the values from watchlist/portfolio.
So there are two main threads, one with ticker and one with that infinite loop. The second thread uses worker thread pool to call on_tick of strategies.
So the strategy instance have access to just watchlist and portfolio.
for x in self.instruments: all_tokens.append(x.inst_token) if x.mode == self.MODE_LTP: ltp_tokens.append(x.inst_token) elif x.mode == self.MODE_FULL: full_tokens.append(x.inst_token) elif x.mode == self.MODE_QUOTE: quote_tokens.append(x.inst_token) try: self.ws.subscribe(all_tokens) self.ws.set_mode(self.ws.MODE_FULL, full_tokens) self.ws.set_mode(self.ws.MODE_LTP, ltp_tokens) self.ws.set_mode(self.ws.MODE_QUOTE, quote_tokens) except Exception as e: logging.error("Watchlist: Failed to re-subscribe on re-connection - {}".format(e))
This is the on_connect of strategy instance:
def on_connect(self, watchlist):
# Subscribe to underlying instrument_tokens. watchlist.add_instrument(self.delta_underlying.inst_token, self.delta_underlying.instrument, watchlist.MODE_FULL)
if self.portfolio.is_empty(): logging.info("RougeDelta: No Previous {} Positions Detected." .format(self.delta_underlying.instrument)) else: < Code to Recover State>
if self.live_delta is None: < Keep checking entry conditions> < Add instrument tokens to watchlist if goes into some trade> watchlist.add_instrument(<Instrument tokens>)
else: # Update ltp for Live Delta self.live_delta.update_price(watchlist)
######################This is the Log that you see in log file################## # Log position info of Live Delta logging.info("RogueDelta: {} - {}: {}. Strike: {} EPV: {} CPV: {} P&L: {} ({} Rs)".format(<code>)
<Code to modify/exit position in case certain conditions are met>
# Check if position was exited. # Unsubscribe tokens and remove Live Delta reference if self.live_delta.in_trade is False: watchlist.unsubscribe(<instrument tokens>) self.live_delta = None
Because you are appending the return object to rogue_delta_runners array, on whose elements later you are loop and executing its member elements in thread like below
this just create instances of strategy class (in this case RogueDelta) for the instrument selected (in this case hardcoded bank nifty). Then the rest of the code in main flow calls on_connect and on_tick of this strategy class instance.
@adjas I think you have complicated the code for no reasons. It looks like a deliberate attempt to fit Object Oriented programming, mixed up with threads. Result is a disaster, forget about the algo, the elementary aspect is not fulfilling
Just simply the code. ticks in queue and let all read from there
@tahseen well my previous experience in coding is in java so that's the approach that came to my mind However I think the approach you suggested with queues would be more appropriate to python..... Will do that. Thanks for taking out time to review my code and for your inputs!!
Are you changing the state of global variable ws_connected once after it is True in on_connect ?
From where is the log "RogueDelta" printing ?
On reconnection are you sure your Thread is getting the connection handle of the new websocket ? Or it is attempting to use the same old one ?
Based on your input I realized I need to pass the new handle of web socket to watchlist on re connection. However that handle is used by watchlist to subscribe/unsubscribe so I wouldn't be able to do that till I pass the new handle of web socket to watchlist.
But after re connection it should still call on_tick and inside on_tick should be able to call the update functions of portfolio and watchlist, right? The update functions only need ticks and not the ws handle to update the ltp timestamp etc.
This would take care of subscribe/unsubscribe after reconnect but it still doesn't solve the on_tick issue
I think my assumption that on reconnect kite automatically switches all the registered tokens might be wrong.
In Linux / Unix, everything this is a file and thus what you get to read / write to it is a handle. You can print pointer / location of such handles.
So when re connection happens, your web socket handle to thread has to be passed again else your thread would continue to use old web socket handle and thus continue to print old values, where it is pointing as that location has stopped getting updated
@adjas but at the end, I don't know why you are doing all this. of passing web socket to your thread. Why can't you just using Python Queue to take data in inside on_ticks and then read the Queue in your thread. In fact Queues are thread safe, which means you can scale your threads also
It seems like you are using an explicit variable to check if it is connected or not. Kite Ticker already exposes a method to do that. You can know more here.
Now my main thread still has handle of watchlist and portfolio whose update functions are called inside on_tick. That's the reason I don't create a new watchlist instance inside on_connect and simply pass the new handle of ws to watchlist for managing future subscription requests from strategies. Inside my on_tick both calls simply take ticks and update the values based on that. They don't need web socket handle. So my assumption is that after reconnect:
1. Same old instance of watchlist gets the new handle of ws to manage future subscription requests
2. The new web socket handle starts calling the on_tick
3. Inside on_tick the ticks are passed to update functions of portfolio and watchlist. The update functions don't need ws handle
4. My main thread has the handle to portfolio and watchlist so they keep reading the values from same instances after reconnect
I am a bit new to threads in python so forgive me for asking so many questions but if the same old instances of portfolio and watchlist are called from inside on_tick then shouldn't they be getting the ticks? Or do I need to read the ticks in a queue and then the portfolio and watchlist objects need to read from this queue?
@sujith that check_connection was just to send email incase portfolio values stop updating. It wasn't doing anything with ws handle. Infact it was a stop gap measure and I have removed that now.
resubscribe happens if the previous disconnection was not an abrupt disconnection.
Also what's the difference between un-clean and clean disconnection?
In this case my algo it was running for almost 6 hrs without any issue before this disconnect happened. Also the last kite request of any kind was at 9:35.... after that for the next 5 hrs it was simply reading the values it was getting from on_tick without posting any request to the API
You can see abnormal disconnection when you plug out the LAN for an example. In kite ticker, every WebSocket connection has associated tokens.
1. In case of normal disconnection, WebSocket client sends the close frame, WebSocket server removes this connection and do any cleanup.
2. In case of abnormal disconnection, WebSocket client cant send this code frame. When the reconnection happens, it will use the same underlying connection. That means the client starts receiving the ticks automatically because it was the same old connection.
In log a few lines after disconnection it shows "Connected to WebSocket" which was logged from inside on_connect, so my assumption is that re connection did happen.
My requirement is:
1. My main thread has handles of Watchlist and portfolio.
2. My Strategies manage subscription/read values using these two objects.
3. Ticker should simply use the update function of these two objects to pass them the ticks from inside on_tick
How would you code it?
Cheers
So here is what I have changed for now:
1. on_connect in first run initializes a watchlist object whose handle my main thread gets to manage subscriptions/read values etc.
2. on_connect in case of re-connection calls a reconnect function of watchlist passing it the new ws handle. Watchlist replaces the old ws handle with this new ws handle for all future subscription requests from strategies.
3. In the same reconnect function watchlist also re-subscribes to all the tokens it currently holds using the new ws handle.
4. on_tick simply has two calls to update functions of portfolio and watchlist that updates the new values in these two objects
One extra change I can think of as suggested by @tahseen is that of using queue inside on_tick instead of calling the update function.
Does this sounds like it would work?
I think this is more to do with the OS, in some cases we had observed that upon reconnection, the system was using the old connection itself. In your case, it looks like it is using a new connection.
And as I said push ticks in Queue in the on_ticks.
You can have two queues inside on_ticks. And you can push same data to both, one read by your portfolio function and other by the watcher function
There are more optimal ways, but considering a longer discussion out of scope of this forum, I would keep it as simple as this for you
The idea is to create multiple instances of any strategy class each for instruments picked by screener.
After that on_connect of those strategy class request watchlist to subscribe to whichever instruments they want to.
Then the flow goes into an infinite loop where in each iteration it calls on_tick of all strategy instances in a thread pool and these instances read the values from watchlist/portfolio.
So there are two main threads, one with ticker and one with that infinite loop. The second thread uses worker thread pool to call on_tick of strategies.
So the strategy instance have access to just watchlist and portfolio.
And this is the on_tick of strategy instances:
Because you are appending the return object to rogue_delta_runners array, on whose elements later you are loop and executing its member elements in thread like below
executor.submit(runner.on_ticks, trader_watchlist)
#EDIT: I see you updated your code, by the time I asked the question. I will check and revert later
this just create instances of strategy class (in this case RogueDelta) for the instrument selected (in this case hardcoded bank nifty). Then the rest of the code in main flow calls on_connect and on_tick of this strategy class instance.
Just simply the code. ticks in queue and let all read from there
However I think the approach you suggested with queues would be more appropriate to python..... Will do that. Thanks for taking out time to review my code and for your inputs!!