Hi, I am using the python client. I have a very basic unit test in python.In the test the websocket streaming is opened per function. The class contains around 5-6 function.When the functions are run all at once I am getting the error 'Connection is already closed'. When I try to run each function at a time ,it works perfectly fine. Any idea what is going on? I am using threading=true. I am explicitly closing the kite websocket in finally block of every function and opening again at the start of the function.I have tried putting sleep after every websocket close,but no luck.In the below code self.socket.send is failing and therefore its closing the connection
for token in instrument_tokens: self.subscribed_tokens.add(token)
return True except: self.socket.close() raise
What looks to me is close of the streaming,doesnt kill the thread and therefore the socket is not free. Also just before subscribing I tried reconnecting didnt work. I think the problem is 1 process containing multiple threads- opening and closing at the same socket is not working.For every close is the thread killed?
@gully Are you sure you are trying to subscribe once the websocket connection is opened? Your tests should call the functions when on_connect is triggered. If you can share your code we will look into it.
The above code is the wrapper for Webstreaming from kiteconnect import WebSocket from pubsub import pub #@Gully
class KiteWebsocketOperation(object): # Initialise. kws = WebSocket("****", "***", "**") scripIdArray=[]
def __init__(self): print "on init in KiteWebSocketOperation"
# Callback for tick reception. def on_tick(self,tick, ws): print tick tickLength=len(tick) for i in range(0,tickLength): instName=tick[i]['instrument_token'] pub.sendMessage(str(instName),tickData=tick)
def close(self): self.onCloseReference()
# Callback for successful connection. def on_connect(self,ws): # Subscribe to a list of instrument_tokens (RELIANCE and ACC here). #ws.subscribe([53230599,53274375]) self.ws=ws print "Connected to Kite websocket"
def on_tick(self,tick, ws): for each in tick: print each eachInstrument=each['instrument_token'] if self.instrument1==eachInstrument: self.instrument1Found=True elif self.instrument2==eachInstrument: self.instrument2Found=True else: self.otherFound=True
def addScripForStreaming_oneScrip_AfterConnect(self): print "in after connect of One scrip" self.kiteWebOpsObject.addScripForStreaming([self.instrument1])
def addScripForStreaming_TwoScrip_AfterConnect(self): print "in after connect of TwoScrip" self.kiteWebOpsObject.addScripForStreaming([self.instrument1,self.instrument2])
def addScripForStreaming_TwoScrip_withStopping(self): print "in after connect for two scrip with stopping" self.kiteWebOpsObject.addScripForStreaming([self.instrument1]) time.sleep(5) self.kiteWebOpsObject.addScripForStreaming([self.instrument2])
def test_addScripForStreaming_oneScrip(self): print "Testing addition of one scrip without stopping"
@vivek I changed my application to subscribe inside on_connect,but still its not working.The connection is getting closed.after on_connect.Note that each individual functions run perfectly fine
@Kailash -Here is what I have done. I backed out from the idea to use this as a separate thread in my main application process.The reason being too much dependency on Socket Api with my application and memory consumption.Therefore I am using KiteWebsocket as a separate process and communicating with my main process via tcp socket queues-rabbitmq/zmq to be in particular. Posting this information as this might be useful for someone who steps on this problem. But the above problem still needs to be looked into,and I assume you are looking into.Thanks
on_connect
is triggered. If you can share your code we will look into it.from kiteconnect import WebSocket
from pubsub import pub
#@Gully
class KiteWebsocketOperation(object):
# Initialise.
kws = WebSocket("****", "***", "**")
scripIdArray=[]
def __init__(self):
print "on init in KiteWebSocketOperation"
# Callback for tick reception.
def on_tick(self,tick, ws):
print tick
tickLength=len(tick)
for i in range(0,tickLength):
instName=tick[i]['instrument_token']
pub.sendMessage(str(instName),tickData=tick)
def close(self):
self.onCloseReference()
# Callback for successful connection.
def on_connect(self,ws):
# Subscribe to a list of instrument_tokens (RELIANCE and ACC here).
#ws.subscribe([53230599,53274375])
self.ws=ws
print "Connected to Kite websocket"
# Assign the callbacks .
def execute(self,tickReference,onConnectReference,closeReference):
self.kws.on_tick = tickReference
self.kws.on_connect = onConnectReference
self.kws.on_close = closeReference
self.kws.connect(threaded=True)
def closeStreaming(self):
print "Connection closed"
self.kws.close()
def addScripForStreaming(self,addedScripArray):
arrLength=len(addedScripArray)
if(arrLength)>0:
self.scripIdArray=self.scripIdArray+addedScripArray
self.scripIdArray=self._getUniqueListOfInstrumentToken(self.scripIdArray)
self.ws.subscribe(self.scripIdArray)
self.ws.set_mode(self.ws.MODE_FULL,self.scripIdArray)
def removeScripForStreaming(self, removedScripArray):
arrLength=len(removedScripArray)
if arrLength>0:
self.scripIdArray = self._removeFromArray(self.scripIdArray,removedScripArray)
self.scripIdArray=self._getUniqueListOfInstrumentToken(self.scripIdArray)
self.ws.unsubscribe(removedScripArray)
def _removeFromArray(self,originalList,removeList):
originalListSet=set(originalList)
removeListSet=set(removeList)
minusSet=originalListSet.difference(removeListSet)
return list(minusSet)
def reconnect(self):
self.kws.reconnect()
def _getUniqueListOfInstrumentToken(self,scripArray):
setArray=set(scripArray)
listScrip=list(setArray)
return listScrip
"""if __name__=="__main__":
obj=KiteWebsocketOperation()
#eventlet.spawn()
try:
obj.execute(obj.on_tick,obj.on_connect,obj.closeStreaming)
except Exception,e:
print str(e)"""
The below code is the smoke tests where I am opening the streaming muliple times within a process.
from KiteApp.KiteWebsocketOperation import KiteWebsocketOperation
import unittest
import time
#@Gully
class WebsocketStreamingTest(unittest.TestCase):
instrument1 = 53287175 #53287175 #CRUDEOIL16OCTFUT
instrument2 = 53287431 #11559426 #53287431 #CRUDEOILM16OCTFUT
instrument1Found=False
instrument2Found=False
otherFound=False
kiteWebOpsObject=None
def on_tick(self,tick, ws):
for each in tick:
print each
eachInstrument=each['instrument_token']
if self.instrument1==eachInstrument:
self.instrument1Found=True
elif self.instrument2==eachInstrument:
self.instrument2Found=True
else:
self.otherFound=True
def on_connect(self,ws):
self.kiteWebOpsObject.ws=ws
print "Connected"
self.onAfterConnect()
def close(self,ws):
print "closed"
def addScripForStreaming_oneScrip_AfterConnect(self):
print "in after connect of One scrip"
self.kiteWebOpsObject.addScripForStreaming([self.instrument1])
def addScripForStreaming_TwoScrip_AfterConnect(self):
print "in after connect of TwoScrip"
self.kiteWebOpsObject.addScripForStreaming([self.instrument1,self.instrument2])
def addScripForStreaming_TwoScrip_withStopping(self):
print "in after connect for two scrip with stopping"
self.kiteWebOpsObject.addScripForStreaming([self.instrument1])
time.sleep(5)
self.kiteWebOpsObject.addScripForStreaming([self.instrument2])
def test_addScripForStreaming_oneScrip(self):
print "Testing addition of one scrip without stopping"
self.instrument1Found=False
self.instrument2Found=False
try:
self.kiteWebOpsObject=KiteWebsocketOperation()
self.onAfterConnect=self.addScripForStreaming_oneScrip_AfterConnect
self.kiteWebOpsObject.execute(self.on_tick,self.on_connect,self.close)
time.sleep(15)
finally:
assert self.instrument1Found==True
self.kiteWebOpsObject.closeStreaming()
self.kiteWebOpsObject=None
time.sleep(5)
def test_addScripForStreaming_TwoScrip(self):
print "Testing addition of two scrips without stopping"
self.instrument1Found = False
self.instrument2Found = False
try:
self.kiteWebOpsObject = KiteWebsocketOperation()
self.onAfterConnect = self.addScripForStreaming_TwoScrip_AfterConnect
self.kiteWebOpsObject.execute(self.on_tick, self.on_connect, self.close)
time.sleep(15)
finally:
assert self.instrument1Found == True
assert self.instrument2Found == True
self.kiteWebOpsObject.closeStreaming()
self.kiteWebOpsObject=None
time.sleep(5)
def test_addScripForStreaming_OneScrip_withStopping(self):
print "Testing addition of two scrips WITH stopping"
self.instrument1Found = False
self.instrument2Found = False
try:
self.kiteWebOpsObject = KiteWebsocketOperation()
self.onAfterConnect = self.addScripForStreaming_TwoScrip_withStopping
self.kiteWebOpsObject.execute(self.on_tick, self.on_connect, self.close)
time.sleep(15)
finally:
assert self.instrument1Found == True
assert self.instrument2Found == True
self.kiteWebOpsObject.closeStreaming()
self.kiteWebOpsObject=None
time.sleep(5)
def main():
unittest.main()
if __name__ == '__main__':
main()
@vivek I changed my application to subscribe inside on_connect,but still its not working.The connection is getting closed.after on_connect.Note that each individual functions run perfectly fine