๐ฌ Receive new lessons straight to your inbox (once a month) and join 20K+ developers in learning how to responsibly deliver value with ML.
Intuition
So far, we've been training and evaluating our different baselines but haven't really been tracking these experiments. We'll fix this but defining a proper process for experiment tracking which we'll use for all future experiments (including hyperparameter optimization). Experiment tracking is the processing of managing all the different experiments and their components, such as parameters, metrics, models and other artifacts and it enables us to:
Organize all the necessary components of a specific experiment. It's important to have everything in one place and know where it is so you can use them later.
Reproduce past results (easily) using saved experiments.
Log iterative improvements across time, data, ideas, teams, etc.
Tools
There are many options for experiment tracking but we're going to use MLFlow (100% free and open-source) because it has all the functionality we'll need (and growing integration support). You can run MLFlow on your own servers and databases so there are no storage cost / limitations, making it one of the most popular options and is used by Microsoft, Facebook, Databricks and others. You can also set up your own Tracking servers to synchronize runs amongst multiple team members collaborating on the same task.
There are also several popular options such as a Comet ML (Used by Google AI, HuggingFace, etc.) and Weights and Biases (Used by Open AI, Toyota Research, etc.). These are fantastic tools that provide features like dashboards, seamless integration, hyperparameter search, reports and even debugging!
Note
Many platforms are leveraging their position as the source for experiment data to provide features that extend into other parts of the ML development pipeline such as versioning, debugging, monitoring, etc.
Application
We'll start by initializing all the required arguments for our experiment and setting up an empty directory where all of our experiment will be stored.
# Trainer (modified for experiment tracking)classTrainer(object):def__init__(self,model,device,loss_fn=None,optimizer=None,scheduler=None):# Set paramsself.model=modelself.device=deviceself.loss_fn=loss_fnself.optimizer=optimizerself.scheduler=schedulerdeftrain_step(self,dataloader):"""Train step."""# Set model to train modeself.model.train()loss=0.0# Iterate over train batchesfori,batchinenumerate(dataloader):# Stepbatch=[item.to(self.device)foriteminbatch]inputs,targets=batch[:-1],batch[-1]self.optimizer.zero_grad()# Reset gradientsz=self.model(inputs)# Forward passJ=self.loss_fn(z,targets)# Define lossJ.backward()# Backward passself.optimizer.step()# Update weights# Cumulative Metricsloss+=(J.detach().item()-loss)/(i+1)returnlossdefeval_step(self,dataloader):"""Validation or test step."""# Set model to eval modeself.model.eval()loss=0.0y_trues,y_probs=[],[]# Iterate over val batcheswithtorch.no_grad():fori,batchinenumerate(dataloader):# Stepbatch=[item.to(self.device)foriteminbatch]# Set deviceinputs,y_true=batch[:-1],batch[-1]z=self.model(inputs)# Forward passJ=self.loss_fn(z,y_true).item()# Cumulative Metricsloss+=(J-loss)/(i+1)# Store outputsy_prob=torch.sigmoid(z).cpu().numpy()y_probs.extend(y_prob)y_trues.extend(y_true.cpu().numpy())returnloss,np.vstack(y_trues),np.vstack(y_probs)defpredict_step(self,dataloader):"""Prediction step."""# Set model to eval modeself.model.eval()y_probs=[]# Iterate over val batcheswithtorch.no_grad():fori,batchinenumerate(dataloader):# Forward pass w/ inputsinputs,targets=batch[:-1],batch[-1]y_prob=self.model(inputs)# Store outputsy_probs.extend(y_prob)returnnp.vstack(y_probs)deftrain(self,num_epochs,patience,train_dataloader,val_dataloader):best_val_loss=np.infforepochinrange(num_epochs):# Stepstrain_loss=self.train_step(dataloader=train_dataloader)val_loss,_,_=self.eval_step(dataloader=val_dataloader)self.scheduler.step(val_loss)# Early stoppingifval_loss<best_val_loss:best_val_loss=val_lossbest_model=self.model_patience=patience# reset _patienceelse:_patience-=1ifnot_patience:# 0print("Stopping early!")break# Trackingmlflow.log_metrics({"train_loss":train_loss,"val_loss":val_loss},step=epoch)# Loggingprint(f"Epoch: {epoch+1} | "f"train_loss: {train_loss:.5f}, "f"val_loss: {val_loss:.5f}, "f"lr: {self.optimizer.param_groups[0]['lr']:.2E}, "f"_patience: {_patience}")returnbest_model,best_val_loss
And to make things simple, we'll encapsulate all the components for training into one function called train_cnn which returns all the artifacts we want to be able to track from our experiment.
1 2 3 4 5 6 7 8 91011
deftrain_cnn(params,df):"""Train a CNN using specific arguments."""...return{"params":params,"tokenizer":tokenizer,"label_encoder":label_encoder,"model":best_model,"performance":performance,"best_val_loss":best_val_loss,}
The input argument paramscontains all the parameters needed and it's nice to have it all organized under one variable so we can easily log it and tweak it for different experiments (we'll see this when we do hyperparameter optimization).
deftrain_cnn(params,df):"""Train a CNN using specific arguments."""# Set seedsset_seeds()# Get data splitspreprocessed_df=df.copy()preprocessed_df.text=preprocessed_df.text.apply(preprocess,lower=True)X_train,X_val,X_test,y_train,y_val,y_test,label_encoder=get_data_splits(preprocessed_df)X_test_raw=X_testnum_classes=len(label_encoder)# Set devicecuda=Truedevice=torch.device('cuda'if(torch.cuda.is_available()andcuda)else'cpu')torch.set_default_tensor_type('torch.FloatTensor')ifdevice.type=='cuda':torch.set_default_tensor_type('torch.cuda.FloatTensor')# Tokenizetokenizer=Tokenizer(char_level=params.char_level)tokenizer.fit_on_texts(texts=X_train)vocab_size=len(tokenizer)# Convert texts to sequences of indicesX_train=np.array(tokenizer.texts_to_sequences(X_train))X_val=np.array(tokenizer.texts_to_sequences(X_val))X_test=np.array(tokenizer.texts_to_sequences(X_test))# Class weightscounts=np.bincount([label_encoder.class_to_index[class_]forclass_inall_tags])class_weights={i:1.0/countfori,countinenumerate(counts)}# Create datasetstrain_dataset=CNNTextDataset(X=X_train,y=y_train,max_filter_size=max(params.filter_sizes))val_dataset=CNNTextDataset(X=X_val,y=y_val,max_filter_size=max(params.filter_sizes))test_dataset=CNNTextDataset(X=X_test,y=y_test,max_filter_size=max(params.filter_sizes))# Create dataloaderstrain_dataloader=train_dataset.create_dataloader(batch_size=params.batch_size)val_dataloader=val_dataset.create_dataloader(batch_size=params.batch_size)test_dataloader=test_dataset.create_dataloader(batch_size=params.batch_size)# Initialize modelmodel=CNN(embedding_dim=params.embedding_dim,vocab_size=vocab_size,num_filters=params.num_filters,filter_sizes=params.filter_sizes,hidden_dim=params.hidden_dim,dropout_p=params.dropout_p,num_classes=num_classes)model=model.to(device)# Define lossclass_weights_tensor=torch.Tensor(np.array(list(class_weights.values())))loss_fn=nn.BCEWithLogitsLoss(weight=class_weights_tensor)# Define optimizer & scheduleroptimizer=torch.optim.Adam(model.parameters(),lr=params.lr)scheduler=torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer,mode='min',factor=0.1,patience=5)# Trainer moduletrainer=Trainer(model=model,device=device,loss_fn=loss_fn,optimizer=optimizer,scheduler=scheduler)# Trainbest_model,best_val_loss=trainer.train(params.num_epochs,params.patience,train_dataloader,val_dataloader)# Best threshold for f1train_loss,y_true,y_prob=trainer.eval_step(dataloader=train_dataloader)precisions,recalls,thresholds=precision_recall_curve(y_true.ravel(),y_prob.ravel())threshold=find_best_threshold(y_true.ravel(),y_prob.ravel())# Determine predictions using thresholdtest_loss,y_true,y_prob=trainer.eval_step(dataloader=test_dataloader)y_pred=np.array([np.where(prob>=threshold,1,0)forprobiny_prob])# Evaluateperformance=get_metrics(y_true=y_test,y_pred=y_pred,classes=label_encoder.classes)return{"params":params,"tokenizer":tokenizer,"label_encoder":label_encoder,"model":best_model,"performance":performance,"best_val_loss":best_val_loss,}
Tracking
With MLFlow we need to first initialize an experiment and then you can do runs under that experiment.
1
importtempfile
12
# Set experimentmlflow.set_experiment(experiment_name="baselines")
INFO: 'baselines' does not exist. Creating a new experiment
1234
defsave_dict(d,filepath):"""Save dict to a json file."""withopen(filepath,"w")asfp:json.dump(d,indent=2,sort_keys=False,fp=fp)
Let's view what we've tracked from our experiment. MLFlow serves a dashboard for us to view and explore our experiments on a localhost port but since we're inside a notebook, we're going to use public tunnel (ngrok) to view it.
1
frompyngrokimportngrok
Note
You may need to rerun the cell below multiple times if the connection times out it is overloaded.
# Best rundevice=torch.device("cpu")best_run_id=all_runs.iloc[0].run_idbest_run=mlflow.get_run(run_id=best_run_id)withtempfile.TemporaryDirectory()asfp:client.download_artifacts(run_id=best_run_id,path="",dst_path=fp)tokenizer=Tokenizer.load(fp=Path(fp,"tokenizer.json"))label_encoder=LabelEncoder.load(fp=Path(fp,"label_encoder.json"))model_state=torch.load(Path(fp,"model.pt"),map_location=device)performance=load_dict(filepath=Path(fp,"performance.json"))
# Dataloadertext="Transfer learning with BERT for self-supervised learning"X=np.array(tokenizer.texts_to_sequences([preprocess(text)]))y_filler=label_encoder.encode([np.array([label_encoder.classes[0]]*len(X))])dataset=CNNTextDataset(X=X,y=y_filler,max_filter_size=max(filter_sizes))dataloader=dataset.create_dataloader(batch_size=batch_size)