| | |
| | |
| | |
| | |
| |
|
| | import os |
| |
|
| | import pandas as pd |
| | import pytest |
| | from sklearn.datasets import make_classification |
| | from sklearn.metrics import accuracy_score, f1_score |
| | from sklearn.model_selection import train_test_split |
| | from sklearn.preprocessing import LabelEncoder |
| | from tensorflow.keras.layers import BatchNormalization, Concatenate, Dense, Dropout |
| | from tensorflow.keras.losses import CategoricalCrossentropy |
| | from tensorflow.keras.models import Model |
| | from tensorflow.keras.optimizers import SGD, Adam |
| |
|
| | from src.classifiers_mlp import MultimodalDataset, create_early_fusion_model, train_mlp |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | @pytest.fixture |
| | def correlated_sample_data(): |
| | """ |
| | Fixture to create a correlated synthetic dataset using make_classification for testing. |
| | It generates data with 10 text features and 10 image features. |
| | Returns: |
| | train_df (pd.DataFrame): DataFrame with train data. |
| | test_df (pd.DataFrame): DataFrame with test data. |
| | """ |
| | |
| | X, y = make_classification( |
| | n_samples=20, n_features=8, n_informative=6, n_classes=3, random_state=42 |
| | ) |
| |
|
| | |
| | feature_names = [f"text_{i}" for i in range(4)] + [ |
| | f"image_{i}" for i in range(4, 8) |
| | ] |
| |
|
| | |
| | df = pd.DataFrame(X, columns=feature_names) |
| | df["class_id"] = y |
| |
|
| | |
| | train_df, test_df = train_test_split(df, test_size=0.3, random_state=42) |
| |
|
| | return train_df, test_df |
| |
|
| |
|
| | @pytest.fixture |
| | def label_encoder(correlated_sample_data): |
| | """ |
| | Fixture to create a label encoder based on the training data. |
| | """ |
| | train_df, _ = correlated_sample_data |
| | label_encoder = LabelEncoder() |
| | label_encoder.fit(train_df["class_id"]) |
| | return label_encoder |
| |
|
| |
|
| | def test_multimodal_dataset_image_only(correlated_sample_data, label_encoder): |
| | """ |
| | Test the MultimodalDataset class with only image data. |
| | """ |
| | train_df, test_df = correlated_sample_data |
| |
|
| | |
| | image_columns = [f"image_{i}" for i in range(4, 8)] |
| | label_column = "class_id" |
| |
|
| | |
| | train_dataset = MultimodalDataset( |
| | train_df, |
| | text_cols=None, |
| | image_cols=image_columns, |
| | label_col=label_column, |
| | encoder=label_encoder, |
| | ) |
| |
|
| | |
| | assert train_dataset.image_data is not None, "Image data should be instantiated" |
| | assert train_dataset.text_data is None, "Text data should be None" |
| |
|
| | |
| | (batch_inputs, batch_labels) = train_dataset[0] |
| |
|
| | assert "image" in batch_inputs, "Batch should contain image data" |
| | assert "text" not in batch_inputs, "Batch should not contain text data" |
| | assert batch_inputs["image"].shape[1] == len(image_columns), ( |
| | "Image data shape is incorrect" |
| | ) |
| | assert batch_labels is not None, "Batch should contain labels" |
| | assert batch_labels.shape[0] == batch_inputs["image"].shape[0], ( |
| | "Labels should match the batch size" |
| | ) |
| |
|
| |
|
| | def test_multimodal_dataset_text_only(correlated_sample_data, label_encoder): |
| | """ |
| | Test the MultimodalDataset class with only text data. |
| | """ |
| | train_df, test_df = correlated_sample_data |
| |
|
| | |
| | text_columns = [f"text_{i}" for i in range(4)] |
| | label_column = "class_id" |
| |
|
| | |
| | train_dataset = MultimodalDataset( |
| | train_df, |
| | text_cols=text_columns, |
| | image_cols=None, |
| | label_col=label_column, |
| | encoder=label_encoder, |
| | ) |
| |
|
| | |
| | assert train_dataset.text_data is not None, "Text data should be instantiated" |
| | assert train_dataset.image_data is None, "Image data should be None" |
| |
|
| | |
| | (batch_inputs, batch_labels) = train_dataset[0] |
| |
|
| | assert "text" in batch_inputs, "Batch should contain text data" |
| | assert "image" not in batch_inputs, "Batch should not contain image data" |
| | assert batch_inputs["text"].shape[1] == len(text_columns), ( |
| | "Text data shape is incorrect" |
| | ) |
| | assert batch_labels is not None, "Batch should contain labels" |
| | assert batch_labels.shape[0] == batch_inputs["text"].shape[0], ( |
| | "Labels should match the batch size" |
| | ) |
| |
|
| |
|
| | def test_multimodal_dataset_multimodal(correlated_sample_data, label_encoder): |
| | """ |
| | Test the MultimodalDataset class with both text and image data. |
| | """ |
| | train_df, test_df = correlated_sample_data |
| |
|
| | |
| | text_columns = [f"text_{i}" for i in range(4)] |
| | image_columns = [f"image_{i}" for i in range(4, 8)] |
| | label_column = "class_id" |
| |
|
| | |
| | train_dataset = MultimodalDataset( |
| | train_df, |
| | text_cols=text_columns, |
| | image_cols=image_columns, |
| | label_col=label_column, |
| | encoder=label_encoder, |
| | ) |
| |
|
| | |
| | assert train_dataset.text_data is not None, "Text data should be instantiated" |
| | assert train_dataset.image_data is not None, "Image data should be instantiated" |
| |
|
| | |
| | (batch_inputs, batch_labels) = train_dataset[0] |
| | assert "text" in batch_inputs, "Batch should contain text data" |
| | assert "image" in batch_inputs, "Batch should contain image data" |
| | assert batch_inputs["text"].shape[1] == len(text_columns), ( |
| | "Text data shape is incorrect" |
| | ) |
| | assert batch_inputs["image"].shape[1] == len(image_columns), ( |
| | "Image data shape is incorrect" |
| | ) |
| | assert batch_labels is not None, "Batch should contain labels" |
| | assert ( |
| | batch_labels.shape[0] |
| | == batch_inputs["text"].shape[0] |
| | == batch_inputs["image"].shape[0] |
| | ), "Labels should match the batch size" |
| |
|
| |
|
| | def test_create_early_fusion_model_single_modality_image(): |
| | """ |
| | Test the model creation with only image input or only text input. |
| | Ensure the architecture matches expectations. |
| | """ |
| | text_input_size = None |
| | image_input_size = 4 |
| | output_size = 3 |
| |
|
| | |
| | model = create_early_fusion_model( |
| | text_input_size, image_input_size, output_size, hidden=[128, 64], p=0.3 |
| | ) |
| |
|
| | |
| | assert isinstance(model, Model), "Model should be a Keras Model instance" |
| |
|
| | |
| | assert model.input_shape == (None, image_input_size), ( |
| | "Input shape should match image input size" |
| | ) |
| | assert model.output_shape == (None, output_size), ( |
| | "Output shape should match number of classes" |
| | ) |
| |
|
| | |
| | dense_layers = [layer for layer in model.layers if isinstance(layer, Dense)] |
| | dropout_layers = [layer for layer in model.layers if isinstance(layer, Dropout)] |
| | batchnorm_layers = [ |
| | layer for layer in model.layers if isinstance(layer, BatchNormalization) |
| | ] |
| |
|
| | assert len(dense_layers) == 3, ( |
| | "There should be 3 Dense layers (2 hidden + 1 output)" |
| | ) |
| | assert len(dropout_layers) > 0, "There should be at least 1 Dropout layers" |
| | assert len(batchnorm_layers) > 0, ( |
| | "There should be at least 1 BatchNormalization layer" |
| | ) |
| |
|
| |
|
| | def test_create_early_fusion_model_single_modality_text(): |
| | """ |
| | Test the model creation with only image input or only text input. |
| | Ensure the architecture matches expectations. |
| | """ |
| | text_input_size = 4 |
| | image_input_size = None |
| | output_size = 3 |
| |
|
| | |
| | model = create_early_fusion_model( |
| | text_input_size, image_input_size, output_size, hidden=[128, 64], p=0.3 |
| | ) |
| |
|
| | |
| | assert isinstance(model, Model), "Model should be a Keras Model instance" |
| |
|
| | |
| | assert model.input_shape == (None, text_input_size), ( |
| | "Input shape should match text input size" |
| | ) |
| | assert model.output_shape == (None, output_size), ( |
| | "Output shape should match number of classes" |
| | ) |
| |
|
| | |
| | dense_layers = [layer for layer in model.layers if isinstance(layer, Dense)] |
| | dropout_layers = [layer for layer in model.layers if isinstance(layer, Dropout)] |
| | batchnorm_layers = [ |
| | layer for layer in model.layers if isinstance(layer, BatchNormalization) |
| | ] |
| |
|
| | assert len(dense_layers) == 3, ( |
| | "There should be 3 Dense layers (2 hidden + 1 output)" |
| | ) |
| | assert len(dropout_layers) > 0, "There should be at least 1 Dropout layers" |
| | assert len(batchnorm_layers) > 0, ( |
| | "There should be at least 1 BatchNormalization layer" |
| | ) |
| |
|
| |
|
| | def test_create_early_fusion_model_multimodal(): |
| | """ |
| | Test the model creation with both text and image input. |
| | Ensure the architecture matches expectations. |
| | """ |
| | text_input_size = 4 |
| | image_input_size = 4 |
| | output_size = 3 |
| |
|
| | |
| | model = create_early_fusion_model( |
| | text_input_size, image_input_size, output_size, hidden=[128, 64], p=0.3 |
| | ) |
| |
|
| | |
| | assert isinstance(model, Model), "Model should be a Keras Model instance" |
| |
|
| | |
| | assert model.input_shape == [(None, text_input_size), (None, image_input_size)], ( |
| | "Input shape should match both text and image input sizes" |
| | ) |
| | assert model.output_shape == (None, output_size), ( |
| | "Output shape should match number of classes" |
| | ) |
| |
|
| | |
| | assert any(isinstance(layer, Concatenate) for layer in model.layers), ( |
| | "There should be a Concatenate layer for text and image inputs" |
| | ) |
| |
|
| | |
| | dense_layers = [layer for layer in model.layers if isinstance(layer, Dense)] |
| | dropout_layers = [layer for layer in model.layers if isinstance(layer, Dropout)] |
| | batchnorm_layers = [ |
| | layer for layer in model.layers if isinstance(layer, BatchNormalization) |
| | ] |
| |
|
| | assert len(dense_layers) == 3, ( |
| | "There should be 3 Dense layers (2 hidden + 1 output)" |
| | ) |
| | assert len(dropout_layers) > 0, "There should be at least 1 Dropout layers" |
| | assert len(batchnorm_layers) > 0, ( |
| | "There should be at least 1 BatchNormalization layer" |
| | ) |
| |
|
| |
|
| | def test_train_mlp_single_modality_image(correlated_sample_data, label_encoder): |
| | """ |
| | Test the MLP training with only image data. |
| | Ensure the model trains and evaluates correctly. |
| | """ |
| | train_df, test_df = correlated_sample_data |
| |
|
| | |
| | image_columns = [f"image_{i}" for i in range(4, 8)] |
| | label_column = "class_id" |
| |
|
| | |
| | train_dataset = MultimodalDataset( |
| | train_df, |
| | text_cols=None, |
| | image_cols=image_columns, |
| | label_col=label_column, |
| | encoder=label_encoder, |
| | ) |
| | test_dataset = MultimodalDataset( |
| | test_df, |
| | text_cols=None, |
| | image_cols=image_columns, |
| | label_col=label_column, |
| | encoder=label_encoder, |
| | ) |
| |
|
| | image_input_size = len(image_columns) |
| | output_size = len(label_encoder.classes_) |
| |
|
| | |
| | model, test_accuracy, f1, macro_auc = train_mlp( |
| | train_loader=train_dataset, |
| | test_loader=test_dataset, |
| | text_input_size=None, |
| | image_input_size=image_input_size, |
| | output_size=output_size, |
| | num_epochs=1, |
| | set_weights=True, |
| | adam=True, |
| | patience=10, |
| | save_results=False, |
| | train_model=False, |
| | test_mlp_model=False, |
| | ) |
| |
|
| | |
| | assert model is not None, "Model should not be None after training." |
| |
|
| | |
| | assert ( |
| | isinstance(model.loss, CategoricalCrossentropy) |
| | or model.loss == "categorical_crossentropy" |
| | ), f"Loss function should be categorical crossentropy, but got {model.loss}" |
| |
|
| | |
| | assert model.input_shape == (None, image_input_size), ( |
| | "Input shape should match image input size" |
| | ) |
| | assert model.output_shape == (None, output_size), ( |
| | "Output shape should match number of classes" |
| | ) |
| |
|
| | |
| | assert isinstance(model.optimizer, Adam) or isinstance(model.optimizer, SGD), ( |
| | f"Optimizer should be Adam or SGD, but got {model.optimizer}" |
| | ) |
| |
|
| |
|
| | def test_train_mlp_single_modality_text(correlated_sample_data, label_encoder): |
| | """ |
| | Test the MLP training with only text data. |
| | Ensure the model trains and evaluates correctly. |
| | """ |
| | train_df, test_df = correlated_sample_data |
| |
|
| | |
| | text_columns = [f"text_{i}" for i in range(4)] |
| | label_column = "class_id" |
| |
|
| | |
| | train_dataset = MultimodalDataset( |
| | train_df, |
| | text_cols=text_columns, |
| | image_cols=None, |
| | label_col=label_column, |
| | encoder=label_encoder, |
| | ) |
| | test_dataset = MultimodalDataset( |
| | test_df, |
| | text_cols=text_columns, |
| | image_cols=None, |
| | label_col=label_column, |
| | encoder=label_encoder, |
| | ) |
| |
|
| | text_input_size = len(text_columns) |
| | output_size = len(label_encoder.classes_) |
| |
|
| | |
| | model, test_accuracy, f1, macro_auc = train_mlp( |
| | train_loader=train_dataset, |
| | test_loader=test_dataset, |
| | text_input_size=text_input_size, |
| | image_input_size=None, |
| | output_size=output_size, |
| | num_epochs=1, |
| | set_weights=True, |
| | adam=True, |
| | patience=10, |
| | save_results=False, |
| | train_model=False, |
| | test_mlp_model=False, |
| | ) |
| |
|
| | |
| | assert model is not None, "Model should not be None after training." |
| |
|
| | |
| | assert ( |
| | isinstance(model.loss, CategoricalCrossentropy) |
| | or model.loss == "categorical_crossentropy" |
| | ), f"Loss function should be categorical crossentropy, but got {model.loss}" |
| |
|
| | |
| | assert model.input_shape == (None, text_input_size), ( |
| | "Input shape should match text input size" |
| | ) |
| | assert model.output_shape == (None, output_size), ( |
| | "Output shape should match number of classes" |
| | ) |
| |
|
| | |
| | assert isinstance(model.optimizer, Adam) or isinstance(model.optimizer, SGD), ( |
| | f"Optimizer should be Adam or SGD, but got {model.optimizer}" |
| | ) |
| |
|
| |
|
| | def test_train_mlp_multimodal(correlated_sample_data, label_encoder): |
| | """ |
| | Test the MLP training with class weights for an imbalanced dataset. |
| | Ensure class weights are applied correctly and early stopping works. |
| | """ |
| | train_df, test_df = correlated_sample_data |
| |
|
| | |
| | text_columns = [f"text_{i}" for i in range(4)] |
| | image_columns = [f"image_{i}" for i in range(4, 8)] |
| | label_column = "class_id" |
| |
|
| | |
| | train_dataset = MultimodalDataset( |
| | train_df, |
| | text_cols=text_columns, |
| | image_cols=image_columns, |
| | label_col=label_column, |
| | encoder=label_encoder, |
| | ) |
| | test_dataset = MultimodalDataset( |
| | test_df, |
| | text_cols=text_columns, |
| | image_cols=image_columns, |
| | label_col=label_column, |
| | encoder=label_encoder, |
| | ) |
| |
|
| | text_input_size = len(text_columns) |
| | image_input_size = len(image_columns) |
| | output_size = len(label_encoder.classes_) |
| |
|
| | |
| | model, test_accuracy, f1, macro_auc = train_mlp( |
| | train_loader=train_dataset, |
| | test_loader=test_dataset, |
| | text_input_size=text_input_size, |
| | image_input_size=image_input_size, |
| | output_size=output_size, |
| | num_epochs=1, |
| | set_weights=True, |
| | adam=True, |
| | patience=10, |
| | save_results=False, |
| | train_model=False, |
| | test_mlp_model=False, |
| | ) |
| |
|
| | |
| | assert model is not None, "Model should not be None after training." |
| |
|
| | |
| | assert ( |
| | isinstance(model.loss, CategoricalCrossentropy) |
| | or model.loss == "categorical_crossentropy" |
| | ), f"Loss function should be categorical crossentropy, but got {model.loss}" |
| |
|
| | |
| | assert model.input_shape == [(None, text_input_size), (None, image_input_size)], ( |
| | "Input shape should match both text and image input sizes" |
| | ) |
| | assert model.output_shape == (None, output_size), ( |
| | "Output shape should match number of classes" |
| | ) |
| |
|
| | |
| | assert isinstance(model.optimizer, Adam) or isinstance(model.optimizer, SGD), ( |
| | f"Optimizer should be Adam or SGD, but got {model.optimizer}" |
| | ) |
| |
|
| |
|
| | |
| | def test_result_files(): |
| | """ |
| | Test if the result files are created for each modality and have the correct format. |
| | """ |
| | |
| | test_dir = os.path.dirname(os.path.abspath(__file__)) |
| |
|
| | |
| | multimodal_results_path = os.path.join( |
| | test_dir, "../results/multimodal_results.csv" |
| | ) |
| | text_results_path = os.path.join(test_dir, "../results/text_results.csv") |
| | image_results_path = os.path.join(test_dir, "../results/image_results.csv") |
| |
|
| | |
| | assert os.path.exists(multimodal_results_path), "Multimodal result file is missing!" |
| | assert os.path.exists(text_results_path), "Text result file is missing!" |
| | assert os.path.exists(image_results_path), "Image result file is missing!" |
| |
|
| | |
| | for file_path in [multimodal_results_path, text_results_path, image_results_path]: |
| | df = pd.read_csv(file_path) |
| | assert not df.empty, f"{file_path} is empty!" |
| | assert "Predictions" in df.columns and "True Labels" in df.columns, ( |
| | f"{file_path} is not in the correct format!" |
| | ) |
| |
|
| |
|
| | |
| | def test_model_performance(): |
| | """ |
| | Test if the accuracy and F1 score are above the required thresholds. |
| | """ |
| | |
| | test_dir = os.path.dirname(os.path.abspath(__file__)) |
| |
|
| | |
| | multimodal_results_path = os.path.join( |
| | test_dir, "../results/multimodal_results.csv" |
| | ) |
| | text_results_path = os.path.join(test_dir, "../results/text_results.csv") |
| | image_results_path = os.path.join(test_dir, "../results/image_results.csv") |
| |
|
| | |
| | multimodal_results = pd.read_csv(multimodal_results_path) |
| | text_results = pd.read_csv(text_results_path) |
| | image_results = pd.read_csv(image_results_path) |
| |
|
| | |
| | multimodal_accuracy_threshold = 0.85 |
| | multimodal_f1_threshold = 0.80 |
| | text_accuracy_threshold = 0.85 |
| | text_f1_threshold = 0.80 |
| | image_accuracy_threshold = 0.75 |
| | image_f1_threshold = 0.70 |
| |
|
| | |
| | multimodal_accuracy = accuracy_score( |
| | multimodal_results["True Labels"], multimodal_results["Predictions"] |
| | ) |
| | multimodal_f1 = f1_score( |
| | multimodal_results["True Labels"], |
| | multimodal_results["Predictions"], |
| | average="macro", |
| | ) |
| |
|
| | |
| | text_accuracy = accuracy_score( |
| | text_results["True Labels"], text_results["Predictions"] |
| | ) |
| | text_f1 = f1_score( |
| | text_results["True Labels"], text_results["Predictions"], average="macro" |
| | ) |
| |
|
| | |
| | image_accuracy = accuracy_score( |
| | image_results["True Labels"], image_results["Predictions"] |
| | ) |
| | image_f1 = f1_score( |
| | image_results["True Labels"], image_results["Predictions"], average="macro" |
| | ) |
| |
|
| | |
| | assert multimodal_accuracy > multimodal_accuracy_threshold, ( |
| | f"Multimodal accuracy is below {multimodal_accuracy_threshold}" |
| | ) |
| | assert multimodal_f1 > multimodal_f1_threshold, ( |
| | f"Multimodal F1 score is below {multimodal_f1_threshold}" |
| | ) |
| |
|
| | |
| | assert text_accuracy > text_accuracy_threshold, ( |
| | f"Text accuracy is below {text_accuracy_threshold}" |
| | ) |
| | assert text_f1 > text_f1_threshold, f"Text F1 score is below {text_f1_threshold}" |
| |
|
| | |
| | assert image_accuracy > image_accuracy_threshold, ( |
| | f"Image accuracy is below {image_accuracy_threshold}" |
| | ) |
| | assert image_f1 > image_f1_threshold, ( |
| | f"Image F1 score is below {image_f1_threshold}" |
| | ) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | pytest.main() |
| |
|