It looks like you're new here. If you want to get involved, click one of these buttons!
import mysql.connector as mysql
db = mysql.connect(host = "localhost",user = "abc",passwd = "pwd")
api_key=open('api_key.txt','r').read()
access_token=open('access_token.txt','r').read()
tokens = [2825473, 2825985, 2827521, 2828801, 2830849, 2832641, 2834433]
from kiteconnect import KiteTicker
import pandas as pd
import time
import sys
kws1=KiteTicker(api_key,access_token)
from datetime import datetime as dt
from datetime import timedelta as td
x = dt.now()
def on_ticks(ws,ticks):
print(ticks)
def on_connect(ws,response):
ws.subscribe(tokens)
ws.set_mode(ws.MODE_FULL,tokens)
def on_close(ws,code,response):
print(code, "\n", response)
ws.stop()
# Callback when connection closed with error.
def on_error(ws, code, reason):
print("Connection error: {code} - {reason}".format(code=code, reason=reason))
# Callback when reconnect is on progress
def on_reconnect(ws, attempts_count):
print("Reconnecting: {}".format(attempts_count))
# Callback when all reconnect failed (exhausted max retries)
def on_noreconnect(ws):
print("Reconnect failed.")
# Assign the callbacks.
kws1.on_error = on_error
kws1.on_reconnect = on_reconnect
kws1.on_noreconnect = on_noreconnect
kws1.on_ticks=on_ticks
kws1.on_connect=on_connect
kws1.on_close=on_close
kws1.connect(threaded=True, disable_ssl_verification=True)
while True:
def on_ticks(ws, ticks):
archive(ticks)
print("committed ticks at "+str(time.time()))
def archive(ticks):
lis = pd.DataFrame(ticks)
lis = lis.astype({'tradable':'object', 'last_trade_time':'object', 'timestamp':'object'})
lis['opn'] = lis['ohlc'].apply(lambda row:row['open'])
lis['high'] = lis['ohlc'].apply(lambda row:row['high'])
lis['low'] = lis['ohlc'].apply(lambda row:row['low'])
lis['clse'] = lis['ohlc'].apply(lambda row:row['close'])
for i in ['buy', 'sell']:
for j in range(5):
for k in ['quantity', 'price', 'orders']:
lis['depth_'+i+'_'+str(j+1)+'_'+k] = lis['depth'].apply(lambda row:row[i][j][k])
lis = lis.rename(columns={"change":"changex"})
lis = lis.drop(columns=['depth', 'ohlc'])
#append to mysql
currentd = dt.today()
try:
cursor = db.cursor()
cursor.execute("create table if not exists Finmin.daily_stream_"+currentd.strftime("%y_%m_%d")+" (tradable varchar(5), mode varchar(10), \
instrument_token int, last_price float, last_quantity int, average_price float, volume int, buy_quantity int, \
sell_quantity int, changex float, last_trade_time varchar(26), oi int, oi_day_high int, oi_day_low int, \
timestamp varchar(26), opn float, high float, low float, clse float, \
depth_buy_1_quantity int, depth_buy_1_price float, depth_buy_1_orders int, \
depth_buy_2_quantity int, depth_buy_2_price float, depth_buy_2_orders int, \
depth_buy_3_quantity int, depth_buy_3_price float, depth_buy_3_orders int, \
depth_buy_4_quantity int, depth_buy_4_price float, depth_buy_4_orders int, \
depth_buy_5_quantity int, depth_buy_5_price float, depth_buy_5_orders int, \
depth_sell_1_quantity int, depth_sell_1_price float, depth_sell_1_orders int, \
depth_sell_2_quantity int, depth_sell_2_price float, depth_sell_2_orders int, \
depth_sell_3_quantity int, depth_sell_3_price float, depth_sell_3_orders int, \
depth_sell_4_quantity int, depth_sell_4_price float, depth_sell_4_orders int, \
depth_sell_5_quantity int, depth_sell_5_price float, depth_sell_5_orders int);")
colstring = "`,`".join(str(i) for i in lis.columns)
insrt_string = "insert into Finmin.daily_stream_"+currentd.strftime("%y_%m_%d")+" (`"+colstring+"`) values ("+"%s,"*48+"%s)"
cursor.fast_executemany = True
cursor.executemany(insrt_string, [row for row in lis.itertuples(index=False, name=None)])
db.commit()
except:
print(lis)
print(" ".join([str(i) for i in sys.exc_info()]))
kws1.on_ticks=on_ticks
Just to confirm - Is this error happening because the threaded ticker is not finished with processing the previous tick when it's hit with a new tick of data? I suppose it makes sense.
This thread can be closed now.