Query regarding threading=false/ threading=true

mpujara
i am extracting live data from websocket and for additional tasks i created a seperate thread so that it can run besided websocket running in the main thread with threaded=false so i am confused that shall i continue with this or shall i use threaded=true to handle suchthings
Tagged:
  • Nivas
    There are two ways to pass tick data from on_tick thread to perform any operation without blocking the main on_ticks thread. You may refer to this discussion for more details.
  • mpujara
    i am aware of them , but never worked so basically the thing is that i was confused should i create a thread of my own and run websocket in the main thread only or should i run the websocket in diff background thread and my main thread run on its own, if u get my question pls do respond ):
  • mpujara
    import os
    import json
    import threading, time
    from datetime import datetime, timedelta, timezone
    from kiteconnect import KiteConnect, KiteTicker
    from dotenv import load_dotenv
    import gspread

    load_dotenv()

    API_KEY = os.getenv("KITE_API_KEY")
    TOKEN_FILE = "token_store.json"
    INSTRUMENT_FILE = "crude_instruments.json"
    creds_path = os.getenv("GOOGLE_CREDS_PATH")
    sheet_id = os.getenv("SHEET_ID")

    gc = gspread.service_account(filename=creds_path)
    sh = gc.open_by_key(sheet_id)
    ws = sh.sheet1

    # ------------------ Configuration ------------------
    STRIKE_INTERVAL = 50 # Crude oil strike interval (adjust as needed: 25, 50, 100)
    STRIKES_RANGE = 5 # Show ±5 strikes around ATM (total 11 strikes)

    # ------------------ Loaders ------------------
    def load_token():
    with open(TOKEN_FILE, "r") as f:
    return json.load(f)

    def load_instruments():
    with open(INSTRUMENT_FILE, "r") as f:
    return json.load(f)

    # ------------------ Globals ------------------
    fut_token = None
    token_to_option = {}

    # Thread-safe data storage
    data_lock = threading.Lock()
    last_known = {} # token -> last LTP
    fut_price = None

    # Control flags
    stop_timer = threading.Event()
    websocket_connected = threading.Event()

    # ------------------ Enhanced Data Storage ------------------
    previous_close = {} # For calculating percentage changes
    daily_high = {} # Track daily highs
    daily_low = {} # Track daily lows
    volume_data = {} # Track volume
    oi_data = {} # Track Open Interest (if available)

    def calculate_percentage_change(current_price, prev_close):
    """Calculate percentage change from previous close."""
    if not prev_close or prev_close == 0:
    return ""
    return round(((current_price - prev_close) / prev_close) * 100, 2)

    def format_percentage_change(pct_change):
    """Format percentage change with + or - sign."""
    if pct_change == "" or pct_change == 0:
    return ""
    return f"{'+' if pct_change > 0 else ''}{pct_change}%"

    # ------------------ Helpers ------------------
    def minute_of(dt):
    """Truncate datetime to the minute."""
    return dt.replace(second=0, microsecond=0)

    # ------------------ Per-minute finalization ------------------
    def process_minute(minute_dt):
    """Enhanced minute processing for comprehensive option chain."""
    with data_lock:
    # Create snapshot of current data
    current_snapshot = last_known.copy()
    prev_close_snapshot = previous_close.copy()

    # Get futures data
    fut_data = current_snapshot.get(fut_token, {})
    fut_close = fut_data.get('last_price') if isinstance(fut_data, dict) else fut_data

    # Build comprehensive CE/PE map
    strikes_map = {}
    for token, tick_data in current_snapshot.items():
    if token == fut_token:
    continue
    if token not in token_to_option:
    continue

    strike, typ = token_to_option[token]
    if strike not in strikes_map:
    strikes_map[strike] = {"CE": {}, "PE": {}}

    # Store comprehensive data for each option
    if isinstance(tick_data, dict):
    ltp = tick_data.get('last_price', 0)
    prev_close = prev_close_snapshot.get(token, ltp)
    pct_change = calculate_percentage_change(ltp, prev_close)

    strikes_map[strike][typ] = {
    'last_price': ltp,
    'pct_change': pct_change,
    'oi': tick_data.get('oi', 0),
    'volume': tick_data.get('volume', 0)
    }
    else:
    # Backward compatibility for simple price data
    strikes_map[strike][typ] = {'last_price': tick_data, 'pct_change': '', 'oi': 0, 'volume': 0}

    # Find ATM strike
    atm_strike = None
    if fut_close is not None and strikes_map:
    atm_strike = min(strikes_map.keys(), key=lambda s: abs(s - fut_close))

    print(f"\n???? Professional Option Chain - {minute_dt}")
    print(f"FUT: {fut_close} | ATM: {atm_strike}")
    print("=" * 70)
    print(f"{'Strike':<8} {'CE LTP':<8} {'CE %':<8} {'PE LTP':<8} {'PE %':<8}")
    print("=" * 70)

    for strike in sorted(strikes_map.keys()):
    ce_data = strikes_map[strike]["CE"]
    pe_data = strikes_map[strike]["PE"]

    ce_ltp = ce_data.get('last_price', '-') if ce_data else '-'
    pe_ltp = pe_data.get('last_price', '-') if pe_data else '-'
    ce_pct = format_percentage_change(ce_data.get('pct_change', '')) if ce_data else ''
    pe_pct = format_percentage_change(pe_data.get('pct_change', '')) if pe_data else ''

    atm_marker = " <- ATM" if strike == atm_strike else ""
    print(f"{strike:<8} {ce_ltp:<8} {ce_pct:<8} {pe_ltp:<8} {pe_pct:<8}{atm_marker}")

    # Update sheet in background thread
    threading.Thread(
    target=update_sheet_snapshot,
    args=(minute_dt, fut_close, atm_strike, strikes_map),
    daemon=True
    ).start()

    def update_sheet_snapshot(minute_dt, fut_close, atm_strike, strikes_map):
    try:
    if ws is None or atm_strike is None:
    return

    # Clear the sheet first
    ws.clear()

    # 1. Add FUT info at top (like trading platforms)
    fut_info = [
    [f"CRUDE OIL FUTURES - {minute_dt.strftime('%d %b %Y')}", "", "", ""],
    [f"LTP: {fut_close}", f"Time: {minute_dt.strftime('%H:%M:%S')}", "", ""],
    ["", "", "", ""], # Empty row for spacing
    ]
    ws.update('A1:D3', fut_info)

    # 2. Determine strike range (ATM ± 5 strikes)
    strikes_to_show = []
    for i in range(-STRIKES_RANGE, STRIKES_RANGE + 1):
    strike = atm_strike + (i * STRIKE_INTERVAL)
    strikes_to_show.append(strike)

    # 3. Prepare header (simple format like your request)
    header = [["Strike", "CE", "PE", "Sum"]]
    ws.update('A4:D4', header)

    # 4. Prepare data with professional option chain format
    values = []
    atm_row_index = None

    for idx, strike in enumerate(strikes_to_show):
    ce_data = strikes_map.get(strike, {}).get("CE", {})
    pe_data = strikes_map.get(strike, {}).get("PE", {})

    # Extract prices (handle both dict and simple number formats)
    if isinstance(ce_data, dict):
    ce_ltp = ce_data.get('last_price', '')
    else:
    ce_ltp = ce_data if ce_data else ''

    if isinstance(pe_data, dict):
    pe_ltp = pe_data.get('last_price', '')
    else:
    pe_ltp = pe_data if pe_data else ''

    # Calculate sum
    sum_value = ""
    if (ce_ltp and pe_ltp and
    isinstance(ce_ltp, (int, float)) and isinstance(pe_ltp, (int, float))):
    sum_value = round(ce_ltp + pe_ltp, 2)

    row_data = [strike, ce_ltp, pe_ltp, sum_value]
    values.append(row_data)

    # Track ATM row for highlighting
    if strike == atm_strike:
    atm_row_index = idx

    # 5. Update data
    if values:
    end_row = 4 + len(values)
    ws.update(f'A5:D{end_row}', values)

    # 6. Apply professional formatting with simple layout
    apply_professional_simple_formatting(atm_row_index, len(values))

    print("✅ Professional option chain updated successfully")

    except Exception as e:
    print("⚠️ Error writing to sheet:", e)

    def apply_professional_simple_formatting(atm_row_index, total_rows):
    """formatting the sheet here ")

    # ------------------ Dynamic ATM Updates ------------------
    def check_atm_change():
    """Check if ATM has changed and update subscriptions if needed."""
    global current_atm_strike, kws_instance

    with data_lock:
    current_fut_price = last_known.get(fut_token)
    if isinstance(current_fut_price, dict):
    current_fut_price = current_fut_price.get('last_price')

    if current_fut_price is None:
    return

    # Calculate new ATM (adjust strike interval for crude)
    new_atm = round(current_fut_price / STRIKE_INTERVAL) * STRIKE_INTERVAL

    if hasattr(check_atm_change, 'last_atm') and check_atm_change.last_atm == new_atm:
    return

    check_atm_change.last_atm = new_atm
    print(f"???? ATM changed to {new_atm} (FUT: {current_fut_price})")

    # Here you could add logic to subscribe to new ATM options
    # and unsubscribe from far OTM options if needed

    # ------------------ Enhanced Timer thread ------------------
    def minute_timer():
    """Run every minute boundary and finalize the previous minute."""
    print("⏰ Minute timer started, waiting for WebSocket connection...")

    # Wait for WebSocket to connect
    websocket_connected.wait()
    print("✅ WebSocket connected, starting minute processing...")

    while not stop_timer.is_set():
    now = datetime.now(timezone.utc)
    next_minute = minute_of(now) + timedelta(minutes=1)
    sleep_seconds = (next_minute - now).total_seconds()

    # Sleep in small chunks to allow for clean shutdown
    while sleep_seconds > 0 and not stop_timer.is_set():
    chunk = min(sleep_seconds, 1.0)
    time.sleep(chunk)
    sleep_seconds -= chunk

    if stop_timer.is_set():
    break

    finished_minute = next_minute - timedelta(minutes=1)
    process_minute(finished_minute)

    # Check for ATM changes every minute
    check_atm_change()

    # ------------------ WebSocket Callbacks ------------------
    def on_ticks(ws_obj, ticks):
    """Enhanced tick processing for comprehensive option chain data."""
    global fut_price

    with data_lock:
    for t in ticks:
    token = t.get("instrument_token")
    ltp = t.get("last_price")
    if ltp is None:
    continue

    # Store comprehensive tick data
    tick_data = {
    'last_price': ltp,
    'volume': t.get('volume', 0),
    'oi': t.get('oi', 0), # Open Interest if available
    'high': t.get('ohlc', {}).get('high', ltp),
    'low': t.get('ohlc', {}).get('low', ltp),
    'close': t.get('ohlc', {}).get('close', ltp),
    'timestamp': datetime.now()
    }

    # Keep comprehensive data
    last_known[token] = tick_data

    # Update daily tracking
    if token not in daily_high or ltp > daily_high[token]:
    daily_high[token] = ltp
    if token not in daily_low or ltp < daily_low[token]:
    daily_low[token] = ltp

    # Store previous close for percentage calculations
    if token not in previous_close:
    previous_close[token] = tick_data.get('close', ltp)

    if token == fut_token:
    fut_price = ltp

    def on_connect(ws_obj, response):
    print("✅ WebSocket Connected. Subscribing to tokens...")
    all_tokens = [fut_token] + list(token_to_option.keys())
    ws_obj.subscribe(all_tokens)
    ws_obj.set_mode(ws_obj.MODE_FULL, all_tokens)
    print(f"???? Subscribed to {len(all_tokens)} instruments")

    # Signal that WebSocket is ready
    websocket_connected.set()

    def on_close(ws_obj, code, reason):
    print(f"❌ WebSocket Closed: {code} - {reason}")
    websocket_connected.clear()

    def on_error(ws_obj, code, reason):
    print(f"⚠️ WebSocket Error: {code} - {reason}")

    # ------------------ Main Runner ------------------
    kws_instance = None

    def run_stream():
    global kws_instance, fut_token, token_to_option

    token_data = load_token()
    access_token = token_data["access_token"]

    # Create KiteTicker
    kws_instance = KiteTicker(API_KEY, access_token)
    inst = load_instruments()
    crude_fut_oct = inst["futures"]
    crude_opt_oct = inst["options"]

    # Pick first FUT as main underlying
    fut_token = crude_fut_oct[0]["instrument_token"]
    print(f"????️ Tracking FUT token: {fut_token}")

    # Map option tokens -> (strike, type)
    token_to_option = {}
    for o in crude_opt_oct:
    strike = o["strike"]
    typ = o["instrument_type"] # CE or PE
    token_to_option[o["instrument_token"]] = (strike, typ)

    print(f"???? Loaded {len(token_to_option)} option contracts")

    # Assign callbacks
    kws_instance.on_ticks = on_ticks
    kws_instance.on_connect = on_connect
    kws_instance.on_close = on_close
    kws_instance.on_error = on_error

    # Start the per-minute timer in background
    timer_thread = threading.Thread(target=minute_timer, daemon=True)
    timer_thread.start()

    # Start WebSocket (non-blocking with threaded=True)
    print("???? Starting WebSocket connection...")
    kws_instance.connect(threaded=True)

    # Keep main thread alive and handle graceful shutdown
    try:
    while True:
    time.sleep(1)
    except KeyboardInterrupt:
    print("\n???? Shutdown requested...")
    stop_timer.set()
    kws_instance.close()
    timer_thread.join(timeout=2)
    print("✅ Clean shutdown completed")

    if __name__ == "__main__":
    try:
    run_stream()
    except KeyboardInterrupt:
    print("Exiting...")
    stop_timer.set()
  • Nivas
    should i create a thread of my own and run websocket in the main thread only or should i run the websocket in diff background thread and my main thread run on its own
    You can run the WebSocket either in the main thread or in a background (daemon) thread, and the choice really depends on how your program is structured and what you want the main thread to do.
    • `threaded=False` → The WebSocket runs in the current thread (usually your main thread). This is simple and works well if your program doesn’t need the main thread for other tasks.
    • `threaded=True` → The WebSocket runs in a separate background thread. This is useful if you want your main thread free to handle other logic while the WebSocket keeps running in parallel.
  • mpujara
    @Nivas actually i know this , but i was confused what should i use as in either of ways i was creating my own thread at the end only for the other task ..my need was only to fetch live data and like update the google sheet per minute
  • mpujara
    basically the difference lies in where the on_tick is running , nothing else ? as either ways i am creating threads of my own because of my requirement there is like nothing ui or sth which is like running in the main thread the main thread code also required threads
  • Nivas
    Kite Connect is purely an execution platform. I am afraid we can't provide support for writing code. You may refer to our GitHub repo to know more details.
Sign In or Register to comment.