From db5cf0b2a824b2f83cf96560e1503eea66a6b202 Mon Sep 17 00:00:00 2001 From: tcsenpai Date: Wed, 11 Sep 2024 10:12:56 +0200 Subject: [PATCH] Better visualizations --- goldigger.py | 668 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 467 insertions(+), 201 deletions(-) diff --git a/goldigger.py b/goldigger.py index 5771c5c..53d45cb 100644 --- a/goldigger.py +++ b/goldigger.py @@ -22,31 +22,18 @@ import sklearn.base import argparse from sklearn.feature_selection import SelectKBest, f_regression, RFE from tensorflow.keras.regularizers import l1_l2 +from matplotlib.dates import num2date + # Suppress warnings and TensorFlow logging def suppress_warnings_method(): # Filter out warnings - warnings.filterwarnings('ignore') + warnings.filterwarnings("ignore") # Suppress TensorFlow logging - os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' + os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" # Suppress TensorFlow verbose logging tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) -# Custom progress bar for Keras model training -class TqdmProgressCallback(Callback): - def __init__(self, epochs, description): - super().__init__() - # Initialize progress bar - self.progress_bar = tqdm(total=epochs, desc=description, leave=False) - - def on_epoch_end(self, epoch, logs=None): - # Update progress bar at the end of each epoch - self.progress_bar.update(1) - self.progress_bar.set_postfix(loss=f"{logs['loss']:.4f}", val_loss=f"{logs['val_loss']:.4f}") - - def on_train_end(self, logs=None): - # Close progress bar at the end of training - self.progress_bar.close() # Fetch historical stock data from Yahoo Finance def fetch_stock_data(symbol, start_date, end_date): @@ -56,27 +43,35 @@ def fetch_stock_data(symbol, start_date, end_date): data = yf.download(symbol, start=start_date, end=end_date) return data + # Add technical indicators to the stock data def add_technical_indicators(data): """ Add technical indicators to the dataset. """ - data['SMA_20'] = ta.trend.sma_indicator(data['Close'], window=20) - data['SMA_50'] = ta.trend.sma_indicator(data['Close'], window=50) - data['RSI'] = ta.momentum.rsi(data['Close'], window=14) - data['MACD'] = ta.trend.macd_diff(data['Close']) - data['BB_upper'], data['BB_middle'], data['BB_lower'] = ta.volatility.bollinger_hband_indicator(data['Close']), ta.volatility.bollinger_mavg(data['Close']), ta.volatility.bollinger_lband_indicator(data['Close']) + data["SMA_20"] = ta.trend.sma_indicator(data["Close"], window=20) + data["SMA_50"] = ta.trend.sma_indicator(data["Close"], window=50) + data["RSI"] = ta.momentum.rsi(data["Close"], window=14) + data["MACD"] = ta.trend.macd_diff(data["Close"]) + data["BB_upper"], data["BB_middle"], data["BB_lower"] = ( + ta.volatility.bollinger_hband_indicator(data["Close"]), + ta.volatility.bollinger_mavg(data["Close"]), + ta.volatility.bollinger_lband_indicator(data["Close"]), + ) # Advanced indicators - data['EMA_20'] = ta.trend.ema_indicator(data['Close'], window=20) - data['ATR'] = ta.volatility.average_true_range(data['High'], data['Low'], data['Close']) - data['ADX'] = ta.trend.adx(data['High'], data['Low'], data['Close']) - data['Stoch_K'] = ta.momentum.stoch(data['High'], data['Low'], data['Close']) - data['Volatility'] = data['Close'].rolling(window=20).std() - data['Price_Change'] = data['Close'].pct_change() - data['Volume_Change'] = data['Volume'].pct_change() - data['High_Low_Range'] = (data['High'] - data['Low']) / data['Close'] + data["EMA_20"] = ta.trend.ema_indicator(data["Close"], window=20) + data["ATR"] = ta.volatility.average_true_range( + data["High"], data["Low"], data["Close"] + ) + data["ADX"] = ta.trend.adx(data["High"], data["Low"], data["Close"]) + data["Stoch_K"] = ta.momentum.stoch(data["High"], data["Low"], data["Close"]) + data["Volatility"] = data["Close"].rolling(window=20).std() + data["Price_Change"] = data["Close"].pct_change() + data["Volume_Change"] = data["Volume"].pct_change() + data["High_Low_Range"] = (data["High"] - data["Low"]) / data["Close"] return data + # Prepare data for model training by scaling and creating sequences def prepare_data(data, look_back=60): """ @@ -84,69 +79,106 @@ def prepare_data(data, look_back=60): """ scaler = MinMaxScaler(feature_range=(0, 1)) scaled_data = scaler.fit_transform(data) - + X, y = [], [] - for i in range(look_back, len(scaled_data)-1): # Note the -1 here - X.append(scaled_data[i-look_back:i]) - y.append(scaled_data[i+1, 0]) # Predicting the next 'Close' price - + for i in range(look_back, len(scaled_data) - 1): # Note the -1 here + X.append(scaled_data[i - look_back : i]) + y.append(scaled_data[i + 1, 0]) # Predicting the next 'Close' price + return np.array(X), np.array(y), scaler + # Create an LSTM model for time series prediction def create_lstm_model(input_shape): - model = Sequential([ - LSTM(units=64, return_sequences=True, input_shape=input_shape, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)), - Dropout(0.2), # Add dropout layer - LSTM(units=32, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)), - Dropout(0.2), # Add dropout layer - Dense(units=16, activation='relu', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)), - Dense(units=1) - ]) - model.compile(optimizer='adam', loss='mean_squared_error') + model = Sequential( + [ + LSTM( + units=64, + return_sequences=True, + input_shape=input_shape, + kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4), + ), + Dropout(0.2), # Add dropout layer + LSTM(units=32, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)), + Dropout(0.2), # Add dropout layer + Dense( + units=16, activation="relu", kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4) + ), + Dense(units=1), + ] + ) + model.compile(optimizer="adam", loss="mean_squared_error") return model + # Create a GRU model for time series prediction def create_gru_model(input_shape): - model = Sequential([ - GRU(units=64, return_sequences=True, input_shape=input_shape, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)), - Dropout(0.2), # Add dropout layer - GRU(units=32, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)), - Dropout(0.2), # Add dropout layer - Dense(units=16, activation='relu', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)), - Dense(units=1) - ]) - model.compile(optimizer='adam', loss='mean_squared_error') - return model + model = Sequential( + [ + GRU( + units=64, + return_sequences=True, + input_shape=input_shape, + kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4), + ), + Dropout(0.2), # Add dropout layer + GRU(units=32, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)), + Dropout(0.2), # Add dropout layer + Dense( + units=16, activation="relu", kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4) + ), + Dense(units=1), + ] + ) + model.compile(optimizer="adam", loss="mean_squared_error") + return model + # Train and evaluate a model using time series cross-validation def train_and_evaluate_model(model, X, y, n_splits=5, model_name="Model"): tscv = TimeSeriesSplit(n_splits=n_splits) all_predictions = [] all_true_values = [] - - with tqdm(total=n_splits, desc=f"Cross-validation for {model_name}", leave=False) as pbar: + + with tqdm(total=n_splits, desc=f"Training {model_name}", leave=False) as pbar: for train_index, test_index in tscv.split(X): X_train, X_test = X[train_index], X[test_index] y_train, y_test = y[train_index], y[test_index] - + if isinstance(model, (RandomForestRegressor, XGBRegressor)): X_train_2d = X_train.reshape(X_train.shape[0], -1) X_test_2d = X_test.reshape(X_test.shape[0], -1) model.fit(X_train_2d, y_train) predictions = model.predict(X_test_2d) elif isinstance(model, Sequential): - early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) - model.fit(X_train, y_train, epochs=100, batch_size=32, verbose=0, - validation_split=0.2, callbacks=[early_stopping]) - predictions = model.predict(X_test).flatten() - + early_stopping = EarlyStopping( + monitor="val_loss", patience=10, restore_best_weights=True + ) + with tqdm(total=100, desc="Epochs", leave=False) as epoch_pbar: + + class EpochProgressCallback(Callback): + def on_epoch_end(self, epoch, logs=None): + epoch_pbar.update(1) + + model.fit( + X_train, + y_train, + epochs=100, + batch_size=32, + verbose=0, + validation_split=0.2, + callbacks=[early_stopping, EpochProgressCallback()], + ) + predictions = model.predict(X_test, verbose=0).flatten() + all_predictions.extend(predictions) all_true_values.extend(y_test) pbar.update(1) - + score = r2_score(all_true_values, all_predictions) return score, 0, score, np.array(all_predictions) + # Make predictions using an ensemble of models def ensemble_predict(models, X): predictions = [] @@ -158,41 +190,60 @@ def ensemble_predict(models, X): predictions.append(pred.flatten()) # Flatten the predictions return np.mean(predictions, axis=0) + def weighted_ensemble_predict(models, X, weights): predictions = [] for model, weight in zip(models, weights): if isinstance(model, (RandomForestRegressor, XGBRegressor)): pred = model.predict(X.reshape(X.shape[0], -1)) else: - pred = np.array([model.predict(X[i:i+1])[0][0] for i in range(len(X))]) + pred = np.array( + [model.predict(X[i : i + 1], verbose=0)[0][0] for i in range(len(X))] + ) predictions.append(weight * pred) return np.sum(predictions, axis=0) + # Calculate risk metrics (Sharpe ratio and max drawdown) def calculate_risk_metrics(returns): - sharpe_ratio = np.mean(returns) / np.std(returns) * np.sqrt(252) # Assuming daily returns + sharpe_ratio = ( + np.mean(returns) / np.std(returns) * np.sqrt(252) + ) # Assuming daily returns max_drawdown = np.max(np.maximum.accumulate(returns) - returns) return sharpe_ratio, max_drawdown + # Predict future stock prices using a trained model def predict_future(model, last_sequence, scaler, days): future_predictions = [] current_sequence = last_sequence.copy() - - for _ in range(days): - if isinstance(model, (RandomForestRegressor, XGBRegressor)): - prediction = model.predict(current_sequence.reshape(1, -1)) - future_predictions.append(prediction[0]) # prediction is already a scalar - else: - prediction = model.predict(current_sequence.reshape(1, current_sequence.shape[0], current_sequence.shape[1])) - future_predictions.append(prediction[0][0]) # Take only the first (and only) element - - # Update the sequence for the next prediction - current_sequence = np.roll(current_sequence, -1, axis=0) - current_sequence[-1] = prediction # Use the full prediction for updating - + + with tqdm(total=days, desc="Predicting future", leave=False) as pbar: + for _ in range(days): + if isinstance(model, (RandomForestRegressor, XGBRegressor)): + prediction = model.predict(current_sequence.reshape(1, -1)) + future_predictions.append( + prediction[0] + ) # prediction is already a scalar + else: + prediction = model.predict( + current_sequence.reshape( + 1, current_sequence.shape[0], current_sequence.shape[1] + ), + verbose=0, + ) + future_predictions.append( + prediction[0][0] + ) # Take only the first (and only) element + + # Update the sequence for the next prediction + current_sequence = np.roll(current_sequence, -1, axis=0) + current_sequence[-1] = prediction # Use the full prediction for updating + pbar.update(1) + return np.array(future_predictions) + # Split data into training and testing sets, respecting temporal order def time_based_train_test_split(X, y, test_size=0.2): """ @@ -203,25 +254,23 @@ def time_based_train_test_split(X, y, test_size=0.2): y_train, y_test = y[:split_idx], y[split_idx:] return X_train, X_test, y_train, y_test + # Tune hyperparameters for Random Forest model def tune_random_forest(X, y, quick_test=False): # Define parameter distribution based on quick_test flag if quick_test: print("Quick test mode: Performing simplified Random Forest tuning...") - param_dist = { - 'n_estimators': randint(10, 50), - 'max_depth': randint(3, 10) - } + param_dist = {"n_estimators": randint(10, 50), "max_depth": randint(3, 10)} n_iter = 5 else: print("Full analysis mode: Performing comprehensive Random Forest tuning...") param_dist = { - 'n_estimators': randint(100, 500), - 'max_depth': randint(5, 50), - 'min_samples_split': randint(2, 20), - 'min_samples_leaf': randint(1, 10), - 'max_features': ['auto', 'sqrt', 'log2'], - 'bootstrap': [True, False] + "n_estimators": randint(100, 500), + "max_depth": randint(5, 50), + "min_samples_split": randint(2, 20), + "min_samples_leaf": randint(1, 10), + "max_features": ["auto", "sqrt", "log2"], + "bootstrap": [True, False], } n_iter = 20 @@ -229,33 +278,37 @@ def tune_random_forest(X, y, quick_test=False): rf = RandomForestRegressor(random_state=42) # Perform randomized search for best parameters tscv = TimeSeriesSplit(n_splits=3 if quick_test else 5) - rf_random = RandomizedSearchCV(estimator=rf, param_distributions=param_dist, - n_iter=n_iter, cv=tscv, - scoring='neg_mean_squared_error', # Change to MSE - verbose=2, random_state=42, n_jobs=-1) + rf_random = RandomizedSearchCV( + estimator=rf, + param_distributions=param_dist, + n_iter=n_iter, + cv=tscv, + scoring="neg_mean_squared_error", # Change to MSE + verbose=2, + random_state=42, + n_jobs=-1, + ) rf_random.fit(X.reshape(X.shape[0], -1), y) print(f"Best Random Forest parameters: {rf_random.best_params_}") return rf_random.best_estimator_ + # Tune hyperparameters for XGBoost model def tune_xgboost(X, y, quick_test=False): # Define parameter distribution based on quick_test flag if quick_test: print("Quick test mode: Performing simplified XGBoost tuning...") - param_dist = { - 'n_estimators': randint(10, 50), - 'max_depth': randint(3, 6) - } + param_dist = {"n_estimators": randint(10, 50), "max_depth": randint(3, 6)} n_iter = 5 else: print("Full analysis mode: Performing comprehensive XGBoost tuning...") param_dist = { - 'n_estimators': randint(100, 500), - 'max_depth': randint(3, 10), - 'learning_rate': uniform(0.01, 0.3), - 'subsample': uniform(0.6, 1.0), - 'colsample_bytree': uniform(0.6, 1.0), - 'gamma': uniform(0, 5) + "n_estimators": randint(100, 500), + "max_depth": randint(3, 10), + "learning_rate": uniform(0.01, 0.3), + "subsample": uniform(0.6, 1.0), + "colsample_bytree": uniform(0.6, 1.0), + "gamma": uniform(0, 5), } n_iter = 20 @@ -263,50 +316,67 @@ def tune_xgboost(X, y, quick_test=False): xgb = XGBRegressor(random_state=42) # Perform randomized search for best parameters tscv = TimeSeriesSplit(n_splits=3 if quick_test else 5) - xgb_random = RandomizedSearchCV(estimator=xgb, param_distributions=param_dist, - n_iter=n_iter, cv=tscv, - scoring='neg_mean_squared_error', # Change to MSE - verbose=2, random_state=42, n_jobs=-1) + xgb_random = RandomizedSearchCV( + estimator=xgb, + param_distributions=param_dist, + n_iter=n_iter, + cv=tscv, + scoring="neg_mean_squared_error", # Change to MSE + verbose=2, + random_state=42, + n_jobs=-1, + ) xgb_random.fit(X.reshape(X.shape[0], -1), y) print(f"Best XGBoost parameters: {xgb_random.best_params_}") return xgb_random.best_estimator_ + def implement_trading_strategy(actual_prices, predicted_prices, threshold=0.01): returns = [] position = 0 # -1: short, 0: neutral, 1: long for i in range(1, len(actual_prices)): - predicted_return = (predicted_prices[i] - actual_prices[i-1]) / actual_prices[i-1] + predicted_return = (predicted_prices[i] - actual_prices[i - 1]) / actual_prices[ + i - 1 + ] if predicted_return > threshold and position <= 0: position = 1 # Buy elif predicted_return < -threshold and position >= 0: position = -1 # Sell - actual_return = (actual_prices[i] - actual_prices[i-1]) / actual_prices[i-1] + actual_return = (actual_prices[i] - actual_prices[i - 1]) / actual_prices[i - 1] returns.append(position * actual_return) return np.array(returns) + def select_features_rfe(X, y, n_features_to_select=10): if isinstance(X, np.ndarray) and len(X.shape) == 3: X_2d = X.reshape(X.shape[0], -1) else: X_2d = X - - rfe = RFE(estimator=RandomForestRegressor(random_state=42), n_features_to_select=n_features_to_select) + + rfe = RFE( + estimator=RandomForestRegressor(random_state=42), + n_features_to_select=n_features_to_select, + ) X_selected = rfe.fit_transform(X_2d, y) selected_features = rfe.support_ return X_selected, selected_features + def calculate_ensemble_weights(models, X, y): weights = [] for name, model in models: - _, _, score, _ = train_and_evaluate_model(model, X, y, n_splits=5, model_name=name) + _, _, score, _ = train_and_evaluate_model( + model, X, y, n_splits=5, model_name=name + ) weights.append(max(score, 0)) # Ensure non-negative weights - + if sum(weights) == 0: # If all weights are zero, use equal weights - return [1/len(weights)] * len(weights) + return [1 / len(weights)] * len(weights) else: return [w / sum(weights) for w in weights] # Normalize weights + def augment_data(X, y, noise_level=0.01): X_aug = X.copy() y_aug = y.copy() @@ -314,8 +384,17 @@ def augment_data(X, y, noise_level=0.01): X_aug += noise return X_aug, y_aug + # Main function to analyze stock data and make predictions -def analyze_and_predict_stock(symbol, start_date, end_date, future_days=30, suppress_warnings=False, quick_test=False, models_to_run=['LSTM', 'GRU', 'Random Forest', 'XGBoost']): +def analyze_and_predict_stock( + symbol, + start_date, + end_date, + future_days=30, + suppress_warnings=False, + quick_test=False, + models_to_run=["LSTM", "GRU", "Random Forest", "XGBoost"], +): # Suppress warnings if flag is set if suppress_warnings: suppress_warnings_method() @@ -333,7 +412,21 @@ def analyze_and_predict_stock(symbol, start_date, end_date, future_days=30, supp data = data.tail(100) print("Preparing data for model training...") - features = ['Close', 'Volume', 'SMA_20', 'SMA_50', 'RSI', 'MACD', 'BB_upper', 'BB_middle', 'BB_lower', 'Volatility', 'Price_Change', 'Volume_Change', 'High_Low_Range'] + features = [ + "Close", + "Volume", + "SMA_20", + "SMA_50", + "RSI", + "MACD", + "BB_upper", + "BB_middle", + "BB_lower", + "Volatility", + "Price_Change", + "Volume_Change", + "High_Low_Range", + ] X, y, scaler = prepare_data(data[features]) print("Augmenting data...") @@ -346,68 +439,93 @@ def analyze_and_predict_stock(symbol, start_date, end_date, future_days=30, supp print("\nStarting model training and hyperparameter tuning...") models = [] - if 'LSTM' in models_to_run: + if "LSTM" in models_to_run: models.append(("LSTM", create_lstm_model((X.shape[1], X.shape[2])))) - if 'GRU' in models_to_run: + if "GRU" in models_to_run: models.append(("GRU", create_gru_model((X.shape[1], X.shape[2])))) - if 'Random Forest' in models_to_run: + if "Random Forest" in models_to_run: models.append(("Random Forest", tune_random_forest(X, y, quick_test))) - if 'XGBoost' in models_to_run: + if "XGBoost" in models_to_run: models.append(("XGBoost", tune_xgboost(X, y, quick_test))) results = {} oof_predictions = {} model_stats = [] - with tqdm(total=len(models), desc="Overall Progress", position=0) as pbar: + with tqdm(total=len(models), desc="Training Models", position=0) as pbar: for name, model in models: print(f"\nTraining and evaluating {name} model...") - cv_score, cv_std, overall_score, oof_pred = train_and_evaluate_model(model, X, y, n_splits=3 if quick_test else 5, model_name=name) + cv_score, cv_std, overall_score, oof_pred = train_and_evaluate_model( + model, X, y, n_splits=3 if quick_test else 5, model_name=name + ) print(f" {name} model results:") print(f" Cross-validation R² score: {cv_score:.4f} (±{cv_std:.4f})") print(f" Overall out-of-fold R² score: {overall_score:.4f}") - + print(f"Retraining {name} model on full dataset...") if isinstance(model, (RandomForestRegressor, XGBRegressor)): model.fit(X.reshape(X.shape[0], -1), y) train_score = model.score(X.reshape(X.shape[0], -1), y) else: - history = model.fit(X, y, epochs=100, batch_size=32, verbose=1) - train_score = 1 - history.history['loss'][-1] # Use final training loss as a proxy for R² - + with tqdm(total=100, desc="Epochs", leave=False) as epoch_pbar: + + class EpochProgressCallback(Callback): + def on_epoch_end(self, epoch, logs=None): + epoch_pbar.update(1) + + history = model.fit( + X, + y, + epochs=100, + batch_size=32, + verbose=0, + callbacks=[EpochProgressCallback()], + ) + train_score = ( + 1 - history.history["loss"][-1] + ) # Use final training loss as a proxy for R² + results[name] = model oof_predictions[name] = oof_pred - + overfitting_score = train_score - overall_score - - model_stats.append({ - 'Model': name, - 'CV R² Score': cv_score, - 'CV R² Std': cv_std, - 'OOF R² Score': overall_score, - 'Train R² Score': train_score, - 'Overfitting Score': overfitting_score - }) - + + model_stats.append( + { + "Model": name, + "CV R² Score": cv_score, + "CV R² Std": cv_std, + "OOF R² Score": overall_score, + "Train R² Score": train_score, + "Overfitting Score": overfitting_score, + } + ) + pbar.update(1) # Create a DataFrame with model statistics stats_df = pd.DataFrame(model_stats) - stats_df = stats_df.sort_values('OOF R² Score', ascending=False).reset_index(drop=True) - + stats_df = stats_df.sort_values("OOF R² Score", ascending=False).reset_index( + drop=True + ) + # Add overfitting indicator - stats_df['Overfit'] = stats_df['Overfitting Score'].apply(lambda x: 'Yes' if x > 0.05 else 'No') + stats_df["Overfit"] = stats_df["Overfitting Score"].apply( + lambda x: "Yes" if x > 0.05 else "No" + ) # Print the table print("\nModel Performance Summary:") - print(tabulate(stats_df, headers='keys', tablefmt='pretty', floatfmt='.4f')) + print(tabulate(stats_df, headers="keys", tablefmt="pretty", floatfmt=".4f")) print("\nCalculating ensemble weights...") ensemble_weights = calculate_ensemble_weights(models, X_test, y_test) print(f"Ensemble weights: {ensemble_weights}") print("Making ensemble predictions...") - ensemble_predictions = weighted_ensemble_predict([model for _, model in models], X, ensemble_weights) - + ensemble_predictions = weighted_ensemble_predict( + [model for _, model in models], X, ensemble_weights + ) + print(f"Predicting future data for the next {future_days} days...") future_predictions = [] for name, model in models: @@ -415,70 +533,186 @@ def analyze_and_predict_stock(symbol, start_date, end_date, future_days=30, supp future_pred = predict_future(model, X[-1], scaler, future_days) future_predictions.append(future_pred) future_predictions = np.mean(future_predictions, axis=0) - + print("Inverse transforming predictions...") close_price_scaler = MinMaxScaler(feature_range=(0, 1)) - close_price_scaler.fit(data['Close'].values.reshape(-1, 1)) - ensemble_predictions = close_price_scaler.inverse_transform(ensemble_predictions.reshape(-1, 1)) - future_predictions = close_price_scaler.inverse_transform(future_predictions.reshape(-1, 1)) + close_price_scaler.fit(data["Close"].values.reshape(-1, 1)) + ensemble_predictions = close_price_scaler.inverse_transform( + ensemble_predictions.reshape(-1, 1) + ) + future_predictions = close_price_scaler.inverse_transform( + future_predictions.reshape(-1, 1) + ) # Ensure ensemble_predictions matches the length of the actual data - ensemble_predictions = ensemble_predictions[-len(data):] + ensemble_predictions = ensemble_predictions[-len(data) :] print("Plotting results...") - plt.figure(figsize=(20, 24)) # Increased figure height - + fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(20, 24)) + # Price prediction plot - plt.subplot(3, 1, 1) - plot_data = data.iloc[-len(ensemble_predictions):] - future_dates = pd.date_range(start=plot_data.index[-1] + pd.Timedelta(days=1), periods=future_days) - - plt.plot(plot_data.index, plot_data['Close'], label='Actual Price', color='blue') - plt.plot(plot_data.index, ensemble_predictions, label='Predicted Price', color='red', linestyle='--') - plt.plot(future_dates, future_predictions, label='Future Predictions', color='green', linestyle='--') - - plt.title(f'{symbol} Stock Price Prediction') - plt.xlabel('Date') - plt.ylabel('Price') - plt.legend() + plot_data = data.iloc[-len(ensemble_predictions) :] + future_dates = pd.date_range( + start=plot_data.index[-1] + pd.Timedelta(days=1), periods=future_days + ) + + ax1.plot(plot_data.index, plot_data["Close"], label="Actual Price", color="blue") + ax1.plot( + plot_data.index, + ensemble_predictions, + label="Predicted Price", + color="red", + linestyle="--", + ) + ax1.plot( + future_dates, + future_predictions, + label="Future Predictions", + color="green", + linestyle="--", + ) + + # Add price indications for every day (initially invisible) + annotations = [] + for i, (date, price) in enumerate(zip(plot_data.index, ensemble_predictions)): + ann = ax1.annotate( + f"${price[0]:.2f}", + (date, price[0]), + xytext=(0, 10), + textcoords="offset points", + ha="center", + va="bottom", + fontsize=8, + alpha=0.7, + visible=False, + ) + annotations.append(ann) + + for i, (date, price) in enumerate(zip(future_dates, future_predictions)): + ann = ax1.annotate( + f"${price[0]:.2f}", + (date, price[0]), + xytext=(0, -10), + textcoords="offset points", + ha="center", + va="top", + fontsize=8, + alpha=0.7, + visible=False, + ) + annotations.append(ann) + + ax1.set_title(f"{symbol} Stock Price Prediction") + ax1.set_xlabel("Date") + ax1.set_ylabel("Price") + ax1.legend() + + # Add hover annotation + hover_annot = ax1.annotate( + "", + xy=(0, 0), + xytext=(10, 10), + textcoords="offset points", + bbox=dict(boxstyle="round", fc="w"), + arrowprops=dict(arrowstyle="->"), + ) + hover_annot.set_visible(False) + + def update_hover_annot(event): + vis = hover_annot.get_visible() + if event.inaxes == ax1: + x, y = event.xdata, event.ydata + date = num2date(x).strftime("%Y-%m-%d") + hover_annot.xy = (x, y) + hover_annot.set_text(f"Date: {date}\nPrice: ${y:.2f}") + hover_annot.set_visible(True) + fig.canvas.draw_idle() + elif vis: + hover_annot.set_visible(False) + fig.canvas.draw_idle() + + # Connect the hover event + fig.canvas.mpl_connect("motion_notify_event", update_hover_annot) + + # Add zoom event handler + def on_zoom(event): + ax1 = event.inaxes + if ax1 is None: + return + xlim = ax1.get_xlim() + ylim = ax1.get_ylim() + + # Calculate the zoom level based on the x-axis range + zoom_level = (plot_data.index[-1] - plot_data.index[0]).days / ( + xlim[1] - xlim[0] + ).days + + # Adjust annotation visibility based on zoom level + for ann in annotations: + ann.set_visible( + zoom_level > 5 + ) # Show annotations when zoomed in more than 5x + + fig.canvas.draw_idle() + + # Connect the zoom event handler + fig.canvas.mpl_connect("motion_notify_event", on_zoom) # Model performance summary table - plt.subplot(3, 1, 2) - plt.axis('off') - table = plt.table(cellText=stats_df.values, - colLabels=stats_df.columns, - cellLoc='center', - loc='center') + ax2.axis("off") + table = ax2.table( + cellText=stats_df.values, + colLabels=stats_df.columns, + cellLoc="center", + loc="center", + ) table.auto_set_font_size(False) table.set_fontsize(9) table.scale(1, 1.5) - + # Lower the title and add more space between plot and table - plt.title('Model Performance Summary', pad=60) + ax2.set_title("Model Performance Summary", pad=60) # Implement trading strategy - strategy_returns = implement_trading_strategy(plot_data['Close'].values, ensemble_predictions.flatten()) - strategy_sharpe_ratio = np.mean(strategy_returns) / np.std(strategy_returns) * np.sqrt(252) + strategy_returns = implement_trading_strategy( + plot_data["Close"].values, ensemble_predictions.flatten() + ) + strategy_sharpe_ratio = ( + np.mean(strategy_returns) / np.std(strategy_returns) * np.sqrt(252) + ) print(f"Trading Strategy Sharpe Ratio: {strategy_sharpe_ratio:.4f}") # Calculate cumulative returns of the trading strategy cumulative_returns = (1 + strategy_returns).cumprod() - 1 # Add new subplot for trading strategy performance - plt.subplot(3, 1, 3) - plt.plot(plot_data.index[-len(cumulative_returns):], cumulative_returns, label='Strategy Cumulative Returns', color='purple') - plt.title(f'{symbol} Trading Strategy Performance') - plt.xlabel('Date') - plt.ylabel('Cumulative Returns') - plt.legend() + ax3.plot( + plot_data.index[-len(cumulative_returns) :], + cumulative_returns, + label="Strategy Cumulative Returns", + color="purple", + ) + ax3.set_title(f"{symbol} Trading Strategy Performance") + ax3.set_xlabel("Date") + ax3.set_ylabel("Cumulative Returns") + ax3.legend() # Add strategy Sharpe ratio as text on the plot - plt.text(0.05, 0.95, f'Strategy Sharpe Ratio: {strategy_sharpe_ratio:.4f}', - transform=plt.gca().transAxes, verticalalignment='top') + ax3.text( + 0.05, + 0.95, + f"Strategy Sharpe Ratio: {strategy_sharpe_ratio:.4f}", + transform=ax3.transAxes, + verticalalignment="top", + ) plt.tight_layout() - plt.savefig(f'{symbol}_prediction_with_stats_and_strategy.png', dpi=300, bbox_inches='tight') - print(f"Plot with statistics and strategy performance saved as '{symbol}_prediction_with_stats_and_strategy.png'") + plt.savefig( + f"{symbol}_prediction_with_stats_and_strategy.png", dpi=300, bbox_inches="tight" + ) + print( + f"Plot with statistics and strategy performance saved as '{symbol}_prediction_with_stats_and_strategy.png'" + ) plt.show() print(f"\nFuture predictions for the next {future_days} days:") @@ -487,35 +721,62 @@ def analyze_and_predict_stock(symbol, start_date, end_date, future_days=30, supp print("\nAnalysis and prediction completed successfully.") + # Parse command-line arguments def parse_arguments(): - parser = argparse.ArgumentParser(description='Stock Price Prediction and Analysis Tool') - - parser.add_argument('-s', '--symbol', type=str, default='MSFT', - help='Stock symbol to analyze (default: MSFT)') - - parser.add_argument('-sd', '--start_date', type=str, default='2018-01-01', - help='Start date for historical data (default: 2018-01-01)') - - parser.add_argument('-fd', '--future_days', type=int, default=30, - help='Number of days to predict into the future (default: 30)') - - parser.add_argument('-q', '--quick_test', action='store_true', - help='Run in quick test mode (default: False)') - - parser.add_argument('-sw', '--suppress_warnings', action='store_true', - help='Suppress warnings (default: False)') + parser = argparse.ArgumentParser( + description="Stock Price Prediction and Analysis Tool" + ) + + parser.add_argument( + "-s", + "--symbol", + type=str, + default="MSFT", + help="Stock symbol to analyze (default: MSFT)", + ) + + parser.add_argument( + "-sd", + "--start_date", + type=str, + default="2018-01-01", + help="Start date for historical data (default: 2018-01-01)", + ) + + parser.add_argument( + "-fd", + "--future_days", + type=int, + default=30, + help="Number of days to predict into the future (default: 30)", + ) + + parser.add_argument( + "-q", + "--quick_test", + action="store_true", + help="Run in quick test mode (default: False)", + ) + + parser.add_argument( + "-sw", + "--suppress_warnings", + action="store_true", + help="Suppress warnings (default: False)", + ) args = parser.parse_args() # Validate start_date try: - datetime.strptime(args.start_date, '%Y-%m-%d') + datetime.strptime(args.start_date, "%Y-%m-%d") except ValueError: parser.error("Incorrect start date format, should be YYYY-MM-DD") return args + # Main execution block if __name__ == "__main__": # Parse command-line arguments @@ -535,6 +796,11 @@ if __name__ == "__main__": print(f"Warnings suppressed: {'Yes' if suppress_warnings_flag else 'No'}") # Run the stock analysis and prediction - analyze_and_predict_stock(symbol, start_date, end_date, future_days, - suppress_warnings=suppress_warnings_flag, - quick_test=quick_test_flag) + analyze_and_predict_stock( + symbol, + start_date, + end_date, + future_days, + suppress_warnings=suppress_warnings_flag, + quick_test=quick_test_flag, + )