Connection error: 1006 - connection was closed uncleanly (peer dropped the TCP connection without pr

sbalaji987
sbalaji987 edited October 2021 in Python client
I know there have been some previous discussions about this error. However, nothing in those older threads or in the FAQs helped me.:
Connection error: 1006 - connection was closed uncleanly (peer dropped the TCP connection without previous WebSocket closing handshake)

What I'm trying to do: Stream websocket data to my laptop and save in my local mysql server instance.

What's happening: I run the code attached below, but the error above shows up immediately.

What I've tried to troubleshoot:
1) api_key and access token are fine.
2) threaded=True is enabled, along with an infinite while loop after that so that the compute/insert operation is not done on the main thread
3) indentation is fine
4) I see some people in other threads have had issues with dynamic subscriptions. but my list of tokens is static.
5) In the .connect() method, I've tried with or without the disable_ssl_verification=True parameter. Doesn't seem to make a difference.
6) Confirmed that the mysql server is active and I'm able to write dataframes to it with similar code as below.

I would be very grateful if someone is able to point me to a solution. (hoping @rakeshr , @sujith , @MAG , @Vivek , @sauravkedia , @Shaha or other experienced folks know an answer. Thanks in advance!)

Code:


import mysql.connector as mysql
db = mysql.connect(host = "localhost",user = "abc",passwd = "pwd")

api_key=open('api_key.txt','r').read()
access_token=open('access_token.txt','r').read()

tokens = [2825473, 2825985, 2827521, 2828801, 2830849, 2832641, 2834433]

from kiteconnect import KiteTicker
import pandas as pd
import time
import sys
kws1=KiteTicker(api_key,access_token)
from datetime import datetime as dt
from datetime import timedelta as td
x = dt.now()

def on_ticks(ws,ticks):
print(ticks)

def on_connect(ws,response):
ws.subscribe(tokens)
ws.set_mode(ws.MODE_FULL,tokens)

def on_close(ws,code,response):
print(code, "\n", response)
ws.stop()


# Callback when connection closed with error.
def on_error(ws, code, reason):
print("Connection error: {code} - {reason}".format(code=code, reason=reason))

# Callback when reconnect is on progress
def on_reconnect(ws, attempts_count):
print("Reconnecting: {}".format(attempts_count))

# Callback when all reconnect failed (exhausted max retries)
def on_noreconnect(ws):
print("Reconnect failed.")


# Assign the callbacks.
kws1.on_error = on_error
kws1.on_reconnect = on_reconnect
kws1.on_noreconnect = on_noreconnect

kws1.on_ticks=on_ticks
kws1.on_connect=on_connect
kws1.on_close=on_close

kws1.connect(threaded=True, disable_ssl_verification=True)

while True:
def on_ticks(ws, ticks):
archive(ticks)
print("committed ticks at "+str(time.time()))

def archive(ticks):
lis = pd.DataFrame(ticks)
lis = lis.astype({'tradable':'object', 'last_trade_time':'object', 'timestamp':'object'})

lis['opn'] = lis['ohlc'].apply(lambda row:row['open'])
lis['high'] = lis['ohlc'].apply(lambda row:row['high'])
lis['low'] = lis['ohlc'].apply(lambda row:row['low'])
lis['clse'] = lis['ohlc'].apply(lambda row:row['close'])

for i in ['buy', 'sell']:
for j in range(5):
for k in ['quantity', 'price', 'orders']:
lis['depth_'+i+'_'+str(j+1)+'_'+k] = lis['depth'].apply(lambda row:row[i][j][k])

lis = lis.rename(columns={"change":"changex"})
lis = lis.drop(columns=['depth', 'ohlc'])

#append to mysql
currentd = dt.today()
try:
cursor = db.cursor()
cursor.execute("create table if not exists Finmin.daily_stream_"+currentd.strftime("%y_%m_%d")+" (tradable varchar(5), mode varchar(10), \
instrument_token int, last_price float, last_quantity int, average_price float, volume int, buy_quantity int, \
sell_quantity int, changex float, last_trade_time varchar(26), oi int, oi_day_high int, oi_day_low int, \
timestamp varchar(26), opn float, high float, low float, clse float, \
depth_buy_1_quantity int, depth_buy_1_price float, depth_buy_1_orders int, \
depth_buy_2_quantity int, depth_buy_2_price float, depth_buy_2_orders int, \
depth_buy_3_quantity int, depth_buy_3_price float, depth_buy_3_orders int, \
depth_buy_4_quantity int, depth_buy_4_price float, depth_buy_4_orders int, \
depth_buy_5_quantity int, depth_buy_5_price float, depth_buy_5_orders int, \
depth_sell_1_quantity int, depth_sell_1_price float, depth_sell_1_orders int, \
depth_sell_2_quantity int, depth_sell_2_price float, depth_sell_2_orders int, \
depth_sell_3_quantity int, depth_sell_3_price float, depth_sell_3_orders int, \
depth_sell_4_quantity int, depth_sell_4_price float, depth_sell_4_orders int, \
depth_sell_5_quantity int, depth_sell_5_price float, depth_sell_5_orders int);")

colstring = "`,`".join(str(i) for i in lis.columns)
insrt_string = "insert into Finmin.daily_stream_"+currentd.strftime("%y_%m_%d")+" (`"+colstring+"`) values ("+"%s,"*48+"%s)"
cursor.fast_executemany = True
cursor.executemany(insrt_string, [row for row in lis.itertuples(index=False, name=None)])
db.commit()
except:
print(lis)
print(" ".join([str(i) for i in sys.exc_info()]))

kws1.on_ticks=on_ticks
  • rakeshr
    You have heavy computation inside the threaded ticker, you need to follow the task queue method mentioned as solution1 in this thread. You can go through this example of the task queue method.
  • sbalaji987
    Thanks @rakeshr ! I'll try out celery and other queue managers and respond here if they work out.

    Just to confirm - Is this error happening because the threaded ticker is not finished with processing the previous tick when it's hit with a new tick of data? I suppose it makes sense.
  • rakeshr
    Is this error happening because the threaded ticker is not finished with processing the previous tick when it's hit with a new tick of data?
    Yes
  • sbalaji987
    sbalaji987 edited October 2021
    Thank you very much, @rakeshr. I am able to stream without any issue now after starting to use celery (~2000 instruments over one websocket )




    This thread can be closed now.
Sign In or Register to comment.