Anomaly Detection in Python: Best Practices and Techniques
When writing previous articles related to detailed data analytics, such as this one, I often receive questions about?detecting and removing outliers.
For a?quick look analysis,?excluding the top and bottom percentile of a certain label?(similar to what was done in the above article) may be considered a valid approach. However, for a more detailed analysis, other methods of anomaly detection should be used instead. Here, I speak about a few commonly used methods of anomaly detection and demonstrate how they work using a specific example — the?Weight and Height dataset available on Kaggle. Full details of the analysis can be found?in this public Kaggle notebook.
Interquartile range (IQR) method
Probably, the?most common method?of outlier detection. For our dataset, it gives the following results (note for metric system users —here, the height is measured in inches and weight in pounds):
The clear drawback of the method is that it does not take into account potential correlations among the features (people’s height and weight, in this example), thus potentially discarding fully valid points.
Isolation forest method
Another common method uses an?isolation forest algorithm?to find the outliers. Here is an example of its?Python (sklearn’s) implementation:
Similar to the above example, this algorithm considers short/light and long/heavy people as potential outliers and does not specifically see a correlation between height and weight.
Local Outlier Factor (LOF) method
The results of applying?this method?look?very different from the previous two plots:
One-class Support Vector Machine (SVM) method
Here is what we see by applying the?One-class SVM method:
Notably, this method also identified some of the middle-weight persons as potential outliers.
Autoencoder method
Finally, let us use the power of a Deep Learning based method (autoencoders). Here is the?sample PyTorch implementation:
class Autoencoder(nn.Module):
def __init__(self, input_dim, hidden_dim):
super(Autoencoder, self).__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, 2*hidden_dim),
nn.ReLU(),
nn.Linear(2*hidden_dim, hidden_dim),
nn.ReLU()
)
self.decoder = nn.Sequential(
nn.Linear(hidden_dim, 2*hidden_dim),
nn.ReLU(),
nn.Linear(2*hidden_dim, input_dim),
nn.Sigmoid()
)
def forward(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
class OutlierDataset(Dataset):
def __init__(self, data):
self.data = torch.tensor(data, dtype=torch.float32)
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return self.data[idx]
def detect_outliers_autoencoder(dataframe, hidden_dim=16, num_epochs=10, batch_size=32):
# Convert dataframe to standard scaled numpy array
scaler = StandardScaler()
data = scaler.fit_transform(dataframe[FEATURE_COLUMNS])
# Create an outlier dataset
dataset = OutlierDataset(data)
# Split data into training and validation sets
val_split = int(0.2 * len(dataset))
train_set, val_set = torch.utils.data.random_split(dataset, [len(dataset) - val_split, val_split])
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size)
# Initialize the autoencoder model
input_dim = data.shape[1]
model = Autoencoder(input_dim, hidden_dim)
# Set the loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=3e-2, weight_decay=1e-8)
# Train the autoencoder
for epoch in range(num_epochs):
train_loss = 0.0
val_loss = 0.0
# Training
model.train()
for batch in train_loader:
# Zero the gradients
optimizer.zero_grad()
# Forward pass
outputs = model(batch)
# Compute the reconstruction loss
loss = criterion(outputs, batch)
# Backward pass and optimization
loss.backward()
optimizer.step()
train_loss += loss.item() * batch.size(0)
# Validation
model.eval()
with torch.no_grad():
for batch in val_loader:
outputs = model(batch)
loss = criterion(outputs, batch)
val_loss += loss.item() * batch.size(0)
train_loss /= len(train_set)
val_loss /= len(val_set)
print(f'Epoch {epoch+1}/{num_epochs}: Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
# Calculate reconstruction error for each data point
reconstructed = model(dataset.data)
mse_loss = nn.MSELoss(reduction='none')
error = torch.mean(mse_loss(reconstructed, dataset.data), dim=1)
# Define a threshold for outlier detection
threshold = np.percentile(error.detach().numpy(), 99)
# Create a boolean mask for outliers
outliers_mask = (error > threshold)
# Mark the outliers
dataframe[f'outliers_autoenc'] = (outliers_mask)
dataframe[f'outliers_autoenc'] = dataframe[f'outliers_autoenc'].astype(int)
return dataframe
# Detect outliers using the autoencoder method
df = detect_outliers_autoencoder(df)
resulting in the following outliers:
To conclude, let us not forget about?No Free Lunch?and stay “hungry” for the data collection method and domain knowledge. Indeed, even the fact that your favourite algorithm does the job well for the datasets of your choice does not guarantee it will ideally work with the new dataset.
I hope these results can be useful for you. In case of questions/comments, do not?hesitate to write in the comments below.