Jaisingh Dhurvey

jsdhurvey
"""
algo.py — Excel -> Paper/Live trading with Zerodha Kite Connect
Features:
- Reads Stoploss_Calculator.xlsx (Sheet1) for inputs (capital, risk%, entry, stoploss%, symbol, qty optional)
- Computes position size from capital & risk%
- Builds targets 1:1 .. 1:7
- PAPER mode: simulate trades by manual or programmatic LTP feed
- LIVE mode: place market buy, create initial GTT (SL + target1), poll LTP, when target hit -> create next GTT with new SL
- Emergency exit function: cancel GTTs and square-off positions (LIVE) or close simulated trades (PAPER)

Requirements:
pip install pandas kiteconnect python-dotenv

Environment variables for LIVE:
- KITE_API_KEY
- KITE_API_SECRET (only for session generation)
- KITE_ACCESS_TOKEN (after generate via login flow)

Usage:
- Put Stoploss_Calculator.xlsx in same folder (Sheet1 expected layout; fallback parser is tolerant)
- Run: python algo.py --mode PAPER
- Or for live: python algo.py --mode LIVE
"""

import os
import time
import math
import argparse
import pandas as pd
from datetime import datetime

# Optional: only needed for live
try:
from kiteconnect import KiteConnect
except Exception:
KiteConnect = None

# ---------- Helpers: Excel parser ----------
def parse_sheet(path="Stoploss_Calculator.xlsx", sheet="Sheet1"):
"""
Tries to find Capital, Risk %, Entry Price, Stoploss %, Symbol, Quantity in Sheet1.
Accepts few label variants. Returns dict with values (floats/ints).
"""
df = pd.read_excel(path, sheet_name=sheet, header=None)
# flatten first two columns to search
labels = df.iloc[:,0].astype(str).str.strip().str.lower().fillna("")
vals = df.iloc[:,1] if df.shape[1] > 1 else df.iloc[:,0]
def find(keys):
for i, lab in enumerate(labels):
for k in keys:
if k in lab:
return vals.iat[i]
return None

capital = find(["capital size","capital","capital size (₹)","capital size (rs)"])
risk_pct = find(["risk %","risk%","risk percent","risk % of capital"])
entry_price = find(["entry price","entry price (₹)","entry"])
stoploss_pct = find(["stoploss %","stoploss%","stoploss percent"])
symbol = find(["symbol","tradingsymbol","scrip"])
qty = find(["quantity","qty","lotsize","lot"])

# fallback: sometimes sheet placed values in specific known rows (common earlier)
if capital is None and df.shape[0] >= 1:
try:
capital = float(df.iat[0,1])
except:
capital = None
# convert types and percent strings
def to_float(x):
if x is None or (isinstance(x, float) and pd.isna(x)):
return None
if isinstance(x, str):
s = x.strip()
if s.endswith("%"):
try:
return float(s[:-1]) / 100.0
except:
pass
try: return float(s)
except: return None
try: return float(x)
except: return None

capital = to_float(capital)
entry_price = to_float(entry_price)
stoploss_pct = to_float(stoploss_pct) if stoploss_pct is not None else None
# user might write stoploss percent as 5 for 5% -> normalize
if stoploss_pct is not None and stoploss_pct > 1:
stoploss_pct = stoploss_pct / 100.0

risk_pct = to_float(risk_pct) if risk_pct is not None else None
if risk_pct is not None and risk_pct > 1:
risk_pct = risk_pct / 100.0

qty = None
try:
if qty is not None:
qty = int(float(qty))
except:
qty = None

symbol = None if symbol is None else str(symbol).strip().upper()

return {
"capital": capital,
"risk_pct": risk_pct,
"entry_price": entry_price,
"stoploss_pct": stoploss_pct,
"symbol": symbol,
"qty": qty,
"raw": df
}

# ---------- Risk/targets ----------
def compute_trade_params(capital, risk_pct, entry_price, stoploss_pct):
"""
returns dict with stoploss_price, stoploss_distance, position_size (per capital risk),
targets list (1..7)
"""
risk_amount = capital * risk_pct
stoploss_price = entry_price * (1 - stoploss_pct) # BUY case; SELL we'll invert later
stoploss_distance = abs(entry_price - stoploss_price)
if stoploss_distance <= 0:
raise ValueError("Stoploss distance computed zero or negative.")
position_size = int(math.floor(risk_amount / stoploss_distance))
# targets for BUY
targets = [round(entry_price + stoploss_distance * i, 4) for i in range(1, 8)]
return {
"risk_amount": risk_amount,
"stoploss_price": round(stoploss_price, 4),
"stoploss_distance": stoploss_distance,
"position_size": position_size,
"targets": targets
}

# ---------- PAPER broker ----------
class PaperEngine:
def __init__(self):
self.active_trade = None
self.history = []

def place_trade(self, symbol, side, entry_price, qty, stoploss_price, targets):
trade = {
"symbol": symbol,
"side": side,
"entry": entry_price,
"qty": qty,
"stoploss": stoploss_price,
"targets": targets.copy(),
"active_target_idx": 0,
"open": True,
"events": [("ENTRY", entry_price, qty, now())]
}
self.active_trade = trade
self.history.append(trade)
print_log(f"[PAPER] Placed {side} {symbol} @ {entry_price} qty {qty}")
return trade

def feed_price(self, price):
"""
feed price updates (manual or from feed). Returns list of events triggered.
"""
events = []
t = self.active_trade
if t is None or not t["open"]:
return events
side = t["side"]
sl = t["stoploss"]
if side == "BUY":
if price <= sl:
t["open"] = False
t["events"].append(("SL_HIT", price, t["qty"], now()))
events.append(("SL", t, price))
print_log(f"[PAPER] STOPLOSS hit @ {price}. Trade closed.")
return events
# check active target
idx = t["active_target_idx"]
if idx < len(t["targets"]) and price >= t["targets"][idx]:
t["events"].append((f"TARGET_{idx+1}_HIT", t["targets"][idx], t["qty"], now()))
events.append(("TARGET", t, t["targets"][idx]))
print_log(f"[PAPER] TARGET {idx+1} hit @ {t['targets'][idx]}")
# trail SL to this target price
t["stoploss"] = t["targets"][idx]
t["active_target_idx"] += 1
if t["active_target_idx"] >= len(t["targets"]):
t["open"] = False
t["events"].append(("ALL_TARGETS_DONE", price, t["qty"], now()))
print_log("[PAPER] All targets achieved. Trade closed.")
else: # SELL side logic (mirror)
if price >= sl:
t["open"] = False
t["events"].append(("SL_HIT", price, t["qty"], now()))
events.append(("SL", t, price))
print_log(f"[PAPER] STOPLOSS hit @ {price}. Trade closed.")
return events
idx = t["active_target_idx"]
if idx < len(t["targets"]) and price <= t["targets"][idx]:
t["events"].append((f"TARGET_{idx+1}_HIT", t["targets"][idx], t["qty"], now()))
events.append(("TARGET", t, t["targets"][idx]))
print_log(f"[PAPER] TARGET {idx+1} hit @ {t['targets'][idx]}")
t["stoploss"] = t["targets"][idx]
t["active_target_idx"] += 1
if t["active_target_idx"] >= len(t["targets"]):
t["open"] = False
t["events"].append(("ALL_TARGETS_DONE", price, t["qty"], now()))
print_log("[PAPER] All targets achieved. Trade closed.")
return events

def emergency_exit(self):
if self.active_trade and self.active_trade["open"]:
self.active_trade["open"] = False
self.active_trade["events"].append(("EMERGENCY_EXIT", now()))
print_log("[PAPER] Emergency exit executed - trade closed.")

# ---------- LIVE broker (Kite) ----------
class LiveEngine:
def __init__(self, kite_client):
self.kc = kite_client
self.tracked = {} # store data keyed by symbol or a client id

def place_market_and_create_initial_gtt(self, symbol, exchange, side, qty, entry_price, stoploss_price, first_target):
"""
Places a market order (entry) and then creates a GTT OCO for Stoploss and first target.
NOTE: adapt place_gtt signature to your kiteconnect version.
"""
txn = self.kc.TRANSACTION_TYPE_BUY if side == "BUY" else self.kc.TRANSACTION_TYPE_SELL
# place market order
print_log(f"[LIVE] Placing market order {side} {symbol} qty {qty}")
try:
order_id = self.kc.place_order(
variety=self.kc.VARIETY_REGULAR,
exchange=exchange,
tradingsymbol=symbol,
transaction_type=txn,
quantity=qty,
order_type=self.kc.ORDER_TYPE_MARKET,
product=self.kc.PRODUCT_MIS
)
except Exception as e:
raise RuntimeError("Order placement failed: " + str(e))
print_log(f"[LIVE] Market order placed. order_id={order_id}")

# Place GTT OCO - example payload (may need adaptation)
try:
# For kiteconnect v3+ signature differs; check your kiteconnect docs.
trigger_vals = [stoploss_price, first_target]
orders = [
{
"exchange": exchange,
"tradingsymbol": symbol,
"transaction_type": "SELL" if side == "BUY" else "BUY",
"quantity": qty,
"order_type": "LIMIT",
"price": stoploss_price
},
{
"exchange": exchange,
"tradingsymbol": symbol,
"transaction_type": "SELL" if side == "BUY" else "BUY",
"quantity": qty,
"order_type": "LIMIT",
"price": first_target
}
]
resp = self.kc.place_gtt(self.kc.GTT_TYPE_OCO, symbol, exchange, trigger_vals, entry_price, orders)
gtt_id = resp.get("id", resp)
print_log(f"[LIVE] GTT placed (id:{gtt_id})")
except Exception as e:
print_log("[LIVE] GTT placement failed or not supported: " + str(e))
gtt_id = None

# store tracking
self.tracked[symbol] = {
"side": side,
"qty": qty,
"stoploss": stoploss_price,
"targets": None, # we will set targets externally
"active_idx": 0,
"gtt_id": gtt_id,
"exchange": exchange
}
return order_id, gtt_id

def get_ltp(self, symbol, exchange):
# Kite LTP expects "EXCHANGE:SYMBOL" key
key = f"{exchange}:{symbol}"
try:
data = self.kc.ltp([key])
# structure: {'NSE:RELIANCE': {'instrument_token':..., 'last_price':...}}
lp = list(data.values())[0]['last_price']
return float(lp)
except Exception as e:
raise RuntimeError("Failed to fetch LTP: " + str(e))

def cancel_gtt(self, gtt_id):
try:
self.kc.delete_gtt(gtt_id)
print_log(f"[LIVE] Cancelled GTT {gtt_id}")
except Exception as e:
print_log(f"[LIVE] Cancel GTT failed: {e}")

def emergency_exit_all(self):
"""
Cancel all GTTs and try to square off positions.
For square-off you may need to fetch positions and place opposing market orders.
"""
try:
# cancel all GTTs (example)
gtts = self.kc.get_gtts()
for g in gtts.get("data", []):
gid = g.get("id")
try:
self.kc.delete_gtt(gid)
print_log(f"[LIVE] Cancelled GTT {gid}")
except:
pass
# Square off positions (simple approach: fetch positions and reverse)
positions = self.kc.positions()
# positions contains 'net' list we can iterate
for pos in positions.get("net", []):
if pos.get("quantity", 0) != 0:
symbol = pos.get("tradingsymbol")
exchange = pos.get("exchange")
qty = abs(int(pos.get("quantity")))
side = "SELL" if pos.get("netquantity") > 0 else "BUY"
try:
self.kc.place_order(
variety=self.kc.VARIETY_REGULAR,
exchange=exchange,
tradingsymbol=symbol,
transaction_type=self.kc.TRANSACTION_TYPE_SELL if side=="SELL" else self.kc.TRANSACTION_TYPE_BUY,
quantity=qty,
order_type=self.kc.ORDER_TYPE_MARKET,
product=self.kc.PRODUCT_MIS
)
print_log(f"[LIVE] Squared off {symbol} qty {qty} side {side}")
except Exception as e:
print_log(f"[LIVE] Square off failed for {symbol}: {e}")
except Exception as e:
print_log("[LIVE] Emergency exit failed: " + str(e))


# ---------- Utility ----------
def now():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def print_log(msg):
print(f"[{now()}] {msg}")

# ---------- Main Runner ----------
def run(mode="PAPER", excel="Stoploss_Calculator.xlsx", poll_interval=5):
parsed = parse_sheet(excel, sheet="Sheet1")
if not parsed or parsed["capital"] is None:
raise RuntimeError("Could not parse required inputs from sheet. Ensure Sheet1 has Capital, Risk %, Entry Price, Stoploss % labels.")

capital = float(parsed["capital"])
risk_pct = float(parsed["risk_pct"])
entry = float(parsed["entry_price"])
stoploss_pct = float(parsed["stoploss_pct"])
symbol = parsed["symbol"] or input("Enter symbol (e.g. RELIANCE): ").strip().upper()
user_qty = parsed["qty"]

params = compute_trade_params(capital, risk_pct, entry, stoploss_pct)
stoploss_price = params["stoploss_price"]
qty = user_qty if user_qty and user_qty > 0 else params["position_size"]
targets = params["targets"]
if qty <= 0:
raise RuntimeError("Computed position size is zero. Increase capital or risk% or reduce SL%.")

side = "BUY" # could be parameterized; currently assumes BUY. For SELL invert targets/stoploss.

print_log(f"Mode: {mode} | Symbol: {symbol} | Entry: {entry} | SL: {stoploss_price} | Qty: {qty}")
print_log("Targets (1..7): " + ", ".join(map(str, targets)))

if mode == "PAPER":
paper = PaperEngine()
trade = paper.place_trade(symbol, side, entry, qty, stoploss_price, targets)
print_log("Feed prices (manual) or press Ctrl+C to exit. You can also modify this script to read live feed.")
try:
while trade["open"]:
# manual price input loop
s = input("Enter current price (or EXIT to emergency-exit): ").strip()
if s.upper() == "EXIT":
paper.emergency_exit()
break
try:
price = float(s)
except:
print("Invalid price")
continue
evts = paper.feed_price(price)
# print basic trade status
if trade["open"]:
print_log(f"Open. Active target idx: {trade['active_target_idx']+1 if trade['active_target_idx']<7 else 'None'} | Current SL: {trade['stoploss']}")
else:
print_log(f"Trade closed. Reason events: {trade['events'][-1]}")
break
except KeyboardInterrupt:
print_log("Interrupted by user. Exiting.")
return

# LIVE mode
if mode == "LIVE":
if KiteConnect is None:
raise RuntimeError("kiteconnect library not available. pip install kiteconnect")
# read env vars
api_key = os.getenv("KITE_API_KEY")
access_token = os.getenv("KITE_ACCESS_TOKEN")
if not api_key or not access_token:
raise RuntimeError("Set KITE_API_KEY and KITE_ACCESS_TOKEN as environment variables (see instructions).")

kc = KiteConnect(api_key=api_key)
kc.set_access_token(access_token)
live = LiveEngine(kc)

# place entry and initial GTT (first target)
try:
order_id, gtt_id = live.place_market_and_create_initial_gtt(symbol, "NSE", side, qty, entry, stoploss_price, targets[0])
except Exception as e:
print_log("Entry or GTT creation failed: " + str(e))
return

# store targets & state
live.tracked[symbol]["targets"] = targets
live.tracked[symbol]["active_idx"] = 0

print_log("Entering LTP poll loop. Press Ctrl+C to stop. Poll interval: {}s".format(poll_interval))
try:
while True:
try:
ltp = live.get_ltp(symbol, "NSE")
except Exception as e:
print_log("LTP fetch error: " + str(e))
time.sleep(poll_interval)
continue

data = live.tracked.get(symbol, None)
if data is None:
time.sleep(poll_interval)
continue
side = data["side"]
idx = data["active_idx"]
cur_target = data["targets"][idx] if idx < len(data["targets"]) else None
# BUY flow:
if side == "BUY":
# check SL
if ltp <= data["stoploss"]:
print_log(f"[LIVE] SL hit at {ltp}. Attempting square-off and cancel GTTs.")
live.emergency_exit_all()
break
# check target
if cur_target and ltp >= cur_target:
print_log(f"[LIVE] Target {idx+1} hit at {cur_target}")
# move stoploss to this target (breakeven/lock)
new_sl = cur_target
data["stoploss"] = new_sl
data["active_idx"] += 1
next_idx = data["active_idx"]
if next_idx < len(data["targets"]):
next_target = data["targets"][next_idx]
# Cancel previous GTT and place new GTT for new_sl and next_target
if data.get("gtt_id"):
try:
live.cancel_gtt(data["gtt_id"])
except:
pass
try:
resp = kc.place_gtt(kc.GTT_TYPE_OCO, symbol, "NSE", [new_sl, next_target], ltp, [
{"transaction_type":"SELL","exchange":"NSE","tradingsymbol":symbol,"quantity":qty,"order_type":"LIMIT","price":new_sl},
{"transaction_type":"SELL","exchange":"NSE","tradingsymbol":symbol,"quantity":qty,"order_type":"LIMIT","price":next_target},
])
gid = resp.get("id", resp)
data["gtt_id"] = gid
print_log(f"[LIVE] Created new GTT id {gid} with SL={new_sl}, Target={next_target}")
except Exception as e:
print_log("[LIVE] Failed to create next GTT: " + str(e))
else:
print_log("[LIVE] All targets achieved. Exiting and cancelling GTTs.")
if data.get("gtt_id"):
try: live.cancel_gtt(data["gtt_id"])
except: pass
break

else: # SELL side (mirror)
if ltp >= data["stoploss"]:
print_log(f"[LIVE] SL hit at {ltp}. Emergency exit.")
live.emergency_exit_all()
break
if cur_target and ltp <= cur_target:
print_log(f"[LIVE] Target {idx+1} hit at {cur_target}")
new_sl = cur_target
data["stoploss"] = new_sl
data["active_idx"] += 1
next_idx = data["active_idx"]
if next_idx < len(data["targets"]):
next_target = data["targets"][next_idx]
if data.get("gtt_id"):
try:
live.cancel_gtt(data["gtt_id"])
except:
pass
try:
resp = kc.place_gtt(kc.GTT_TYPE_OCO, symbol, "NSE", [new_sl, next_target], ltp, [
{"transaction_type":"BUY","exchange":"NSE","tradingsymbol":symbol,"quantity":qty,"order_type":"LIMIT","price":new_sl},
{"transaction_type":"BUY","exchange":"NSE","tradingsymbol":symbol,"quantity":qty,"order_type":"LIMIT","price":next_target},
])
gid = resp.get("id", resp)
data["gtt_id"] = gid
print_log(f"[LIVE] Created new GTT id {gid} with SL={new_sl}, Target={next_target}")
except Exception as e:
print_log("[LIVE] Failed to create next GTT: " + str(e))
else:
print_log("[LIVE] All targets achieved. Exiting and cancelling GTTs.")
if data.get("gtt_id"):
try: live.cancel_gtt(data["gtt_id"])
except: pass
break

time.sleep(poll_interval)
except KeyboardInterrupt:
print_log("Interrupted by user. Exiting loop.")
# optional: leave GTTs active, or cancel
# live.emergency_exit_all()
return

raise RuntimeError("Unknown mode")

# ---------- CLI ----------
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Kite Algo: PAPER or LIVE (Zerodha)")
parser.add_argument("--mode", choices=["PAPER","LIVE"], default="PAPER", help="PAPER or LIVE")
parser.add_argument("--excel", default="Stoploss_Calculator.xlsx", help="Excel file path")
parser.add_argument("--poll", type=int, default=5, help="Poll interval (sec) for LTP checks (LIVE)")
args = parser.parse_args()
run(mode=args.mode, excel=args.excel, poll_interval=args.poll)
Sign In or Register to comment.