ML-Pipeline mit Kubeflow und MLflow
In diesem Blogpost führen wir die MLOps-Serie fort. Dazu werden die Themen ML-Pipeline, Tracking von Experimenten, Daten- und Modellversionierung sowie Reproduzierbarkeit und Versionierung der gesamten Pipeline adressiert und alles unter einen Hut gebracht. Zur Verwendung kommen dafür Kubeflow Pipelines in Kombination mit MLflow Tracking. Wir werden uns die besten Features von beiden Werkzeugen zunutze machen.
Warum Pipelines?
Eine Machine Learning (ML) Pipeline beschreibt einen Workflow eines Prozesses wie dem Modelltraining mithilfe eines DAGs (Directed Acyclic Graph). Gerichtete azyklische Graphen bestehen aus einzelnen Schritten oder auch Komponenten. Jede Komponente definiert genaue Inputs und Outputs und ist alleinstehend funktionsfähig sowie reproduzierbar. Damit legt das Konzept von Pipelines den Grundstein zur Automatisierung zwecks MLOps.
Was ist Kubeflow?
Kubeflow ist ein Open Source Projekt mit dem Ziel, dass Deployments von ML-Workflows auf Kubernetes einfach, portabel und skalierbar werden. Die Infrastruktur wird für Data Scientists gekapselt, sodass sich diese auf ihre Kernaufgaben konzentrieren können. Kubeflow ermöglicht das Ausrollen und Managen verschiedener Services durch eine Plattform mit mehreren großen Komponenten wie Kubeflow Pipelines oder KFServing. Mittels einer graphischen Oberfläche und der jeweiligen Python SDK können Data Scientists unterschiedliche Aufgaben bewältigen, die eine große Bandbreite der MLOps-Anforderungen abdecken. Kubeflow hat es sich zur Aufgabe gemacht, auf jedem Kubernetes lauffähig zu sein egal ob in der Cloud oder On-Premise.
Komponenten einer Kubeflow Pipeline werden mittels Dockercontainern umgesetzt. Grundsätzlich kann das über die sogenannte schwer- oder leichtgewichtige Funktion erreicht werden. Bei der schwergewichtigen Variante erstellt man außerhalb von Kubeflow einen Dockercontainer, in dem Pythoncode deponiert wurde und legt diesen in einer Registry ab. Im Gegensatz dazu erlaubt die leichtgewichtige Funktionalität, dass eine Pythonfunktion mittels einer SDK in einen Dockercontainer geladen wird. Aus diesem Grund wird sich dieser Beitrag auf die leichtgewichtige Option fokussieren, um eine gesamte Kubeflow Pipeline in einem Jupyter Notebook zu erstellen.
Beispiel einer Kubeflow Pipeline mit MLflow Tracking
Für die exemplarische Umsetzung der Kubeflow Pipeline bedienen wir uns dem bekannten Beispiel einer Zeitreihenvorhersage des letzten Blogposts über MLflow. Der dazugehörige Pythoncode wird sich nur wenig ändern. Wir werden daraus einzelne leichtgewichtige Komponenten formen und sie direkt in einen Dockercontainer einbetten, ohne ihn manuell bauen zu müssen. Zudem verwenden wir den Tracking-Mechanismus und die Modellregistry von MLflow, da MLflow aktuell mehr Features als die Metadata-Komponente von Kubeflow zur Verfügung stellt. Als MLflow Backend-Store wird eine MySQL Datenbank und als Artifact-Store ein AWS S3 Bucket benutzt, was ein sicheres und stabiles Setup darstellt.
Voraussetzungen
Im weiteren Verlauf setze ich grundlegende Kenntnisse von Kubernetes voraus. Als technische Voraussetzungen muss Kubeflow in einem Kubernetes Cluster installiert sein, MLflow darin ausgerollt werden und ein AWS S3 Bucket erstellt werden. Die hier verwendete Kubeflow-Version beträgt v1.2.0. Bei niedrigeren Versionen kann es zu Abweichungen im Code und den Secrets kommen. Wie schon angedeutet kann Kubeflow auf viele Weisen installiert werden. Ein schneller und einfacher Weg ist ein lokales Kubernetes Cluster in einer virtuellen Maschine mittels MiniKF. MiniKF installiert automatisch eine VM über Virtual Box und erstellt ein Minikube-Cluster zusammen mit Kubeflow.
Steht unser Kubernetes Cluster zusammen mit Kubeflow, so können wir MLflow deployen. Für diesen Zweck habe ich im zugehörigen Github Repository alle benötigten yaml-Dateien und ein Dockerfile abgelegt. Alle Installations-Befehle sind in der README zu finden. Das Deployment besteht im Wesentlichen aus 2 Teilen: dem MLflow-Trackingserver und der Backend-Datenbank MySQL. Für den Trackingserver bauen wir als erstes das entsprechende Dockerimage mit dem Namen mlflow/server. Danach muss ein eigener Namespace namens mlflow für unsere MLflow Komponenten eingerichtet werden. Zudem sind insgesamt 3 Secrets erforderlich: Benutzerdaten für die MySQL Datenbank und AWS Credentials sowohl im Namespace mlflow als auch im erstellten Namespace von Kubeflow, der standardmäßig anonymous heißt. Jetzt können die 3 yaml-Dateien in folgender Reihenfolge ausgerollt werden: 1. Persistent Volume Claim für MySQL-DB (mysql-pvc.yaml), 2. MySQL-DB Deployment (mysql-deployment.yaml) und 3. MLflow-Trackingserver Deployment (mlflow-deployment.yaml). Dabei ist darauf zu achten, dass das Persistent Volume Claim an ein Persistent Volume gebunden ist. Zum Abschluss erstellen wir uns ein AWS S3 Bucket, wo unsere Modelle gespeichert werden. Der Name des Buckets muss in der mlflow-deployment.yaml geändert werden.
Kubeflow ML-Pipeline
Falls noch nicht geschehen, starten wir einen Jupyter Notebook Server in der graphischen Oberfläche von Kubeflow. Von dort aus wird unsere Pipeline ausgerollt. Auf dem Server muss die Pythonbibliothek kfp über beispielsweise pip installiert werden. Unsere fertige Pipeline wird folgendermaßen aussehen:
Auf die einzelnen Schritte der Pipeline gehen wir im Laufe des Beitrags genauer ein. Der gesamte Code sowie die Dateien zum Ausrollen von MLflow sind im zugehörigen Github Repository abgelegt.
Die erste leichtgewichtige Komponente lädt den Datensatz aus dem MLflow Repository runter und übergibt ihn als csv-Datei an die nächste Komponente zur Weiterverarbeitung. Zur Umsetzung schreiben wir den gesamten Code in eine Funktion, die später in den Dockercontainer geladen wird. Diese Funktion muss alle erforderlichen Imports beinhalten. Betrachten wir die Funktion für die erste Komponente:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
from kfp.components import InputPath, OutputPath def load_raw_data(path: str, raw_data_path: OutputPath('CSV')): print(raw_data_path) import subprocess # downlaod the dataset from the mlflow repo def download_dataset(): subprocess.call(['apt-get', 'update']) subprocess.call(['apt-get', 'install', 'curl', '-y']) subprocess.call(['curl', '-o', 'household_power_consumption.txt', 'https://raw.githubusercontent.com/felix-exel/mlflow/master/household_power_consumption.txt']) download_dataset() import pandas as pd df = pd.read_csv(path, sep=';', parse_dates={'dt': ['Date', 'Time']}, infer_datetime_format=True, low_memory=False, na_values=['nan', '?'], index_col='dt') df.to_csv(raw_data_path) |
In der Funktionsbeschreibung der dritten Zeile ist eine Variable vom Typ kfp.components.OutputPath angegeben. Mit diesem Decorator geben wir Kubeflow an, dass die Komponente eine csv-Datei als Output weitergeben wird. Dafür wird Kubeflow eigenständig einen Pfad erstellen und diesen Pfad als String dem nächsten Schritt in der Pipeline übermitteln. Dieses Vorgehen ist bei größeren Dateien zu empfehlen. Die andere Variante wäre alle Variablen der csv-Datei als Parameter zu übergeben, was einen sehr hohen Traffic verursachen könnte. Kubeflow kümmert sich intern darum, wie die csv-Datei von einem Dockercontainer in den nächsten gemountet wird. In der Zeile 6 und 17 finden die Imports statt. Der restliche Code ist selbsterklärend: Es wird der Datensatz household_power_consumption.txt aus dem Github Repository runtergeladen und unter dem von Kubeflow generierten Pfad gespeichert.
Das Laden der load_raw_data()-Funktion in einen Dockercontainer sieht folgendermaßen aus:
1 2 3 4 5 |
import kfp.components as comp load_raw_data_op = comp.func_to_container_op(load_raw_data, base_image='python:3.7-slim', packages_to_install=['pandas==1.0.5']) |
Es wird die vorher definierte Funktion übergeben zusammen mit dem Dockerimage und den benötigten Paketen, die zusätzlich installiert werden sollen. Damit ist unsere leichtgewichtige Komponente Load Raw Data auch schon fertig!
Der nächste Schritt der Pipeline nimmt die csv-Datei der Load Raw Data Komponente entgegen, füllt fehlende Werte einer Spalte mit dem Mittelwert und übergibt das gespeicherte Ergebnis wieder als csv-Datei an die nächste Komponente:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
def filling_nans_with_mean(input_data_path: InputPath('CSV'), output_data_path: OutputPath('CSV')): import pandas as pd df = pd.read_csv(input_data_path, sep=',', header=0, infer_datetime_format=True, low_memory=False, parse_dates=True, index_col='dt') # filling nan with mean in any columns for j in range(0, df.shape[1]): df.iloc[:, j] = df.iloc[:, j].fillna(df.iloc[:, j].mean()) print(df.isnull().sum()) df.to_csv(output_data_path) |
In der ersten Zeile wird sowohl ein InputPath als auch ein OutputPath einer csv-Datei deklariert. Der InputPath wird mithilfe der Pandas Funktion read_csv() eingelesen und das Ergebnis unter dem OutputPath gespeichert. Das Laden dieser Funktion in den Dockercontainer sieht wie bei der vorherigen Komponente aus. Ein Blick auf die Komponente zeigt folgendes Bild:
Standardmäßig nutzt Kubeflow einen Minio Server als Artefakt-Store, der durch Persistent Volumes abgedeckt wird. Auf der graphischen Oberfläche sehen wir die Input- und Output-URI und den Anfang der Dateien. Unter der Artifacts-Seite sind die CSV-Dateien ebenfalls eingetragen:
Durch die Caching-Funktion von Kubeflow werden bei weiteren Ausführungen der Pipeline alle Komponenten übersprungen, die komplett identisch zur vorherigen Ausführung waren. Dies kann zu einer erheblichen Zeiteinsparung und Reduzierung des Rechenaufwands führen, wenn große Datenmengen und aufwändige Verarbeitungsschritte nicht wiederholt werden müssen.
Die weiteren Komponenten Split Data und Standardization betrachten wir nicht mehr im Detail. Die Namen spiegeln die Funktionalität wider und zusätzlich liegt der gesamte Code im Jupyter Notebook des Repositorys. Die Inputs und Outputs sind weiterhin csv-Dateien. Einzig die Anzahl der Dateien ändert sich nach dem Teilen der Daten in Trainings- und Testdaten. Dann werden entsprechend 2 csv-Dateien weitergegeben.
Spannend wird es beim parallelen Modelltraining. Sobald die Standardisierung der Trainings- und Testdaten abgeschlossen ist, wird das Training parallel gestartet. Um im Training das MLflow-Tracking zu nutzen, setzen wir zunächst die erforderlichen Einstellungen innerhalb der training()-Funktion:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
def training(input_train_data_path: InputPath('CSV'), input_test_data_path: InputPath('CSV'), batch_size: int, window_length: int, future_length: int, dropout_fc: float, hidden_layer_size: int, n_output_features: int, mlpipeline_metrics_path: OutputPath('Metrics')): import mlflow import os def set_mlflow_settings(): os.environ["GIT_PYTHON_REFRESH"] = "quiet" registry_uri = 'mysql+pymysql://mlflow:mlflow@mysql.mlflow.svc.cluster.local:3306/mlflow' tracking_uri = 'http://mlflow-service.mlflow.svc.cluster.local:5001' mlflow.tracking.set_registry_uri(registry_uri) mlflow.tracking.set_tracking_uri(tracking_uri) set_mlflow_settings() |
In den Zeilen 17 und 18 finden sich die Adressen auf die Services der MySQL-DB und des MLflow-Trackingservers im Kubernetes Cluster. Nicht elegant an der Stelle ist natürlich, dass die Benutzerdaten der Datenbank im Klartext hart kodiert sind, was nur der Einfachheit geschuldet ist. Mit diesen Voraussetzungen sind wir bereit, Experimente und Runs in der Kubeflow Pipeline mit MLflow zu tracken!
Von hier an ist der Pythoncode für das Training fast identisch zu dem im vorherigen Blogpost.
- Die Inputdateien werden eingelesen.
- Das LSTM-basierte RNN zur Zeitreihenvorhersage wird erstellt.
- Es wird die TensorFlow Data API für Sliding Windows genutzt.
- Ein benutzerdefinierter TensorFlow Callback für das Metriktracking über MLflow wird definiert.
- Wir starten einen MLflow Run mit mlflow.start_run() und tracken Parameter wie die Batch-Größe, Dropoutrate, usw.
- Das RNN-Modell wird mit dem Datensatz über den Stromverbrauch von Haushalten trainiert.
- Am Ende wird das trainierte TensorFlow-Modell in der MLflow-Registry angemeldet und damit automatisch in das AWS S3 Bucket hochgeladen.
Da nun alle Komponenten der Pipeline definiert wurden, muss daraus noch ein Graph mithilfe der Python SDK gebaut werden:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
import kfp.dsl as dsl @dsl.pipeline( name='Training pipeline', description='Training pipeline for time series forecasting on household power consumption dataset.' ) def training_pipeline( path='./household_power_consumption.txt' ): load_raw_data_task = load_raw_data_op( path).set_display_name('Load Raw Data') key_output = str(list(load_raw_data_task.outputs.keys())[0]) filling_nans_with_mean_op_task = filling_nans_with_mean_op(load_raw_data_task.outputs[key_output]).after( load_raw_data_task).set_display_name('Filling NaNs with Mean') |
Mit dem @dsl.pipeline()-Decorator geben wir die Kubeflow Pipeline an. Metadaten der Pipeline wie ein Name und eine Beschreibung können als Parameter gesetzt werden. Ab der Zeile 8 definieren wir unsere Pipeline-Funktion, die den Graphen sowie die Inputs und Outputs von allen Komponenten beschreibt. In die Pipeline-Funktion fügen wir die zuvor erstellten Komponenten wie beispielsweise load_raw_data_op() ein.
Um den Outputpfad des ersten Schrittes zu übergeben, extrahieren wir den Schlüssel aus dem Dictionary load_raw_data_task.outputs, damit dieser nicht hartkodiert wird. In der Zeile 17 wird die zweite Komponente Filling NaNs with Mean eingefügt. Als Eingabe erhält sie den Pfad der csv-Datei des vorherigen Schritts. Mit der .after()-Funktion geben wir die Abhängigkeit und damit den Fluss des Graphen an.
Ähnlich dazu sehen die weiteren Komponenten bis zum Modelltraining aus. Deswegen überspringen wir den Code dazu und konzentrieren uns auf das parallel ablaufende Training:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
key_output1 = str(list(standardization_task.outputs.keys())[0]) key_output2 = str(list(standardization_task.outputs.keys())[1]) grid_search_dic = {'hidden_layer_size': [20, 40], 'batch_size': [64], 'future_length': [5], 'window_length': [50], 'dropout_fc': [0.0, 0.2], 'n_output_features': [1]} # Cartesian product grid_search_param = [dict(zip(grid_search_dic, v)) for v in product(*grid_search_dic.values())] for i, params in enumerate(grid_search_param): batch_size = params['batch_size'] window_length = params['window_length'] future_length = params['future_length'] dropout_fc = params['dropout_fc'] hidden_layer_size = params['hidden_layer_size'] n_output_features = params['n_output_features'] training_task = training_op(standardization_task.outputs[key_output1], standardization_task.outputs[key_output2], batch_size, window_length, future_length, dropout_fc, hidden_layer_size, n_output_features).after(standardization_task).set_display_name('Model Training '+str(i+1))\ .apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY')) |
Wie im MLflow-Blogpost werden die Hyperparameterkonfigurationen für ein Grid Search in den Zeilen 12 und 13 definiert. Daraus ergeben sich vier verschiedene Einstellungen, die parallel ausgeführt werden. Die unterschiedlichen Parameter werden durch eine Schleife über das kartesische Produkt an die training_op()-Komponente übergeben zusammen mit den Trainings- und Testdaten als Pfade. Da die Trainingskomponente die Modelle mit MLflow in das S3 Bucket hochlädt, benötigt sie die AWS Credentials, die als Kubernetes Secret im kubeflow Namespace gespeichert wurden. Damit ist unsere Pipeline fertig!
Eine Ausführung der definierten Pipeline kann ebenfalls mit der SDK in einem Befehl erreicht werden:
1 2 3 4 |
import kfp kfp.Client().create_run_from_pipeline_func( training_pipeline, arguments=arguments, namespace=namespace) |
Das Hochladen der Pipeline ist auch schnell bewerkstelligt:
1 2 3 |
kfp.compiler.Compiler().compile(training_pipeline, 'workflow.yaml') kfp.Client().upload_pipeline(pipeline_package_path='workflow.yaml', pipeline_name='Training Pipeline.') |
Damit kann jeder mit Zugangsrechten über die graphische Benutzeroberfläche von Kubeflow diese Pipeline ausführen oder sogar einen Zeitplan zum automatischen Triggern erstellen. Zudem können weitere neue Versionen unter dem selben Pipelinenamen hinzugefügt werden.
Für die Analyse der Trainingsdurchläufe nutzen wir das MLflow-Dashboard und vergleichen die vier registrierten Modelle:
Es zeigt sich, dass das Modell Nr. 4 bei unseren Testdaten am besten abgeschnitten hat, obwohl die Metriken der Trainingsdaten nicht die geringsten Werte aufweisen. Es kann wesentlich besser generalisieren als das Modell Nr. 3, was durch einen signifikant höheren Validation-Loss unterstrichen wird. Damit erzielt das Modell Nr. 4 gegeben durch die Hyperparameter mit der Droprate von 0.2 und 40 Neuronen der LSTM-Schichten das beste Ergebnis.
Fazit
In diesem Blogpost haben wir eine Kubeflow Pipeline durch leichtgewichtige Komponenten zusammengestellt und diese mit der MLflow-Tracking Funktionalität verbunden. Dadurch konnten wir die besten Vorteile beider Tools für uns nutzen. Kubeflow Pipelines ermöglichen das Automatisieren und Planen von Pipelines. Zudem können sie versioniert und von anderen durch das Bedienen der Kubeflow Oberfläche reproduziert werden. Die modularen Komponenten können für andere Pipelines wiederverwendet oder ausgetauscht werden. Die MLflow-Modellregistry ermöglicht es uns, dass die Modelle versioniert festgehalten werden. Sie liegen durch MLflow sicher auf einem AWS S3 Bucket und die dazugehörigen Metadaten wie Parameter und Metriken in einer MySQL Datenbank.
Der nächste Schritt dieser MLOps-Serie ist das Bereitstellen des besten Modells in der Produktion als Service durch z.B. KFServing und das Monitoring des Services.
Aktuelle Beiträge






Artikel kommentieren