Datenfluss


In den vorherigen beiden Artikeln haben wir uns über den Aufbau eines Clusters und den unterschiedlichen Arten von Daten ausgetauscht, so dass wir nun eine Idee haben (sollten), was wir tun, wenn wir auf Daten treffen.

Aufgabe

Nehmen wir an, wir hätten die Aufgabe Daten verschiedener Systeme in unserem soeben aufgebauten Cluster zu speichern und so zur Verfügung zu stellen, dass wir diese Daten effizient analysieren können. Aufgrund der Analyse der Quellen und aus der vorherigen Diskussion über Daten haben wir uns entschieden, unsere Daten im Hadoop-Cluster im Parquet-Format abzulegen. Unter effizient nehmen wir hier exemplarisch an, dass wir darunter verstehen, dass einzelne Attribute aus der Gesamtmenge ausgelesen werden können und das alle Datumsfelder in demselben Datumsformat (hier exemplarisch UTC) gespeichert werden. Andere Transformationen sind erst einmal nicht vorgesehen.

Notwendige Funktionalitäten

Zu Beginn möchte ich mit Ihnen verschiedene Anforderungen an eine Datenintegration diskutieren. Hierbei gibt es natürlich mal wieder nicht nur funktionale Anforderungen, sondern auch nicht-funktionale.

Ich erwähne das hier so explizit, da ich häufig auf ein Umfeld treffe, dass skeptisch gegenüber der Verwendung von Frameworks oder Open Source Software ist und lieber vieles oder sogar alles selber entwickelt. Somit sollten wir uns zuvor überlegen, was wir genau benötigen.

Was zeichnet also ein Tool zur Datenintegration aus?

Der erste Teil unserer zuvor skizzierten Aufgabe besteht darin, dass wir Daten aus verschiedenen Quellen auslesen sollen. Somit müssen wir in der Lage sein, verschiedene Quellsystem anzusprechen: Datenbanken, Messaging-Systeme, Dateien via sFTP, etc..

Wenn wir diese Daten ausgelesen haben (und dauerhaft auslesen werden), wollen wir die Datumsfelder in ein einheitliches Format (UTC) tranformieren (zweiter Teil der Aufgabe), um die Ergebnisdaten dann in sinnvoller Größe als Parquet-Datei im Hadoop-Cluster zu speichern.

Wir benötigen somit eine Komponente, die in der Lage ist einzelne Attribute aus dem Datenstrom herauszulesen und diese in das gewünschte (UTC-)Format zu transformieren. Aber was passiert, wenn ein Fehler bei der Konvertierung auftritt, beispielsweise weil wir Daten erhalten haben, die in der Annahme eines Schaltjahres einen Datensatz vom 29.3.2019 beinhaltet? Oder als unbestimmtes Startdatum des Quellsystems das Datum 0000-00-00T00:00.00 ausgewählt wurde und somit häufig in den Datensätzen vorkommt? Ok, Problem erkannt, Maßnahme ergriffen: ein Exception Handling mit der Möglichkeit einer Wiedervorlage.

Als letzte Komponenten benötigen wir etwas, das in der Lage ist Parquet-Dateien in definierter Blockgröße (Chunk-Size) in ein verteiltes Dateisystem (HDFS) zu schreiben. Die definierte Blockgröße ist eine etwas ungenaue Angabe, da es immer die Entscheidung des Data Engineers ist, einen guten Kompromiss zwischen Blöckgröße des Clusters (per Default sind das bei HDFS 128 MB) und der Verfügbarkeit der Daten zu finden, denn für weniger breite Daten benötigen wir entsprechend mehr Datensätze, um 128 MB zu erhalten, was dazu führt, dass es länger dauert, bis wir die Parquet-Datei schreiben können. Der Kompromiss hierbei kann eben sein, dass wir zu Beginn nicht immer solange warten, bis wir 128 MB haben, sondern maximal 15 oder 30 Minuten. Und im Nachgang (vielleicht in einem Wartungsfenster am Wochenende) konsolidieren wir die Dateien, die nicht optimal aufgebaut sind.

Kommen wir wieder zum Fehlerfall: Nehmen wir an, dass aufgrund eines Hardware-Ausfalls der Hadoop-Cluster ausfällt und wir die Daten, die wir unentwegt aus den Quellsystemen auslesen, aktuell nicht wegschreiben können. Wir müssen somit mit einer stetig ansteigenden Menge an Daten umgehen können oder wir müssten unserer Komponente, die zum Auslesen aus den Quellsystemen entstanden ist, mitteilen, dass wir aktuell keine weiteren Daten benötigen können (Back Pressure) und später wieder Bescheid geben, wenn das Schreiben in das HDFS wieder funktioniert.

Mögliche Produkte zur Umsetzung

Um unsere zuvor genannten Anforderungen umzusetzen, können wir uns für verschiedene Implementierungen entscheiden.

  1. Verwendung eines Tools zur Datenintegration: Apache Nifi
  2. Verwendung eines Frameworks zur Datenintegration: Luigi (Python Framework)
  3. Implementierung einer eigenen Datenintegrationspipeline: Apache Spark


Fangen wir mit Apache Nifi an. Hierbei handelt es sich um eine Software, die speziell zur Integration von Datenflüssen entwickelt wurde und sogenannte Prozessoren zum Auslesen, Verarbeiten und Schreiben anbietet. Das Besondere an Apache Nifi ist, dass per Design eine Lieferung der Daten garantiert, die es mit den eigenen Prozessoren ausgelesen hat. Hierzu werden Daten direkt beim Auslesen in ein Repository gespeichert und bei der nachfolgenden Verarbeitung nur noch referenziert, was die Verarbeitungsgeschwindigkeit drastisch erhöht. Apache Nifi ist clusterfähig, das bedeutet, dass mehrere Instanzen parallel betrieben werden können, um eine größere Datenmenge verarbeiten zu können. Die Anwendung kann über ein WebUI (auch von Nicht-Entwicklern) bedient werden und verfügt über eine Benutzerverwaltung.

Luigi, ein von Spotify entwickeltes Python Framework, bietet ähnliche Funktionen wie Apache Nifi, allerdings mit einer deutlich anderen Zielgruppe: Entwickler, konkret: Python Entwickler. Die Anwendung ist auch darauf ausgelegt, dass Datenflüsse mal nicht funktionieren. Luigi schreibt hierzu: You want to chain many tasks, automate them, and failures will happen. Ein großer Vorteil von Luigi ist, dass es aufgrund seines Aufbaus - das Framework benötigt nicht zwingend einen dedizierten Server(-prozess) und lässt sich mit –local-scheduler auch sehr gut auf dem Laptop ausführen - sowohl für kleinere Datenintegrationen als auch für umfangreiche Implementierung verwendet werden kann. Zusätzlich kümmert es sich um die Koordination der Ausführung der Tasks, damit nicht zwei Tasks parallel aus einer Datenquelle lesen und es somit zu Überschneidungen oder Überholern bei der Verarbeitung kommt. Aber der größte Vorteil, insbesondere für die Entwicklung, ist seine Python-Basis. Wer weiß, wie man in Python ein Datum aus Format A in Format B konvertiert, kann dieses Code-Fragment einfach in einem Jupyter Notebook ausprobieren und testen und ohne Anpassungen in einen Luigi Task integrieren. Um den Start mit Luigi zu vereinfachen, setzt das System, ähnlich wie Apache Nifi, auf vordefinierte Komponenten, die hier nicht Prozessoren, sondern Targets heißen, womit man das Auslesen aus einer Datenbank, das Schreiben ins HDFS oder die Ausführung von Apache Spark Code nicht selbst implementieren muss. Sehr smart.

Kommen wir somit zum bereits zuvor genannten Apache Spark als dritte Implementierungsmöglichkeit. Apache Spark ist eine in Scala programmierte Software, welche besonders bei der Verarbeitung sehr großer Datenmengen aktuell zum Quasi-Standard geworden ist. Es bietet nicht nur die Möglichkeit in einem Hadoop-Cluster in Yarn verteilt ausgeführt zu werden, sondern kann sowohl Streaming- als auch Batch-Daten in Dataframes und Datasets speichern und kann hierbei auf ein großes Repertoire von Zusatzmodulen, wie beispielsweise Machine Learning Funktionen, zugreifen. Für die Implementierung von sogenannten Spark Jobs sei erwähnt, dass diese nicht nur in Scala, sondern ebenso gut in JAVA, aber auch in Python (PySpark) oder R (SparklyR) umgesetzt werden können. Im Falle von JAVA und Python sogar nahezu ohne (größere) Performancebeeinträchtigungen. Da Apache Spark zwar häufig für Datenintegrationen genutzt wird, aber nicht dediziert dazu gebaut wurde, müssen die zuvor genannten Komponenten und Funktionen selbst implementiert werden: lesende und schreibende Prozessoren, Queuing-Elemente zur Zwischenspeicherung der aktuellen Daten bei Ausfall der Verarbeitung, Koordination des Datenflusses, etc.. Der besondere Vorteil von Apache Spark ist sicherlich die Möglichkeit in unterschiedlichen Programmiersprachen die Datenflüsse und hier auch direkt die Transformationsprozesse zu realisieren. In großen Umgebungen ist die Ausführung in einem Hadoop-Cluster in Yarn im Besonderen nochmals zu erwähnen, da hier die Leistungsfähigkeit der Datenintegration direkt mit der Größe des Hadoop-Cluster skalieren kann. Ein unschätzbarer Wert, der auch den operativen Betrieb der Umgebung reduzieren kann.