HappyWhale 🐳: PyTorch Training from scratch Lite ⚡️
Learn how to write a custom training loop in pure PyTorch, create custom torch Dataset class, compute metrics for model performance, and Scale the Training on any hardware like GPU, TPU, IPU or Distributed Training with LightningLite.
kaggle
Published
April 3, 2022
Keywords
PyTorch, deep learning, kaggle, Lightning
This Notebook article teaches you how to write a custom training loop in pure PyTorch, create a custom torch Dataset class, compute metrics for model performance, and scale the training on any hardware like GPU, TPU, IPU, or distributed training with LightningLite.
DataLoader is an iterable object which contains your input image data and the target label. To create a DataLoader, we first need to implement a torch Dataset class. We define MyDataset class which inherits from Dataset and it will implement __len__ and __getitem__ method.
label_to_idx = {e: i for i, e inenumerate(df_train.species.unique())}class MyDataset(Dataset):def__init__(self, df, transforms=None):super().__init__()self.df = dfself.root = os.path.join(PATH_DATASET, "train_images")self.transforms = transformsdef__len__(self):returnlen(self.df)def__getitem__(self, idx): data =self.df.iloc[idx] image = Image.open(self.root +f"/{data.image}").convert("RGB") label = label_to_idx[data.species]ifself.transforms: image =self.transforms(image)return image, labeldef random_split_dataset(data: Dataset, pct=0.9):""" Randomly splits dataset into two sets. Length of first split is len(data) * pct. Source: https://github.com/gradsflow/gradsflow/blob/main/gradsflow/data/common.py#L20 Args: data: pytorch Dataset object with `__len__` implementation. pct: percentage of split. """ n =len(data) split_1 =int(n * pct) split_2 = n - split_1return random_split(data, (split_1, split_2))
We define image augmentation to make our classifier robust. We will use the function random_split_dataset to split dataset into train and validation set. Once we have our Dataset object, we can create a DataLoader class like this - dataloader = DataLoader(dataset, batch_size=8)
Now, we have our dataloader ready we can create our classifier and write training loop. A training loop consists of model prediction, loss computation, backward propagation and model weight update by the optimizer.
We will start with a basic training loop then will use LightningLite to enable multiple hardware, precision and distributed training.
First we will create model, optimizer, loss function and metrics.
Downloading: "https://github.com/rwightman/pytorch-image-models/releases/download/v0.1-weights/efficientnet_b0_ra-3dd342df.pth" to /root/.cache/torch/hub/checkpoints/efficientnet_b0_ra-3dd342df.pth
Training Loop
For writing the training loop we will iterate a for loop for given number of epochs num_epochs. Set the model to training mode with model.train() and iterate through the dataloader. We pass the data to model and calculate the crossentropy loss. We do loss.backward() to compute gradients followed by optimizer.step() which will update the model weights.
For model evaluation we define a validation loop which will calculate the F1 accuracy on the validation dataset. For validation we set our model to eval mode with model.eval() method. For calculating F1 accuracy, we use TorchMetrics which contains a collection of Machine Learning metrics for distributed, scalable PyTorch models and an easy-to-use API to create custom metrics.
# EPOCH LOOPfor epoch in tqdm(range(1, num_epochs +1)):# TRAINING LOOP model.train()for batch_idx, (data, target) in tqdm(enumerate(train_loader), total=len(train_ds) // batch_size ): data, target = data.to(device), target.to(device) optimizer.zero_grad() output = model(data) loss = criterion(output, target) loss.backward() optimizer.step()if (batch_idx ==0) or ((batch_idx +1) % log_interval ==0):print("Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format( epoch, batch_idx *len(data),len(train_loader.dataset),100.0* batch_idx /len(train_loader), loss.item(), ) )if dry_run:break# TESTING LOOP model.eval() test_loss =0with torch.no_grad():for data, target in val_loader: data = data.to(device) target = target.to(device) output = model(data) test_loss += criterion(output, target).item()# WITH TorchMetrics metric(output, target)if dry_run:break# all_gather is used to aggregated the value across processes test_loss = test_loss /len(val_loader.dataset)print(f"\nTest set: Average loss: {test_loss:.4f}, Accuracy: ({metric.compute():.0f}%)\n" ) metric.reset()if dry_run:break
Train Epoch: 1 [0/45929 (0%)] Loss: 3.646841
Test set: Average loss: 0.0008, Accuracy: (0%)
👷 Scale Model Training
Our dry run was successful 🎉! Now, let’s scale our training on a hardware accelerator like GPU or TPU. We can also use distributed training if multiple devices are available. For this purpose we use LightningLite, it scales PyTorch model training loop with minimal changes. That means we will retain the full control of our training loop! It also enables Precision support abd DDP training.
To use LightningLite, we will import it from PyTorch Lightning library. We implement LightningLite and override run method. We can just copy paste our whole training loop code inside the run method and then just make these three changes to our code.
model, optimizer = self.setup(model, optimizer)
dataloader = self.setup_dataloaders(dataloader)
Replace loss.backward() with self.backward(loss)
from pytorch_lightning.lite import LightningLiteclass CustomTrainer(LightningLite):def run(self, num_epochs, batch_size, gamma=0.7, dry_run: bool=False, save_model=True, log_interval=10, ): model = timm.create_model("efficientnet_b0", pretrained=True, num_classes=len(label_to_idx) ) optimizer = torch.optim.AdamW(model.parameters(), 1e-4) criterion = torch.nn.CrossEntropyLoss() metric = F1().to(self.device)print(self.device)# don't forget to call `setup` to prepare for model / optimizer for distributed training.# the model is moved automatically to the right device. model, optimizer =self.setup(model, optimizer) pin_memory ="cuda"inself.device.type train_loader, val_loader =self.setup_dataloaders( DataLoader(train_ds, batch_size=batch_size, pin_memory=pin_memory), DataLoader(val_ds, batch_size=batch_size, pin_memory=pin_memory), ) scheduler = StepLR(optimizer, step_size=1, gamma=gamma)# EPOCH LOOPfor epoch in tqdm(range(1, num_epochs +1)):# TRAINING LOOP model.train()for batch_idx, (data, target) in tqdm(enumerate(train_loader), total=len(train_ds) // batch_size ):# NOTE: no need to call `.to(device)` on the data, target optimizer.zero_grad() output = model(data) loss = criterion(output, target)self.backward(loss) # instead of loss.backward() optimizer.step()if (batch_idx ==0) or ((batch_idx +1) % log_interval ==0):print("Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format( epoch, batch_idx *len(data),len(train_loader.dataset),100.0* batch_idx /len(train_loader), loss.item(), ) )if dry_run:break scheduler.step()# TESTING LOOP model.eval() test_loss =0with torch.no_grad():for data, target in val_loader:# NOTE: no need to call `.to(device)` on the data, target output = model(data) test_loss += criterion(output, target).item()# WITH TorchMetrics metric(output, target)if dry_run:break# all_gather is used to aggregated the value across processes test_loss =self.all_gather(test_loss).sum() /len(val_loader.dataset)print(f"\nTest set: Average loss: {test_loss:.4f}, Accuracy: ({metric.compute():.0f}%)\n" ) metric.reset()if dry_run:break# When using distributed training, use `self.save`# to ensure the current process is allowed to save a checkpointif save_model:self.save(model.state_dict(), "model.pt")
That’s all we need to do. Now we can select any supported hardware, precision type, number of devices, or training strategy.