Implement anti-overfitting measures for stock prediction model

- Add walk-forward optimization for more realistic model evaluation
- Introduce robust feature selection using RFE
- Implement time series cross-validation for hyperparameter tuning
- Adjust ensemble weighting based on out-of-sample performance
- Add L1/L2 regularization to LSTM and GRU models
- Implement simple data augmentation for time series
- Enhance feature engineering with additional relevant indicators

These changes aim to reduce overfitting and improve the model's
generalization to unseen data, resulting in more reliable predictions.
This commit is contained in:
tcsenpai 2024-09-11 00:14:30 +02:00
parent 40ff61ae11
commit 9bac92702f

View File

@ -20,7 +20,8 @@ from tabulate import tabulate
from scipy.stats import randint, uniform
import sklearn.base
import argparse
from sklearn.feature_selection import SelectKBest, f_regression
from sklearn.feature_selection import SelectKBest, f_regression, RFE
from tensorflow.keras.regularizers import l1_l2
# Suppress warnings and TensorFlow logging
def suppress_warnings_method():
@ -70,6 +71,10 @@ def add_technical_indicators(data):
data['ATR'] = ta.volatility.average_true_range(data['High'], data['Low'], data['Close'])
data['ADX'] = ta.trend.adx(data['High'], data['Low'], data['Close'])
data['Stoch_K'] = ta.momentum.stoch(data['High'], data['Low'], data['Close'])
data['Volatility'] = data['Close'].rolling(window=20).std()
data['Price_Change'] = data['Close'].pct_change()
data['Volume_Change'] = data['Volume'].pct_change()
data['High_Low_Range'] = (data['High'] - data['Low']) / data['Close']
return data
# Prepare data for model training by scaling and creating sequences
@ -90,9 +95,9 @@ def prepare_data(data, look_back=60):
# Create an LSTM model for time series prediction
def create_lstm_model(input_shape):
model = Sequential([
LSTM(units=64, return_sequences=True, input_shape=input_shape),
LSTM(units=32),
Dense(units=16, activation='relu'),
LSTM(units=64, return_sequences=True, input_shape=input_shape, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
LSTM(units=32, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
Dense(units=16, activation='relu', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
Dense(units=1)
])
model.compile(optimizer='adam', loss='mean_squared_error')
@ -101,9 +106,9 @@ def create_lstm_model(input_shape):
# Create a GRU model for time series prediction
def create_gru_model(input_shape):
model = Sequential([
GRU(units=64, return_sequences=True, input_shape=input_shape),
GRU(units=32),
Dense(units=16, activation='relu'),
GRU(units=64, return_sequences=True, input_shape=input_shape, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
GRU(units=32, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
Dense(units=16, activation='relu', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
Dense(units=1)
])
model.compile(optimizer='adam', loss='mean_squared_error')
@ -111,47 +116,32 @@ def create_gru_model(input_shape):
# Train and evaluate a model using time series cross-validation
def train_and_evaluate_model(model, X, y, n_splits=5, model_name="Model"):
predictions, true_values = walk_forward_validation(X, y, model, n_splits)
score = r2_score(true_values, predictions)
return score, 0, score, predictions # Return the same score for consistency
def walk_forward_validation(X, y, model, n_splits=5):
tscv = TimeSeriesSplit(n_splits=n_splits)
scores = []
oof_predictions = np.zeros_like(y)
all_predictions = []
all_true_values = []
with tqdm(total=n_splits, desc=f"Training {model_name}", leave=False) as pbar:
for fold, (train_index, val_index) in enumerate(tscv.split(X), 1):
X_train, X_val = X[train_index], X[val_index]
y_train, y_val = y[train_index], y[val_index]
if isinstance(model, (RandomForestRegressor, XGBRegressor)):
X_train_2d = X_train.reshape(X_train.shape[0], -1)
X_val_2d = X_val.reshape(X_val.shape[0], -1)
cloned_model = sklearn.base.clone(model)
cloned_model.fit(X_train_2d, y_train)
val_pred = []
for i in range(len(X_val_2d)):
pred = cloned_model.predict(X_val_2d[i:i+1])
val_pred.append(pred[0])
oof_predictions[val_index] = val_pred
elif isinstance(model, Sequential):
cloned_model = keras_clone_model(model)
cloned_model.compile(optimizer='adam', loss='mean_squared_error')
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
cloned_model.fit(X_train, y_train, epochs=100, batch_size=32, validation_data=(X_val, y_val), verbose=0,
callbacks=[TqdmProgressCallback(100, f"{model_name} Epoch {fold}/{n_splits}"), early_stopping])
# Predict one step ahead for each sample in the validation set
val_pred = []
for i in range(len(X_val)):
pred = cloned_model.predict(X_val[i:i+1])
val_pred.append(pred[0][0])
oof_predictions[val_index] = val_pred
else:
raise ValueError(f"Unsupported model type: {type(model)}")
score = r2_score(y_val, val_pred)
scores.append(score)
pbar.update(1)
for train_index, test_index in tscv.split(X):
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = y[train_index], y[test_index]
if isinstance(model, (RandomForestRegressor, XGBRegressor)):
X_train_2d = X_train.reshape(X_train.shape[0], -1)
X_test_2d = X_test.reshape(X_test.shape[0], -1)
model.fit(X_train_2d, y_train)
predictions = model.predict(X_test_2d)
elif isinstance(model, Sequential):
model.fit(X_train, y_train, epochs=100, batch_size=32, verbose=0)
predictions = model.predict(X_test).flatten()
all_predictions.extend(predictions)
all_true_values.extend(y_test)
overall_score = r2_score(y, oof_predictions)
return np.mean(scores), np.std(scores), overall_score, oof_predictions
return np.array(all_predictions), np.array(all_true_values)
# Make predictions using an ensemble of models
def ensemble_predict(models, X):
@ -234,9 +224,10 @@ def tune_random_forest(X, y, quick_test=False):
# Initialize Random Forest model
rf = RandomForestRegressor(random_state=42)
# Perform randomized search for best parameters
tscv = TimeSeriesSplit(n_splits=5)
tscv = TimeSeriesSplit(n_splits=3 if quick_test else 5)
rf_random = RandomizedSearchCV(estimator=rf, param_distributions=param_dist,
n_iter=n_iter, cv=tscv,
scoring='neg_mean_squared_error', # Change to MSE
verbose=2, random_state=42, n_jobs=-1)
rf_random.fit(X.reshape(X.shape[0], -1), y)
print(f"Best Random Forest parameters: {rf_random.best_params_}")
@ -267,9 +258,10 @@ def tune_xgboost(X, y, quick_test=False):
# Initialize XGBoost model
xgb = XGBRegressor(random_state=42)
# Perform randomized search for best parameters
tscv = TimeSeriesSplit(n_splits=5)
tscv = TimeSeriesSplit(n_splits=3 if quick_test else 5)
xgb_random = RandomizedSearchCV(estimator=xgb, param_distributions=param_dist,
n_iter=n_iter, cv=tscv,
scoring='neg_mean_squared_error', # Change to MSE
verbose=2, random_state=42, n_jobs=-1)
xgb_random.fit(X.reshape(X.shape[0], -1), y)
print(f"Best XGBoost parameters: {xgb_random.best_params_}")
@ -288,25 +280,31 @@ def implement_trading_strategy(actual_prices, predicted_prices, threshold=0.01):
returns.append(position * actual_return)
return np.array(returns)
def select_features(X, y, k=10):
selector = SelectKBest(score_func=f_regression, k=k)
X_selected = selector.fit_transform(X.reshape(X.shape[0], -1), y)
selected_features = selector.get_support(indices=True)
def select_features_rfe(X, y, n_features_to_select=10):
if isinstance(X, np.ndarray) and len(X.shape) == 3:
X_2d = X.reshape(X.shape[0], -1)
else:
X_2d = X
rfe = RFE(estimator=RandomForestRegressor(random_state=42), n_features_to_select=n_features_to_select)
X_selected = rfe.fit_transform(X_2d, y)
selected_features = rfe.support_
return X_selected, selected_features
def calculate_ensemble_weights(models, X_val, y_val):
def calculate_ensemble_weights(models, X, y):
weights = []
for name, model in models:
if isinstance(model, (RandomForestRegressor, XGBRegressor)):
pred = model.predict(X_val.reshape(X_val.shape[0], -1))
elif isinstance(model, Sequential):
pred = model.predict(X_val)
else:
raise ValueError(f"Unsupported model type: {type(model)}")
score = r2_score(y_val, pred)
_, _, score, _ = train_and_evaluate_model(model, X, y, n_splits=5, model_name=name)
weights.append(max(score, 0)) # Ensure non-negative weights
return [w / sum(weights) for w in weights] # Normalize weights
def augment_data(X, y, noise_level=0.01):
X_aug = X.copy()
y_aug = y.copy()
noise = np.random.normal(0, noise_level, X.shape)
X_aug += noise
return X_aug, y_aug
# Main function to analyze stock data and make predictions
def analyze_and_predict_stock(symbol, start_date, end_date, future_days=30, suppress_warnings=False, quick_test=False):
# Suppress warnings if flag is set
@ -324,7 +322,7 @@ def analyze_and_predict_stock(symbol, start_date, end_date, future_days=30, supp
# Use only the last 100 data points for quick testing
data = data.tail(100)
features = ['Close', 'Volume', 'SMA_20', 'SMA_50', 'RSI', 'MACD', 'BB_upper', 'BB_middle', 'BB_lower']
features = ['Close', 'Volume', 'SMA_20', 'SMA_50', 'RSI', 'MACD', 'BB_upper', 'BB_middle', 'BB_lower', 'Volatility', 'Price_Change', 'Volume_Change', 'High_Low_Range']
X, y, scaler = prepare_data(data[features])
# Split data into training and testing sets