Episode 4: Developing Anomaly Detection on AWS

Episode 4: Developing Anomaly Detection on AWS

Training, Deploying, and Integrating a Real-Time Anomaly Detection System?

In this final episode of our anomaly detection series, David and I take the last step in building a fully operational real-time anomaly detection pipeline. We focus on developing the machine learning model, deploying it within our pipeline, and integrating it with our Kafka-based log ingestion system. To complete the system, we will set up CloudWatch dashboards for real-time anomaly visualization and alerts, ensuring seamless monitoring and rapid response to unusual patterns.

Let’s get started!

Content:?

  • Train Anomaly Detection Models.
  • Develop Real-Time Pipeline for Anomaly Detection
  • Anomaly Detection in Action
  • Conclusion?

Train Anomaly Detection Models

As we explained in the previous episodes, we trained three models: Random Cut Forest (RCF), Seasonal Autoregressive Integrated Moving Average (SARIMA), and Support Vector Machine (SVM) to detect anomalies in log data and response times. Each of these models has a unique approach to identifying outliers, making them useful in different scenarios.

In this article, we’ll focus mainly on RCF model.

Before diving into model training, we must first prepare our dataset properly. This involves two key steps:

  1. Data Cleaning – Ensuring that the dataset is free from inconsistencies and missing values that could introduce bias.
  2. Feature Engineering – Extracting meaningful features that enhance the model's ability to detect anomalies effectively.


Feature Engineering

To build effective anomaly detection models, we must extract meaningful features that capture the underlying patterns in our log data. The key interactions between response time, log level, HTTP method, and request path can provide valuable insights into potential anomalies.

Since Random Cut Forest (RCF) operates on purely numerical data, we must transform categorical features like log_level, method, and path into numerical representations while preserving their impact on anomaly detection. Below is our approach to feature engineering:


1. Response Time (response_time)

  • This is our primary feature for anomaly detection.
  • Higher response times often indicate anomalies, but context matters. For instance, a response time of 500ms might be normal for one API path but anomalous for another.
  • We will analyze response time distribution per log level, method, and path to identify deviations from expected behavior.


2. Log Level (log_level)

  • Log levels provide insights into system state and errors.
  • A higher severity log level (e.g., ERROR) may correlate with higher response times and system issues.
  • We use ordinal encoding to represent log levels numerically:
  • This encoding reflects increasing severity while maintaining numerical order, which helps models like RCF detect patterns related to critical log events.


3. HTTP Method (method)

  • Different HTTP methods can have different performance characteristics. For instance:
  • We use one-hot encoding to convert categorical values into numerical form, avoiding an artificial ordering:


4. Request Path (path)

  • Some API paths inherently take longer to process due to data volume, query complexity, or backend dependencies.
  • To prevent excessive dimensionality (if there are many unique paths), we use frequency encoding:
  • Example:

By transforming these categorical features into numerical values and ensuring they capture meaningful relationships, we prepare the dataset for effective anomaly detection using RCF, SARIMA, and SVM.


Time-Based Features

Since log data contains timestamps, we can extract temporal patterns that may influence response times and anomaly detection. Network traffic, user activity, and system load often vary across different times of the day and days of the week. To capture these variations, we engineer the following features:


Hour of the Day (hour)

  • System performance and traffic fluctuate throughout the day.
  • For example, an API might experience peak load during business hours (9 AM–5 PM) and reduced traffic at night.
  • Extract the hour from the timestamp (0–23).


Day of the Week (day_of_week)

  • API response times may differ between weekdays and weekends.
  • For example, weekday traffic (Monday–Friday) may be heavier than weekends, leading to different response characteristics.
  • Convert the timestamp into an integer representing the day (0–6)


Weekend Indicator (is_weekend)

  • To further simplify analysis, introduce a binary feature:
  • This helps models differentiate between normal and unusual response patterns on weekends.

By integrating time-based features into our dataset, we enable our anomaly detection models to recognize cyclical patterns and contextual anomalies that might otherwise go unnoticed.


Data Standardization

To improve Random Cut Forest (RCF) anomaly detection, it's crucial to ensure that features are on a consistent scale. This prevents certain features with larger numerical ranges from dominating the model and distorting anomaly detection.

When detecting outliers, it's recommended to use standardization techniques like Z-score scaling instead of Min-Max scaling.


Min-Max Scaling

  • Normalizes values to a range between 0 and 1.
  • Compresses normal values into a narrow range. Anomalous values may be squashed near the upper limit (1), making them harder to distinguish.

Z-Score Standardization (Preferred for RCF)

  • Centers the data around zero by subtracting the mean and scaling by standard deviation.
  • Ensures all features have zero mean and a standard deviation of one.
  • Makes outliers stand out because they have much higher Z-scores.


Split Data into Training and Testing Subsets

Before training the anomaly detection models, we need to split the dataset into training and testing subsets. The model learns patterns from training data and is tested on unseen data, which helps to evaluate how well the model generalizes to new log data.

# Split
split_index = int(len(prepared_data) * 0.8)  # 80% for training
X_train = prepared_data[:split_index]  # First 80% of the data
X_test = prepared_data[split_index:]     # Last 20% of the data        

Train Random Cut Forest Model?

Now that we have preprocessed and split our dataset, we can train the Random Cut Forest (RCF) model using Amazon SageMaker.

session = sagemaker.Session()

# specify general training job information
rcf = RandomCutForest(
   role=execution_role,
   instance_count=1,
   instance_type="ml.m4.xlarge",
   data_location=f"s3://{bucket}/{prefix}/", 
   output_path=f"s3://{bucket}{prefix}/output",  
   num_samples_per_tree=512,
   num_trees=50,
)
rcf.fit(rcf.record_set(X_train))        

Inference?

Now that we have trained the Random Cut Forest (RCF) model, the next step is to deploy an inference endpoint and run predictions on the test dataset.

rcf_inference = rcf.deploy(initial_instance_count=1, instance_type="ml.m5.xlarge")        

initial_instance_count=1: Starts with one instance for inference.

instance_type="ml.m5.xlarge": This instance provides a good balance of memory and compute for real-time anomaly detection.


Configure Serialization & Deserialization

To interact with the inference endpoint, we set up:

CSVSerializer: Converts input data to a CSV format.

JSONDeserializer: Converts the model's JSON response back to a Python object.

from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer
rcf_inference.serializer = CSVSerializer()
rcf_inference.deserializer = JSONDeserializer()        

Now, we make predictions on X_test:

results = rcf_inference.predict(X_test)        

Each result contains an anomaly score for a given input row.


Calculate Anomaly Threshold using Three-Sigma Rule

Now that we have the anomaly scores from the Random Cut Forest (RCF) model, we need to determine a threshold to classify data points as anomalies.


Understanding the Three-Sigma Rule

The Three-Sigma Rule states that:

  • 95.4% of normal data points fall within two standard deviations of the mean.
  • 99.7% of normal data points fall within three standard deviations of the mean.
  • Data points beyond three standard deviations (σ) from the mean (μ) are likely anomalies.

Thus, the anomaly threshold is:

Threshold = mean + 3σ


Calculate the Threshold

We compute the mean (μ) and standard deviation (σ) of anomaly scores and set the threshold at three standard deviations above the mean.

def calc_threshold(scores):
   from statistics import mean, stdev
   score_mean = mean(scores)
   score_std = stdev(scores)
   score_max = max(scores)
   threshold = score_mean + 3 * score_std
   return threshold, score_mean, score_std
scores = [datum["score"] for datum in results["scores"]]

threshold, score_mean, score_std = features_eng.calc_threshold(scores)        


Identify Anomalous Data Points

Now, we classify data points as anomalies if their score exceeds the threshold.

anomalies = df[df["score"] > threshold]        

Visualize Anomalies?

Now that we have classified anomalies based on the Three-Sigma Rule, let's visualize how the response time and anomaly scores correlate.

We'll use Matplotlib to plot the response time on one axis and anomaly scores on the other. Detected anomalies will be highlighted in black.


fig, ax1 = plt.subplots()
ax2 = ax1.twinx()
start, end = 0, len(test_df)
dataset_subset = test_df[start:end]

ax1.plot(dataset_subset["response_time"], color="C0", alpha=0.8)
ax2.plot(dataset_subset["score"], color="C1")
ax1.grid(which="major", axis="both")

ax1.set_ylabel("Response Time", color="C0")
ax2.set_ylabel("Anomaly Score", color="C1")

ax1.tick_params("y", colors="C0")
ax2.tick_params("y", colors="C1")

ax1.set_ylim(0, max(dataset_subset["response_time"])) # max = 24 hours
ax2.set_ylim(min(scores), 1.4 * max(scores))
fig.set_figwidth(20)        


ax2.plot(anomalies.index, anomalies.score, "ko")        


Interpretation

  • Blue Line (C0) → Represents response time over the dataset.

  • Orange Line (C1) → Represents the anomaly scores assigned by the model.
  • Black Dots → Highlight detected anomalies, where the anomaly score exceeds the threshold.

As we can see, spikes in response time often coincide with high anomaly scores, confirming that the model successfully identifies outliers.

With the trained model deployed as an endpoint, we can now integrate it into a real-time pipeline, where incoming log data is continuously processed, and the model is invoked to detect anomalies in real time.


Develop Real-Time Pipeline for Anomaly Detection

After setting up our infrastructure and training the Random Cut Forest model, we need to implement the complete data flow pipeline to enable real-time anomaly detection. This involves connecting all components to create a seamless flow from data ingestion to alerts.

The journey of our data begins with Kafka, where log events containing metrics like response time, log level, and API paths are published. Our EKS-deployed consumer pod continuously polls the Kafka topic using a defined consumer group ID. The design intentionally separates consumption from processing – the pod doesn't analyze the data itself but acts as an intelligent router.


When a message arrives, the pod leverages its IAM role (attached via the ServiceAccount) to securely invoke our primary Lambda function. This setup eliminates credential management within the cluster while maintaining a secure authentication chain between Kubernetes and AWS services.

The Lambda function serves as the processing nexus of our system. It first transforms the incoming log data into the feature format expected by our model, applying the same feature engineering techniques used during training – normalizing response times, encoding log levels, and transforming categorical values. Once prepared, the function invokes our SageMaker endpoint:

response = sagemaker_runtime.invoke_endpoint(
    EndpointName=sagemaker_endpoint,
    ContentType='text/csv',
    Body=transformed_data
)        

The SageMaker endpoint runs our deployed RCF model, which returns an anomaly score for each data point. The Lambda function then applies our three-sigma threshold to classify results:

if anomaly_score > threshold:
    result = "High Outlier"
else:
    result = "Normal"        

After classification, the function follows a multi-destination pattern:

  1. Logs the result with standard patterns that CloudWatch metric filters can capture
  2. Stores the raw data and results in S3 for historical analysis
  3. Sends a message to SQS containing the anomaly score and classification

On an hourly schedule, our secondary Lambda processes messages from the SQS queue, performing batch analysis that generates trend reports and aggregated statistics. This two-tier approach optimizes for both real-time detection and cost-efficient batch processing.

The entire pipeline operates as a responsive detection system, capable of processing thousands of log entries per minute while maintaining low latency for critical anomaly identification.


Anomaly Detection in Action

When our system detects anomalies, it triggers a cascade of visibility and notification mechanisms designed for operational awareness and rapid response.

CloudWatch serves as our central monitoring hub, collecting metrics from both Lambda functions through custom metric filters. These filters convert the standardized log patterns into time-series metrics:

  • ResultNormalCount tracks the volume of normal events
  • ResultHighOutlierCount tracks detected anomalies


Our CloudWatch dashboard presents these metrics through multiple complementary visualizations:

  1. A time-series chart showing both normal and anomalous events, making pattern shifts immediately visible
  2. Gauge visualizations that provide at-a-glance status of current anomaly levels
  3. Detailed log tables that display the raw events leading to anomaly classifications


The anomaly alarm triggers when the system detects more than five anomalies within a 15-minute window. This threshold was determined through analysis of historical patterns, balancing sensitivity with alert fatigue prevention. When triggered, SNS delivers notifications to the operations team via email.


During a recent test, the system successfully identified several anomaly patterns:

  1. A gradual increase in response times for a specific API endpoint that eventually crossed our threshold, indicating a memory leak in the service
  2. A sudden spike in ERROR log levels during off-hours, revealing an unauthorized access attempt
  3. Cyclical performance degradation that corresponded with batch job scheduling conflicts

In each case, our anomaly detection system identified the issue before users reported problems, enabling proactive remediation. The combination of RCF's sensitivity to multivariate anomalies and our cloud-native pipeline architecture proved particularly effective at detecting subtle patterns that traditional threshold-based monitoring would miss.


Conclusion?

By integrating machine learning with event-driven processing and comprehensive monitoring, we've created a system that not only detects anomalies but provides the operational context needed to resolve them quickly. The complete data flow from Kafka through Lambda, SageMaker, and CloudWatch forms a resilient, scalable pipeline capable of protecting system health across our entire application landscape.

That wraps up our series on anomaly detection! We truly enjoyed the collaborative effort and the opportunity to share our experiences. We look forward to working together on future projects and bringing you more insights and lessons learned. Stay tuned!


Authors?

Noor Sabahi and David Van Ginneken

#AWS #MachineLearning #AnomalyDetection #InfrastructureAsCode #EDA #Terraform #CloudMonitoring #DataScience

要查看或添加评论,请登录

Noor S.的更多文章

社区洞察