Skip to main content

Guangzhou, China

MLFlow Docker

Part 2 of MLFlow in Docker.

Hyperopt

Distributed Asynchronous Hyper-parameter Optimization:

pip install hyperopt
from hyperopt import fmin, tpe, hp, Trials

Define the parameter you want to tune:

search_space = {
"lr": hp.loguniform("lr", -10, -8),
"l1": hp.choice("l1", [32, 64, 128]),
"l2": hp.choice("l2", [64, 128, 256])
}

And replace all parameters inside the lightning model accordingly:

class EmnistModel(pl.LightningModule):
def __init__(self, hparams):
super().__init__()

self.save_hyperparameters(hparams)

self.criterion = nn.CrossEntropyLoss()

self.network = nn.Sequential(
nn.Conv2d(1,32,kernel_size=3,padding=1),
nn.ReLU(),
nn.Conv2d(32,64,kernel_size=3,padding=1),
nn.ReLU(),
nn.MaxPool2d(2,2), # 64*14*14

nn.Conv2d(64,128,kernel_size=3,padding=1),
nn.ReLU(),
nn.Conv2d(128,256,kernel_size=3,padding=1),
nn.ReLU(),
nn.MaxPool2d(2,2), # 256*7*7

nn.Flatten(),
nn.Linear(256*7*7, self.hparams["l1"]),
nn.ReLU(),

nn.Linear(self.hparams["l1"], self.hparams["l2"]),
nn.ReLU(),

nn.Linear(self.hparams["l2"], 26)
)

def forward(self, xb):
return self.network(xb.reshape(-1,1,28,28))

def configure_optimizers(self):
return optim.Adam(self.parameters(), lr = self.hparams["lr"])

def training_step(self, batch, batch_idx):
# batches consists of images and labels
x, y = batch
# labels start at 1 but the classes at 0
y -= 1

y_hat = self(x)
loss = self.criterion(y_hat, y.long())
pred = y_hat.argmax(dim = 1)

acc = accuracy(pred, y, task="multiclass", num_classes=26)

self.log("train_loss", loss, on_epoch=True, prog_bar=True)
self.log("train_acc", acc, on_epoch=True, prog_bar=True)

return loss

def validation_step(self, batch, batch_idx):
x, y = batch
y -= 1

y_hat = self(x)
loss = self.criterion(y_hat, y.long())
pred = y_hat.argmax(dim=1)

acc = accuracy(pred, y, task="multiclass", num_classes=26)

self.log("val_loss", loss, on_epoch=True, prog_bar=True)
self.log("val_acc", acc, on_epoch=True, prog_bar=True)

return acc

def test_step(self, batch, batch_idx):
x, y = batch
y -= 1

y_hat = self(x)
loss = self.criterion(y_hat, y.long())
pred = y_hat.argmax(dim=1)

acc = accuracy(pred, y, task="multiclass", num_classes=26)

self.log("test_loss", loss, on_epoch=True, prog_bar=True)
self.log("test_acc", acc, on_epoch=True, prog_bar=True)

return acc

def predict_step(self, batch, batch_idx, dataloaders_idx=0):
x, y = batch

return self(x)

MLFlow

The model can now be executed with MLFlow:

def train_emnist(params):
# create a group of mlflow runs that nests all experiments
with mlflow.start_run(nested=True):
model = EmnistModel(params)

trainer = pl.Trainer(max_epochs=10, accelerator="gpu")
trainer.fit(model, train_dataloaders=train_dl, val_dataloaders=val_dl)

train_loss = trainer.callback_metrics["train_loss"].item()
train_acc = trainer.callback_metrics["train_acc"].item()
valid_loss = trainer.callback_metrics["val_loss"].item()
valid_acc = trainer.callback_metrics["val_acc"].item()

mlflow.log_params(params)

mlflow.log_metrics({
"train_loss_avg": train_loss, "train_acc_avg": train_acc,
"val_loss_avg": valid_loss, "val_loss_avg": valid_acc
})

input_schema = Schema([TensorSpec(np.dtype(np.float32), (-1, 1, 28, 28))])
output_schema = Schema([TensorSpec(np.dtype(np.float32), (-1, 26))])

signature = ModelSignature(inputs=input_schema, outputs=output_schema)

mlflow.pytorch.log_model(model, "emnist_classifier_hyper__cnn", signature=signature)

return -valid_acc # run optimization to minimize negative validation accuracy
with mlflow.start_run():
best_result = fmin(
fn=train_emnist,
space=search_space,
algo=tpe.suggest,
max_evals=10
)

MLFlow in Docker

MLFlow in Docker

MLFlow in Docker

MLFlow in Docker

MLFlow in Docker

MLFlow in Docker

MLFlow in Docker

Best Model Evaluation

The model with the best validation accuracy is:

print(hyperopt.space_eval(search_space, best_result))
# {'l1': 128, 'l2': 256, 'lr': 0.00013761616014749492}

MLFlow in Docker

# get run ID for the above model from the MLFlow dashboard
logged_model = 'runs:/80f82f97d942473aab55f5ba47df8db2/emnist_classifier_hyper__cnn'

# Load model as a PyFuncModel.
loaded_model = mlflow.pyfunc.load_model(logged_model)

predictions = loaded_model.predict(test_images.numpy().reshape(-1,1,28,28))

print("Predicted Class:", classes[np.argmax(predictions[0])])
test_samples = np.random.randint(0, len(test_images), 16)

fig = plt.figure(figsize = (8, 8))

for i, idx in enumerate(test_samples):
true_label = classes[int(test_labels[idx].item()) - 1]
pred_label = classes[np.argmax(predictions[idx])]

plt.subplot(4, 4, i+1)
plt.imshow(test_images[idx].numpy().reshape(-1,1,28,28).squeeze() / 255.0, cmap="gray")
plt.title(f"True: {true_label} || Pred: {pred_label}")
plt.axis('off')

plt.tight_layout()
plt.show()

MLFlow in Docker

y_pred = []
y_true = []

for inputs, labels in test_dl:
output = loaded_model.predict(inputs.numpy().reshape(-1,1,28,28))

output = np.argmax(output, axis=1).astype('float64').tolist()
y_pred.extend(output)

labels = [x-1 for x in labels.tolist()]
y_true.extend(labels)

cm = confusion_matrix(y_true, y_pred)
confusion = ConfusionMatrixDisplay(cm, display_labels=classes)

fig, ax = plt.subplots(figsize = (12,8))
confusion.plot(ax = ax)

MLFlow in Docker

Model Registry

Once you picked a model to be the best model for your task you can register it in the MLFlow model registry:

MLFlow in Docker

MLFlow in Docker

MLFlow in Docker