Mutithreading in WebSockets creating issue (python)

Detrader
Hi team,
What I am tying to do:
1.Record Tick data in one thread and process it in another thread (as data from the websocket sometimes misses some ticks).
Data is being recorded in csv. Both threads are different from main thread.

Code:
# Main thread
from login_kiteconnect import *  // This is returning me the request token 
from kiteconnect import KiteConnect
from WebSocket_thread import *
from time_correct_thread import time_correct_mythread
from time_final_thread import time_final_mythread
import threading
import time
import logging


api_key="xxxxxxxxxxxxxxxxx"
api_secret="yyyyyyyyyyyyyyyyyyy"

kite=KiteConnect(api_key=api_key)

data = kite.generate_session(request_token[0], api_secret)

#Websocket creation for ticker data
WebSocket=Websocket_mythread(api_key, data["access_token"] )

#time_correct_thread creation for first level of filtering data : removal of same tickers
correct_time=time_correct_mythread()


WebSocket.start() # Adds tick price to minute.csv , raw format


correct_time.start() #adds data with removal of repetitions,adding of empty row for missing values in minute.csv


WebSocket.join()
correct_time.join()

####################################Second thread #########################################
Websocket Thread:

from kiteconnect import KiteTicker
import threading
import csv

count=0


class Websocket_mythread(threading.Thread):
def __init__(self, api_key, access_token ):
threading.Thread.__init__(self)
self.api_key = api_key
self.access_token = access_token
self.user_id = user_id

with open('minute.csv', 'w') as csvfile:
csvwriter = csv.DictWriter(csvfile, fieldnames=["Date","Time","LTP"])
csvwriter.writeheader()


def run(self):
global count

kws = KiteTicker(self.api_key, self.access_token, debug=True)
kws.on_ticks = on_ticks
kws.on_connect = on_connect
# kws.on_close = on_close

kws.on_noreconnect = on_noreconnect
kws.on_reconnect = on_reconnect
kws.on_close = on_close
kws.on_error = on_error

kws.connect(threaded=True)
kws.connect(disable_ssl_verification=False)



def on_ticks(ws, ticks):
# Callback to receive ticks.
for element in ticks:
with open('minute.csv', 'a') as csvfile:
csvwriter = csv.DictWriter(csvfile, fieldnames=["Date","Time","LTP"])
if ((element.get("timestamp").time().second == 00) or (element.get("timestamp").time().second == 59)):
csvwriter.writerow({"Date" :element.get("timestamp").date() , "Time":element.get("timestamp").time() , "LTP":element.get("last_price")})




def on_connect(ws, response):
# Callback on successful connect.
# Subscribe to a list of instrument_tokens (RELIANCE and ACC here).
ws.subscribe([738561])

# Set RELIANCE to tick in `full` mode.
ws.set_mode(ws.MODE_FULL, [738561])

def on_close():
print("closed Websocket")


def on_noreconnect(ws):
logging.error("Reconnecting the websocket failed")


def on_reconnect(ws, attempt_count=10):
logging.debug("Reconnecting the websocket: {}".format(attempt_count))


def on_error(ws, code, reason):
logging.error("closed connection on error: {} {}".format(code, reason))

####################################Error I am getting##########################################

2018-04-17 12:45:22+0530 [-] Log opened.
2018-04-17 12:45:22+0530 [-] Starting factory
2018-04-17 12:45:22,897 - kiteconnect.ticker - DEBUG - Start WebSocket connection.
2018-04-17 12:45:22+0530 [-] Exception in thread Thread-1:
2018-04-17 12:45:22+0530 [-] Traceback (most recent call last):
2018-04-17 12:45:22+0530 [-] File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
2018-04-17 12:45:22+0530 [-] self.run()
2018-04-17 12:45:22+0530 [-] File "/home/deepak/Zero/src/WebSocket_thread.py", line 36, in run
2018-04-17 12:45:22+0530 [-] kws.connect(disable_ssl_verification=False)
2018-04-17 12:45:22+0530 [-] File "build/bdist.linux-x86_64/egg/kiteconnect/ticker.py", line 521, in connect
2018-04-17 12:45:22+0530 [-] reactor.run(**opts)
2018-04-17 12:45:22+0530 [-] File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1260, in run
2018-04-17 12:45:22+0530 [-] self.startRunning(installSignalHandlers=installSignalHandlers)
2018-04-17 12:45:22+0530 [-] File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1240, in startRunning
2018-04-17 12:45:22+0530 [-] ReactorBase.startRunning(self)
2018-04-17 12:45:22+0530 [-] File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 746, in startRunning
2018-04-17 12:45:22+0530 [-] raise error.ReactorAlreadyRunning()
2018-04-17 12:45:22+0530 [-] ReactorAlreadyRunning
2018-04-17 12:45:22+0530 [-]


And then it stops.

I tried several similar posts solution ie. of making websocket connection in main thread but then i am not able to figure out , where to put thread2.start() as after Kws.connect on main thread nothing after that gets executed.
Tagged:
  • sujith
    If you are intending to close the websocket connection by stopping the thread then inside on_close, you need call ws.stop() as mentioned in the example on pykiteconnect repository.
  • Detrader
    Hi Sujith,

    I am not trying to close the connection, the issue is it automatically stops ie. the websocket connection, So as in the code in on_ticks method i am dumping ticks into csv file, which doesn't happen as the connection gets closed.
  • naz
    @sujith Facing similar issue with python multi-threading after migration to Kite api version-3. It was running smoothly previously.
  • vineet_dhandhania
    Hi Detrader,
    I am wondering if you need to run kws.connect twice:
    kws.connect(threaded=True)
    kws.connect(disable_ssl_verification=False)
    Try running kws.connect only once. Set both threaded=True and disable_ssl_verification=False in one call of connect().
    HTH.
  • Vivek
    Vivek edited May 2018
    @vineet_dhandhania You just need to run it once like this kws.connect(threaded=True, disable_ssl_verification=False)
  • vineet_dhandhania
    @ vivek I was actually saying the same thing :)
Sign In or Register to comment.