Connection is already closed error

gully
gully edited August 2016 in Python client
Hi,
I am using the python client. I have a very basic unit test in python.In the test the websocket streaming is opened per function. The class contains around 5-6 function.When the functions are run all at once I am getting the error 'Connection is already closed'. When I try to run each function at a time ,it works perfectly fine. Any idea what is going on? I am using threading=true. I am explicitly closing the kite websocket in finally block of every function and opening again at the start of the function.I have tried putting sleep after every websocket close,but no luck.In the below code self.socket.send is failing and therefore its closing the connection

try:
self.socket.send(json.dumps({"a": self._message_subscribe, "v": instrument_tokens}))

for token in instrument_tokens:
self.subscribed_tokens.add(token)

return True
except:
self.socket.close()
raise

What looks to me is close of the streaming,doesnt kill the thread and therefore the socket is not free.
Also just before subscribing I tried reconnecting didnt work. I think the problem is 1 process containing multiple threads- opening and closing at the same socket is not working.For every close is the thread killed?
  • Vivek
    @gully Are you sure you are trying to subscribe once the websocket connection is opened? Your tests should call the functions when on_connect is triggered. If you can share your code we will look into it.
  • gully
    gully edited September 2016
    The above code is the wrapper for Webstreaming

    from kiteconnect import WebSocket
    from pubsub import pub
    #@Gully

    class KiteWebsocketOperation(object):
    # Initialise.
    kws = WebSocket("****", "***", "**")
    scripIdArray=[]

    def __init__(self):
    print "on init in KiteWebSocketOperation"


    # Callback for tick reception.
    def on_tick(self,tick, ws):
    print tick
    tickLength=len(tick)
    for i in range(0,tickLength):
    instName=tick[i]['instrument_token']
    pub.sendMessage(str(instName),tickData=tick)

    def close(self):
    self.onCloseReference()

    # Callback for successful connection.
    def on_connect(self,ws):
    # Subscribe to a list of instrument_tokens (RELIANCE and ACC here).
    #ws.subscribe([53230599,53274375])
    self.ws=ws
    print "Connected to Kite websocket"

    # Assign the callbacks .
    def execute(self,tickReference,onConnectReference,closeReference):
    self.kws.on_tick = tickReference
    self.kws.on_connect = onConnectReference
    self.kws.on_close = closeReference

    self.kws.connect(threaded=True)

    def closeStreaming(self):
    print "Connection closed"
    self.kws.close()

    def addScripForStreaming(self,addedScripArray):
    arrLength=len(addedScripArray)
    if(arrLength)>0:
    self.scripIdArray=self.scripIdArray+addedScripArray
    self.scripIdArray=self._getUniqueListOfInstrumentToken(self.scripIdArray)
    self.ws.subscribe(self.scripIdArray)
    self.ws.set_mode(self.ws.MODE_FULL,self.scripIdArray)

    def removeScripForStreaming(self, removedScripArray):
    arrLength=len(removedScripArray)
    if arrLength>0:
    self.scripIdArray = self._removeFromArray(self.scripIdArray,removedScripArray)
    self.scripIdArray=self._getUniqueListOfInstrumentToken(self.scripIdArray)
    self.ws.unsubscribe(removedScripArray)

    def _removeFromArray(self,originalList,removeList):
    originalListSet=set(originalList)
    removeListSet=set(removeList)
    minusSet=originalListSet.difference(removeListSet)
    return list(minusSet)

    def reconnect(self):
    self.kws.reconnect()

    def _getUniqueListOfInstrumentToken(self,scripArray):
    setArray=set(scripArray)
    listScrip=list(setArray)
    return listScrip



    """if __name__=="__main__":
    obj=KiteWebsocketOperation()
    #eventlet.spawn()
    try:
    obj.execute(obj.on_tick,obj.on_connect,obj.closeStreaming)
    except Exception,e:
    print str(e)"""




    The below code is the smoke tests where I am opening the streaming muliple times within a process.


    from KiteApp.KiteWebsocketOperation import KiteWebsocketOperation
    import unittest
    import time
    #@Gully

    class WebsocketStreamingTest(unittest.TestCase):
    instrument1 = 53287175 #53287175 #CRUDEOIL16OCTFUT
    instrument2 = 53287431 #11559426 #53287431 #CRUDEOILM16OCTFUT
    instrument1Found=False
    instrument2Found=False
    otherFound=False
    kiteWebOpsObject=None

    def on_tick(self,tick, ws):
    for each in tick:
    print each
    eachInstrument=each['instrument_token']
    if self.instrument1==eachInstrument:
    self.instrument1Found=True
    elif self.instrument2==eachInstrument:
    self.instrument2Found=True
    else:
    self.otherFound=True

    def on_connect(self,ws):
    self.kiteWebOpsObject.ws=ws
    print "Connected"
    self.onAfterConnect()

    def close(self,ws):
    print "closed"

    def addScripForStreaming_oneScrip_AfterConnect(self):
    print "in after connect of One scrip"
    self.kiteWebOpsObject.addScripForStreaming([self.instrument1])

    def addScripForStreaming_TwoScrip_AfterConnect(self):
    print "in after connect of TwoScrip"
    self.kiteWebOpsObject.addScripForStreaming([self.instrument1,self.instrument2])

    def addScripForStreaming_TwoScrip_withStopping(self):
    print "in after connect for two scrip with stopping"
    self.kiteWebOpsObject.addScripForStreaming([self.instrument1])
    time.sleep(5)
    self.kiteWebOpsObject.addScripForStreaming([self.instrument2])


    def test_addScripForStreaming_oneScrip(self):
    print "Testing addition of one scrip without stopping"

    self.instrument1Found=False
    self.instrument2Found=False
    try:
    self.kiteWebOpsObject=KiteWebsocketOperation()
    self.onAfterConnect=self.addScripForStreaming_oneScrip_AfterConnect
    self.kiteWebOpsObject.execute(self.on_tick,self.on_connect,self.close)
    time.sleep(15)

    finally:
    assert self.instrument1Found==True
    self.kiteWebOpsObject.closeStreaming()
    self.kiteWebOpsObject=None
    time.sleep(5)

    def test_addScripForStreaming_TwoScrip(self):
    print "Testing addition of two scrips without stopping"
    self.instrument1Found = False
    self.instrument2Found = False
    try:
    self.kiteWebOpsObject = KiteWebsocketOperation()
    self.onAfterConnect = self.addScripForStreaming_TwoScrip_AfterConnect
    self.kiteWebOpsObject.execute(self.on_tick, self.on_connect, self.close)
    time.sleep(15)

    finally:
    assert self.instrument1Found == True
    assert self.instrument2Found == True
    self.kiteWebOpsObject.closeStreaming()
    self.kiteWebOpsObject=None
    time.sleep(5)

    def test_addScripForStreaming_OneScrip_withStopping(self):
    print "Testing addition of two scrips WITH stopping"

    self.instrument1Found = False
    self.instrument2Found = False
    try:
    self.kiteWebOpsObject = KiteWebsocketOperation()
    self.onAfterConnect = self.addScripForStreaming_TwoScrip_withStopping
    self.kiteWebOpsObject.execute(self.on_tick, self.on_connect, self.close)
    time.sleep(15)

    finally:
    assert self.instrument1Found == True
    assert self.instrument2Found == True
    self.kiteWebOpsObject.closeStreaming()
    self.kiteWebOpsObject=None
    time.sleep(5)



    def main():
    unittest.main()

    if __name__ == '__main__':
    main()


    @vivek I changed my application to subscribe inside on_connect,but still its not working.The connection is getting closed.after on_connect.Note that each individual functions run perfectly fine
  • gully
    @vivek @Kailash any update on this?
  • Kailash
    Hi @gully, apologies. This thread got buried. I'm getting someone to look into this and will post an update soon.
  • gully
    @Kailash -Here is what I have done. I backed out from the idea to use this as a separate thread in my main application process.The reason being too much dependency on Socket Api with my application and memory consumption.Therefore I am using KiteWebsocket as a separate process and communicating with my main process via tcp socket queues-rabbitmq/zmq to be in particular. Posting this information as this might be useful for someone who steps on this problem. But the above problem still needs to be looked into,and I assume you are looking into.Thanks
  • Kailash
    @gully Ah, that works. Yes, this is being looked into.
  • syamkiranv
    @Kailash , Is the problem fixed or is there any workaround as I am also facing similar to issue.
Sign In or Register to comment.