import numpy as np import pandas as pd import matplotlib.pyplot as plt from sklearn.model_selection import TimeSeriesSplit, cross_val_score, RandomizedSearchCV from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import mean_squared_error, r2_score from tensorflow.keras.models import Sequential, clone_model as keras_clone_model from tensorflow.keras.layers import LSTM, Dense, GRU, Dropout from tensorflow.keras.callbacks import Callback, EarlyStopping from datetime import datetime, timedelta from tqdm.auto import tqdm import yfinance as yf import ta from sklearn.ensemble import RandomForestRegressor from xgboost import XGBRegressor import warnings import os import tensorflow as tf from tabulate import tabulate from scipy.stats import randint, uniform import sklearn.base import argparse from sklearn.feature_selection import SelectKBest, f_regression, RFE from tensorflow.keras.regularizers import l1_l2 # Suppress warnings and TensorFlow logging def suppress_warnings_method(): # Filter out warnings warnings.filterwarnings('ignore') # Suppress TensorFlow logging os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # Suppress TensorFlow verbose logging tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) # Custom progress bar for Keras model training class TqdmProgressCallback(Callback): def __init__(self, epochs, description): super().__init__() # Initialize progress bar self.progress_bar = tqdm(total=epochs, desc=description, leave=False) def on_epoch_end(self, epoch, logs=None): # Update progress bar at the end of each epoch self.progress_bar.update(1) self.progress_bar.set_postfix(loss=f"{logs['loss']:.4f}", val_loss=f"{logs['val_loss']:.4f}") def on_train_end(self, logs=None): # Close progress bar at the end of training self.progress_bar.close() # Fetch historical stock data from Yahoo Finance def fetch_stock_data(symbol, start_date, end_date): """ Fetch stock data from Yahoo Finance. """ data = yf.download(symbol, start=start_date, end=end_date) return data # Add technical indicators to the stock data def add_technical_indicators(data): """ Add technical indicators to the dataset. """ data['SMA_20'] = ta.trend.sma_indicator(data['Close'], window=20) data['SMA_50'] = ta.trend.sma_indicator(data['Close'], window=50) data['RSI'] = ta.momentum.rsi(data['Close'], window=14) data['MACD'] = ta.trend.macd_diff(data['Close']) data['BB_upper'], data['BB_middle'], data['BB_lower'] = ta.volatility.bollinger_hband_indicator(data['Close']), ta.volatility.bollinger_mavg(data['Close']), ta.volatility.bollinger_lband_indicator(data['Close']) # Advanced indicators data['EMA_20'] = ta.trend.ema_indicator(data['Close'], window=20) data['ATR'] = ta.volatility.average_true_range(data['High'], data['Low'], data['Close']) data['ADX'] = ta.trend.adx(data['High'], data['Low'], data['Close']) data['Stoch_K'] = ta.momentum.stoch(data['High'], data['Low'], data['Close']) data['Volatility'] = data['Close'].rolling(window=20).std() data['Price_Change'] = data['Close'].pct_change() data['Volume_Change'] = data['Volume'].pct_change() data['High_Low_Range'] = (data['High'] - data['Low']) / data['Close'] return data # Prepare data for model training by scaling and creating sequences def prepare_data(data, look_back=60): """ Prepare data for model training. """ scaler = MinMaxScaler(feature_range=(0, 1)) scaled_data = scaler.fit_transform(data) X, y = [], [] for i in range(look_back, len(scaled_data)-1): # Note the -1 here X.append(scaled_data[i-look_back:i]) y.append(scaled_data[i+1, 0]) # Predicting the next 'Close' price return np.array(X), np.array(y), scaler # Create an LSTM model for time series prediction def create_lstm_model(input_shape): model = Sequential([ LSTM(units=64, return_sequences=True, input_shape=input_shape, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)), Dropout(0.2), # Add dropout layer LSTM(units=32, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)), Dropout(0.2), # Add dropout layer Dense(units=16, activation='relu', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)), Dense(units=1) ]) model.compile(optimizer='adam', loss='mean_squared_error') return model # Create a GRU model for time series prediction def create_gru_model(input_shape): model = Sequential([ GRU(units=64, return_sequences=True, input_shape=input_shape, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)), Dropout(0.2), # Add dropout layer GRU(units=32, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)), Dropout(0.2), # Add dropout layer Dense(units=16, activation='relu', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)), Dense(units=1) ]) model.compile(optimizer='adam', loss='mean_squared_error') return model # Train and evaluate a model using time series cross-validation def train_and_evaluate_model(model, X, y, n_splits=5, model_name="Model"): tscv = TimeSeriesSplit(n_splits=n_splits) all_predictions = [] all_true_values = [] with tqdm(total=n_splits, desc=f"Cross-validation for {model_name}", leave=False) as pbar: for train_index, test_index in tscv.split(X): X_train, X_test = X[train_index], X[test_index] y_train, y_test = y[train_index], y[test_index] if isinstance(model, (RandomForestRegressor, XGBRegressor)): X_train_2d = X_train.reshape(X_train.shape[0], -1) X_test_2d = X_test.reshape(X_test.shape[0], -1) model.fit(X_train_2d, y_train) predictions = model.predict(X_test_2d) elif isinstance(model, Sequential): early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) model.fit(X_train, y_train, epochs=100, batch_size=32, verbose=0, validation_split=0.2, callbacks=[early_stopping]) predictions = model.predict(X_test).flatten() all_predictions.extend(predictions) all_true_values.extend(y_test) pbar.update(1) score = r2_score(all_true_values, all_predictions) return score, 0, score, np.array(all_predictions) # Make predictions using an ensemble of models def ensemble_predict(models, X): predictions = [] for model in models: if isinstance(model, (RandomForestRegressor, XGBRegressor)): pred = model.predict(X.reshape(X.shape[0], -1)) else: pred = model.predict(X) predictions.append(pred.flatten()) # Flatten the predictions return np.mean(predictions, axis=0) def weighted_ensemble_predict(models, X, weights): predictions = [] for model, weight in zip(models, weights): if isinstance(model, (RandomForestRegressor, XGBRegressor)): pred = model.predict(X.reshape(X.shape[0], -1)) else: pred = np.array([model.predict(X[i:i+1])[0][0] for i in range(len(X))]) predictions.append(weight * pred) return np.sum(predictions, axis=0) # Calculate risk metrics (Sharpe ratio and max drawdown) def calculate_risk_metrics(returns): sharpe_ratio = np.mean(returns) / np.std(returns) * np.sqrt(252) # Assuming daily returns max_drawdown = np.max(np.maximum.accumulate(returns) - returns) return sharpe_ratio, max_drawdown # Predict future stock prices using a trained model def predict_future(model, last_sequence, scaler, days): future_predictions = [] current_sequence = last_sequence.copy() for _ in range(days): if isinstance(model, (RandomForestRegressor, XGBRegressor)): prediction = model.predict(current_sequence.reshape(1, -1)) future_predictions.append(prediction[0]) # prediction is already a scalar else: prediction = model.predict(current_sequence.reshape(1, current_sequence.shape[0], current_sequence.shape[1])) future_predictions.append(prediction[0][0]) # Take only the first (and only) element # Update the sequence for the next prediction current_sequence = np.roll(current_sequence, -1, axis=0) current_sequence[-1] = prediction # Use the full prediction for updating return np.array(future_predictions) # Split data into training and testing sets, respecting temporal order def time_based_train_test_split(X, y, test_size=0.2): """ Split the data into training and testing sets, respecting the temporal order. """ split_idx = int(len(X) * (1 - test_size)) X_train, X_test = X[:split_idx], X[split_idx:] y_train, y_test = y[:split_idx], y[split_idx:] return X_train, X_test, y_train, y_test # Tune hyperparameters for Random Forest model def tune_random_forest(X, y, quick_test=False): # Define parameter distribution based on quick_test flag if quick_test: print("Quick test mode: Performing simplified Random Forest tuning...") param_dist = { 'n_estimators': randint(10, 50), 'max_depth': randint(3, 10) } n_iter = 5 else: print("Full analysis mode: Performing comprehensive Random Forest tuning...") param_dist = { 'n_estimators': randint(100, 500), 'max_depth': randint(5, 50), 'min_samples_split': randint(2, 20), 'min_samples_leaf': randint(1, 10), 'max_features': ['auto', 'sqrt', 'log2'], 'bootstrap': [True, False] } n_iter = 20 # Initialize Random Forest model rf = RandomForestRegressor(random_state=42) # Perform randomized search for best parameters tscv = TimeSeriesSplit(n_splits=3 if quick_test else 5) rf_random = RandomizedSearchCV(estimator=rf, param_distributions=param_dist, n_iter=n_iter, cv=tscv, scoring='neg_mean_squared_error', # Change to MSE verbose=2, random_state=42, n_jobs=-1) rf_random.fit(X.reshape(X.shape[0], -1), y) print(f"Best Random Forest parameters: {rf_random.best_params_}") return rf_random.best_estimator_ # Tune hyperparameters for XGBoost model def tune_xgboost(X, y, quick_test=False): # Define parameter distribution based on quick_test flag if quick_test: print("Quick test mode: Performing simplified XGBoost tuning...") param_dist = { 'n_estimators': randint(10, 50), 'max_depth': randint(3, 6) } n_iter = 5 else: print("Full analysis mode: Performing comprehensive XGBoost tuning...") param_dist = { 'n_estimators': randint(100, 500), 'max_depth': randint(3, 10), 'learning_rate': uniform(0.01, 0.3), 'subsample': uniform(0.6, 1.0), 'colsample_bytree': uniform(0.6, 1.0), 'gamma': uniform(0, 5) } n_iter = 20 # Initialize XGBoost model xgb = XGBRegressor(random_state=42) # Perform randomized search for best parameters tscv = TimeSeriesSplit(n_splits=3 if quick_test else 5) xgb_random = RandomizedSearchCV(estimator=xgb, param_distributions=param_dist, n_iter=n_iter, cv=tscv, scoring='neg_mean_squared_error', # Change to MSE verbose=2, random_state=42, n_jobs=-1) xgb_random.fit(X.reshape(X.shape[0], -1), y) print(f"Best XGBoost parameters: {xgb_random.best_params_}") return xgb_random.best_estimator_ def implement_trading_strategy(actual_prices, predicted_prices, threshold=0.01): returns = [] position = 0 # -1: short, 0: neutral, 1: long for i in range(1, len(actual_prices)): predicted_return = (predicted_prices[i] - actual_prices[i-1]) / actual_prices[i-1] if predicted_return > threshold and position <= 0: position = 1 # Buy elif predicted_return < -threshold and position >= 0: position = -1 # Sell actual_return = (actual_prices[i] - actual_prices[i-1]) / actual_prices[i-1] returns.append(position * actual_return) return np.array(returns) def select_features_rfe(X, y, n_features_to_select=10): if isinstance(X, np.ndarray) and len(X.shape) == 3: X_2d = X.reshape(X.shape[0], -1) else: X_2d = X rfe = RFE(estimator=RandomForestRegressor(random_state=42), n_features_to_select=n_features_to_select) X_selected = rfe.fit_transform(X_2d, y) selected_features = rfe.support_ return X_selected, selected_features def calculate_ensemble_weights(models, X, y): weights = [] for name, model in models: _, _, score, _ = train_and_evaluate_model(model, X, y, n_splits=5, model_name=name) weights.append(max(score, 0)) # Ensure non-negative weights if sum(weights) == 0: # If all weights are zero, use equal weights return [1/len(weights)] * len(weights) else: return [w / sum(weights) for w in weights] # Normalize weights def augment_data(X, y, noise_level=0.01): X_aug = X.copy() y_aug = y.copy() noise = np.random.normal(0, noise_level, X.shape) X_aug += noise return X_aug, y_aug # Main function to analyze stock data and make predictions def analyze_and_predict_stock(symbol, start_date, end_date, future_days=30, suppress_warnings=False, quick_test=False, models_to_run=['LSTM', 'GRU', 'Random Forest', 'XGBoost']): # Suppress warnings if flag is set if suppress_warnings: suppress_warnings_method() print(f"Starting analysis for {symbol}...") print(f"Fetching stock data for {symbol}...") data = fetch_stock_data(symbol, start_date, end_date) print(f"Adding technical indicators...") data = add_technical_indicators(data) data.dropna(inplace=True) if quick_test: print("Quick test mode: Using only the last 100 data points.") data = data.tail(100) print("Preparing data for model training...") features = ['Close', 'Volume', 'SMA_20', 'SMA_50', 'RSI', 'MACD', 'BB_upper', 'BB_middle', 'BB_lower', 'Volatility', 'Price_Change', 'Volume_Change', 'High_Low_Range'] X, y, scaler = prepare_data(data[features]) print("Augmenting data...") X_aug, y_aug = augment_data(X, y) X = np.concatenate((X, X_aug), axis=0) y = np.concatenate((y, y_aug), axis=0) print("Splitting data into training and testing sets...") X_train, X_test, y_train, y_test = time_based_train_test_split(X, y, test_size=0.2) print("\nStarting model training and hyperparameter tuning...") models = [] if 'LSTM' in models_to_run: models.append(("LSTM", create_lstm_model((X.shape[1], X.shape[2])))) if 'GRU' in models_to_run: models.append(("GRU", create_gru_model((X.shape[1], X.shape[2])))) if 'Random Forest' in models_to_run: models.append(("Random Forest", tune_random_forest(X, y, quick_test))) if 'XGBoost' in models_to_run: models.append(("XGBoost", tune_xgboost(X, y, quick_test))) results = {} oof_predictions = {} model_stats = [] with tqdm(total=len(models), desc="Overall Progress", position=0) as pbar: for name, model in models: print(f"\nTraining and evaluating {name} model...") cv_score, cv_std, overall_score, oof_pred = train_and_evaluate_model(model, X, y, n_splits=3 if quick_test else 5, model_name=name) print(f" {name} model results:") print(f" Cross-validation R² score: {cv_score:.4f} (±{cv_std:.4f})") print(f" Overall out-of-fold R² score: {overall_score:.4f}") print(f"Retraining {name} model on full dataset...") if isinstance(model, (RandomForestRegressor, XGBRegressor)): model.fit(X.reshape(X.shape[0], -1), y) train_score = model.score(X.reshape(X.shape[0], -1), y) else: history = model.fit(X, y, epochs=100, batch_size=32, verbose=1) train_score = 1 - history.history['loss'][-1] # Use final training loss as a proxy for R² results[name] = model oof_predictions[name] = oof_pred overfitting_score = train_score - overall_score model_stats.append({ 'Model': name, 'CV R² Score': cv_score, 'CV R² Std': cv_std, 'OOF R² Score': overall_score, 'Train R² Score': train_score, 'Overfitting Score': overfitting_score }) pbar.update(1) # Create a DataFrame with model statistics stats_df = pd.DataFrame(model_stats) stats_df = stats_df.sort_values('OOF R² Score', ascending=False).reset_index(drop=True) # Add overfitting indicator stats_df['Overfit'] = stats_df['Overfitting Score'].apply(lambda x: 'Yes' if x > 0.05 else 'No') # Print the table print("\nModel Performance Summary:") print(tabulate(stats_df, headers='keys', tablefmt='pretty', floatfmt='.4f')) print("\nCalculating ensemble weights...") ensemble_weights = calculate_ensemble_weights(models, X_test, y_test) print(f"Ensemble weights: {ensemble_weights}") print("Making ensemble predictions...") ensemble_predictions = weighted_ensemble_predict([model for _, model in models], X, ensemble_weights) print(f"Predicting future data for the next {future_days} days...") future_predictions = [] for name, model in models: print(f" Making future predictions with {name} model...") future_pred = predict_future(model, X[-1], scaler, future_days) future_predictions.append(future_pred) future_predictions = np.mean(future_predictions, axis=0) print("Inverse transforming predictions...") close_price_scaler = MinMaxScaler(feature_range=(0, 1)) close_price_scaler.fit(data['Close'].values.reshape(-1, 1)) ensemble_predictions = close_price_scaler.inverse_transform(ensemble_predictions.reshape(-1, 1)) future_predictions = close_price_scaler.inverse_transform(future_predictions.reshape(-1, 1)) # Ensure ensemble_predictions matches the length of the actual data ensemble_predictions = ensemble_predictions[-len(data):] print("Plotting results...") plt.figure(figsize=(20, 24)) # Increased figure height # Price prediction plot plt.subplot(3, 1, 1) plot_data = data.iloc[-len(ensemble_predictions):] future_dates = pd.date_range(start=plot_data.index[-1] + pd.Timedelta(days=1), periods=future_days) plt.plot(plot_data.index, plot_data['Close'], label='Actual Price', color='blue') plt.plot(plot_data.index, ensemble_predictions, label='Predicted Price', color='red', linestyle='--') plt.plot(future_dates, future_predictions, label='Future Predictions', color='green', linestyle='--') plt.title(f'{symbol} Stock Price Prediction') plt.xlabel('Date') plt.ylabel('Price') plt.legend() # Model performance summary table plt.subplot(3, 1, 2) plt.axis('off') table = plt.table(cellText=stats_df.values, colLabels=stats_df.columns, cellLoc='center', loc='center') table.auto_set_font_size(False) table.set_fontsize(9) table.scale(1, 1.5) # Lower the title and add more space between plot and table plt.title('Model Performance Summary', pad=60) # Implement trading strategy strategy_returns = implement_trading_strategy(plot_data['Close'].values, ensemble_predictions.flatten()) strategy_sharpe_ratio = np.mean(strategy_returns) / np.std(strategy_returns) * np.sqrt(252) print(f"Trading Strategy Sharpe Ratio: {strategy_sharpe_ratio:.4f}") # Calculate cumulative returns of the trading strategy cumulative_returns = (1 + strategy_returns).cumprod() - 1 # Add new subplot for trading strategy performance plt.subplot(3, 1, 3) plt.plot(plot_data.index[-len(cumulative_returns):], cumulative_returns, label='Strategy Cumulative Returns', color='purple') plt.title(f'{symbol} Trading Strategy Performance') plt.xlabel('Date') plt.ylabel('Cumulative Returns') plt.legend() # Add strategy Sharpe ratio as text on the plot plt.text(0.05, 0.95, f'Strategy Sharpe Ratio: {strategy_sharpe_ratio:.4f}', transform=plt.gca().transAxes, verticalalignment='top') plt.tight_layout() plt.savefig(f'{symbol}_prediction_with_stats_and_strategy.png', dpi=300, bbox_inches='tight') print(f"Plot with statistics and strategy performance saved as '{symbol}_prediction_with_stats_and_strategy.png'") plt.show() print(f"\nFuture predictions for the next {future_days} days:") for date, price in zip(future_dates, future_predictions): print(f"{date.date()}: ${price[0]:.2f}") print("\nAnalysis and prediction completed successfully.") # Parse command-line arguments def parse_arguments(): parser = argparse.ArgumentParser(description='Stock Price Prediction and Analysis Tool') parser.add_argument('-s', '--symbol', type=str, default='MSFT', help='Stock symbol to analyze (default: MSFT)') parser.add_argument('-sd', '--start_date', type=str, default='2018-01-01', help='Start date for historical data (default: 2018-01-01)') parser.add_argument('-fd', '--future_days', type=int, default=30, help='Number of days to predict into the future (default: 30)') parser.add_argument('-q', '--quick_test', action='store_true', help='Run in quick test mode (default: False)') parser.add_argument('-sw', '--suppress_warnings', action='store_true', help='Suppress warnings (default: False)') args = parser.parse_args() # Validate start_date try: datetime.strptime(args.start_date, '%Y-%m-%d') except ValueError: parser.error("Incorrect start date format, should be YYYY-MM-DD") return args # Main execution block if __name__ == "__main__": # Parse command-line arguments args = parse_arguments() symbol = args.symbol start_date = args.start_date end_date = datetime.now().strftime("%Y-%m-%d") future_days = args.future_days quick_test_flag = args.quick_test suppress_warnings_flag = args.suppress_warnings # Print analysis parameters print(f"Analyzing {symbol} from {start_date} to {end_date}") print(f"Predicting {future_days} days into the future") print(f"Quick test mode: {'Enabled' if quick_test_flag else 'Disabled'}") print(f"Warnings suppressed: {'Yes' if suppress_warnings_flag else 'No'}") # Run the stock analysis and prediction analyze_and_predict_stock(symbol, start_date, end_date, future_days, suppress_warnings=suppress_warnings_flag, quick_test=quick_test_flag)