## RapidFire.AI Tutorial Notebook: IMDB Use Case

In [None]:
from os import getenv
from typing import Any, Dict

import pandas as pd
import torch
from torch import nn

from rapidfire import  ETLSpec, MLSpec, Experiment

### Implement ETLSpec API for Data Example Preprocessing

In [None]:
class ImdbETLSpec(ETLSpec):

    def initialize_worker(self, misc_path: str) -> None:
        """
        Function to initialize objects on every machine that are reused (if any) across functions below.
        """
        from transformers import AutoTokenizer

        self.tokenizer = AutoTokenizer.from_pretrained("lvwerra/distilbert-imdb")

    def download_columns(self):
        """
        Function to specify (subset of) column names from Example Structure File with objectsÂ to download.
        """
        return []

    def row_prep(self, row, is_predict: bool) -> Dict[str, torch.Tensor]:
        """
        Function to preprocess each injected row from Example Structure File.
        """
        #Carry forward the strings to collate
        out = {"text": row['text']} 
        if not is_predict:
            out["labels"] = row['label']
        return out

    def collate_fn(self, batch, is_predict):
        """
        Function to collate variable-sized examples in a custom way to get uniform-sized batch for GPU later
        """
        import torch
        import transformers

        #Get unpadded dict for collate but still truncate all to model max length
        texts = [row["text"] for row in batch]
        texttok1 = self.tokenizer(texts, return_tensors=None, truncation=True, padding=False)
        batch_max = max(len(ex) for ex in texttok1["input_ids"])

        #Retokenize with padding to batch_max and get tensors
        texttok2 = self.tokenizer(texts, padding='max_length', truncation=True, max_length=batch_max, return_tensors="pt")
        out = {"input_ids": texttok2["input_ids"], 
               "attention_mask": texttok2["attention_mask"]}

        if not is_predict:
            labs = [row["labels"] for row in batch]
            out["labels"] = torch.tensor(labs)
        return out

### Implement MLSpec API for Model Creation, Forward Pass, and (Optional) Custom Metrics

In [None]:
class ImdbTrainingSpec(MLSpec):

    def initialize_worker(self, misc_path: str) -> None:
        """
        Function to initialize objects on every machine that are reused (if any) across functions below.
        """

    def create_model (self, cfg: Dict[str, Any]) -> nn.Module:
        """
        Function to create an instance of the model.
        """
        from transformers import AutoModelForSequenceClassification

        model = AutoModelForSequenceClassification.from_pretrained("lvwerra/distilbert-imdb")
        return model

    def compute_forward(self, model, minibatch, cfg: Dict[str, Any], is_predict: bool):
        """
        Function to compute loss (only for train/val/test) and outputs using the injected model on injected minibatch.
        """
        import torch

        outputs = model(**minibatch)
        logits = outputs.logits
        outdict = {"predictions": torch.argmax(logits, dim=1)}

        if not is_predict:
            outdict["targets"] = minibatch["labels"]
            loss = outputs.loss
            return loss, outdict
        return outdict

    def compute_metrics(self, loss: torch.tensor, outputs: Any, minibatch, cfg: Dict[str, Any]) -> Dict[str, torch.Tensor]:
        """
        Function to compute train/val/test metrics using outputs from compute_forward() and corresponding minibatch.
        """
        import torch

        labels = minibatch["labels"]
        correct = (outputs["predictions"] == labels).sum()
        total = torch.tensor(labels.size(0))
        return {"correct": correct, "total": total}

    def aggregate_metrics(self, metrics: pd.DataFrame, cfg: Dict[str, Any]) -> Dict[str, Any]:
        """
        Function to aggregate metrics returned by compute_metrics() across all minibatches in an epoch.
        """
        correct = sum(metrics["correct"])
        total = sum(metrics["total"])
        return {"accuracy": correct / total}

### Specify Data Locators for Inputs and Outputs

In [None]:
region = getenv("AWS_REGION")
cluster_name = getenv("CLUSTER_NAME")

#Region and cluster name are injected from environment variables; etl_dir and output_dir are writeable
ImdbLocators = {
    "train_main": f"s3://rapidfire-datasets-{region}/imdb/Data/imdb-train.csv",
    "validation_main": f"s3://rapidfire-datasets-{region}/imdb/Data/imdb-test.csv",
    "test_main": f"s3://rapidfire-datasets-{region}/imdb/Data/imdb-test.csv",
    "predict_main": f"s3://rapidfire-datasets-{region}/imdb/Data/imdb-pred.csv",

    "etl_dir": f"s3://rapidfire-datasets-{region}/outputs/{cluster_name}/imdb/SavedETLData",
    "output_dir": f"s3://rapidfire-datasets-{region}/outputs/{cluster_name}/imdb/SavedArtifacts"
}

### Initialize Experiment

In [None]:
#Every experiment instance must be uniquely named and associated with one set of instances of the three inputs
experiment = Experiment("exp1-imdb", ImdbETLSpec, ImdbTrainingSpec, ImdbLocators)

### Run Data Preprocessing

In [None]:
#The fraction argument controls how much data is ingested and processed; here it is 100%
experiment.run_etl(fraction=1.0)

### Define Config Knobs and Config-Group Generation with Search/AutoML Method

In [None]:
from rapidfire.automl import GridSearch, List

#Grid search over 2 hyperparameters with 2 values each = 4 configs in group
config_group = GridSearch({
    'user_knobs': {
        'model_type': "distillbert",
    },
    'train': {
        'epochs': 5,
        'batch_size': List([16, 32]),
        'optimizer': {
            'name': "Adam",
            'args': {
                'lr': List([2e-5, 1e-5]),
                'weight_decay': 1e-4,
            },
        },
    },
    "named_metrics": ["top1_accuracy"],
})

### Run Training

In [None]:
#Launch training with interdimensional parallelism for all configs in the config_group
experiment.run_fit(config_group, seed=42)

### Run Testing

In [None]:
#Compute test accuracy of model with run_id 2
experiment.run_test(2)

### Run Prediction

In [None]:
#Compute predictions using model with run_id 2 and output them to file
experiment.run_predict(2)

### End Experiment

In [None]:
#End the current experiment and persist its artifacts if needed
experiment.end_experiment(save_artifacts=False)