In unserem letzten Blogeintrag, Datenanalyse - Intro, haben wir eine Umgebung zur Datenanalyse aufgebaut, auf die wir nun mit ersten Datenselektionen gegen unseren Analyse-Cluster aufbauen möchten.
Wenn Sie beim Aufbau Ihrer Umgebung diesem Blog gefolgt sind, dann besteht unser Cluster bisher “nur” aus einem HDFS, dem Hadoop Distributed File System. Mit diesem haben wir die Möglichkeit große Datenmengen, verteilt auf unseren Cluster-Membern, abzulegen, was wir ansatzweise mit unserem Datenfluss gezeigt haben.
Um Datensätze aus dem HDFS auswerten zu können, müssen wir diese in unsere Analyseumgebung laden. Mit dem zuvor zur Verfügung gestellten Pandas funktioniert das mit einer einzelnen (CSV-)Datei wie folgt:
import pandas as pd
df = pd.read_csv('hdfs://name-ihrer-umgebung/ordner-name/name-der-datei.csv')
# Anzeige der ersten 5 Datensätze
df.head(5)
Sollten wir hingegen mehr als eine Datei auslesen wollen, so können wir hier mit ein wenig ergänzendem Code, z.B. mit dem glob-Modul, eine Liste von Dateien erzeugen und diese über eine Schleife einlesen. Da wir hier allerdings auf ein entferntes Dateisystem (HDFS) zugreifen, ist das nicht der beste und vor allem einfachste Weg, da glob ein Auslesen aus HDFS nicht direkt unterstützt. Hier könnte uns ein anderes Modul, python-hdfs unterstützen, allerdings bleibt es bei der zu programmierenden Iteration über die einzelnen Dateien.
Einfacher geht das “eigentlich” mit dem Python-Framework Dask, da wir hier den zuvor genannten Code fast nur im Import ändern müssen, da viele Methoden von Dask denen von Pandas im Aufbau ähnlich sind.
Die Änderungen wären wie folgt. Zu Beginn aktivieren wir die Umgebung und installieren Dask:
conda activate analytics
conda install dask
Im Anschluss lesen wir alle (CSV-)Dateien Ihres Datenordners:
import dask.dataframe as dd
df = dd.read_csv('hdfs://name-ihrer-umgebung/ordner-name/*.csv')
# Anzeige der ersten 5 Datensätze
df.head(5)
Das Besondere an Dask ist das sogenannte Lazy Computation, ähnlich wie es auch in Apache Spark umgesetzt ist, wobei Daten nicht nach jeder Zeile Code verarbeitet werden, sondern en bloc bei Aufruf einer Funktion, die dieses Lazy Coputation ausführt. Im Falle von Dask ist dies beispielsweise .compute(). Auch ein .head() verarbeitet aktiv Daten.
Das Problem mit Dask ist allerdings, dass es für die Kommunikation mit HDFS auf pyarrow aufsetzt, welches wieder libhdfs benötigt. Sollte die Installation dieser abhängigen Frameworks in naher Zukunft mal auf verschiedenen Plattformen gut funktionen, dann passe ich gerne diesen Artikel an. Im Moment kann ich diese Kombination allerdings aufgrund der fehlenden Stabilität bei der Installation nicht empfehlen.
Besser, viel besser, geht das allerdings mit pyspark. Das liegt sicherlich zum einen daran, dass Spark nicht mehr so enorm neu ist, aber auch daran, dass es sehr stark mit dem Hadoop Ökosystem in Verbindung steht. Voraussetzung zur Nutzung von Spark ist allerdings (aktuell noch) die Installation des JAVA JDK in Version 8 (Version 11 geht nicht vollständig) und eine verfügbare HADOOP Installation (hier reicht allerdings ein entsprechender Download von Apache Hadoop und das setzen der HADOOP_HOME-Umgebungsvariablen).
Um mit pyspark arbeiten zu können, müssen wir dieses zu Beginn allerdings in unserer Analytics-Umgebung installieren:
conda activate analytics
conda install -c conda-forge pyspark
Die Verwendung von pyspark unterscheidet sich leicht von der von Dask. Um CSV-Dateien aus einem dedizierten HDFS-Ordner auszulesen, nutzen wir folgenden Code:
# Import der relevanten Frameworks/Module
from pyspark.sql import SparkSession
import pandas as pd
# Erzeugen einer SparkSession mit der Bezeichnung 'myAnalysis'
spark = SparkSession.builder.appName('myAnalysis').getOrCreate()
# Erzeugung eines Spark-Dataframes mit den Daten aus dem HDFS
df = spark.read.csv('hdfs://name-ihrer-umgebung/ordner-name/*.csv', header=True, sep=';')
# Konvertierung des Spark-Dataframes in ein Pandas-Dataframe
pandas_df = df.toPandas()
# Anzeige der ersten 5 Datensätze des Pandas-Dataframes
pandas_df.head(5)
Sie werden feststellen, dass das Erzeugen der SparkSession etwas Zeit in Anspruch nimmt. Danach ist dann das Laufzeitverhalten natürlich abhängig von der Menge und Größe der CSV-Dateien und der Geschwindigkeit der Umgebung, wo Sie diesen Code ausführen und von wo Sie diese Daten laden.
Nachdem wir die Daten nun im (Pandas-)Dataframe vorliegen haben, können wir diese filtern, aggregieren, mit anderen Datensätze in Relation setzen oder was auch immer man mit Daten sinnvolles tun kann.
Aber eines muss uns soeben klar sein: wir haben aus den CSV-Dateien alle (!) Informationen ausgelesen, sowohl alle Zeilen, wie auch alle Spalten und halten uns diese soeben im Arbeitsspeicher.
Folgender Code zeigt uns das derzeitige Datenvolumen des Pandas-Dataframes an:
pandas_df.info(memory_usage='deep')
Dieses gezeigte Vorgehen ist in der Praxis kein ganz ungewöhnliches (und ich sehe es auch sehr häufig in Code-Fragmenten verschiedener Projekte), da es zwar nicht effizient ist, aber doch sehr große Spielräume zum Experimentieren gibt.
Wir können allerdings durch einfache Wege die Größe des Dataframes reduzieren. Schauen wir uns zu Beginn vielleicht kurz an, warum das Dataframe so viel Speicher einnimmt.
Die einfachste Möglichkeit ist die Anzeige der Datentypen über:
pandas_df.info()
Oder nutzen Sie gerne erneut die Anzeige der Speichernutzung über:
pandas_df.info(memory_usage='deep')
In beiden Fällen werden Ihnen die interpretierten Datentypen des Dataframes angezeigt. Ist Pandas der Meinung, dass es sich bei dem Feld um eine Zahl handelt, so wird int16 angezeigt, auch wenn nur sehr kleine Zahlen vorkommen; Bei anderen Datentypen ist es ähnlich, bei Zeichenketten werden die object finden, was in den meisten, wenn nicht allen Fällen vorkommen wird.
Eine automatische Optimierung der Daten findet hier nicht statt, da das Schema des Dataframes bereits sehr früh bei der Erstellung des Dataframes erzeugt wird und das Framework nicht abschätzen kann, ob noch große Zahlen kommen oder ob nicht vielleicht doch noch ein Buchstabe, wie beispielsweise bei einer Hausnummer 15b, vorkommt.
Genau hier setzt die erste Optimierung an.
Nehmen wir an, wie hätten Postleitzahl, Hausnummer und Hausnummernzusatz als Spalten in unseren Daten und würden gerne den Datentype für Postleitzahl und Hausnummer von object auf int4 reduzieren, so können wir das wie folgt machen:
pandas_df[['Postleitzahl', 'Hausnummer']] = pandas_df[['Postleitzahl', 'Hausnummer']].astype(int4)
Wenn wir uns nun nach der Ausführung erneut den Speicherverbrauch anschauen, werden wir eine (starke) Reduzierung feststellen:
pandas_df.info(memory_usage='deep')
Noch drastischer lässt sich das Volumen durch Reduzierung der Datensätze verkleinern. Und das, ohne Datensätze löschen zu müssen.
Nehmen wir an, wir hätten Daten, die beispielsweise das Schaltverhalten einer Ampel über den Tag abbildet. Somit hätten wir währscheinlich Spalten wie: Uhrzeit, Ort, Signal.
Wenn Sie sich mit pandas_df.describe() eine einfache Analyse der Daten anzeigen lassen, werden Sie sehen, dass die Spalte Signal (das ist hoffentlich keine Überraschung) nur drei Ausprägungen hat: rot, gelb und grün. Diese Art von Daten nennen wir Kategorie und wir können das auch Pandas mitteilen:
pandas_df[['Signal']] = pandas_df[['Signal']].astype('category')
Durch dieses Vorgehen wurden die genannten konkreten Werte rot, gelb und grün durch Referenzen auf eben diese Kategoriewerte ersetzt. Wenn wir uns nun erneut die Speichernutzung der Daten anschauen, werden Sie erneut eine (enorme) Reduzierung festellen:
pandas_df.info(memory_usage='deep')
Mit diesen beiden einfache Tricks, der Festlegung der Datentypen und der Kategorisierung einiger Spalten, nutzen wir das System schon viel effizienter.
Im nächsten Artikel (der demnächst hier erscheinen wird) gehen wir diesen Schritt durch eine sinnvollen Reduzierung der Datenmengen noch einen kleinen, aber wichtigen weiter.