As a full stack developer, being able to create distributed DataFrames and leverage Spark‘s parallel processing power allows you to scale data pipelines and applications to handle immense amounts of data. This guide will provide code-focused examples and visualizations tailored for a developer audience.
The Rise of Apache Spark
Apache Spark has seen massive adoption over the past 5 years based on surveys from Databricks. Here are some key stats:
- 90% of data professionals working with big data have production applications using Spark
- Spark powers data pipelines & analytics for 77% of surveyed organizations
- Adoption grew over 350% from 2016 to 2020
Image Source: Datanami Spark Adoption Report 2020
This growth is driven by need to harness huge data volumes and Spark’s speed, scalability, and unified SQL/Python/Scala APIs. For developers, PySpark provides production-ready data engineering capabilities.
Now let‘s see how to leverage PySpark for your data applications.
DataFrames – Distributed Data Tables
The core Spark concept developers should understand are DataFrames – distributed tabular data sets comprised of rows and columns like a SQL database table or Pandas DataFrame.
Image Source: Spark By Examples
DataFrames are comprised of partitions of data distributed across nodes in a cluster. They are:
- Immutable – Cannot be modified once created
- Stored in optimizer Columnar format
- Leverage indexing, filtering pruning for Performance
- Can be converted to pandas DataFrames via
.toPandas()
Loading, filtering and slicing DataFrames generates execution plans rather than actual data movement for efficiency. Understanding partitions and wide transformations vs narrow transformations is key for performance.
Now let‘s explore various methods developers can use to ingest data into DataFrames from different formats and data sources.
1. Load from JSON
JSON is used widely in web apps for transferring data. Spark can directly ingest JSON files or web API responses and infer the schema:
import requests
response = requests.get("http://api.example.com/data")
df = spark.read.json(response.text)
This handles parsing into columns automatically:
root
|-- age: integer (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
For more control over the schemas:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
custom_schema = StructType([
StructField("identifier", StringType(), True),
StructField("user_name", StringType(), True),
StructField("user_age", IntegerType(), True)
])
df = spark.read.schema(custom_schema).json(response.text)
JSON is useful for developers needing to ingest web data.
2. Load from CSV
Comma-separated value files are ubiquitous:
id,name,age
1,John,35
2,Mike,50
However, CSVs lack an inherent schema unlike JSON. So we need to define the schema:
from pyspark.sql import types
schema = types.StructType([
types.StructField(‘id‘, types.IntegerType()),
types.StructField(‘name‘, types.StringType()),
types.StructField(‘age‘, types.IntegerType())
])
df = spark.read.csv("people.csv", schema=schema)
Important to define data types properly based on expected values.
For complex or changing schemas, we can leverage Spark‘s schema inference:
df = spark.read.csv("people.csv", header=True, inferSchema=True)
Where it will automatically deduce column names and types.
3. Load from Pandas DataFrames
Pandas is the leading data analysis toolkit for Python. Spark can convert Pandas DataFrames into Spark DataFrames via createDataFrame()
:
import pandas as pd
pandas_df = pd.read_csv("data.csv")
spark_df = spark.createDataFrame(pandas_df)
This leverages Pandas for ad-hoc data manipulation on small data while scaling to large production data via Spark conversion.
4. Load from Koalas DataFrames
Koalas provides Pandas API directly on top of PySpark enabling scaling intensive workloads without API changes:
import databricks.koalas as ks
koalas_df = ks.read_csv("data.csv")
koalas_df["age"].mean()
Koalas utilizes Spark under the hood while keeping Pandas syntax to simplify transition to distributed processing.
5. Query External Databases
PySpark supports JDBC connections for querying external databases like PostgreSQL and MySQL directly into DataFrames:
jdbc_df = spark.read \
.format("jdbc") \
.option("url", "jdbc:{database}://host") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "xxxx") \
.load()
SQL databases can be used as data sources this way for unified data analytics avoiding data movement.
6. Read Partitioned Data
Large datasets are often store partitioned across multiple files like /data/year=2020/month=1.parquet
.
We can load partitions using the partitionBy()
method:
partitions_df = spark.read.parquet("/data") \
.partitionBy("year", "month")
This produces a partitioned DataFrame without requiring schema changes for partition columns.
7. Handle Nested Data
Complex JSON or XML data can have nested structures like:
{
"name": "John",
"address": {
"street": "123 Main St",
"city": "Anytown",
"state": "CA"
}
}
We can map nested fields in Spark with:
from pyspark.sql.functions import explode
df = spark.read.json("data.json")
nested_df = df.withColumn("city", explode("address.city"))
Explode creates a rows for each nested element allowing queries across hierarchies.
8. Read Apache Parquet Files
Parquet is a common columnar format optimized for Spark workloads. It provides compression and encoding schemes for big data.
Reading Parquet files is simple:
users_df = spark.read.parquet("/data/users.parquet")
purchases_df = spark.read.parquet("/data/purchases.parquet")
This handles Scheme inference and partitions automatically.
Parquet allows persisting DataFrames in a space optimized format that supports with complex types like maps, arrays and structs.
9. Read Data Streams with Structured Streaming
Spark Structured Streaming allows ingesting live streams of data from Kafka, Kinesis etc as DataFrames:
stream_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
stream_df.writeStream.outputMode("append").format("console").start()
This continually ingests data from Kafka topics into the DataFrame then outputs to console.
Stream output can also write to databases, file sinks like Parquet for further analytics.
10. Creating DataFrames in Spark Jobs
In Spark workloads, its best practice to use DataFrame reader APIs within Spark sessions rather than custom RDD logic:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("data-pipeline") \
.config("spark.sql.warehouse.dir", "hdfs:/temp") \
.enableHiveSupport() \
.getOrCreate()
df = spark.read.json("/data/*.json")
This configures Spark environment appropriately leveraging the warehouse directory and Hive support.
Overall when ingesting data in Spark, use DataFrames over raw RDDs to enable richer optimizations.
Now let‘s put some techniques together.
Building a Data Pipeline Example
Let‘s walk through a sample data pipeline ingesting user activity event data from Kafka, joining to user profile data from MySQL, then executing analytics queries before loading aggregate data to Parquet.
Kafka Source Stream -> JDBC Lookup -> Transform -> Parquet Sink
1. Ingest clickstream events from Kafka
from pyspark.sql.types import *
event_schema = StructType([
StructField("timestamp", TimestampType()),
StructField("user_id", IntegerType()),
StructField("action", StringType())
])
click_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "events") \
.load()
stream_df = click_stream.select(from_json(click_stream.value, event_schema).alias("event"))
Here we connect a Kafka streaming source, parse the message payload from JSON into a DataFrame using the provided schema.
2. Enrich with user data from MySQL via JDBC
users_jdbc = spark.read \
.format("jdbc") \
.option("url", "jdbc:mysql://localhost/users") \
.option("dbtable", "profiles") \
.option("user", "root") \
.load()
joined_df = stream_df.join(users_jdbc, stream_df.user_id == users_jdbc.id)
This joins the user profile data from MySQL database with the streaming DataFrame to enrich each event with user attributes.
3. Aggregate and Transform
With profile data joined, we can now calculate aggregates:
from pyspark.sql import functions as F
aggregated_df = joined_df \
.withWatermark("timestamp", "1 minute") \
.groupBy(
F.window("timestamp", "10 minutes"),
"user_country", "user_age"
) \
.agg(F.count("action").alias("events"))
formatted_df = aggregated_df.select("window", "user_country", "user_age", "events")
This rolls up data into windowed aggregates by country and age bucket, transforming into the desired output format.
4. Sink Results to Parquet
Finally we write the results as Parquet files partitioned by date:
(formatted_df.writeStream
.outputMode("append")
.format("parquet")
.option("path", "s3a://bucket/events")
.option("checkpointLocation", "s3://bucket/checkpoints")
.partitionBy("window")
.start())
This achieves a batch-like architecture using streamed data, external lookups, transformations and open formats like Parquet for further analysis.
Conclusion
As we‘ve explored, PySpark DataFrames provide a very rich, code-friendly abstraction for manipulating distributed data sets at scale and unlocking the power of Apache Spark clusters.
With compute frameworks handling parallelization and fault tolerance under the hood, developers can focus on domain specific data flows using familiar APIs like SQL, data frames and streams.
The key abilities we need are:
- Structuring unstructured data sources via schemas
- Joining disparate databases/APIs into a unifying analytics layer
- Building scaled data pipelines with code instead of individual scripts
- Leveraging streaming data sources and sink results for future re-use
- Encapsulating complex business logic as transformations
Based on examples like batch and streaming pipelines, loading JSON/CSV formats and joining databases, we see how PySpark enables developers to be tremendously productive building robust data applications.
I hope you enjoyed the guide! Let me know in the comments if you have any other questions.