web socket dynamic subscription

Guhan
@sujith, is it possible to dynamically subscribe and unsubscribe tokens in live web socket connection?
If possible can you explain me with an example
  • sujith
    You can call subscribe and unsubscribe at any point of time.
    There is no special call for that. It is just the normal subscribe you use when you start Kite Ticker connection.
  • enigma
    enigma edited December 2018
    Hi,
    I am trying to do the same thing but the problem is I am little confused how to make this work.
    For example I am streaming NIFTY 50 and while the connection is live, I want to add another instrument. So if I want to add it you are saying that I need to use the subscribe() function to add the new instrument.

    So with my understanding, will the below code snippet work??
    kws = KiteTicker("your_api_key", "your_access_token")
    kws.subscribe(instrument_token_list)

    Will the code work in Python?
  • rakeshr
    @enigma
    Yeah, you need to use kws.subscribe(token/list of tokens) in on_connect method to subscribe to new instrument each time.
  • Guhan
    @rakeshr how to change the subscription list or call kms.subscribe in a live streaming Websocket. I still couldn't figure it out.
  • sujith
    You can call unsubscribe for the tokens which you don't want to listen to.
  • sujith
    You can check out Kite Connect documentation.
  • rakeshr
    @Guhan
    Do you have any specific condition to subscribe to the specific list of instrument token?
    You can try something below:
    Eg.
    def on_connect(ws, response): 
    if(condition for instrument):
    ws.subscribe(token/list of token)
    ws.set_mode(ws.MODE_FULL,token/list of token)
  • Guhan
    @sujith, @rakeshr thanks for replying. I will explain how I am doing it, I start web socket and screener code in the morning, the web socket will have predefined subscription list based on my live positions. When screener code finds potential trades I want it to add those instruments to the live websocket code without interrupting the data-feed.
  • rakeshr
    @Guhan
    Yeah, you can use above sample code for condition-based token subscription.
  • Guhan
    @rakeshr once web socket code starts running control doesn't go to on_connect, it will be checked only once right?
  • Guhan
    Guhan edited January 2019
    @sujith, @rakeshr, @zartimus
    def on_ticks(ws, ticks):  
    ## insert ticks
    def on_connect(ws, response): # noqa
    ins_token_list = get_ins_token_list_func()
    ws.subscribe(ins_token_list)
    ws.set_mode(ws.MODE_FULL,ins_token_list)

    kws.on_ticks = on_ticks
    kws.on_connect = on_connect

    kws.connect()
    in the above example if I want to to add an instrument token to live Websocket I have to manually restart the code to receive new ins_token list from get_ins_token_list_func(). Is there any other way to do it? is it possible to alter subscription?
  • sujith
    The Kite Ticker module of pykiteconnect exposes a method called subscribe, you can keep the instance of ticker in your code and call the method.

    on_connect is only called once after a new connection is established. You don't actually need to kill connection and re-connect in order to subscribe for a bunch of tokens.
  • Guhan
    @sujith, thanks for replying, but still couldn't figure it out, it is driving me crazy here, is there an example or something?
  • rakeshr
    @Guhan
    As @sujith said, you can use subscribe and unsubscribe method for subscribing/unsubscribing token in WebSocket connection.Eg:
    def on_ticks(ws, ticks):
    if(condition for instrument):
    ws.subscribe(new token/list of tokens)
    ws.unsubscribe(remove old token/list of old tokens,if not required)
  • Guhan
    Guhan edited February 2019
    Thanks guys I will try that
  • rjamitsharm
    @rakeshr
    I have this code
    sub_list=[738561]
    subs_list=[]
    from kiteconnect import KiteTicker
    kws = KiteTicker(api, access_token)
    def on_ticks(ws, ticks):
    print(ticks)
    for i in sub_list:
    if i not in subs_list:
    ws.subscribe(i)
    ws.set_mode(ws.MODE_FULL, i)
    subs_list.append(i)

    def on_connect(ws, response):
    ws.subscribe(sub_list)
    ws.set_mode(ws.MODE_FULL, sub_list)
    for i in sub_list:
    subs_list.append(i)
    def on_close(ws, code, reason):
    ws.stop()
    kws.on_ticks = on_ticks
    kws.on_connect = on_connect
    kws.on_close = on_close
    kws.connect(threaded=True)
    now when I add one more token to sub_list using sub_list.append(2125)

    it should subscribe this token too as in on_ticks

    but it shows error

    Unhandled Error
    Connection error: 1006 - connection was closed uncleanly (peer dropped the TCP connection without previous WebSocket closing handshake)
    Connection closed: 1006 - connection was closed uncleanly (peer dropped the TCP connection without previous WebSocket closing handshake)
  • kumar_krishnan
    @rjamitsharm Any luck with above code working?

    Or any one succeeded with Dynamic subscribe and unsubscribe in python?
  • sujith
    @kumar_krishnan,
    One can subscribe and unsubscribe for ticks at any time. There is no restrction.
  • kumar_krishnan
    @sujith Thanks for the response, I have tried ticker level kws.subscribe(subscribe_list) it is thowing "Connection error: 0 - error parsing request" and also I have tried inside on_ticks function with conditional subscription there also I got Connection error, do you have any example on subscribe and unsubscribe dynamically other than initial on_connect method or point out any documentation?
  • sujith
    @kumar_krishnan,
    You need to connect ticker first and then inside on_connect callback you can call subscribe.
    You can check out the documentation here.
  • kumar_krishnan
    @sujith Yes it is already connected and I can see there are ticks which is showing in on_ticks method, during the live streaming time, based on a condition I want to subscribe a new instrument Id and that's the one which throwing error. I have tried subscribe in on_ticks method and also I made ticker as out side variable ,after connecting I have called subscribe function and both are failing.

  • kumar_krishnan
    @sujith In the code I am using kws.connect(threaded=True) and then in another main thread I am calling kws.subscribe(subscribe_list) and it is failing
  • rakeshr
    @kumar_krishnan
    I am using kws.connect(threaded=True) and then in another main thread I am calling kws.subscribe(subscribe_list)
    It should work properly, if you are assigning required call backs.
    Basic example flow:
    .......
    ......

    kws.on_ticks = on_ticks
    kws.on_connect = on_connect
    kws.connect(threaded=True)

    while True:
    def on_ticks(ws, ticks):
    # Methods that subscribe/unsubscribe based on certain condition
    subscriptionStatus(ticks)
    ......

    def subscriptionStatus(ticks):
    # condition statement to subscribe or unsubscribe
    ws.unsubscribe([tokens])
    .........
    ws.subscribe([tokens])

    #Assign callback
    kws.on_ticks=on_ticks
    it is thowing "Connection error: 0 - error parsing request"
    You need to paste here a complete error stack trace, this doesn't give enough info.
  • kumar_krishnan
    @rakeshr I have written python flask application, and also I am exposing subscribe flask method to subscribe new instrument during streaming. In subscribe method I am appending the instrument id to subscribe_list that the same list is used in on_ticks method to subscribe the methods looks like as below


    subscribe_list = ['16492034','16491522','264969'] #Default list
    unsubscribe_list = []

    def kite_ticker_connect():
    # Assign the callbacks.
    kws.on_ticks = on_ticks
    kws.on_connect = on_connect
    kws.on_close = on_close
    # Infinite loop on the main thread. Nothing after this will run.
    # You have to use the pre-defined callbacks to manage subscriptions.
    kws.connect(threaded=True)


    @app.route("/subscribe",methods=['GET'])
    def dynamic_subscribe():
    print(" /subscribe : Request Params :"+str(request.args))
    inst_id = request.args.get("inst_id")
    symbol = request.args.get("symbol")
    #Globalization
    global subscribe_list

    subscribe_list.append(str(inst_id))
    symbol_token_dict.update({str(inst_id):str(symbol)})
    print("Subcribed the instrument id :"+str(inst_id))
    print(" Current Subscription list :"+str(subscribe_list))
    print(" Current symbol_token_dict :"+str(symbol_token_dict))

    return str(symbol)+": "+str(inst_id)+ " Added to subscribe list, OK ",200



    def on_ticks(ws, ticks):
    print("subscribe list :"+str(subscribe_list))
    print("Unsubscribe list :"+str(unsubscribe_list))
    if(len(subscribe_list) > 0 ):
    print(" New subscription found ... "+str(subscribe_list))
    try:
    ws.subscribe(str(subscribe_list))
    #Clear the list
    subscribe_list.clear()
    '''
    if ws.is_connected == True:
    ws.subscribe(subscribe_list)
    #Clear the list
    subscribe_list.clear()
    else:
    ws.connect()
    ws.subscribe(subscribe_list)
    #Clear the list
    subscribe_list.clear()
    '''
    except Exception as e:
    print (e)

    if(len(unsubscribe_list) > 0 ):
    print(" New Unsubscription found ... "+str(unsubscribe_list))
    try:
    ws.unsubscribe(unsubscribe_list)
    #Clear the list
    unsubscribe_list.clear()
    except Exception as e:
    print (e)

    for tick in ticks:
    timestamp = datetime.now()
    instrument_token=tick['instrument_token']
    last_price=tick['last_price']
    ltp = last_price
    #print(ltp)
    ltp_dict.update({'instrument_token':instrument_token,'ltp':ltp})
    #print(str(ltp_dict))
    symbol=symbol_token_dict[str(instrument_token)]
    #SET the KEYS with symbolname and ltp
    #print('symbol: '+symbol+' ltp:'+str(ltp_dict))

    app = Flask(__name__)

    if __name__ == "__main__":
    #Ticker Connect
    kite_ticker_connect()
    #Initiate the Flask server
    app.run(port=7000)

    And after running I can see ticker values also, however when call subscribe flask method http://localhost:7000/subscribe?inst_id=15262978&symbol=NIFTY2140114550CE
    I am getting the below error trace

    /subscribe : Request Params :ImmutableMultiDict([('inst_id', '15262978'), ('symbol', 'NIFTY2140114550CE')])
    Subcribed the instrument id :15262978
    Current Subscription list :['15262978']
    Current symbol_token_dict :{'264969': 'INDIA VIX', '16492034': 'NIFTY21APRFUT', '16491522': 'BANKNIFTY21APRFUT', '15262978': 'NIFTY2140114550CE'}
    127.0.0.1 - - [01/Apr/2021 16:29:01] "GET /subscribe?inst_id=15262978&symbol=NIFTY2140114550CE HTTP/1.1" 200 -
    subscribe list :['15262978']
    Unsubscribe list :[]
    New subscription found ... ['15262978']
    Connection error: 0 - error parsing request
    Generally which I have noticed I am getting connection error when I call ws.subscribe() or kws.subscribe() method, Please advise me is anything wrong in this approach?

  • kumar_krishnan
    @rakeshr and @sujith I found the issue, I am using instrument token as string with str() function, and subscribe method takes list of integer or long and not string. So that was the problem. Thank you very much to both of you
This discussion has been closed.