Machine Learning and Stochastic Models for Predicting FOMC Meetings' Impact
In this article, we will use stochastic financial modeling and machine learning to estimated the impact of the next (31st July 2024) Federal Open Market Committee (FOMC) announcement on the market.
Specifically, we will implement a Heston Stochastic Volatility Model and a Ornstein-Uhlenbeck model, to simulate market movements and analyze the fictitious outcomes.
To give us an edge with the simulations, we utilize machine learning techniques in the form of Random Forests and other ensembles to supply probability distributions to our financial models.
Heston Stochastic Volatility Model
The Heston model is a popular stochastic volatility model used to describe the evolution of an underlying asset with stochastic volatility. The model assumes that the underlying asset price St and its variance Vt follow the stochastic differential equations:
where:
These 2 stochastic differential equations (SDE) describe how the price and variance evolve continuously over time, though it needs to be discretized to be applied in a numerical solution.
Discretized Heston Model
Using the Euler-Maruyama method to discretize the model:
Where:
Same for the variance equation:
Where:
in both equations dt is the timestep, calculated as:
Where:
The random variables Zt1 and Zt2 are correlated with correlation coefficient ρ:
Where:
Jumps with Poisson Process
We are simulating market impact, therefore there will be a shock in the Heston process which we should simulate using Poisson. The Poisson process is used to model the occurrence of random events over time:
The number of events N(t) occurring up to time t follows the Poisson distribution:
where k represents the number of events (or jumps) that occur up to time t, and λ is the frequency of jumps
In our Heston model, we include 2 new parameters:
def heston_model(X0, v0, mu, kappa, gamma, sigma_variance, rho, T, N, pos_jump_proba=0.05, neg_jump_proba=0.05, jump_mean=0.005, jump_std=0.015):
dt = T / N
X = np.zeros(N+1)
v = np.zeros(N+1)
X[0] = X0
v[0] = v0
pos_lambda = pos_jump_proba / dt
neg_lambda = neg_jump_proba / dt
for t in range(1, N+1):
Gt1 = np.random.normal(0, 1)
Gt2 = np.random.normal(0, 1)
Zt1 = Gt1
Zt2 = rho * Gt1 + np.sqrt(1 - rho**2) * Gt2
v[t] = v[t-1] + kappa * (gamma - v[t-1]) * dt + sigma_variance * np.sqrt(v[t-1]) * np.sqrt(dt) * Zt2
v[t] = max(v[t], 0) # Ensure variance stays non-negative
X[t] = X[t-1] * np.exp((mu - 0.5 * v[t-1]) * dt + np.sqrt(v[t-1] * dt) * Zt1)
# Convert into rates per unit time interval
pos_jump_occurs = np.random.poisson(pos_lambda) > 0
neg_jump_occurs = np.random.poisson(neg_lambda) > 0
if pos_jump_occurs:
pos_jump_size = np.random.normal(jump_mean, jump_std)
X[t] *= np.exp(pos_jump_size)
elif neg_jump_occurs:
neg_jump_size = np.random.normal(jump_mean, jump_std)
X[t] *= np.exp(-neg_jump_size)
# Don't return X0 and V0
return X[1:], v[1:]
Let's do some tests to see that our Heston gives sensible results:
X0 = 100 # Initial stock price
variance0 = 0.04 # Initial variance (square of initial volatility)
mu = 0.05 # Drift of stock price
kappa = 1.0 # Speed of mean reversion of variance
theta = 0.04 # Long run average variance
sigma_variance = 0.2 # Volatility of variance
rho = -0.7 # Correlation between the two Wiener processes
T = 5 / 252 # Time horizon (1 week, represented as 5/252 years)
N = 5 # Number of time steps (5 trading days)
# Simulate without shocks
X, variance = heston_model(
X0, variance0, mu, kappa, theta, sigma_variance, rho, T, N
)
X, variance
(array([100.71199004, 100.34698581, 101.12159392, 103.03590835,
106.83560336]),
array([0.04037841, 0.04207356, 0.04693393, 0.03937224, 0.03522911]))
FOMC and Macro Datasets
Note the probabilities for jumps in the Heston parameters. We need to supply those through historic data. Our aim is to use machine learning (ML) to output such probabilities.
Through various data repositories, we built a dataset for our simulations which you can download.
We did feature engineering to create various interaction features in the python below:
macro_df = pd.read_csv(f"{DATA_PATH}/FOMC_dataset.csv")
macro_df['Date'] = pd.to_datetime(macro_df['Date'])
macro_df['Year'] = macro_df['Date'].dt.year
macro_df['Month'] = macro_df['Date'].dt.month
macro_df['Quarter'] = macro_df['Date'].dt.quarter
macro_df['Month_Sin'] = np.sin(2 * np.pi * macro_df['Month'] / 12)
macro_df['Month_Cos'] = np.cos(2 * np.pi * macro_df['Month'] / 12)
macro_df['YoY Industry Production'] = macro_df['Industry Production'].pct_change(periods=12).bfill()
macro_df['YoY PCE'] = macro_df['PCE'].pct_change(periods=12).bfill()
macro_df['YoY Retail Sales'] = macro_df['Retail Sales'].pct_change(periods=12).bfill()
macro_df['YoY Unemployment'] = macro_df['Unemployment'].pct_change(periods=12).bfill()
macro_df['YoY Wage Increase'] = macro_df['Wage Increase'].pct_change(periods=12).bfill()
macro_df['YoY Home Sales'] = macro_df['Home Sales'].pct_change(periods=12).bfill()
macro_df['YoY Retail Trade'] = macro_df['Retail Trade'].pct_change(periods=12).bfill()
macro_df['YoY Real GDP'] = macro_df['Real GDP'].pct_change(periods=12).bfill()
macro_df['MoM Industry Production'] = macro_df['Industry Production'].pct_change(periods=1).bfill()
macro_df['MoM PCE'] = macro_df['PCE'].pct_change(periods=1).bfill()
macro_df['MoM Retail Sales'] = macro_df['Retail Sales'].pct_change(periods=1).bfill()
macro_df['MoM Unemployment'] = macro_df['Unemployment'].pct_change(periods=1).bfill()
macro_df['MoM Wage Increase'] = macro_df['Wage Increase'].pct_change(periods=1).bfill()
macro_df['MoM Home Sales'] = macro_df['Home Sales'].pct_change(periods=1).bfill()
macro_df['MoM Retail Trade'] = macro_df['Retail Trade'].pct_change(periods=1).bfill()
macro_df['MoM Real GDP'] = macro_df['Real GDP'].pct_change(periods=1).bfill()
macro_df['CPI_Unemployment'] = macro_df['CPI'] * macro_df['Unemployment']
macro_df['CPI_Industry Production'] = macro_df['CPI'] * macro_df['Industry Production']
lags = [1, 3, 6, 12]
for lag in lags:
macro_df[f'Industry Production Lag_{lag}'] = macro_df['Industry Production'].shift(lag).bfill()
macro_df[f'PCE Lag_{lag}'] = macro_df['PCE'].shift(lag).bfill()
macro_df[f'Retail Sales Lag_{lag}'] = macro_df['Retail Sales'].shift(lag).bfill()
macro_df[f'Unemployment Lag_{lag}'] = macro_df['Unemployment'].shift(lag).bfill()
macro_df[f'Wage Increase Lag_{lag}'] = macro_df['Wage Increase'].shift(lag).bfill()
macro_df[f'Home Sales Lag_{lag}'] = macro_df['Home Sales'].shift(lag).bfill()
macro_df[f'Retail Trade Lag_{lag}'] = macro_df['Retail Trade'].shift(lag).bfill()
macro_df[f'Real GDP Lag_{lag}'] = macro_df['Real GDP'].shift(lag).bfill()
macro_df.sort_values(by="Date", ascending=True, inplace=True)
FEATURES = [
"CPI", "Industry Production", "PCE", "Retail Sales", "Unemployment",
"Wage Increase", "Prior Fed Rate", "Home Sales", "Retail Trade",
"Real GDP", "Month", "Month_Sin", "Month_Cos", "Prior CPI", "Inflation",
"YoY Industry Production", "YoY PCE", "YoY Retail Sales", "YoY Unemployment",
"YoY Wage Increase", "YoY Home Sales", "YoY Retail Trade", "YoY Real GDP",
"MoM Industry Production", "MoM PCE", "MoM Retail Sales", "MoM Unemployment",
"MoM Wage Increase", "MoM Home Sales", "MoM Retail Trade", "MoM Real GDP",
"CPI_Unemployment", "CPI_Industry Production",
"Industry Production_Quarterly_Avg", "PCE_Quarterly_Avg", "Retail Sales_Quarterly_Avg",
"Unemployment_Quarterly_Avg", "Wage Increase_Quarterly_Avg", "Home Sales_Quarterly_Avg",
"Retail Trade_Quarterly_Avg", "Real GDP_Quarterly_Avg"
]
print(f"Shape: {macro_df.shape}")
macro_df[FEATURES].tail(5)
Shape: (2492, 105)
Machine Learning for Probabilities
We have a binary classification problem, with 2 classes:
From our macro data we will engineer our label by comparing the current Fed Rate with the next time step's Fed Rate:
macro_df['Fed Rate Increase'] = ((macro_df['Fed Rate'] - macro_df['Prior Fed Rate']) > 0).astype(int) # our label
counts = macro_df['Fed Rate Increase'].value_counts()
counts
Fed Rate Increase
0 1270
1 1222
Name: count, dtype: int64
Note there is a slight imbalance between meetings that led to increases and keeping the status quo, for this we will use Synthetic Minority Oversampling Technique (SMOTE) to balance them out again.
The dataset has 2492 rows and 105, statistical or tree classifiers perform well on such small datasets. We will run some trails with a subset of such models, using out of the box parameters and stratified cross validation.
The models are:
Below is the python code to scale the data, cross validate and test these:
import json
import os
import pandas as pd
import joblib
from sklearn.naive_bayes import GaussianNB
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score
from sklearn.metrics import accuracy_score, recall_score, f1_score, precision_score
from sklearn.utils.class_weight import compute_sample_weight
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE
X = macro_df[FEATURES]
y = macro_df['Fed Rate Increase']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
smote = SMOTE(random_state=42)
X_train, y_train = smote.fit_resample(X_train, y_train)
sample_weights = compute_sample_weight(class_weight='balanced', y=y_train)
models = {
"Naive Bayes": GaussianNB(),
"SVM": SVC(probability=True),
"Logistic Regression": LogisticRegression(max_iter=10000),
"Decision Tree": DecisionTreeClassifier(),
"Random Forest": RandomForestClassifier(),
"Gradient Boosting": GradientBoostingClassifier()
}
best_models = {}
metrics = []
if os.path.exists(f"{MODEL_PATH}/fomc_all_metrics.json"):
metrics_df = pd.read_json(f"{MODEL_PATH}/fomc_all_metrics.json")
else:
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
for model_name, model in tqdm(models.items(), desc="Training Models..."):
model.fit(X_train_scaled, y_train, sample_weight=sample_weights)
best_models[model_name] = model
y_pred = model.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted')
recall = recall_score(y_test, y_pred, average='weighted')
f1 = f1_score(y_test, y_pred, average='weighted')
X_scaled = scaler.fit_transform(X) # Scale the entire dataset for cross-validation
cv_scores = cross_val_score(model, X_scaled, y, cv=cv, scoring='precision_weighted')
cv_mean = cv_scores.mean()
metrics.append({
"Model": model_name,
"Accuracy": accuracy,
"Precision": precision,
"Recall": recall,
"F1 Score": f1,
"CV Score": cv_mean
})
metrics_df = pd.DataFrame(metrics)
metrics_df.to_json(f"{MODEL_PATH}/fomc_all_metrics.json")
metrics_df = metrics_df.sort_values(by=["CV Score", "Precision"], ascending=False)
metrics_df
The top performing algo was decision trees, no surprise there, this algo are excellent for tabular data.
Fine-Tune the Classifier
Now we will sampling the models hyperparameters using the bayes optimization library to find the right parameters.
The library will create gaussian distribution to maximize the function we will provide it, and using exploration and exploitation will find the upper limits of the parameters. The problem of this library is that it only works with continuous parameters, and discrete or categorical parameters will need the more traditional gridsearch.
It's also fast!
from bayes_opt import BayesianOptimization
from bayes_opt.logger import JSONLogger
from bayes_opt.event import Events
from bayes_opt.util import load_logs
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.metrics import accuracy_score, recall_score, f1_score, roc_auc_score, precision_score, roc_curve
import os
import joblib
import json
from sklearn.preprocessing import StandardScaler
def max_function(n_estimators, max_depth, min_samples_split, min_samples_leaf, max_features, sample_weights=None):
model = RandomForestClassifier(
n_estimators=int(n_estimators),
max_depth=int(max_depth),
min_samples_split=int(min_samples_split),
min_samples_leaf=int(min_samples_leaf),
max_features=max_features,
random_state=42
)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(model, X_train, y_train, cv=cv, scoring='precision')
return scores.mean()
param_bounds = {
'n_estimators': (20, 1000),
'max_depth': (2, 30),
'min_samples_split': (2, 30),
'min_samples_leaf': (1, 20),
'max_features': (0.1, 0.9)
}
if os.path.exists(f"{MODEL_PATH}/fomc_model.joblib"):
best_model = joblib.load(f"{MODEL_PATH}/fomc_model.joblib")
scaler = joblib.load(f"{MODEL_PATH}/fomc_model_scaler.joblib")
with open(f"{MODEL_PATH}/fomc_model_params.json", 'r') as file:
best_params = json.load(file)
metrics_df = pd.read_json(f"{MODEL_PATH}/fomc_model_metrics.json")
y_pred = best_model.predict(X_test)
y_pred_proba = best_model.predict_proba(X_test)[:, 1]
else:
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
optimizer = BayesianOptimization(
f=max_function,
pbounds=param_bounds,
random_state=42,
verbose=2
)
if os.path.exists(f"{MODEL_PATH}/cv_logs.log"):
load_logs(optimizer, logs=[f"{MODEL_PATH}/cv_logs.log"])
optimizer.maximize(
init_points=8,
n_iter=15,
)
logger = JSONLogger(path=f"{MODEL_PATH}/cv_logs.log")
optimizer.subscribe(Events.OPTIMIZATION_STEP, logger)
best_params = optimizer.max['params']
best_params['n_estimators'] = int(best_params['n_estimators'])
best_params['max_depth'] = int(best_params['max_depth'])
best_params['min_samples_split'] = int(best_params['min_samples_split'])
best_params['min_samples_leaf'] = int(best_params['min_samples_leaf'])
best_model = RandomForestClassifier(
**best_params,
random_state=42
)
best_model.fit(X_train_scaled, y_train, sample_weight=sample_weights)
y_pred = best_model.predict(X_test_scaled)
y_pred_proba = best_model.predict_proba(X_test_scaled)[:, 1]
accuracy = accuracy_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
auc = roc_auc_score(y_test, y_pred_proba)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = cross_val_score(best_model, X_train_scaled, y_train, cv=cv, scoring='precision')
cv_mean = cv_scores.mean()
metrics = {
"Model": "Random Forest Classifier",
"Accuracy": accuracy,
"Recall": recall,
"Precision": precision,
"F1 Score": f1,
"AUC": auc,
"CV Score": cv_mean
}
metrics_df = pd.DataFrame([metrics])
with open(f"{MODEL_PATH}/fomc_model_params.json", 'w') as file:
json.dump(best_params, file)
metrics_df.to_json(f"{MODEL_PATH}/fomc_model_metrics.json")
joblib.dump(best_model, f"{MODEL_PATH}/fomc_model.joblib")
joblib.dump(scaler, f"{MODEL_PATH}/fomc_model_scaler.joblib")
print("Best model parameters:", best_params)
metrics_df
Best model parameters: {'max_depth': 28, 'max_features': 0.9, 'min_samples_leaf': 3, 'min_samples_split': 25, 'n_estimators': 447}
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, roc_curve
def classification_plots(X_test, y_test, best_model, scaler, metrics_df):
X_test_scaled = scaler.transform(X_test)
y_pred = best_model.predict(X_test_scaled)
y_pred_proba = best_model.predict_proba(X_test_scaled)[:, 1]
fig, ax = plt.subplots(1, 2, figsize=(12, 6))
# ROC Curve
fpr, tpr, _ = roc_curve(y_test, y_pred_proba)
auc = metrics_df['AUC'].iloc[-1]
ax[0].plot(fpr, tpr, label=f"Classifier (AUC={auc:.2f})")
ax[0].plot([0, 1], [0, 1], 'k--')
ax[0].set_xlabel('False Positive Rate')
ax[0].set_ylabel('True Positive Rate')
ax[0].set_title('ROC Curve')
ax[0].legend(loc='best')
# Confusion Matrix
cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['Status Quo', 'Rates Increase'])
disp.plot(cmap='Blues', ax=ax[1])
ax[1].set_title('Confusion Matrix')
plt.tight_layout()
plt.show()
classification_plots(X_test, y_test, best_model, scaler, metrics_df)
The confusion matrix and ROC curves show a worrisome story, there are quite a number of false positives and negatives. Our classifier has a 60% precision rate though, this is better than random chance but far from perfect.
FOMC outcomes involve large amount of data taken from many economic domains, so we are doing a best effort to estimate the outcome here.
Below we test the robustness of the classifier by perturbing it with gaussian noise:
def perturb_gaussiannoise(X, noise_level=0.01):
sigma = noise_level * np.std(X)
noise = np.random.normal(0, sigma, X.shape)
return X + noise
def test_robustness():
X_train_scaled = scaler.transform(X_train)
X_test_scaled = scaler.transform(X_test)
X_train_pert = perturb_gaussiannoise(X_train_scaled)
X_test_pert = perturb_gaussiannoise(X_test_scaled)
y_pred = best_model.predict(X_test_pert)
accuracy = accuracy_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
auc = roc_auc_score(y_test, y_pred_proba)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = cross_val_score(best_model, X_train_pert, y_train, cv=cv, scoring='precision', verbose=1)
cv_mean = cv_scores.mean()
perturb_metrics = {
"Model": "Gradient Boosting Classifier (Perturbed)",
"Accuracy": accuracy,
"Recall": recall,
"Precision": precision,
"F1 Score": f1,
"AUC": auc,
"CV Score": cv_mean
}
new_metrics_df = pd.concat([metrics_df, pd.DataFrame([perturb_metrics])], ignore_index=True)
return new_metrics_df
test_robustness()
The model is robust, with minor gaussian noise it had small degradation in the classification scores, but still better than random chance. What's even more worrying is that some scores improved, though slightly.
Finally, we have a look at what the model deems important: PCE, inflation and unemployment are economically sound, as this is what the Fed has been focusing during these times of inflation. Retail and GDP not so much, which might be relevant for the next FOMC (at the time of writing 31st July 2024) as there was a GDP surprise of 2.8 when the expected was 2.1.
importances = best_model.feature_importances_
indices = np.argsort(importances)[::-1]
feature_ranking = pd.DataFrame({
'Feature': [FEATURES[i] for i in indices],
'Importance': importances[indices]
})
sorted_feature_ranking = feature_ranking.sort_values(by="Importance", ascending=True)
top_features = sorted_feature_ranking.tail(5)
bottom_features = sorted_feature_ranking.head(5)
combined_df = pd.concat([bottom_features.reset_index(drop=True), top_features.reset_index(drop=True)], axis=1)
combined_df
Market Data
Now we get the market data, we will use the ETF SPY as proxy for the market, we won't use use the SnP index itself due to its large numbers which might skew the simulations.
import yfinance as yf
SPY_TICKER = "SPY" # SnP 500 Index
macro_df['FOMC Meeting'] = pd.to_datetime(macro_df['FOMC Meeting'], errors='coerce')
macro_df['FOMC Meeting'] = macro_df['FOMC Meeting'].dt.tz_localize(None)
filtered_meetings = macro_df[(macro_df['FOMC Meeting'] >= '2023-06-01') & (macro_df['FOMC Meeting'] <= '2024-08-01')]
filtered_meetings = filtered_meetings.dropna().drop_duplicates()
market_df = yf.download(SPY_TICKER, start='2020-01-01', end='2024-07-01', interval='1d')
market_df.index = market_df.index.tz_localize(None)
market_df.head(5)
Going back to the Heston, most of its parameters can sensibly be estimated from our data. With the exception of Kappa, Kappa is the speed of mean reversion and that needs its own model to estimate. For this we will use the Ornstein-Uhlenbeck.
Ornstein-Uhlenbeck (OU) Process
An Ornstein–Uhlenbeck process is defined by this SDE:
With the help of the It? lemma, we can find a continuous-time solution:
领英推荐
Its first two moments of its equilibrium distribution become:
As t→∞, E[xt] approaches μ, showing that the process reverts to the mean μ over time.
As t→∞,Var[xt] approaches σ22γ, indicating the long-term variance of the process.
Its half-life is defined as the (expected) time to return half way to its mean, or:
Substituting the expectation at time κ with the discretized form, its left-hand-side becomes:
Solving it with right-hand-side gives the half-life κ:
A large γ is a short half life, having strong mean-reversion effect.
We still have to find γ. Using the Euler-Maruyama method, for a small discrete time step Δt:
Rewriting for xt+1:
Simplifying the expression:
We can rewrite this equation in a linear regression form for 1 time step Δt=1:
where:
Using ordinary least squares (OLS) or any linear regression model, we can estimate the parameters α and β from the data. We ignore the error term ?. Once we have these estimates, we can solve for γ as follows:
python code below does that:
import scipy.stats as ss
from statsmodels.tsa.stattools import adfuller
def estimate_ou(ts_df):
# Check for stationarity
adf_test = adfuller(ts_df)
if adf_test[1] > 0.05:
print(f"Warning: Time series is not stationary. ADF p-value: {adf_test[1]}")
X = ts_df.values
XX = X[:-1]
YY = X[1:]
res = ss.linregress(XX, YY)
alpha = res.intercept
beta = res.slope
gamma = 1 - beta
mu = alpha / gamma
kappa = -np.log(2) / gamma
return kappa, gamma, mu
kappa, gamma, mu = estimate_ou(market_df['Close'])
print(f"Estimated kappa: {kappa}")
print(f"Estimated gamma: {gamma}")
print(f"Estimated mu: ${mu}")
print(f"Origin range ${market_df['Close'].iloc[0]} to ${market_df['Close'].iloc[-1]}")
Warning: Time series is not stationary. ADF p-value: 0.8332882090319528
Estimated kappa: -400.13097171342804
Estimated gamma: 0.0017323007454078665
Estimated mu: $520.4572418593111
Origin range $324.8699951171875 to $544.219970703125
The results don't make sense, because the market is not stationary and does not mean revert in the short term, and as we saw these past 4 years, can explode and trend up or down. Still we have our kappa of -400.
Mining Trading Days Around FOMC
To run simulations, we need the market data around the FOMC meetings, a week before and a week after when the shock down will wear out:
results = []
for i, row in filtered_meetings.iterrows():
meeting_date = row['FOMC Meeting']
start_date = meeting_date - pd.tseries.offsets.BDay(5)
end_date = meeting_date + pd.tseries.offsets.BDay(6)
spx_after_meeting = market_df.loc[start_date:end_date]
rets = spx_after_meeting['Close'].pct_change().bfill()
mean = rets.mean()
var = rets.rolling(window=2).var()
theta = var.mean()
sigma_variance = var.std()
rho = rets.corr(var)
result = {
'FOMOC Meeting Date': meeting_date,
'mean rets': mean,
'prices': spx_after_meeting['Close'].values,
'variance': var.mean(),
'theta': theta,
'sigma_variance': sigma_variance,
'rho': rho,
'shock': row['Fed Rate Increase']
}
# Add all features dynamically
for feature in FEATURES:
result[feature] = row[feature]
results.append(result)
all_fomc_data_df = pd.DataFrame(results)
all_fomc_data_df[["FOMOC Meeting Date", "mean rets", "variance", "theta", "sigma_variance", "rho", "shock"]].head(10)
Testing with Market Shock
let's select a FOMC where they last increased the rates, the one in 2023-06-14:
train_fomc = all_fomc_data_df.iloc[1:2]
train_fomc[["FOMOC Meeting Date", "mean rets", "variance", "theta", "sigma_variance", "rho", "shock"]]
test_fomc = all_fomc_data_df.iloc[2:3]
test_fomc[["FOMOC Meeting Date", "mean rets", "variance", "theta", "sigma_variance", "rho", "shock"]]
We dryrun the Heston, to see if the numbers make sense:
def get_params(market_df, test_fomc):
date_end = pd.to_datetime(test_fomc['FOMOC Meeting Date'].iloc[-1])
historic_df = market_df.loc[:date_end]
kappa, _, _ = estimate_ou(historic_df['Close'])
rets = historic_df['Close'].pct_change().bfill()
mean = rets.mean()
var = rets.rolling(window=20).var()
X0 = test_fomc['prices'].iloc[-1][0] if len(test_fomc['prices']) > 0 else None
v0 = var.iloc[-1]
mu = rets.iloc[-1]
gamma = var.mean()
sigma_variance = var.std()
rho = rets.corr(var)
T = 1 / 252
N = len(test_fomc['prices'].iloc[-1]) if len(test_fomc['prices']) > 0 else None
return X0, v0, mu, kappa, gamma, sigma_variance, rho, T, N
X0, v0, mu, kappa, gamma, sigma_variance, rho, T, N= get_params(market_df, test_fomc)
print(X0, v0, mu, kappa, theta, sigma_variance, rho, T)
X, variance = heston_model(X0, v0, mu, kappa, gamma, sigma_variance, rho, T, N)
comparison_df = pd.DataFrame([X, test_fomc['prices'].iloc[-1]], index=['X', 'Prices'])
comparison_df
Warning: Time series is not stationary. ADF p-value: 0.5662781779205335
455.20001220703125 2.3669774517605694e-05 0.00015371360408278356 -143.6544538111282 nan 0.0004657217433631542 0.03872615826319792 0.003968253968253968
Without hinting the Heston that there was a shock, it will random walk sideways.
To give the correct parameters for the shocks, we need to collect it from the historical data:
def calculate_fomcstats(df, target_date):
filtered_df = df[df['FOMOC Meeting Date'] < target_date]
mean_prices = []
std_prices = []
for prices in filtered_df['prices']:
returns = np.diff(prices) / prices[:-1]
mean_prices.append(np.mean(returns))
std_prices.append(np.std(returns))
mean = np.mean(mean_prices)
sigma = np.mean(std_prices)
return mean, sigma
calculate_fomcstats(all_fomc_data_df, pd.to_datetime(test_fomc['FOMOC Meeting Date'].iloc[-1]))
(0.0023299501398404, 0.0061881761869394605)
Monte Carlo Simulations
Here is where it will get interesting.
We have the Heston, we have the jumps mean and standard deviation throughout history and we have a classifier that can supply probabilities for the status quo (no or slight positive shocks) and a rate increase (negative shocks). Now we get a normal distribution of what might happen to SPY's price, and therefore the market by running a million different paths with a Monte Carlo.
The python code below simulates that:
from scipy.stats import norm, skew, kurtosis, shapiro
import seaborn as sns
def monte_carlo(X0, v0, mu, kappa, gamma, sigma_variance, rho, T, N , df = None, historic_df=None, model = None, scaler=None, features = FEATURES, num_simulations=100000):
final_prices = np.zeros(num_simulations)
final_variances = np.zeros(num_simulations)
jump_probas = None
jummp_mean = None
jump_std = None
if df is not None and model is not None and scaler is not None and historic_df is not None:
scaled_features = scaler.transform(df[features].head(1))
jump_probas = best_model.predict_proba(scaled_features)[0]
jummp_mean, jump_std = calculate_fomcstats(historic_df, pd.to_datetime(df['FOMOC Meeting Date'].iloc[-1]))
print(jump_probas) # 0: No change or less, 1: increase
for i in range(num_simulations):
if jump_probas is not None:
X, variance = heston_model(X0, v0, mu, kappa, gamma, sigma_variance, rho, T, N, pos_jump_proba=jump_probas[0], neg_jump_proba=jump_probas[1], jump_mean=jummp_mean, jump_std=jump_std)
else:
X, variance = heston_model(X0, v0, mu, kappa, gamma, sigma_variance, rho, T, N )
final_prices[i] = X[-1]
final_variances[i] = variance[-1]
results = pd.DataFrame({
'final_prices': final_prices,
'final_variances': final_variances
})
return results.describe(), results
def plot_results(results, start_price=None, end_price=None, fomc_date=None, high=None, low=None):
skew_credit = skew(results['final_prices'])
kurtosis_credit = kurtosis(results['final_prices'])
stat, p_value = shapiro(results['final_prices'])
normality_test_result = f'Stat={stat:.3f}, p-value={p_value:.3f}, skew={skew_credit:.2f}, kurt={kurtosis_credit:.2f}'
plt.figure(figsize=(12, 4))
sns.histplot(results['final_prices'], bins=50, kde=True, color='blue', edgecolor='black', alpha=0.2, stat='density', label='KDE')
if start_price is not None:
plt.axvline(start_price, color='green', linestyle='dashed', linewidth=2, label='Start Price', alpha=0.5)
if end_price is not None:
plt.axvline(end_price, color='red', linestyle='dashed', linewidth=2, label='End Price', alpha=0.5)
if high is not None:
plt.axvline(high, color='orange', linewidth=2, label='High Price', alpha=0.5)
if low is not None:
plt.axvline(low, color='cyan', linewidth=2, label='Low Price', alpha=0.5)
plt.axvline(results['final_prices'].mean(), color='yellow', linestyle='dashed', linewidth=2, label='Mean Price', alpha=0.5)
n_mu, n_std = norm.fit(results['final_prices'])
xmin, xmax = plt.xlim()
x = np.linspace(xmin, xmax, 100)
p = norm.pdf(x, n_mu, n_std)
plt.plot(x, p, 'k', linewidth=2, label='Normal PDF', alpha=0.5)
plt.title(f'{fomc_date} Distribution ({normality_test_result})')
plt.xlabel('Price')
plt.ylabel('Density')
plt.legend()
plt.show()
train_fomc = all_fomc_data_df.iloc[1:2]
test_fomc = all_fomc_data_df.iloc[2:3]
X0, v0, mu, kappa, gamma, sigma_variance, rho, T, N= get_params(market_df, test_fomc)
summary_statistics, results = monte_carlo(X0, variance0, mu, kappa, gamma, sigma_variance, rho, T, N, df=test_fomc, historic_df=all_fomc_data_df, model=best_model, scaler=scaler)
plot_results(results,
start_price=test_fomc['prices'].iloc[-1][0],
end_price=test_fomc['prices'].iloc[-1][-1],
high=test_fomc['prices'].iloc[-1].max(),
low=test_fomc['prices'].iloc[-1].min(),
fomc_date= pd.to_datetime(test_fomc['FOMOC Meeting Date'].iloc[-1]))
Warning: Time series is not stationary. ADF p-value: 0.5662781779205335
[0.71451023 0.28548977]
First simulation for the meeting on 2023-07-26 was a failure, mainly because the classifier signalled weakly the status quo (probabilities: [0.71451023 0.28548977]).
No Shock FOMC
The next FOMC meeting to be simulated is a recent one on the 2024-06-12.
In?[20]:
train_fomc = all_fomc_data_df.iloc[-3:-2]
test_fomc = all_fomc_data_df.iloc[-2:-1]
test_fomc
X0, v0, mu, kappa, gamma, sigma_variance, rho, T, N= get_params(market_df, test_fomc)
summary_statistics, results = monte_carlo(X0, variance0, mu, kappa, gamma, sigma_variance, rho, T, N, df=test_fomc, historic_df=all_fomc_data_df, model=best_model, scaler=scaler)
plot_results(results,
start_price=test_fomc['prices'].iloc[-1][0],
end_price=test_fomc['prices'].iloc[-1][-1],
high=test_fomc['prices'].iloc[-1].max(),
low=test_fomc['prices'].iloc[-1].min(),
fomc_date= pd.to_datetime(test_fomc['FOMOC Meeting Date'].iloc[-1]))
Warning: Time series is not stationary. ADF p-value: 0.8332121128508799
[0.91894777 0.08105223]
This simulation was perfect, with all price ranges being central to the historic.
FOMC 31st July 2024 Predictions
next_fomc = all_fomc_data_df.iloc[-1:]
X0, v0, mu, kappa, gamma, sigma_variance, rho, T, N= get_params(market_df, test_fomc)
X0 = 544.44 # SPY as 29th July 20024
N = 11
summary_statistics, results = monte_carlo(X0, variance0, mu, kappa, gamma, sigma_variance, rho, T, N, df=next_fomc, historic_df=all_fomc_data_df, model=best_model, scaler=scaler)
plot_results(results,
start_price=X0,
fomc_date= pd.to_datetime(next_fomc['FOMOC Meeting Date'].iloc[-1]))
Warning: Time series is not stationary. ADF p-value: 0.8332121128508799
[0.82887592 0.17112408]
summary_statistics
This is the distribution for this FOMC on the 31st July 2024. The classifier's predictions lean strongly to maintaining the status quo, with probabilities: [0.98792802 0.01207198] and the distribution has a slightly stronger positive skew, meaning the market should trend green on average around these 2 weeks, with SPY clustering around 550.92,andmightrangefrom540.21 (25%) to $561.38 (75%).
Conclusion
In this article we used a Heston stochastic process and a Random Forest Classifier to create distributions for what the next FOMC impact on the market might be, using the SPY ETF as a proxy.
We attempted to guess what will happen for the next meeting, on the 31st July 2024. Future readers are welcome to comment if we were close or far!
References
Github
Article and code available on Github
Kaggle notebook available here
Google Collab available here
Media
All media used (in the form of code or images) are either solely owned by me, acquired through licensing, or part of the Public Domain and granted use through Creative Commons License.
CC Licensing and Use
This work is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License.