i am extracting live data from websocket and for additional tasks i created a seperate thread so that it can run besided websocket running in the main thread with threaded=false so i am confused that shall i continue with this or shall i use threaded=true to handle suchthings
There are two ways to pass tick data from on_tick thread to perform any operation without blocking the main on_ticks thread. You may refer to this discussion for more details.
i am aware of them , but never worked so basically the thing is that i was confused should i create a thread of my own and run websocket in the main thread only or should i run the websocket in diff background thread and my main thread run on its own, if u get my question pls do respond ):
import os import json import threading, time from datetime import datetime, timedelta, timezone from kiteconnect import KiteConnect, KiteTicker from dotenv import load_dotenv import gspread
def calculate_percentage_change(current_price, prev_close): """Calculate percentage change from previous close.""" if not prev_close or prev_close == 0: return "" return round(((current_price - prev_close) / prev_close) * 100, 2)
def format_percentage_change(pct_change): """Format percentage change with + or - sign.""" if pct_change == "" or pct_change == 0: return "" return f"{'+' if pct_change > 0 else ''}{pct_change}%"
# ------------------ Helpers ------------------ def minute_of(dt): """Truncate datetime to the minute.""" return dt.replace(second=0, microsecond=0)
# ------------------ Per-minute finalization ------------------ def process_minute(minute_dt): """Enhanced minute processing for comprehensive option chain.""" with data_lock: # Create snapshot of current data current_snapshot = last_known.copy() prev_close_snapshot = previous_close.copy()
# Get futures data fut_data = current_snapshot.get(fut_token, {}) fut_close = fut_data.get('last_price') if isinstance(fut_data, dict) else fut_data
# Build comprehensive CE/PE map strikes_map = {} for token, tick_data in current_snapshot.items(): if token == fut_token: continue if token not in token_to_option: continue
strike, typ = token_to_option[token] if strike not in strikes_map: strikes_map[strike] = {"CE": {}, "PE": {}}
# Store comprehensive data for each option if isinstance(tick_data, dict): ltp = tick_data.get('last_price', 0) prev_close = prev_close_snapshot.get(token, ltp) pct_change = calculate_percentage_change(ltp, prev_close)
def update_sheet_snapshot(minute_dt, fut_close, atm_strike, strikes_map): try: if ws is None or atm_strike is None: return
# Clear the sheet first ws.clear()
# 1. Add FUT info at top (like trading platforms) fut_info = [ [f"CRUDE OIL FUTURES - {minute_dt.strftime('%d %b %Y')}", "", "", ""], [f"LTP: {fut_close}", f"Time: {minute_dt.strftime('%H:%M:%S')}", "", ""], ["", "", "", ""], # Empty row for spacing ] ws.update('A1:D3', fut_info)
# 2. Determine strike range (ATM ± 5 strikes) strikes_to_show = [] for i in range(-STRIKES_RANGE, STRIKES_RANGE + 1): strike = atm_strike + (i * STRIKE_INTERVAL) strikes_to_show.append(strike)
# 3. Prepare header (simple format like your request) header = [["Strike", "CE", "PE", "Sum"]] ws.update('A4:D4', header)
# 4. Prepare data with professional option chain format values = [] atm_row_index = None
for idx, strike in enumerate(strikes_to_show): ce_data = strikes_map.get(strike, {}).get("CE", {}) pe_data = strikes_map.get(strike, {}).get("PE", {})
# Extract prices (handle both dict and simple number formats) if isinstance(ce_data, dict): ce_ltp = ce_data.get('last_price', '') else: ce_ltp = ce_data if ce_data else ''
if isinstance(pe_data, dict): pe_ltp = pe_data.get('last_price', '') else: pe_ltp = pe_data if pe_data else ''
# Calculate sum sum_value = "" if (ce_ltp and pe_ltp and isinstance(ce_ltp, (int, float)) and isinstance(pe_ltp, (int, float))): sum_value = round(ce_ltp + pe_ltp, 2)
# Track ATM row for highlighting if strike == atm_strike: atm_row_index = idx
# 5. Update data if values: end_row = 4 + len(values) ws.update(f'A5:D{end_row}', values)
# 6. Apply professional formatting with simple layout apply_professional_simple_formatting(atm_row_index, len(values))
print("✅ Professional option chain updated successfully")
except Exception as e: print("⚠️ Error writing to sheet:", e)
def apply_professional_simple_formatting(atm_row_index, total_rows): """formatting the sheet here ")
# ------------------ Dynamic ATM Updates ------------------ def check_atm_change(): """Check if ATM has changed and update subscriptions if needed.""" global current_atm_strike, kws_instance
with data_lock: current_fut_price = last_known.get(fut_token) if isinstance(current_fut_price, dict): current_fut_price = current_fut_price.get('last_price')
if current_fut_price is None: return
# Calculate new ATM (adjust strike interval for crude) new_atm = round(current_fut_price / STRIKE_INTERVAL) * STRIKE_INTERVAL
if hasattr(check_atm_change, 'last_atm') and check_atm_change.last_atm == new_atm: return
check_atm_change.last_atm = new_atm print(f"???? ATM changed to {new_atm} (FUT: {current_fut_price})")
# Here you could add logic to subscribe to new ATM options # and unsubscribe from far OTM options if needed
# ------------------ Enhanced Timer thread ------------------ def minute_timer(): """Run every minute boundary and finalize the previous minute.""" print("⏰ Minute timer started, waiting for WebSocket connection...")
# Wait for WebSocket to connect websocket_connected.wait() print("✅ WebSocket connected, starting minute processing...")
while not stop_timer.is_set(): now = datetime.now(timezone.utc) next_minute = minute_of(now) + timedelta(minutes=1) sleep_seconds = (next_minute - now).total_seconds()
# Sleep in small chunks to allow for clean shutdown while sleep_seconds > 0 and not stop_timer.is_set(): chunk = min(sleep_seconds, 1.0) time.sleep(chunk) sleep_seconds -= chunk
# Check for ATM changes every minute check_atm_change()
# ------------------ WebSocket Callbacks ------------------ def on_ticks(ws_obj, ticks): """Enhanced tick processing for comprehensive option chain data.""" global fut_price
with data_lock: for t in ticks: token = t.get("instrument_token") ltp = t.get("last_price") if ltp is None: continue
# Store comprehensive tick data tick_data = { 'last_price': ltp, 'volume': t.get('volume', 0), 'oi': t.get('oi', 0), # Open Interest if available 'high': t.get('ohlc', {}).get('high', ltp), 'low': t.get('ohlc', {}).get('low', ltp), 'close': t.get('ohlc', {}).get('close', ltp), 'timestamp': datetime.now() }
# Keep comprehensive data last_known[token] = tick_data
# Update daily tracking if token not in daily_high or ltp > daily_high[token]: daily_high[token] = ltp if token not in daily_low or ltp < daily_low[token]: daily_low[token] = ltp
# Store previous close for percentage calculations if token not in previous_close: previous_close[token] = tick_data.get('close', ltp)
# Pick first FUT as main underlying fut_token = crude_fut_oct[0]["instrument_token"] print(f"????️ Tracking FUT token: {fut_token}")
# Map option tokens -> (strike, type) token_to_option = {} for o in crude_opt_oct: strike = o["strike"] typ = o["instrument_type"] # CE or PE token_to_option[o["instrument_token"]] = (strike, typ)
should i create a thread of my own and run websocket in the main thread only or should i run the websocket in diff background thread and my main thread run on its own
You can run the WebSocket either in the main thread or in a background (daemon) thread, and the choice really depends on how your program is structured and what you want the main thread to do.
`threaded=False` → The WebSocket runs in the current thread (usually your main thread). This is simple and works well if your program doesn’t need the main thread for other tasks.
`threaded=True` → The WebSocket runs in a separate background thread. This is useful if you want your main thread free to handle other logic while the WebSocket keeps running in parallel.
@Nivas actually i know this , but i was confused what should i use as in either of ways i was creating my own thread at the end only for the other task ..my need was only to fetch live data and like update the google sheet per minute
basically the difference lies in where the on_tick is running , nothing else ? as either ways i am creating threads of my own because of my requirement there is like nothing ui or sth which is like running in the main thread the main thread code also required threads
Kite Connect is purely an execution platform. I am afraid we can't provide support for writing code. You may refer to our GitHub repo to know more details.
import json
import threading, time
from datetime import datetime, timedelta, timezone
from kiteconnect import KiteConnect, KiteTicker
from dotenv import load_dotenv
import gspread
load_dotenv()
API_KEY = os.getenv("KITE_API_KEY")
TOKEN_FILE = "token_store.json"
INSTRUMENT_FILE = "crude_instruments.json"
creds_path = os.getenv("GOOGLE_CREDS_PATH")
sheet_id = os.getenv("SHEET_ID")
gc = gspread.service_account(filename=creds_path)
sh = gc.open_by_key(sheet_id)
ws = sh.sheet1
# ------------------ Configuration ------------------
STRIKE_INTERVAL = 50 # Crude oil strike interval (adjust as needed: 25, 50, 100)
STRIKES_RANGE = 5 # Show ±5 strikes around ATM (total 11 strikes)
# ------------------ Loaders ------------------
def load_token():
with open(TOKEN_FILE, "r") as f:
return json.load(f)
def load_instruments():
with open(INSTRUMENT_FILE, "r") as f:
return json.load(f)
# ------------------ Globals ------------------
fut_token = None
token_to_option = {}
# Thread-safe data storage
data_lock = threading.Lock()
last_known = {} # token -> last LTP
fut_price = None
# Control flags
stop_timer = threading.Event()
websocket_connected = threading.Event()
# ------------------ Enhanced Data Storage ------------------
previous_close = {} # For calculating percentage changes
daily_high = {} # Track daily highs
daily_low = {} # Track daily lows
volume_data = {} # Track volume
oi_data = {} # Track Open Interest (if available)
def calculate_percentage_change(current_price, prev_close):
"""Calculate percentage change from previous close."""
if not prev_close or prev_close == 0:
return ""
return round(((current_price - prev_close) / prev_close) * 100, 2)
def format_percentage_change(pct_change):
"""Format percentage change with + or - sign."""
if pct_change == "" or pct_change == 0:
return ""
return f"{'+' if pct_change > 0 else ''}{pct_change}%"
# ------------------ Helpers ------------------
def minute_of(dt):
"""Truncate datetime to the minute."""
return dt.replace(second=0, microsecond=0)
# ------------------ Per-minute finalization ------------------
def process_minute(minute_dt):
"""Enhanced minute processing for comprehensive option chain."""
with data_lock:
# Create snapshot of current data
current_snapshot = last_known.copy()
prev_close_snapshot = previous_close.copy()
# Get futures data
fut_data = current_snapshot.get(fut_token, {})
fut_close = fut_data.get('last_price') if isinstance(fut_data, dict) else fut_data
# Build comprehensive CE/PE map
strikes_map = {}
for token, tick_data in current_snapshot.items():
if token == fut_token:
continue
if token not in token_to_option:
continue
strike, typ = token_to_option[token]
if strike not in strikes_map:
strikes_map[strike] = {"CE": {}, "PE": {}}
# Store comprehensive data for each option
if isinstance(tick_data, dict):
ltp = tick_data.get('last_price', 0)
prev_close = prev_close_snapshot.get(token, ltp)
pct_change = calculate_percentage_change(ltp, prev_close)
strikes_map[strike][typ] = {
'last_price': ltp,
'pct_change': pct_change,
'oi': tick_data.get('oi', 0),
'volume': tick_data.get('volume', 0)
}
else:
# Backward compatibility for simple price data
strikes_map[strike][typ] = {'last_price': tick_data, 'pct_change': '', 'oi': 0, 'volume': 0}
# Find ATM strike
atm_strike = None
if fut_close is not None and strikes_map:
atm_strike = min(strikes_map.keys(), key=lambda s: abs(s - fut_close))
print(f"\n???? Professional Option Chain - {minute_dt}")
print(f"FUT: {fut_close} | ATM: {atm_strike}")
print("=" * 70)
print(f"{'Strike':<8} {'CE LTP':<8} {'CE %':<8} {'PE LTP':<8} {'PE %':<8}")
print("=" * 70)
for strike in sorted(strikes_map.keys()):
ce_data = strikes_map[strike]["CE"]
pe_data = strikes_map[strike]["PE"]
ce_ltp = ce_data.get('last_price', '-') if ce_data else '-'
pe_ltp = pe_data.get('last_price', '-') if pe_data else '-'
ce_pct = format_percentage_change(ce_data.get('pct_change', '')) if ce_data else ''
pe_pct = format_percentage_change(pe_data.get('pct_change', '')) if pe_data else ''
atm_marker = " <- ATM" if strike == atm_strike else ""
print(f"{strike:<8} {ce_ltp:<8} {ce_pct:<8} {pe_ltp:<8} {pe_pct:<8}{atm_marker}")
# Update sheet in background thread
threading.Thread(
target=update_sheet_snapshot,
args=(minute_dt, fut_close, atm_strike, strikes_map),
daemon=True
).start()
def update_sheet_snapshot(minute_dt, fut_close, atm_strike, strikes_map):
try:
if ws is None or atm_strike is None:
return
# Clear the sheet first
ws.clear()
# 1. Add FUT info at top (like trading platforms)
fut_info = [
[f"CRUDE OIL FUTURES - {minute_dt.strftime('%d %b %Y')}", "", "", ""],
[f"LTP: {fut_close}", f"Time: {minute_dt.strftime('%H:%M:%S')}", "", ""],
["", "", "", ""], # Empty row for spacing
]
ws.update('A1:D3', fut_info)
# 2. Determine strike range (ATM ± 5 strikes)
strikes_to_show = []
for i in range(-STRIKES_RANGE, STRIKES_RANGE + 1):
strike = atm_strike + (i * STRIKE_INTERVAL)
strikes_to_show.append(strike)
# 3. Prepare header (simple format like your request)
header = [["Strike", "CE", "PE", "Sum"]]
ws.update('A4:D4', header)
# 4. Prepare data with professional option chain format
values = []
atm_row_index = None
for idx, strike in enumerate(strikes_to_show):
ce_data = strikes_map.get(strike, {}).get("CE", {})
pe_data = strikes_map.get(strike, {}).get("PE", {})
# Extract prices (handle both dict and simple number formats)
if isinstance(ce_data, dict):
ce_ltp = ce_data.get('last_price', '')
else:
ce_ltp = ce_data if ce_data else ''
if isinstance(pe_data, dict):
pe_ltp = pe_data.get('last_price', '')
else:
pe_ltp = pe_data if pe_data else ''
# Calculate sum
sum_value = ""
if (ce_ltp and pe_ltp and
isinstance(ce_ltp, (int, float)) and isinstance(pe_ltp, (int, float))):
sum_value = round(ce_ltp + pe_ltp, 2)
row_data = [strike, ce_ltp, pe_ltp, sum_value]
values.append(row_data)
# Track ATM row for highlighting
if strike == atm_strike:
atm_row_index = idx
# 5. Update data
if values:
end_row = 4 + len(values)
ws.update(f'A5:D{end_row}', values)
# 6. Apply professional formatting with simple layout
apply_professional_simple_formatting(atm_row_index, len(values))
print("✅ Professional option chain updated successfully")
except Exception as e:
print("⚠️ Error writing to sheet:", e)
def apply_professional_simple_formatting(atm_row_index, total_rows):
"""formatting the sheet here ")
# ------------------ Dynamic ATM Updates ------------------
def check_atm_change():
"""Check if ATM has changed and update subscriptions if needed."""
global current_atm_strike, kws_instance
with data_lock:
current_fut_price = last_known.get(fut_token)
if isinstance(current_fut_price, dict):
current_fut_price = current_fut_price.get('last_price')
if current_fut_price is None:
return
# Calculate new ATM (adjust strike interval for crude)
new_atm = round(current_fut_price / STRIKE_INTERVAL) * STRIKE_INTERVAL
if hasattr(check_atm_change, 'last_atm') and check_atm_change.last_atm == new_atm:
return
check_atm_change.last_atm = new_atm
print(f"???? ATM changed to {new_atm} (FUT: {current_fut_price})")
# Here you could add logic to subscribe to new ATM options
# and unsubscribe from far OTM options if needed
# ------------------ Enhanced Timer thread ------------------
def minute_timer():
"""Run every minute boundary and finalize the previous minute."""
print("⏰ Minute timer started, waiting for WebSocket connection...")
# Wait for WebSocket to connect
websocket_connected.wait()
print("✅ WebSocket connected, starting minute processing...")
while not stop_timer.is_set():
now = datetime.now(timezone.utc)
next_minute = minute_of(now) + timedelta(minutes=1)
sleep_seconds = (next_minute - now).total_seconds()
# Sleep in small chunks to allow for clean shutdown
while sleep_seconds > 0 and not stop_timer.is_set():
chunk = min(sleep_seconds, 1.0)
time.sleep(chunk)
sleep_seconds -= chunk
if stop_timer.is_set():
break
finished_minute = next_minute - timedelta(minutes=1)
process_minute(finished_minute)
# Check for ATM changes every minute
check_atm_change()
# ------------------ WebSocket Callbacks ------------------
def on_ticks(ws_obj, ticks):
"""Enhanced tick processing for comprehensive option chain data."""
global fut_price
with data_lock:
for t in ticks:
token = t.get("instrument_token")
ltp = t.get("last_price")
if ltp is None:
continue
# Store comprehensive tick data
tick_data = {
'last_price': ltp,
'volume': t.get('volume', 0),
'oi': t.get('oi', 0), # Open Interest if available
'high': t.get('ohlc', {}).get('high', ltp),
'low': t.get('ohlc', {}).get('low', ltp),
'close': t.get('ohlc', {}).get('close', ltp),
'timestamp': datetime.now()
}
# Keep comprehensive data
last_known[token] = tick_data
# Update daily tracking
if token not in daily_high or ltp > daily_high[token]:
daily_high[token] = ltp
if token not in daily_low or ltp < daily_low[token]:
daily_low[token] = ltp
# Store previous close for percentage calculations
if token not in previous_close:
previous_close[token] = tick_data.get('close', ltp)
if token == fut_token:
fut_price = ltp
def on_connect(ws_obj, response):
print("✅ WebSocket Connected. Subscribing to tokens...")
all_tokens = [fut_token] + list(token_to_option.keys())
ws_obj.subscribe(all_tokens)
ws_obj.set_mode(ws_obj.MODE_FULL, all_tokens)
print(f"???? Subscribed to {len(all_tokens)} instruments")
# Signal that WebSocket is ready
websocket_connected.set()
def on_close(ws_obj, code, reason):
print(f"❌ WebSocket Closed: {code} - {reason}")
websocket_connected.clear()
def on_error(ws_obj, code, reason):
print(f"⚠️ WebSocket Error: {code} - {reason}")
# ------------------ Main Runner ------------------
kws_instance = None
def run_stream():
global kws_instance, fut_token, token_to_option
token_data = load_token()
access_token = token_data["access_token"]
# Create KiteTicker
kws_instance = KiteTicker(API_KEY, access_token)
inst = load_instruments()
crude_fut_oct = inst["futures"]
crude_opt_oct = inst["options"]
# Pick first FUT as main underlying
fut_token = crude_fut_oct[0]["instrument_token"]
print(f"????️ Tracking FUT token: {fut_token}")
# Map option tokens -> (strike, type)
token_to_option = {}
for o in crude_opt_oct:
strike = o["strike"]
typ = o["instrument_type"] # CE or PE
token_to_option[o["instrument_token"]] = (strike, typ)
print(f"???? Loaded {len(token_to_option)} option contracts")
# Assign callbacks
kws_instance.on_ticks = on_ticks
kws_instance.on_connect = on_connect
kws_instance.on_close = on_close
kws_instance.on_error = on_error
# Start the per-minute timer in background
timer_thread = threading.Thread(target=minute_timer, daemon=True)
timer_thread.start()
# Start WebSocket (non-blocking with threaded=True)
print("???? Starting WebSocket connection...")
kws_instance.connect(threaded=True)
# Keep main thread alive and handle graceful shutdown
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n???? Shutdown requested...")
stop_timer.set()
kws_instance.close()
timer_thread.join(timeout=2)
print("✅ Clean shutdown completed")
if __name__ == "__main__":
try:
run_stream()
except KeyboardInterrupt:
print("Exiting...")
stop_timer.set()