Episode 4: Developing Anomaly Detection on AWS
Training, Deploying, and Integrating a Real-Time Anomaly Detection System?
In this final episode of our anomaly detection series, David and I take the last step in building a fully operational real-time anomaly detection pipeline. We focus on developing the machine learning model, deploying it within our pipeline, and integrating it with our Kafka-based log ingestion system. To complete the system, we will set up CloudWatch dashboards for real-time anomaly visualization and alerts, ensuring seamless monitoring and rapid response to unusual patterns.
Let’s get started!
Content:?
Train Anomaly Detection Models
As we explained in the previous episodes, we trained three models: Random Cut Forest (RCF), Seasonal Autoregressive Integrated Moving Average (SARIMA), and Support Vector Machine (SVM) to detect anomalies in log data and response times. Each of these models has a unique approach to identifying outliers, making them useful in different scenarios.
In this article, we’ll focus mainly on RCF model.
Before diving into model training, we must first prepare our dataset properly. This involves two key steps:
Feature Engineering
To build effective anomaly detection models, we must extract meaningful features that capture the underlying patterns in our log data. The key interactions between response time, log level, HTTP method, and request path can provide valuable insights into potential anomalies.
Since Random Cut Forest (RCF) operates on purely numerical data, we must transform categorical features like log_level, method, and path into numerical representations while preserving their impact on anomaly detection. Below is our approach to feature engineering:
1. Response Time (response_time)
2. Log Level (log_level)
3. HTTP Method (method)
4. Request Path (path)
By transforming these categorical features into numerical values and ensuring they capture meaningful relationships, we prepare the dataset for effective anomaly detection using RCF, SARIMA, and SVM.
Time-Based Features
Since log data contains timestamps, we can extract temporal patterns that may influence response times and anomaly detection. Network traffic, user activity, and system load often vary across different times of the day and days of the week. To capture these variations, we engineer the following features:
Hour of the Day (hour)
Day of the Week (day_of_week)
Weekend Indicator (is_weekend)
By integrating time-based features into our dataset, we enable our anomaly detection models to recognize cyclical patterns and contextual anomalies that might otherwise go unnoticed.
Data Standardization
To improve Random Cut Forest (RCF) anomaly detection, it's crucial to ensure that features are on a consistent scale. This prevents certain features with larger numerical ranges from dominating the model and distorting anomaly detection.
When detecting outliers, it's recommended to use standardization techniques like Z-score scaling instead of Min-Max scaling.
Min-Max Scaling
Z-Score Standardization (Preferred for RCF)
Split Data into Training and Testing Subsets
Before training the anomaly detection models, we need to split the dataset into training and testing subsets. The model learns patterns from training data and is tested on unseen data, which helps to evaluate how well the model generalizes to new log data.
# Split
split_index = int(len(prepared_data) * 0.8) # 80% for training
X_train = prepared_data[:split_index] # First 80% of the data
X_test = prepared_data[split_index:] # Last 20% of the data
Train Random Cut Forest Model?
Now that we have preprocessed and split our dataset, we can train the Random Cut Forest (RCF) model using Amazon SageMaker.
session = sagemaker.Session()
# specify general training job information
rcf = RandomCutForest(
role=execution_role,
instance_count=1,
instance_type="ml.m4.xlarge",
data_location=f"s3://{bucket}/{prefix}/",
output_path=f"s3://{bucket}{prefix}/output",
num_samples_per_tree=512,
num_trees=50,
)
rcf.fit(rcf.record_set(X_train))
Inference?
Now that we have trained the Random Cut Forest (RCF) model, the next step is to deploy an inference endpoint and run predictions on the test dataset.
rcf_inference = rcf.deploy(initial_instance_count=1, instance_type="ml.m5.xlarge")
initial_instance_count=1: Starts with one instance for inference.
instance_type="ml.m5.xlarge": This instance provides a good balance of memory and compute for real-time anomaly detection.
Configure Serialization & Deserialization
To interact with the inference endpoint, we set up:
CSVSerializer: Converts input data to a CSV format.
JSONDeserializer: Converts the model's JSON response back to a Python object.
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer
rcf_inference.serializer = CSVSerializer()
rcf_inference.deserializer = JSONDeserializer()
Now, we make predictions on X_test:
results = rcf_inference.predict(X_test)
Each result contains an anomaly score for a given input row.
Calculate Anomaly Threshold using Three-Sigma Rule
Now that we have the anomaly scores from the Random Cut Forest (RCF) model, we need to determine a threshold to classify data points as anomalies.
Understanding the Three-Sigma Rule
The Three-Sigma Rule states that:
Thus, the anomaly threshold is:
Threshold = mean + 3σ
Calculate the Threshold
We compute the mean (μ) and standard deviation (σ) of anomaly scores and set the threshold at three standard deviations above the mean.
def calc_threshold(scores):
from statistics import mean, stdev
score_mean = mean(scores)
score_std = stdev(scores)
score_max = max(scores)
threshold = score_mean + 3 * score_std
return threshold, score_mean, score_std
scores = [datum["score"] for datum in results["scores"]]
threshold, score_mean, score_std = features_eng.calc_threshold(scores)
Identify Anomalous Data Points
Now, we classify data points as anomalies if their score exceeds the threshold.
anomalies = df[df["score"] > threshold]
Visualize Anomalies?
Now that we have classified anomalies based on the Three-Sigma Rule, let's visualize how the response time and anomaly scores correlate.
We'll use Matplotlib to plot the response time on one axis and anomaly scores on the other. Detected anomalies will be highlighted in black.
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()
start, end = 0, len(test_df)
dataset_subset = test_df[start:end]
ax1.plot(dataset_subset["response_time"], color="C0", alpha=0.8)
ax2.plot(dataset_subset["score"], color="C1")
ax1.grid(which="major", axis="both")
ax1.set_ylabel("Response Time", color="C0")
ax2.set_ylabel("Anomaly Score", color="C1")
ax1.tick_params("y", colors="C0")
ax2.tick_params("y", colors="C1")
ax1.set_ylim(0, max(dataset_subset["response_time"])) # max = 24 hours
ax2.set_ylim(min(scores), 1.4 * max(scores))
fig.set_figwidth(20)
ax2.plot(anomalies.index, anomalies.score, "ko")
Interpretation
As we can see, spikes in response time often coincide with high anomaly scores, confirming that the model successfully identifies outliers.
With the trained model deployed as an endpoint, we can now integrate it into a real-time pipeline, where incoming log data is continuously processed, and the model is invoked to detect anomalies in real time.
Develop Real-Time Pipeline for Anomaly Detection
After setting up our infrastructure and training the Random Cut Forest model, we need to implement the complete data flow pipeline to enable real-time anomaly detection. This involves connecting all components to create a seamless flow from data ingestion to alerts.
The journey of our data begins with Kafka, where log events containing metrics like response time, log level, and API paths are published. Our EKS-deployed consumer pod continuously polls the Kafka topic using a defined consumer group ID. The design intentionally separates consumption from processing – the pod doesn't analyze the data itself but acts as an intelligent router.
When a message arrives, the pod leverages its IAM role (attached via the ServiceAccount) to securely invoke our primary Lambda function. This setup eliminates credential management within the cluster while maintaining a secure authentication chain between Kubernetes and AWS services.
The Lambda function serves as the processing nexus of our system. It first transforms the incoming log data into the feature format expected by our model, applying the same feature engineering techniques used during training – normalizing response times, encoding log levels, and transforming categorical values. Once prepared, the function invokes our SageMaker endpoint:
response = sagemaker_runtime.invoke_endpoint(
EndpointName=sagemaker_endpoint,
ContentType='text/csv',
Body=transformed_data
)
The SageMaker endpoint runs our deployed RCF model, which returns an anomaly score for each data point. The Lambda function then applies our three-sigma threshold to classify results:
if anomaly_score > threshold:
result = "High Outlier"
else:
result = "Normal"
After classification, the function follows a multi-destination pattern:
On an hourly schedule, our secondary Lambda processes messages from the SQS queue, performing batch analysis that generates trend reports and aggregated statistics. This two-tier approach optimizes for both real-time detection and cost-efficient batch processing.
The entire pipeline operates as a responsive detection system, capable of processing thousands of log entries per minute while maintaining low latency for critical anomaly identification.
Anomaly Detection in Action
When our system detects anomalies, it triggers a cascade of visibility and notification mechanisms designed for operational awareness and rapid response.
CloudWatch serves as our central monitoring hub, collecting metrics from both Lambda functions through custom metric filters. These filters convert the standardized log patterns into time-series metrics:
Our CloudWatch dashboard presents these metrics through multiple complementary visualizations:
The anomaly alarm triggers when the system detects more than five anomalies within a 15-minute window. This threshold was determined through analysis of historical patterns, balancing sensitivity with alert fatigue prevention. When triggered, SNS delivers notifications to the operations team via email.
During a recent test, the system successfully identified several anomaly patterns:
In each case, our anomaly detection system identified the issue before users reported problems, enabling proactive remediation. The combination of RCF's sensitivity to multivariate anomalies and our cloud-native pipeline architecture proved particularly effective at detecting subtle patterns that traditional threshold-based monitoring would miss.
Conclusion?
By integrating machine learning with event-driven processing and comprehensive monitoring, we've created a system that not only detects anomalies but provides the operational context needed to resolve them quickly. The complete data flow from Kafka through Lambda, SageMaker, and CloudWatch forms a resilient, scalable pipeline capable of protecting system health across our entire application landscape.
That wraps up our series on anomaly detection! We truly enjoyed the collaborative effort and the opportunity to share our experiences. We look forward to working together on future projects and bringing you more insights and lessons learned. Stay tuned!
Authors?
Noor Sabahi and David Van Ginneken
#AWS #MachineLearning #AnomalyDetection #InfrastructureAsCode #EDA #Terraform #CloudMonitoring #DataScience