## Computational Analysis of Sound and Music

# ESA 4 - Acoustic Anomaly Detection

Dr.-Ing. Jakob Abe√üer, jakob.abesser@idmt.fraunhofer.de

**Last update:** 03.06.2024

**Outline**

In this notebook, we use a small dataset for acoustic anomaly detection.
We will implement two methods:
- a **traditional machine learning approach** using **distribution modeling** to detect anomalies as **outliers**
- a **deep learning-based method** using an **autoencoder** and the **reconstruction error** to detect anomalies

In [None]:
!pip install wget

In [None]:
import numpy as np
import sklearn as skl
import os
import matplotlib
import librosa
import matplotlib.pyplot as pl
import platform
import IPython.display as ipd
import wget
import zipfile
import glob

from sklearn.mixture import GaussianMixture
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import accuracy_score, roc_auc_score, roc_curve, auc
import tensorflow as tf

## Dataset download & pre-processing

The **anomaly_mini** dataset includes audio recordings from the MIMII dataset (https://zenodo.org/records/3384388):

*Purohit, H., Tanabe, R., Ichige, K., Endo, T., Nikaido, Y., Suefusa, K., & Kawaguchi, Y. (2019). MIMII Dataset: Sound Dataset for Malfunctioning Industrial Machine Investigation and Inspection (public 1.0) [Data set]. 4th Workshop on Detection and Classification of Acoustic Scenes and Events (DCASE 2019 Workshop), New York, USA.*

Here's a part of the official description: *"This dataset is a sound dataset for malfunctioning industrial machine investigation and inspection (MIMII dataset). It contains the sounds generated from four types of industrial machines, i.e. valves, pumps, fans, and slide rails. Each type of machine includes seven individual product models, and the data for each model contains normal sounds (from 5000 seconds to 10000 seconds) and anomalous sounds (about 1000 seconds)"*

Concetely, the **anomaly_mini** dataset includes from the **6_db_valve.zip** file.
It has **20 2.5s audio clips** for training (representing the **normal state**) and **5 2.5s clips** each as test data for the normal and anomaly classes, respectively.

In [None]:
if not os.path.isfile('anomaly_mini.zip'):
    print('Please wait a couple of seconds ...')
    wget.download('https://github.com/machinelistening/machinelistening.github.io/blob/master/anomaly_mini.zip?raw=true', 
                      out='anomaly_mini.zip', bar=None)
    print('anomaly_mini.zip downloaded successfully ...')
else:
    print('Files already exist!')
    
if not os.path.isdir('anomaly_mini'):
    print("Let's unzip the file ... ")
    assert os.path.isfile('anomaly_mini.zip')
    with zipfile.ZipFile('anomaly_mini.zip', 'r') as f:
        f.extractall('.')
    assert os.path.isdir('anomaly_mini')
    print("All done :)")


In [None]:
# sample rate
fs = 44100

In [None]:
# load dataset
dir_dataset = 'anomaly_mini'
fn_wav_list = glob.glob(os.path.join(dir_dataset, '*.wav'))

is_train = []
class_id = []

for fn_wav in fn_wav_list:
    class_id.append(int('_anomaly_' in fn_wav))
    is_train.append(int('train' in fn_wav))

n_files = len(fn_wav_list)

# let's check
for i in range(n_files):
    print(f"file {i+1}: {fn_wav_list[i]} (class = {class_id[i]}, is_train={is_train[i]}")


Let's listen to some examples

In [None]:
idx = (0, 3,6,9,12,15,17)
for i in idx:
    x, fs = librosa.load(fn_wav_list[i])
    print(os.path.basename(fn_wav_list[i]))
    ipd.display(ipd.Audio(data=x, rate=fs))

## Feature Extraction

In [None]:
def compute_melspec(fn_wav, n_bins=128):
    """ Compute Mel spectrogram with logarithmic magnitude scaling 
    Args:
        fn_wav (str): WAV file name
        n_bins (int): Number of Mel frequency bins
    Returns:
        mel_spec (2d np.ndarray): Mel spectrogram (n_bins x n_frames)
    """
    x, fs = librosa.load(fn_wav, mono=True, sr=44100)
    S = librosa.feature.melspectrogram(y=x, sr=fs, n_mels=n_bins, fmax=fs/2)
    S_dB = librosa.power_to_db(S, ref=np.max)
    return S_dB

In [None]:
feat = []
for fn_wav in fn_wav_list:
    feat.append(compute_melspec(fn_wav))
feat = np.array(feat)
feat = np.expand_dims(feat, axis=-1)

In [None]:
print(f"Feature matrix shape: {feat.shape}")

For the machine learning based approach, we represent each audio clip not as a 2D spectrogram but as a 1D feature vector. In particular, we use the time-average over the entire spectrogram as feature vector.

In [None]:
print(feat.shape)

# here, we can remove the channel dimension
feat_1d = (np.squeeze(np.mean(feat, axis=2)))
print(feat_1d.shape)

**Observation**: This is a shape we already know from the first lectures on Machine learning: **number of data instances x number of features**

## Train-Test-Split

In [None]:
is_train = np.array(is_train, dtype=bool)
is_test = np.logical_not(is_train)

In [None]:
X_1d_train = feat_1d[is_train, :]
X_1d_test = feat_1d[is_test, :]
X_train = feat[is_train, :, :, :]
X_test = feat[is_test, :, :, :]

class_id = np.array(class_id)

y_train = class_id[is_train]
y_test = class_id[is_test]

# Data standardization
scaler = StandardScaler().fit(X_1d_train)
X_1d_train = scaler.transform(X_1d_train)
X_1d_test = scaler.transform(X_1d_test)


print(f"X_1d_train shape {X_1d_train.shape}")
print(f"X_1d_test shape {X_1d_test.shape}")
print(f"X_train shape {X_train.shape}")
print(f"X_test shape {X_test.shape}")

print(y_test)

In [None]:
batch_size=8 
n_epochs=50

## Approach 1: Distribution Modeling & Outlier Detection

In the first appraoch, we use a Gaussian mixture model to model the distribution of our **normal training examples**.

In [None]:
gmm = GaussianMixture(n_components=2, random_state=42)
gmm.fit(X_1d_train)

As a second step, we'll compute the likelihood for each test sample, that it was drawn from the learnt distribution.

In [None]:
scores = gmm.score_samples(X_1d_test)

# at this point, we computed the log-likelihood score, it is higher for 
# samples drawn from the learnt distribution (normal examples) and lower
# for samples outside of this distribution (anomaly examples)

# in order to use it as detection function (where values above a threshold are assigned 
# to the anomaly class), we need to inverse it
scores -= np.min(scores)
scores /= np.max(scores)
scores = 1 - scores

In [None]:
# Feature space visualization after dimensionality reduction using 
# Principal Component Analysis (PCA)
pca = PCA().fit(X_1d_train)
X_1d_train_pca = pca.transform(X_1d_train)
X_1d_test_pca = pca.transform(X_1d_test)

pl.figure(figsize=(6,3))
pl.plot(X_1d_train_pca[:, 0], X_1d_train_pca[:, 1], 'ok', label='Training (normal)')
pl.plot(X_1d_test_pca[y_test==0, 0], X_1d_test_pca[y_test==0, 1], 'og', label='Test (normal)')
pl.plot(X_1d_test_pca[y_test==1, 0], X_1d_test_pca[y_test==1, 1], 'dr', label='Test (anomaly)')
pl.legend()
pl.show()

In [None]:
pl.figure(figsize=(4,3))
pl.title('Distribution modeling approach')
pl.plot(y_test, scores, 'ko')
pl.xlabel('True label (0 = normal, 1 = anomaly)')
pl.ylabel('Anomaly score')

In [None]:
fpr, tpr, _ = roc_curve(y_test, scores)
roc_auc = roc1 = roc_auc_score(y_test, scores)

In [None]:
pl.figure(figsize=(4,4))
pl.plot(fpr, tpr, 
    color="darkorange",
    lw=2,
    label="ROC curve",
)
pl.plot([0, 1], [0, 1], color="navy", lw=2, linestyle="--")
pl.xlim([0.0, 1.0])
pl.ylim([0.0, 1.05])
pl.xlabel("False Positive Rate")
pl.ylabel("True Positive Rate")
pl.title(f"Receiver operating characteristic, AUC = {roc_auc}")
pl.legend(loc="lower right")
pl.show()

In [None]:
# compute AUC ROC metric
print(y_test)
roc1 = roc_auc_score(y_test, scores)
print(f"AUC ROC = {roc1}")

**Observation**: Compute the static average over all spectrogram frames does not seem to be a good feature vector for anomaly detection. The main reason is that we lose all information about the temporal dynamics of the signal.

## Approach 2: Autoencoder + Reconstruction Error

### Neural Network Architecture

We'll create a **convolutional autoencoder** which has a **encoder** that compresses the input to a latent (bottleneck) representation and a **decoder** which maps this representation back to the original signal.

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam


def create_autoencoder(input_shape):
    # Encoder
    input_img = Input(shape=input_shape)
    x = Conv2D(8, (3, 3), activation='relu', padding='same')(input_img)
    x = MaxPooling2D((2, 2), padding='same')(x)
    x = Conv2D(16, (3, 3), activation='relu', padding='same')(x)
    #x = MaxPooling2D((2, 2), padding='same')(x)
    
    # Decoder
    #x = Conv2D(16, (3, 3), activation='relu', padding='same')(x)
    #x = UpSampling2D((2, 2))(x)
    x = Conv2D(8, (3, 3), activation='relu', padding='same')(x)
    x = UpSampling2D((2, 2))(x)
    decoded = Conv2D(input_shape[2], (3, 3), activation='sigmoid', padding='same')(x)

    # Autoencoder
    autoencoder = Model(input_img, decoded)
    autoencoder.compile(optimizer=Adam(), loss='mean_squared_error')
    
    return autoencoder

In [None]:
ae_model = create_autoencoder(input_shape=X_train.shape[1:])
ae_model.summary()

For training, we only need to provide the training data (no labels!) as **both input and output**

In [None]:
# normalize data to [0, 1] due to the sigmoid layer
min_ = np.min(X_train)
range_ = np.max(X_train) - np.min(X_train)

X_train -= min_
X_train /= range_
X_test -= min_
X_test /= range_

In [None]:
ae_model.fit(X_train, X_train, batch_size=2, epochs=100, verbose=2)

### Evaluation

In [None]:
X_test_pred = ae_model.predict(X_test)

# compute mean squared error (MSE) as reconstruction error for all test samples
ae_score = np.mean(np.square(X_test - X_test_pred), axis=(1, 2, 3)) 

print(y_test, ae_score)


In [None]:
pl.figure(figsize=(4,3))
pl.title('Autoencoder approach')
pl.plot(y_test, ae_score, 'ko')
pl.xlabel('True label (0 = normal, 1 = anomaly)')
pl.ylabel('Anomaly score (reconstruction error)')

In [None]:
# compute false positive rate (FPR) and true positive rate (TPR) for different 
# decision thresholds to be applied on the "scores"
fpr, tpr, _ = roc_curve(y_test, ae_score)
roc_auc = roc1 = roc_auc_score(y_test, ae_score)
print(f"ROC AUC = {roc_auc}")

In [None]:
pl.figure(figsize=(4,4))
pl.plot(fpr, tpr, 
    color="darkorange",
    lw=2,
    label="ROC curve",
)
pl.plot([0, 1], [0, 1], color="navy", lw=2, linestyle="--")
pl.xlim([0.0, 1.0])
pl.ylim([0.0, 1.05])
pl.xlabel("False Positive Rate")
pl.ylabel("True Positive Rate")
pl.title(f"Receiver operating characteristic, AUC = {roc_auc}")
pl.legend(loc="lower right")
pl.show()