Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import torch
- import torch.nn as nn
- import torch.optim as optim
- import torch.nn.functional as F
- import numpy as np
- import pandas as pd
- from sklearn.preprocessing import StandardScaler
- device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cuda" if torch.cuda.is_available() else "cpu")
- class TrueMambaBlock(nn.Module):
- def __init__(self, d_model, rank=1):
- super().__init__()
- self.d_model = d_model
- # Structured A: low-rank + diagonal (HiPPO-inspired initialization)
- self.D = nn.Parameter(-torch.abs(torch.randn(d_model)))
- self.low_rank_U = nn.Parameter(torch.randn(d_model, rank))
- self.low_rank_V = nn.Parameter(torch.randn(rank, d_model))
- # Input and output projections (B, C)
- self.B = nn.Parameter(torch.randn(d_model))
- self.C = nn.Parameter(torch.randn(d_model))
- # Gating mechanism
- self.input_proj = nn.Linear(d_model, d_model)
- self.gate_proj = nn.Linear(d_model, d_model)
- self.activation = nn.Sigmoid()
- def compute_A(self):
- return torch.diag(self.D) + self.low_rank_U @ self.low_rank_V
- def compute_kernel(self, A, L):
- dt = 1.0
- kernel = []
- Ak = torch.eye(self.d_model, device=A.device)
- for _ in range(L):
- kernel.append((Ak @ self.B).unsqueeze(0))
- Ak = Ak @ (torch.eye(self.d_model, device=A.device) + dt * A)
- kernel = torch.cat(kernel, dim=0)
- return kernel
- def forward(self, x):
- batch_size, seq_len, d_model = x.shape
- u = self.input_proj(x)
- gate = self.activation(self.gate_proj(x))
- A = self.compute_A()
- kernel = self.compute_kernel(A, seq_len)
- u = u.permute(0, 2, 1)
- k = kernel.permute(1, 0).unsqueeze(1)
- y = F.conv1d(u, k, padding=seq_len - 1, groups=self.d_model)
- y = y[:, :, :seq_len]
- y = y.permute(0, 2, 1)
- y = gate * y
- y = self.C * y
- return y
- class MambaTabularModel(nn.Module):
- def __init__(self, input_dim, hidden_dim=64, output_dim=1):
- super().__init__()
- self.input_layer = nn.Linear(input_dim, hidden_dim)
- self.mamba_block = TrueMambaBlock(d_model=hidden_dim)
- self.output_layer = nn.Linear(hidden_dim, output_dim)
- def forward(self, x):
- x = F.relu(self.input_layer(x))
- x = x.unsqueeze(1)
- x = self.mamba_block(x)
- x = x.squeeze(1)
- return self.output_layer(x)
- def analyze_portfolio_mamba_backtest(df, window_years=5, test_years=1):
- independent_vars = ["MARKET_RETURN_ADJ", "SMB", "HML", "MOM",
- 'MARKET_RETURN_ADJ_lag1', 'SMB_lag1', 'HML_lag1', 'MOM_lag1',
- 'MARKET_RETURN_ADJ_lag2', 'SMB_lag2', 'HML_lag2', 'MOM_lag2',
- 'MARKET_RETURN_ADJ_rollmean_3', 'MARKET_RETURN_ADJ_rollstd_3',
- 'SMB_rollmean_3', 'SMB_rollstd_3', 'HML_rollmean_3', 'HML_rollstd_3',
- 'MOM_rollmean_3', 'MOM_rollstd_3', 'MARKET_RETURN_ADJ_rollmean_6',
- 'MARKET_RETURN_ADJ_rollstd_6', 'SMB_rollmean_6', 'SMB_rollstd_6',
- 'HML_rollmean_6', 'HML_rollstd_6', 'MOM_rollmean_6', 'MOM_rollstd_6',
- 'MARKET_RETURN_ADJ_x_SMB', 'MARKET_RETURN_ADJ_x_HML',
- 'MARKET_RETURN_ADJ_x_MOM', 'SMB_x_HML', 'SMB_x_MOM', 'HML_x_MOM',
- 'MARKET_RETURN_ADJ_squared', 'MARKET_RETURN_ADJ_cubed',
- 'SMB_squared', 'SMB_cubed', 'HML_squared', 'HML_cubed',
- 'MOM_squared', 'MOM_cubed', 'Month_sin', 'Month_cos', 'Quarter_sin', 'Quarter_cos']
- exclude = set(independent_vars + ["TRADEDATE", 'Year', 'Month', 'Quarter'])
- df = df.copy()
- df['TRADEDATE'] = pd.to_datetime(df['TRADEDATE'])
- df = df.sort_values('TRADEDATE')
- tradedates = df['TRADEDATE'].drop_duplicates().sort_values().tolist()
- window_months = window_years * 12
- test_months = test_years * 12
- results = []
- for start_idx in range(0, len(tradedates) - window_months - test_months + 1):
- train_start = tradedates[start_idx]
- train_end = tradedates[start_idx + window_months - 1]
- test_start = tradedates[start_idx + window_months]
- test_end = tradedates[start_idx + window_months + test_months - 1]
- train_df = df[(df['TRADEDATE'] >= train_start) & (df['TRADEDATE'] <= train_end)]
- test_df = df[(df['TRADEDATE'] >= test_start) & (df['TRADEDATE'] <= test_end)]
- target_cols = [col for col in df.columns if col not in exclude]
- expected_returns = {}
- trained_models = {}
- for target in target_cols:
- df_temp = pd.concat([train_df[independent_vars], train_df[target]], axis=1).dropna()
- if df_temp.empty:
- continue
- X = df_temp[independent_vars]
- y = df_temp[target]
- scaler = StandardScaler()
- X_scaled = scaler.fit_transform(X)
- X_tensor = torch.tensor(X_scaled, dtype=torch.float32, device=device)
- y_tensor = torch.tensor(y.values.reshape(-1, 1), dtype=torch.float32, device=device)
- model = MambaTabularModel(input_dim=X_tensor.shape[1]).to(device)
- if hasattr(torch, "compile") and device.type != "mps":
- model = torch.compile(model)
- optimizer = optim.Adam(model.parameters(), lr=0.05)
- loss_fn = nn.MSELoss()
- for _ in range(5):
- model.train()
- optimizer.zero_grad()
- loss = loss_fn(model(X_tensor), y_tensor)
- loss.backward()
- optimizer.step()
- model.eval()
- with torch.no_grad():
- predictions = model(X_tensor).cpu().numpy()
- expected_returns[target] = predictions.mean()
- trained_models[target] = model
- if not expected_returns:
- continue
- all_stocks = list(expected_returns.keys())
- mu = np.array([expected_returns[s] for s in all_stocks])
- train_returns_df = train_df[all_stocks].dropna()
- cov_matrix = train_returns_df.cov().values
- optimal_weights = optimize_portfolio(mu, cov_matrix)
- test_returns_df = test_df[all_stocks].dropna()
- if test_returns_df.empty:
- continue
- portfolio_returns = backtest_portfolio(test_returns_df, optimal_weights, all_stocks)
- portfolio_beta = 1.0
- risk_metrics = compute_risk_metrics(portfolio_returns, portfolio_beta)
- result_row = {
- 'Train Start': train_start,
- 'Train End': train_end,
- 'Test Start': test_start,
- 'Test End': test_end,
- }
- result_row.update(risk_metrics)
- for stock, weight in zip(all_stocks, optimal_weights):
- result_row[f'Weight_{stock}'] = weight
- results.append(result_row)
- return pd.DataFrame(results)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement