## Computational Analysis of Sound and Music

# ESA 3 - Acoustic Scene Classification

Dr.-Ing. Jakob Abe√üer, jakob.abesser@idmt.fraunhofer.de

**Last update:** 29.05.2024

**Outline**

In this notebook, we use a small dataset of **acoustic scene recordings**.
We will study how to 
- implement a convolutional recurrent neural network (CRNN) in comparison with our previous CNN
- implement the **mixup** and **spec masking** data augmentation in a generator

In [None]:
!pip install wget

In [None]:
import numpy as np
import sklearn as skl
import os
import matplotlib
import librosa
import matplotlib.pyplot as pl
import platform
import IPython.display as ipd
import wget
import zipfile
import glob

from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
import tensorflow as tf

## Dataset download & pre-processing

The **asc_mini** dataset includes audio recordings from the following **10 acoustic scene classes**:
- airport
- bus
- metro
- metro station
- part
- public square
- shopping mall
- street pedestrian
- street traffic
- tram

and **8 short 2.5s audio clips** from each ASC class.
The original audio samples were taken from the **TAU Urban Acoustic Scenes 2020 3Class dataset**, which is described here: https://dcase.community/challenge2020/task-acoustic-scene-classification (under "Development set")

In [None]:
if not os.path.isfile('asc_mini.zip'):
    print('Please wait a couple of seconds ...')
    wget.download('https://github.com/machinelistening/machinelistening.github.io/blob/master/asc_mini.zip?raw=true', 
                      out='asc_mini.zip', bar=None)
    print('asc_mini.zip downloaded successfully ...')
else:
    print('Files already exist!')
    
if not os.path.isdir('asc_mini'):
    print("Let's unzip the file ... ")
    assert os.path.isfile('asc_mini.zip')
    with zipfile.ZipFile('asc_mini.zip', 'r') as f:
        f.extractall('.')
    assert os.path.isdir('asc_mini')
    print("All done :)")


In [None]:
# sample rate
fs = 44100

In [None]:
# list the subdirectories (which provide us the animal classes)
dir_dataset = 'asc_mini'
fn_wav_list = glob.glob(os.path.join(dir_dataset, '*.wav'))

class_label = []
file_num_in_class = []
for fn_wav in fn_wav_list:
    fn_wav_base = os.path.basename(fn_wav)
    parts = fn_wav_base.split('_')
    class_label.append(parts[0])
    file_num_in_class.append(int(parts[-1].replace('.wav', ''))-1)

for i in range(len(fn_wav_list)):
    print(f"File {i+1}: {os.path.basename(fn_wav_list[i])}, class = {class_label[i]}, number in class = {file_num_in_class[i]}")
n_files = len(fn_wav_list)
    
# this vector includes a "counter" for each file within its class, we use it later ...
file_num_in_class = np.array(file_num_in_class)


In [None]:
unique_classes = sorted(list(set(class_label)))
class_id = np.array([unique_classes.index(_) for _ in class_label])

Let's listen to some examples

In [None]:
idx = (0, 12, 24, 36, 48)
for i in idx:
    x, fs = librosa.load(fn_wav_list[i])
    print(class_label[i])
    ipd.display(ipd.Audio(data=x, rate=fs))

## Feature Extraction

In [None]:
def compute_melspec(fn_wav, n_bins=128):
    """ Compute Mel spectrogram with logarithmic magnitude scaling 
    Args:
        fn_wav (str): WAV file name
        n_bins (int): Number of Mel frequency bins
    Returns:
        mel_spec (2d np.ndarray): Mel spectrogram (n_bins x n_frames)
    """
    x, fs = librosa.load(fn_wav, mono=True, sr=44100)
    S = librosa.feature.melspectrogram(y=x, sr=fs, n_mels=n_bins, fmax=fs/2)
    S_dB = librosa.power_to_db(S, ref=np.max)
    return S_dB

In [None]:
feat = []
for fn_wav in fn_wav_list:
    feat.append(compute_melspec(fn_wav))
feat = np.array(feat)
feat = np.expand_dims(feat, axis=-1)

In [None]:
print(f"Feature matrix shape: {feat.shape}")

## Train-Test-Split

In [None]:
is_train = np.where(file_num_in_class < 5)[0] # 5 training files
is_test = np.where(file_num_in_class >= 5)[0] # e 

In [None]:
X_train = feat[is_train, :]
X_test = feat[is_test, :]

y_train = class_id[is_train]
y_test = class_id[is_test]

# one-hot-encoding
y_train = tf.keras.utils.to_categorical(y_train, num_classes=10)
y_test = tf.keras.utils.to_categorical(y_test, num_classes=10)

# Data standardization
X_train -= np.mean(X_train)
X_train /= np.std(X_train)

X_test -= np.mean(X_test)
X_test /= np.std(X_test)

print(f"X_train shape {X_train.shape}")
print(f"X_test shape {X_test.shape}")

print(X_train)


In [None]:
batch_size=8 
n_epochs=50

## Neural Network Architecture

We use the same CNN model as in the previous seminar

In [None]:
def create_cnn_model(input_shape, num_output_dim):
    
    inp = tf.keras.layers.Input(shape=input_shape)

    x = None
    for i in range(3):
        if i == 0:
            x = inp
        x = tf.keras.layers.Conv2D(32, kernel_size=(3, 3), padding='same')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.Activation(activation="relu")(x)

    x = tf.keras.layers.MaxPooling2D((2, 2))(x)

    for i in range(2):
        x = tf.keras.layers.Conv2D(64, kernel_size=(3, 3), padding='same')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.Activation(activation="relu")(x)

    x = tf.keras.layers.MaxPooling2D((2, 2))(x)
    x = tf.keras.layers.Conv2D(128, kernel_size=(3, 3), padding='same')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation(activation="relu")(x)

    x = tf.keras.layers.concatenate([tf.keras.layers.GlobalAveragePooling2D()(x),
                                     tf.keras.layers.GlobalMaxPooling2D()(x)])

    x = tf.keras.layers.Dense(256, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    out = tf.keras.layers.Dense(num_output_dim, activation="softmax")(x)

    model = tf.keras.models.Model(inputs=inp, outputs=out)
    
    model.compile(loss='categorical_crossentropy', 
                  optimizer='adam',
                  metrics=['accuracy'])
 
    return model


## Baseline

As a baseline experiment, we train our model on the clean training set first.

In [None]:
input_shape = X_train.shape[1:] 
model_s1 = create_cnn_model(input_shape, 10)
model_s1.fit(X_train, y_train, batch_size=batch_size, epochs=n_epochs, verbose=2)

## Data Augmentation 

Let's implement the **Mixup** and **Spectrogram Masking** methods first by-hand to get a better understanding.

### (1) Spectrogram Masking

We want to create blocks in the spectrogram that we will mask with zeros. We distinguish between
- frequency blocks (they span the entire spectrogram duration but only a small frequency band)
- time blocks (try span only a short time segment but the entire frequency range)

For each spectrogram, we'll randomly mask 2 frequency blocks and 2 time blocks.

In [None]:
def augment_spectral_masking(orig_spec, n_time_blocks=2, n_freq_blocks=2, max_width=10, max_height=15):
    spec=np.copy(orig_spec)
    n_freq, n_time = spec.shape
    
    for i in range(n_time_blocks):
        start_frame = int(np.floor(np.random.rand(1)*n_time))
        duration = int(np.ceil(np.random.rand(1)*max_width))
        spec[:, start_frame:start_frame+duration] = 0
        
    for i in range(n_freq_blocks):
        low_bin = int(np.floor(np.random.rand(1)*n_freq))
        range_ = int(np.ceil(np.random.rand(1)*max_height))
        spec[low_bin:low_bin+range_, :] = 0
        
    return spec

Let's try this for one of the spectrograms and create three randomly augmented versions!

In [None]:
# test
pl.figure()
pl.subplot(2,2,1)
pl.imshow(feat[0, :, :, 0], origin="lower", aspect="auto")
for i in range(3):
    pl.subplot(2,2,2+i)
    pl.imshow(augment_spectral_masking(feat[0, :, :, 0]), origin="lower", aspect="auto")
pl.show()
    

### (2) Mixup

**Mixup** was introduced in 
  - Zhang, Hongyi, Moustapha Cisse, Yann N. Dauphin, and David Lopez-Paz. "mixup: Beyond empirical risk minimization."  arXiv preprint arXiv:1710.09412 (2017).
  
Mixup is a data augmentation technique where training samples are created by combining pairs of inputs and their corresponding labels in a weighted manner. 

Mixup is controlled by only one parameter: $\alpha$. It controls the **strength of interpolation between pairs of examples and their labels**. 
It determines the **shape of the Beta distribution** used to sample the mixing coefficients, where a higher alpha leads to more mixing between the examples.

In [None]:
def augment_mixup(spec1, spec2, target1, target2, alpha):
    
    # sample mixing coefficient from beta distribution
    t = np.random.beta(alpha, alpha)

    # Apply MixUp
    spec = t*spec1 + (1-t)*spec2
    target = t*target1 + (1-t)*target2
    
    return spec, target

Again, let's try this with two random spectrograms from our datasets.

In [None]:
# test
idx1 = 2
idx2 = 20

pl.figure()
pl.subplot(2,2,1)
pl.imshow(X_train[idx1, :, :, 0], origin="lower", aspect="auto")
pl.title(f"Orig spec 1", fontsize=8)
pl.axis("off")
print(f"Target 1 {y_train[idx1, :]}")
    
pl.subplot(2,2,2)
pl.imshow(X_train[idx2, :, :, 0], origin="lower", aspect="auto")
pl.title(f"Orig spec 2", fontsize=8)
pl.axis("off")
print(f"Target 2 {y_train[idx2, :]}")

# let's create two mixed versions thereof
for i in range(2):
    X, y = augment_mixup(X_train[idx1, :, :, 0], 
                         X_train[idx2, :, :, 0], 
                         y_train[idx1, :],
                         y_train[idx2, :],
                         0.7)
    pl.subplot(2,2,3+i)
    pl.imshow(X, origin="lower", aspect="auto")
    pl.title(f"Mixup result", fontsize=8)
    print(f"Target (mix) {y}")
    pl.axis("off")



## Apply data augmentation to enhance training set

### (1) Spectral Masking

In [None]:
X_train_new = np.copy(X_train)
for i in range(X_train.shape[0]):
    X_train_new[i, :, :, 0] = augment_spectral_masking(X_train[i, :, :, 0])

In [None]:
input_shape = X_train_new.shape[1:] 
model_s2 = create_cnn_model(input_shape, 10)
model_s2.fit(X_train_new, y_train, batch_size=batch_size, epochs=n_epochs, verbose=2)

### (3) Mixup

Here, we random pairs within our training dataset such that we have the same number of training examples

In [None]:
X_train_new = np.copy(X_train)
y_train_new = np.copy(y_train)
n_train_examples = X_train.shape[0]
all_idx = np.arange(n_train_examples)
for i in range(n_train_examples):
    np.random.shuffle(all_idx)
    X_train_new[i, :, :, 0], \
    y_train_new[i, :] = augment_mixup(X_train[all_idx[0], :, :, 0], 
                                      X_train[all_idx[1], :, :, 0], 
                                      y_train[all_idx[0], :], 
                                      y_train[all_idx[1], :], 
                                      0.3)

In [None]:
input_shape = X_train_new.shape[1:] 
model_s3 = create_cnn_model(input_shape, 10)
model_s3.fit(X_train_new, y_train_new, batch_size=batch_size, epochs=n_epochs, verbose=2)

## Final Evaluation

In [None]:
# evaluate both models on the test set
acc = np.zeros(3)
for i, model in enumerate((model_s1, model_s2, model_s3)):

    # evaluate model on test set
    y_test_pred = model.predict(X_test)
    class_id_test = np.argmax(y_test, axis=1)
    class_id_test_pred = np.argmax(y_test_pred, axis=1)
    acc[i] = accuracy_score(class_id_test, class_id_test_pred)
    

In [None]:
pl.figure()
pl.plot(acc, 'o-')
pl.xticks((0, 1, 2), ('no aug', 'spectral masking', 'mixup'))
pl.ylabel('Accuracy')
pl.xlabel('Strategy')
pl.show()

## Next steps

- combine spectral masking and mixup
- integrate them into a data generator to have random data augmentation results in each epoch
- create **more** augmented results