Implementation of using PyTorch for anomaly detection

To demonstrate how to use PyTorch for anomaly detection, we can use a sample dataset. For this example, let’s consider using a dataset that includes time-series data, which is common in anomaly detection scenarios.

Step 1: Import Necessary Libraries

This code is a setup for starting with neural networks using PyTorch and plotting capabilities with matplotlib.

import numpy as np
import matplotlib.pyplot as plt
import torch
from torch import nn

Step 2 : Generating Synthetic Data

This code generate synthetic time-series data with anomalies, which is a common practice for testing algorithms designed to detect unusual patterns or outliers.

# Seed for reproducibility
np.random.seed(0)

# Generate synthetic time-series data
data_length = 300
data = np.sin(np.linspace(0, 20, data_length)) + np.random.normal(scale=0.5, size=data_length)
# Introduce anomalies
data[50] += 6 # Anomaly 1
data[150] += 7 # Anomaly 2
data[250] += 8 # Anomaly 3

Step 3: Creating Sequences

The function called create_sequences that creates overlapping sequences from your time-series data. This function is particularly useful for preparing data for models that require fixed-length input, such as many machine learning models.

def create_sequences(data, window_size):
sequences = []
for i in range(len(data) - window_size):
sequences.append(data[i:i+window_size])
return np.array(sequences)

window_size = 10
sequences = create_sequences(data, window_size)

Step 4: Defining the Autoencoder model

An Autoencoder class using PyTorch is created which is a type of neural network used for unsupervised learning tasks, like dimensionality reduction or anomaly detection in this case.

class Autoencoder(nn.Module):
def __init__(self):
super(Autoencoder, self).__init__()
self.encoder = nn.Sequential(
nn.Linear(window_size, 5),
nn.ReLU(),
nn.Linear(5, 2),
nn.ReLU()
)
self.decoder = nn.Sequential(
nn.Linear(2, 5),
nn.ReLU(),
nn.Linear(5, window_size),
nn.ReLU()
)

def forward(self, x):
x = self.encoder(x)
x = self.decoder(x)
return x

model = Autoencoder()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

Step 5: Converting Sequences to PyTorch Sensors

Converting sequences into a PyTorch tensor, specifying dtype=torch.float32 is essential for using the sequences with your PyTorch model, as PyTorch operations generally require data to be in the form of tensors.

sequences = torch.tensor(sequences, dtype=torch.float32)

Step 6: Training the model

This code is the core training loop for a neural network model using PyTorch

num_epochs = 100
for epoch in range(num_epochs):
optimizer.zero_grad()
output = model(sequences)
loss = criterion(output, sequences)
loss.backward()
optimizer.step()

if (epoch+1) % 10 == 0:
print(f'Epoch {epoch+1}, Loss: {loss.item()}')

Step 7: Anomaly Detection

The code snippet performs anomaly detection using a trained model in PyTorch by evaluating the reconstruction errors of the input sequences.

with torch.no_grad():
predictions = model(sequences)
losses = torch.mean((predictions - sequences)**2, dim=1)
plt.hist(losses.numpy(), bins=50)
plt.xlabel("Loss")
plt.ylabel("Frequency")
plt.show()

# Threshold for defining an anomaly
threshold = losses.mean() + 2 * losses.std()
print(f"Anomaly threshold: {threshold.item()}")

# Detecting anomalies
anomalies = losses > threshold
anomaly_positions = np.where(anomalies.numpy())[0]
print(f"Anomalies found at positions: {np.where(anomalies.numpy())[0]}")

Step 8: Visualizing Anomalies

The anomalies are vizualized using matplotlib.

# Plotting anomalies on the time-series graph
plt.figure(figsize=(10, 6))
plt.plot(data, label='Data')
plt.scatter(anomaly_positions, data[anomaly_positions], color='r', label='Anomaly')
plt.title("Time Series Data with Detected Anomalies")
plt.xlabel("Time Steps")
plt.ylabel("Value")
plt.legend()
plt.show()

Implementing the whole code at once,

Python
import numpy as np
import matplotlib.pyplot as plt
import torch
from torch import nn

# Seed for reproducibility
np.random.seed(0)

# Generate synthetic time-series data
data_length = 300
data = np.sin(np.linspace(0, 20, data_length)) + np.random.normal(scale=0.5, size=data_length)
# Introduce anomalies
data[50] += 6  # Anomaly 1
data[150] += 7 # Anomaly 2
data[250] += 8 # Anomaly 3

# Function to create sequences
def create_sequences(data, window_size):
    sequences = []
    for i in range(len(data) - window_size):
        sequences.append(data[i:i+window_size])
    return np.array(sequences)

window_size = 10
sequences = create_sequences(data, window_size)

# Define the Autoencoder model
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(window_size, 5),
            nn.ReLU(),
            nn.Linear(5, 2),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.Linear(2, 5),
            nn.ReLU(),
            nn.Linear(5, window_size),
            nn.ReLU()
        )
        
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

model = Autoencoder()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# Convert sequences to PyTorch tensors
sequences = torch.tensor(sequences, dtype=torch.float32)

# Training loop
num_epochs = 100
for epoch in range(num_epochs):
    optimizer.zero_grad()
    output = model(sequences)
    loss = criterion(output, sequences)
    loss.backward()
    optimizer.step()
    
    if (epoch+1) % 10 == 0:
        print(f'Epoch {epoch+1}, Loss: {loss.item()}')

# Anomaly detection
with torch.no_grad():
    predictions = model(sequences)
    losses = torch.mean((predictions - sequences)**2, dim=1)
    plt.hist(losses.numpy(), bins=50)
    plt.xlabel("Loss")
    plt.ylabel("Frequency")
    plt.show()

# Threshold for defining an anomaly
threshold = losses.mean() + 2 * losses.std()
print(f"Anomaly threshold: {threshold.item()}")

# Detecting anomalies
anomalies = losses > threshold
anomaly_positions = np.where(anomalies.numpy())[0]
print(f"Anomalies found at positions: {np.where(anomalies.numpy())[0]}")

# Plotting anomalies on the time-series graph
plt.figure(figsize=(10, 6))
plt.plot(data, label='Data')
plt.scatter(anomaly_positions, data[anomaly_positions], color='r', label='Anomaly')
plt.title("Time Series Data with Detected Anomalies")
plt.xlabel("Time Steps")
plt.ylabel("Value")
plt.legend()
plt.show()

Output:

Anomaly threshold: 3.9193027019500732
Anomalies found at positions: [141 142 143 144 146 147 148 149 150 241 242 243 244 245 246 247 248 249
250]

Data with anomalies


  • Anomaly Threshold Defined: The computed anomaly threshold is approximately 3.92. This threshold is derived from the model’s prediction errors, with any data point having a reconstruction error above this value considered anomalous.
  1. Identified Anomalies: The positions [141, 142, 143, 144, 146, 147, 148, 149, 150, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250] indicate where the anomalies were detected in the dataset. These indices suggest periods of significant deviation from the typical data patterns.
  2. Clustered Anomalies Indicate Extended Events: The clustering of anomaly indices suggests that these are not isolated incidents. Such patterns may reflect ongoing or sustained abnormal events in the data source, warranting further investigation.
  3. Implications for Monitoring and Maintenance: These findings can be crucial for applications like system monitoring or predictive maintenance. Anomalies can trigger alerts or actions such as system checks or deeper analysis to determine the cause and mitigate potential risks.

In conclusion, the anomaly detection process utilized here effectively identifies unusual data points in a time-series dataset using a trained neural network model. By establishing a threshold based on statistical measures—specifically, the mean plus two standard deviations of the prediction errors—the method pinpoints anomalies represented by significant deviations from expected data patterns.



How to use PyTorch for anomaly detection?

An anomaly is something that deviates from what is standard, normal, or expected. In a broad sense, anomalies can be observed in various contexts, such as in data analysis, science, statistics, engineering, and more. In this article, we will see how we can detect anomalies using PyTorch.

Similar Reads

What is an anomaly?

An anomaly is something that deviates from what is standard, normal, or expected. In a broader context, it refers to an irregularity or an outlier that stands out from the common pattern. Anomalies are significant because they often indicate unusual or unexpected events, such as errors, fraud, or rare occurrences.Anomaly detection is the process of identifying these unusual patterns or outliers in a dataset. It has applications in many fields, including fraud detection, network security, healthcare, manufacturing, and more....

Implementation of using PyTorch for anomaly detection

To demonstrate how to use PyTorch for anomaly detection, we can use a sample dataset. For this example, let’s consider using a dataset that includes time-series data, which is common in anomaly detection scenarios....

Contact Us