//! Live trading bot using Alpaca API. use anyhow::Result; use chrono::{Datelike, Duration, NaiveDate, Utc}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use tokio::time::{sleep, Duration as TokioDuration}; use crate::alpaca::AlpacaClient; use crate::config::{ get_all_symbols, get_sector, Timeframe, ATR_STOP_MULTIPLIER, ATR_TRAIL_ACTIVATION_MULTIPLIER, ATR_TRAIL_MULTIPLIER, BOT_CHECK_INTERVAL_SECONDS, DRAWDOWN_HALT_BARS, HOURS_PER_DAY, MAX_CONCURRENT_POSITIONS, MAX_DRAWDOWN_HALT, MAX_POSITION_SIZE, MAX_SECTOR_POSITIONS, MIN_CASH_RESERVE, RAMPUP_PERIOD_BARS, REENTRY_COOLDOWN_BARS, TOP_MOMENTUM_COUNT, }; use crate::indicators::{calculate_all_indicators, generate_signal}; use crate::paths::{ LIVE_DAY_TRADES_FILE, LIVE_ENTRY_ATRS_FILE, LIVE_EQUITY_FILE, LIVE_HIGH_WATER_MARKS_FILE, LIVE_POSITIONS_FILE, LIVE_POSITION_META_FILE, }; use crate::strategy::Strategy; use crate::types::{EquitySnapshot, PositionInfo, Signal, TradeSignal}; /// Per-position metadata persisted to disk. #[derive(Debug, Clone, Serialize, Deserialize)] struct PositionMeta { bars_held: usize, /// Date (YYYY-MM-DD) when this position was opened, for PDT tracking. #[serde(default)] entry_date: Option, } /// PDT (Pattern Day Trading) constants. const PDT_MAX_DAY_TRADES: usize = 3; const PDT_ROLLING_BUSINESS_DAYS: i64 = 5; /// Live trading bot for paper trading. pub struct TradingBot { client: AlpacaClient, strategy: Strategy, timeframe: Timeframe, position_meta: HashMap, equity_history: Vec, peak_portfolio_value: f64, drawdown_halt: bool, /// Cycle count when drawdown halt started (for time-based resume) drawdown_halt_start: Option, /// Current trading cycle count trading_cycle_count: usize, /// Tracks when each symbol can be re-entered after stop-loss (cycle index) cooldown_timers: HashMap, /// Tracks new positions opened in current cycle (for gradual ramp-up) new_positions_this_cycle: usize, /// Rolling list of day trade dates for PDT tracking. day_trades: Vec, /// Current portfolio value (updated each cycle), used for PDT exemption check. current_portfolio_value: f64, } impl TradingBot { /// Create a new trading bot. pub async fn new( api_key: String, api_secret: String, timeframe: Timeframe, ) -> Result { let client = AlpacaClient::new(api_key, api_secret)?; let mut bot = Self { client, strategy: Strategy::new(timeframe), timeframe, position_meta: HashMap::new(), equity_history: Vec::new(), peak_portfolio_value: 0.0, drawdown_halt: false, drawdown_halt_start: None, trading_cycle_count: 0, cooldown_timers: HashMap::new(), new_positions_this_cycle: 0, day_trades: Vec::new(), current_portfolio_value: 0.0, }; // Load persisted state bot.load_entry_prices(); bot.load_high_water_marks(); bot.load_entry_atrs(); bot.load_position_meta(); bot.load_cooldown_timers(); bot.load_day_trades(); bot.load_equity_history(); // Log account info bot.log_account_info().await; tracing::info!("Trading bot initialized successfully (Paper Trading Mode)"); Ok(bot) } // ── Persistence helpers ────────────────────────────────────────── fn load_json_map( path: &std::path::Path, label: &str, ) -> HashMap { if path.exists() { match std::fs::read_to_string(path) { Ok(content) if !content.is_empty() => { match serde_json::from_str(&content) { Ok(map) => return map, Err(e) => tracing::error!("Error parsing {} file: {}", label, e), } } Ok(_) => {} // Empty file is valid, return empty map Err(e) => tracing::error!("Error loading {} file: {}", label, e), } } HashMap::new() } fn save_json_map(map: &HashMap, path: &std::path::Path, label: &str) { match serde_json::to_string_pretty(map) { Ok(json) => { if let Err(e) = std::fs::write(path, json) { tracing::error!("Error saving {} file: {}", label, e); } } Err(e) => tracing::error!("Error serializing {}: {}", label, e), } } fn load_entry_prices(&mut self) { self.strategy.entry_prices = Self::load_json_map(&LIVE_POSITIONS_FILE, "positions"); if !self.strategy.entry_prices.is_empty() { tracing::info!("Loaded entry prices for {} positions.", self.strategy.entry_prices.len()); } } fn save_entry_prices(&self) { Self::save_json_map(&self.strategy.entry_prices, &LIVE_POSITIONS_FILE, "positions"); } fn load_high_water_marks(&mut self) { self.strategy.high_water_marks = Self::load_json_map(&LIVE_HIGH_WATER_MARKS_FILE, "high water marks"); if !self.strategy.high_water_marks.is_empty() { tracing::info!("Loaded high water marks for {} positions.", self.strategy.high_water_marks.len()); } } fn save_high_water_marks(&self) { Self::save_json_map(&self.strategy.high_water_marks, &LIVE_HIGH_WATER_MARKS_FILE, "high water marks"); } fn load_entry_atrs(&mut self) { self.strategy.entry_atrs = Self::load_json_map(&LIVE_ENTRY_ATRS_FILE, "entry ATRs"); if !self.strategy.entry_atrs.is_empty() { tracing::info!("Loaded entry ATRs for {} positions.", self.strategy.entry_atrs.len()); } } fn save_entry_atrs(&self) { Self::save_json_map(&self.strategy.entry_atrs, &LIVE_ENTRY_ATRS_FILE, "entry ATRs"); } fn load_position_meta(&mut self) { self.position_meta = Self::load_json_map(&LIVE_POSITION_META_FILE, "position meta"); if !self.position_meta.is_empty() { tracing::info!("Loaded position meta for {} positions.", self.position_meta.len()); } } fn save_position_meta(&self) { Self::save_json_map(&self.position_meta, &LIVE_POSITION_META_FILE, "position meta"); } fn load_cooldown_timers(&mut self) { if let Ok(path_str) = std::env::var("HOME") { let path = std::path::PathBuf::from(path_str) .join(".local/share/invest-bot/cooldown_timers.json"); self.cooldown_timers = Self::load_json_map(&path, "cooldown timers"); if !self.cooldown_timers.is_empty() { tracing::info!("Loaded cooldown timers for {} symbols.", self.cooldown_timers.len()); } } } fn save_cooldown_timers(&self) { if let Ok(path_str) = std::env::var("HOME") { let path = std::path::PathBuf::from(path_str) .join(".local/share/invest-bot/cooldown_timers.json"); Self::save_json_map(&self.cooldown_timers, &path, "cooldown timers"); } } // ── PDT (Pattern Day Trading) protection ─────────────────────── fn load_day_trades(&mut self) { if LIVE_DAY_TRADES_FILE.exists() { match std::fs::read_to_string(&*LIVE_DAY_TRADES_FILE) { Ok(content) if !content.is_empty() => { match serde_json::from_str::>(&content) { Ok(date_strings) => { self.day_trades = date_strings .iter() .filter_map(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok()) .collect(); self.prune_old_day_trades(); if !self.day_trades.is_empty() { tracing::info!( "Loaded {} day trades in rolling window.", self.day_trades.len() ); } } Err(e) => tracing::error!("Error parsing day trades file: {}", e), } } _ => {} } } } fn save_day_trades(&self) { let date_strings: Vec = self .day_trades .iter() .map(|d| d.format("%Y-%m-%d").to_string()) .collect(); match serde_json::to_string_pretty(&date_strings) { Ok(json) => { if let Err(e) = std::fs::write(&*LIVE_DAY_TRADES_FILE, json) { tracing::error!("Error saving day trades file: {}", e); } } Err(e) => tracing::error!("Error serializing day trades: {}", e), } } /// Remove day trades older than the 5-business-day rolling window. fn prune_old_day_trades(&mut self) { let cutoff = Self::business_days_before(Utc::now().date_naive(), PDT_ROLLING_BUSINESS_DAYS); self.day_trades.retain(|&d| d >= cutoff); } /// Get the date N business days before the given date. fn business_days_before(from: NaiveDate, n: i64) -> NaiveDate { let mut count = 0i64; let mut date = from; while count < n { date -= Duration::days(1); let wd = date.weekday(); if wd != chrono::Weekday::Sat && wd != chrono::Weekday::Sun { count += 1; } } date } /// Count how many day trades have occurred in the rolling 5-business-day window. fn day_trades_in_window(&self) -> usize { let cutoff = Self::business_days_before(Utc::now().date_naive(), PDT_ROLLING_BUSINESS_DAYS); self.day_trades.iter().filter(|&&d| d >= cutoff).count() } /// Check if selling this symbol today would be a day trade (bought today). fn would_be_day_trade(&self, symbol: &str) -> bool { let today = Utc::now().date_naive().format("%Y-%m-%d").to_string(); self.position_meta .get(symbol) .and_then(|m| m.entry_date.as_ref()) .map(|d| d == &today) .unwrap_or(false) } /// Check if a day trade is allowed (under PDT limit). /// PDT rule only applies to accounts under $25,000. fn can_day_trade(&self) -> bool { if self.current_portfolio_value >= 25_000.0 { return true; } self.day_trades_in_window() < PDT_MAX_DAY_TRADES } /// Record a day trade. fn record_day_trade(&mut self) { self.day_trades.push(Utc::now().date_naive()); self.save_day_trades(); } fn load_equity_history(&mut self) { if LIVE_EQUITY_FILE.exists() { match std::fs::read_to_string(&*LIVE_EQUITY_FILE) { Ok(content) => { if !content.is_empty() { match serde_json::from_str::>(&content) { Ok(history) => { tracing::info!("Loaded {} equity data points.", history.len()); // Restore peak from history self.peak_portfolio_value = history .iter() .map(|s| s.portfolio_value) .fold(0.0_f64, f64::max); self.equity_history = history; } Err(e) => tracing::error!("Error parsing equity history: {}", e), } } } Err(e) => tracing::error!("Error loading equity history: {}", e), } } } /// Save equity snapshot. async fn save_equity_snapshot(&mut self) -> Result<()> { let account = self.client.get_account().await?; let positions = self.client.get_positions().await?; let mut positions_map = HashMap::new(); for pos in &positions { positions_map.insert( pos.symbol.clone(), PositionInfo { qty: pos.qty.parse().unwrap_or(0.0), market_value: pos.market_value.parse().unwrap_or(0.0), avg_entry_price: pos.avg_entry_price.parse().unwrap_or(0.0), current_price: pos.current_price.parse().unwrap_or(0.0), unrealized_pnl: pos.unrealized_pl.parse().unwrap_or(0.0), pnl_pct: pos.unrealized_plpc.parse::().unwrap_or(0.0) * 100.0, change_today: pos.change_today.as_ref().and_then(|s| s.parse::().ok()).unwrap_or(0.0) * 100.0, }, ); } let portfolio_value = account.portfolio_value.parse().unwrap_or(0.0); // Update peak and drawdown halt status if portfolio_value > self.peak_portfolio_value { self.peak_portfolio_value = portfolio_value; } let drawdown_pct = if self.peak_portfolio_value > 0.0 { (self.peak_portfolio_value - portfolio_value) / self.peak_portfolio_value } else { 0.0 }; // Trigger halt if drawdown exceeds threshold if drawdown_pct >= MAX_DRAWDOWN_HALT && !self.drawdown_halt { tracing::warn!( "DRAWDOWN CIRCUIT BREAKER: {:.2}% drawdown exceeds {:.0}% limit. Halting for {} cycles.", drawdown_pct * 100.0, MAX_DRAWDOWN_HALT * 100.0, DRAWDOWN_HALT_BARS ); self.drawdown_halt = true; self.drawdown_halt_start = Some(self.trading_cycle_count); } // Auto-resume after time-based cooldown if self.drawdown_halt { if let Some(halt_start) = self.drawdown_halt_start { if self.trading_cycle_count >= halt_start + DRAWDOWN_HALT_BARS { tracing::info!( "Drawdown halt expired after {} cycles. Resuming trading. \ Peak reset from ${:.2} to ${:.2} (was {:.2}% drawdown).", DRAWDOWN_HALT_BARS, self.peak_portfolio_value, portfolio_value, drawdown_pct * 100.0 ); self.drawdown_halt = false; self.drawdown_halt_start = None; // Reset peak to current value to prevent cascading re-triggers. self.peak_portfolio_value = portfolio_value; } } } let snapshot = EquitySnapshot { timestamp: Utc::now().to_rfc3339(), portfolio_value, cash: account.cash.parse().unwrap_or(0.0), buying_power: account.buying_power.parse().unwrap_or(0.0), positions_count: positions.len(), positions: positions_map, }; self.equity_history.push(snapshot.clone()); // Keep last 7 trading days of equity data const SNAPSHOTS_PER_MINUTE: usize = 4; const MINUTES_PER_HOUR: usize = 60; const DAYS_TO_KEEP: usize = 7; const MAX_SNAPSHOTS: usize = DAYS_TO_KEEP * HOURS_PER_DAY * MINUTES_PER_HOUR * SNAPSHOTS_PER_MINUTE; if self.equity_history.len() > MAX_SNAPSHOTS { let start = self.equity_history.len() - MAX_SNAPSHOTS; self.equity_history = self.equity_history[start..].to_vec(); } // Save to file match serde_json::to_string_pretty(&self.equity_history) { Ok(json) => { if let Err(e) = std::fs::write(&*LIVE_EQUITY_FILE, json) { tracing::error!("Error saving equity history: {}", e); } } Err(e) => tracing::error!("Error serializing equity history: {}", e), } tracing::info!("Saved equity snapshot: ${:.2}", snapshot.portfolio_value); Ok(()) } // ── Account helpers ────────────────────────────────────────────── async fn log_account_info(&mut self) { match self.client.get_account().await { Ok(account) => { let portfolio_value: f64 = account.portfolio_value.parse().unwrap_or(0.0); let buying_power: f64 = account.buying_power.parse().unwrap_or(0.0); let cash: f64 = account.cash.parse().unwrap_or(0.0); self.current_portfolio_value = portfolio_value; tracing::info!("Account Status: {}", account.status); tracing::info!("Buying Power: ${:.2}", buying_power); tracing::info!("Portfolio Value: ${:.2}", portfolio_value); tracing::info!("Cash: ${:.2}", cash); } Err(e) => tracing::error!("Failed to get account info: {}", e), } } async fn get_position(&self, symbol: &str) -> Option { match self.client.get_position(symbol).await { Ok(Some(pos)) => pos.qty.parse().ok(), Ok(None) => None, Err(e) => { tracing::error!("Failed to get position for {}: {}", symbol, e); None } } } // ── Volatility-adjusted position sizing ────────────────────────── async fn calculate_position_size(&self, signal: &TradeSignal) -> u64 { let account = match self.client.get_account().await { Ok(a) => a, Err(e) => { tracing::error!("Failed to get account: {}", e); return 0; } }; let portfolio_value: f64 = account.portfolio_value.parse().unwrap_or(0.0); let cash: f64 = account.cash.parse().unwrap_or(0.0); let available_funds = cash - (portfolio_value * MIN_CASH_RESERVE); self.strategy.calculate_position_size( signal.current_price, portfolio_value, available_funds, signal, ) } // ── ATR-based stop/trailing logic ──────────────────────────────── fn check_stop_loss_take_profit( &mut self, symbol: &str, current_price: f64, ) -> Option { let bars_held = self.position_meta.get(symbol).map_or(0, |m| m.bars_held); let signal = self .strategy .check_stop_loss_take_profit(symbol, current_price, bars_held); if self.strategy.high_water_marks.contains_key(symbol) { self.save_high_water_marks(); } signal } // ── Sector concentration check ─────────────────────────────────── fn sector_position_count(&self, sector: &str) -> usize { self.strategy .sector_position_count(sector, self.strategy.entry_prices.keys()) } // ── Order execution ────────────────────────────────────────────── async fn execute_buy(&mut self, symbol: &str, signal: &TradeSignal) -> bool { // Check if already holding if let Some(qty) = self.get_position(symbol).await { if qty > 0.0 { tracing::info!("{}: Already holding {} shares, skipping buy", symbol, qty); return false; } } // Cooldown guard: prevent whipsaw re-entry after stop-loss if let Some(&cooldown_until) = self.cooldown_timers.get(symbol) { if self.trading_cycle_count < cooldown_until { tracing::info!( "{}: In cooldown period until cycle {} (currently {})", symbol, cooldown_until, self.trading_cycle_count ); return false; } } // Portfolio-level guards if self.drawdown_halt { tracing::info!("{}: Skipping buy — drawdown circuit breaker active", symbol); return false; } if self.strategy.entry_prices.len() >= MAX_CONCURRENT_POSITIONS { tracing::info!( "{}: Skipping buy — at max {} concurrent positions", symbol, MAX_CONCURRENT_POSITIONS ); return false; } let sector = get_sector(symbol); if self.sector_position_count(sector) >= MAX_SECTOR_POSITIONS { tracing::info!( "{}: Skipping buy — sector '{}' at max {} positions", symbol, sector, MAX_SECTOR_POSITIONS ); return false; } // Gradual ramp-up: limit new positions during initial period if self.trading_cycle_count < RAMPUP_PERIOD_BARS && self.new_positions_this_cycle >= 1 { tracing::info!( "{}: Ramp-up period (cycle {}/{}) — already opened 1 position this cycle", symbol, self.trading_cycle_count, RAMPUP_PERIOD_BARS ); return false; } let shares = self.calculate_position_size(signal).await; if shares == 0 { tracing::info!("{}: Insufficient funds for purchase", symbol); return false; } match self .client .submit_market_order(symbol, shares as f64, "buy") .await { Ok(order) => { // Use filled price if available, otherwise signal price let fill_price = order .filled_avg_price .as_ref() .and_then(|s| s.parse::().ok()) .unwrap_or(signal.current_price); self.strategy.entry_prices.insert(symbol.to_string(), fill_price); self.strategy.entry_atrs.insert(symbol.to_string(), signal.atr); self.strategy.high_water_marks.insert(symbol.to_string(), fill_price); self.position_meta.insert( symbol.to_string(), PositionMeta { bars_held: 0, entry_date: Some(Utc::now().format("%Y-%m-%d").to_string()), }, ); self.save_entry_prices(); self.save_entry_atrs(); self.save_high_water_marks(); self.save_position_meta(); self.new_positions_this_cycle += 1; tracing::info!( "BUY ORDER EXECUTED: {} - {} shares @ ~${:.2} \n (RSI: {:.1}, MACD: {:.3}, ATR: ${:.2}, Confidence: {:.2})", symbol, shares, fill_price, signal.rsi, signal.macd_histogram, signal.atr, signal.confidence ); true } Err(e) => { tracing::error!("Failed to execute buy for {}: {}", symbol, e); false } } } async fn execute_sell(&mut self, symbol: &str, signal: &TradeSignal, was_stop_loss: bool) -> bool { let current_position = match self.get_position(symbol).await { Some(qty) if qty > 0.0 => qty, _ => { tracing::info!("{}: No position to sell", symbol); return false; } }; // PDT protection: if selling today would create a day trade, check the limit. // EXCEPTION: stop-loss exits are NEVER blocked -- risk management takes priority // over PDT compliance. The correct defense against PDT violations is to prevent // entries that would need same-day exits, not to trap capital in losing positions. let is_day_trade = self.would_be_day_trade(symbol); if is_day_trade && !was_stop_loss && !self.can_day_trade() { let count = self.day_trades_in_window(); tracing::warn!( "{}: SKIPPING SELL — would trigger PDT violation ({}/{} day trades in rolling 5-day window). \ Position opened today, will sell tomorrow.", symbol, count, PDT_MAX_DAY_TRADES ); return false; } match self .client .submit_market_order(symbol, current_position, "sell") .await { Ok(_order) => { // Record the day trade if applicable if is_day_trade { self.record_day_trade(); tracing::info!( "{}: Day trade recorded ({}/{} in rolling window)", symbol, self.day_trades_in_window(), PDT_MAX_DAY_TRADES ); } if let Some(entry) = self.strategy.entry_prices.remove(symbol) { let pnl_pct = (signal.current_price - entry) / entry; tracing::info!("{}: Realized P&L: {:.2}%", symbol, pnl_pct * 100.0); self.save_entry_prices(); } self.strategy.high_water_marks.remove(symbol); self.strategy.entry_atrs.remove(symbol); self.position_meta.remove(symbol); self.save_high_water_marks(); self.save_entry_atrs(); self.save_position_meta(); // Record cooldown if this was a stop-loss exit if was_stop_loss { self.cooldown_timers.insert( symbol.to_string(), self.trading_cycle_count + REENTRY_COOLDOWN_BARS, ); self.save_cooldown_timers(); tracing::info!( "{}: Stop-loss exit — cooldown until cycle {}", symbol, self.trading_cycle_count + REENTRY_COOLDOWN_BARS ); } tracing::info!( "SELL ORDER EXECUTED: {} - {} shares @ ~${:.2} \n (RSI: {:.1}, MACD: {:.3})", symbol, current_position, signal.current_price, signal.rsi, signal.macd_histogram ); true } Err(e) => { tracing::error!("Failed to execute sell for {}: {}", symbol, e); false } } } // Partial exits removed: they systematically halve winning trade size // while losing trades remain at full size, creating unfavorable avg win/loss ratio. // ── Analysis ───────────────────────────────────────────────────── async fn analyze_symbol(&self, symbol: &str) -> Option { let min_bars = self.strategy.params.min_bars(); let days = if self.timeframe == Timeframe::Hourly { (min_bars as f64 / HOURS_PER_DAY as f64 * 1.5) as i64 + 10 } else { (min_bars as f64 * 1.5) as i64 + 30 }; let end = Utc::now(); let start = end - Duration::days(days); let bars = match self .client .get_historical_bars(symbol, self.timeframe, start, end) .await { Ok(b) => b, Err(e) => { tracing::warn!("{}: Failed to get historical data: {}", symbol, e); return None; } }; if bars.len() < min_bars { tracing::warn!( "{}: Only {} bars, need {} for indicators", symbol, bars.len(), min_bars ); return None; } let indicators = calculate_all_indicators(&bars, &self.strategy.params); if indicators.len() < 2 { return None; } let current = &indicators[indicators.len() - 1]; let previous = &indicators[indicators.len() - 2]; if current.rsi.is_nan() || current.macd.is_nan() { return None; } Some(generate_signal(symbol, current, previous)) } // ── Trading cycle ──────────────────────────────────────────────── async fn run_trading_cycle(&mut self) { self.trading_cycle_count += 1; self.new_positions_this_cycle = 0; // Reset counter for each cycle self.prune_old_day_trades(); tracing::info!("{}", "=".repeat(60)); tracing::info!("Starting trading cycle #{}...", self.trading_cycle_count); self.log_account_info().await; if self.current_portfolio_value >= 25_000.0 { tracing::info!("PDT status: EXEMPT (portfolio ${:.2} >= $25,000)", self.current_portfolio_value); } else { tracing::info!( "PDT status: {}/{} day trades in rolling 5-business-day window (portfolio ${:.2} < $25,000)", self.day_trades_in_window(), PDT_MAX_DAY_TRADES, self.current_portfolio_value, ); } // Increment bars_held once per trading cycle (matches backtester's per-bar increment) for meta in self.position_meta.values_mut() { meta.bars_held += 1; } let symbols = get_all_symbols(); // Analyze all symbols first let mut signals: Vec = Vec::new(); for symbol in &symbols { tracing::info!("\nAnalyzing {}...", symbol); let signal = match self.analyze_symbol(symbol).await { Some(s) => s, None => { tracing::warn!("{}: Analysis failed, skipping", symbol); continue; } }; tracing::info!( "{}: Signal={}, RSI={:.1}, MACD Hist={:.3}, Momentum={:.2}%, \n ATR=${:.2}, Price=${:.2}, Confidence={:.2}", signal.symbol, signal.signal.as_str(), signal.rsi, signal.macd_histogram, signal.momentum, signal.atr, signal.current_price, signal.confidence ); signals.push(signal); // Small delay between symbols for rate limiting sleep(TokioDuration::from_millis(500)).await; } // Phase 1: Process all sells (stop-loss, trailing stop, time exit, signals) for signal in &signals { let mut effective_signal = signal.clone(); // Check stop-loss/take-profit/trailing stop/time exit if let Some(sl_tp) = self.check_stop_loss_take_profit(&signal.symbol, signal.current_price) { effective_signal.signal = sl_tp; } if effective_signal.signal.is_sell() { let was_stop_loss = matches!(effective_signal.signal, Signal::StrongSell); self.execute_sell(&signal.symbol, &effective_signal, was_stop_loss).await; } } // Phase 2: Momentum ranking — only buy top N momentum stocks let mut ranked_signals: Vec<&TradeSignal> = signals .iter() .filter(|s| !s.momentum.is_nan()) .collect(); ranked_signals.sort_by(|a, b| { b.momentum .partial_cmp(&a.momentum) .unwrap_or(std::cmp::Ordering::Equal) }); let top_momentum_symbols: std::collections::HashSet = ranked_signals .iter() .take(TOP_MOMENTUM_COUNT) .map(|s| s.symbol.clone()) .collect(); tracing::info!( "Top {} momentum stocks: {:?}", TOP_MOMENTUM_COUNT, top_momentum_symbols ); // Phase 3: Process buys in momentum-ranked order (highest momentum first) for signal in &ranked_signals { if !top_momentum_symbols.contains(&signal.symbol) { continue; } if signal.signal.is_buy() { self.execute_buy(&signal.symbol, signal).await; } } // Save equity snapshot and persist metadata self.save_position_meta(); if let Err(e) = self.save_equity_snapshot().await { tracing::error!("Failed to save equity snapshot: {}", e); } tracing::info!("Trading cycle complete"); tracing::info!("{}", "=".repeat(60)); } /// Main bot loop — runs continuously during market hours. pub async fn run(&mut self) -> Result<()> { let symbols = get_all_symbols(); tracing::info!("{}", "=".repeat(60)); tracing::info!("TECH GIANTS TRADING BOT STARTED"); tracing::info!("Timeframe: {:?} bars", self.timeframe); if self.timeframe == Timeframe::Hourly { tracing::info!( "Parameters scaled {}x (RSI: {}, EMA_TREND: {})", HOURS_PER_DAY, self.strategy.params.rsi_period, self.strategy.params.ema_trend ); } tracing::info!("Symbols: {}", symbols.join(", ")); tracing::info!( "Strategy: RSI({}) + MACD({},{},{}) + Momentum({})", self.strategy.params.rsi_period, self.strategy.params.macd_fast, self.strategy.params.macd_slow, self.strategy.params.macd_signal, self.strategy.params.momentum_period ); tracing::info!( "Risk: ATR stops ({}x), trailing ({}x after {}x gain), max {}% position, {} max positions", ATR_STOP_MULTIPLIER, ATR_TRAIL_MULTIPLIER, ATR_TRAIL_ACTIVATION_MULTIPLIER, MAX_POSITION_SIZE * 100.0, MAX_CONCURRENT_POSITIONS ); tracing::info!("Bot Check Interval: {} seconds", BOT_CHECK_INTERVAL_SECONDS); tracing::info!("{}", "=".repeat(60)); loop { match self.client.is_market_open().await { Ok(true) => { self.run_trading_cycle().await; tracing::info!( "Next signal check in {} seconds...", BOT_CHECK_INTERVAL_SECONDS ); sleep(TokioDuration::from_secs(BOT_CHECK_INTERVAL_SECONDS)).await; } Ok(false) => { match self.client.get_next_market_open().await { Ok(next_open) => { let wait_seconds = (next_open - Utc::now()).num_seconds().max(0); tracing::info!("Market closed. Next open: {}", next_open); tracing::info!( "Waiting {:.1} hours...", wait_seconds as f64 / 3600.0 ); let sleep_time = (wait_seconds as u64).min(300).max(60); sleep(TokioDuration::from_secs(sleep_time)).await; } Err(e) => { tracing::error!("Failed to get next market open: {}", e); tracing::info!("Market closed. Checking again in 5 minutes..."); sleep(TokioDuration::from_secs(300)).await; } } } Err(e) => { tracing::error!("Failed to check market status: {}", e); tracing::info!("Retrying in 60 seconds..."); sleep(TokioDuration::from_secs(60)).await; } } } } }