As a full stack developer, being able to create distributed DataFrames and leverage Spark‘s parallel processing power allows you to scale data pipelines and applications to handle immense amounts of data. This guide will provide code-focused examples and visualizations tailored for a developer audience.

The Rise of Apache Spark

Apache Spark has seen massive adoption over the past 5 years based on surveys from Databricks. Here are some key stats:

  • 90% of data professionals working with big data have production applications using Spark
  • Spark powers data pipelines & analytics for 77% of surveyed organizations
  • Adoption grew over 350% from 2016 to 2020

Spark Adoption Statistics

Image Source: Datanami Spark Adoption Report 2020

This growth is driven by need to harness huge data volumes and Spark’s speed, scalability, and unified SQL/Python/Scala APIs. For developers, PySpark provides production-ready data engineering capabilities.

Now let‘s see how to leverage PySpark for your data applications.

DataFrames – Distributed Data Tables

The core Spark concept developers should understand are DataFrames – distributed tabular data sets comprised of rows and columns like a SQL database table or Pandas DataFrame.

Spark DataFrame Structure

Image Source: Spark By Examples

DataFrames are comprised of partitions of data distributed across nodes in a cluster. They are:

  • Immutable – Cannot be modified once created
  • Stored in optimizer Columnar format
  • Leverage indexing, filtering pruning for Performance
  • Can be converted to pandas DataFrames via .toPandas()

Loading, filtering and slicing DataFrames generates execution plans rather than actual data movement for efficiency. Understanding partitions and wide transformations vs narrow transformations is key for performance.

Now let‘s explore various methods developers can use to ingest data into DataFrames from different formats and data sources.

1. Load from JSON

JSON is used widely in web apps for transferring data. Spark can directly ingest JSON files or web API responses and infer the schema:

import requests

response = requests.get("http://api.example.com/data")  

df = spark.read.json(response.text)

This handles parsing into columns automatically:

root
|-- age: integer (nullable = true)
|-- id: string (nullable = true) 
|-- name: string (nullable = true)

For more control over the schemas:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

custom_schema = StructType([
  StructField("identifier", StringType(), True),
  StructField("user_name", StringType(), True),
  StructField("user_age", IntegerType(), True)
])

df = spark.read.schema(custom_schema).json(response.text)

JSON is useful for developers needing to ingest web data.

2. Load from CSV

Comma-separated value files are ubiquitous:

id,name,age
1,John,35
2,Mike,50

However, CSVs lack an inherent schema unlike JSON. So we need to define the schema:

from pyspark.sql import types 

schema = types.StructType([
    types.StructField(‘id‘, types.IntegerType()),
    types.StructField(‘name‘, types.StringType()),
    types.StructField(‘age‘, types.IntegerType())  
])

df = spark.read.csv("people.csv", schema=schema)  

Important to define data types properly based on expected values.

For complex or changing schemas, we can leverage Spark‘s schema inference:

df = spark.read.csv("people.csv", header=True, inferSchema=True)

Where it will automatically deduce column names and types.

3. Load from Pandas DataFrames

Pandas is the leading data analysis toolkit for Python. Spark can convert Pandas DataFrames into Spark DataFrames via createDataFrame():

import pandas as pd

pandas_df = pd.read_csv("data.csv") 

spark_df = spark.createDataFrame(pandas_df)  

This leverages Pandas for ad-hoc data manipulation on small data while scaling to large production data via Spark conversion.

4. Load from Koalas DataFrames

Koalas provides Pandas API directly on top of PySpark enabling scaling intensive workloads without API changes:

import databricks.koalas as ks

koalas_df = ks.read_csv("data.csv")

koalas_df["age"].mean() 

Koalas utilizes Spark under the hood while keeping Pandas syntax to simplify transition to distributed processing.

5. Query External Databases

PySpark supports JDBC connections for querying external databases like PostgreSQL and MySQL directly into DataFrames:

jdbc_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:{database}://host") \ 
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "xxxx") \
    .load()

SQL databases can be used as data sources this way for unified data analytics avoiding data movement.

6. Read Partitioned Data

Large datasets are often store partitioned across multiple files like /data/year=2020/month=1.parquet.

We can load partitions using the partitionBy() method:

partitions_df = spark.read.parquet("/data") \
    .partitionBy("year", "month")   

This produces a partitioned DataFrame without requiring schema changes for partition columns.

7. Handle Nested Data

Complex JSON or XML data can have nested structures like:

{
  "name": "John",
  "address": {
      "street": "123 Main St",
      "city": "Anytown",
      "state": "CA"  
  }
}

We can map nested fields in Spark with:

from pyspark.sql.functions import explode

df = spark.read.json("data.json") 

nested_df = df.withColumn("city", explode("address.city"))

Explode creates a rows for each nested element allowing queries across hierarchies.

8. Read Apache Parquet Files

Parquet is a common columnar format optimized for Spark workloads. It provides compression and encoding schemes for big data.

Reading Parquet files is simple:

users_df = spark.read.parquet("/data/users.parquet")

purchases_df = spark.read.parquet("/data/purchases.parquet") 

This handles Scheme inference and partitions automatically.

Parquet allows persisting DataFrames in a space optimized format that supports with complex types like maps, arrays and structs.

9. Read Data Streams with Structured Streaming

Spark Structured Streaming allows ingesting live streams of data from Kafka, Kinesis etc as DataFrames:

stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    .option("subscribe", "topic1") \
    .load()

stream_df.writeStream.outputMode("append").format("console").start() 

This continually ingests data from Kafka topics into the DataFrame then outputs to console.

Stream output can also write to databases, file sinks like Parquet for further analytics.

10. Creating DataFrames in Spark Jobs

In Spark workloads, its best practice to use DataFrame reader APIs within Spark sessions rather than custom RDD logic:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("data-pipeline") \ 
    .config("spark.sql.warehouse.dir", "hdfs:/temp") \
    .enableHiveSupport() \
    .getOrCreate() 

df = spark.read.json("/data/*.json")

This configures Spark environment appropriately leveraging the warehouse directory and Hive support.

Overall when ingesting data in Spark, use DataFrames over raw RDDs to enable richer optimizations.

Now let‘s put some techniques together.

Building a Data Pipeline Example

Let‘s walk through a sample data pipeline ingesting user activity event data from Kafka, joining to user profile data from MySQL, then executing analytics queries before loading aggregate data to Parquet.

Kafka Source Stream -> JDBC Lookup -> Transform -> Parquet Sink

1. Ingest clickstream events from Kafka

from pyspark.sql.types import * 

event_schema = StructType([
   StructField("timestamp", TimestampType()),
   StructField("user_id", IntegerType()),
   StructField("action", StringType())   
])

click_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    .option("subscribe", "events") \
    .load()  

stream_df = click_stream.select(from_json(click_stream.value, event_schema).alias("event"))

Here we connect a Kafka streaming source, parse the message payload from JSON into a DataFrame using the provided schema.

2. Enrich with user data from MySQL via JDBC

users_jdbc = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost/users") \
    .option("dbtable", "profiles") \
    .option("user", "root") \ 
    .load()

joined_df = stream_df.join(users_jdbc, stream_df.user_id == users_jdbc.id)  

This joins the user profile data from MySQL database with the streaming DataFrame to enrich each event with user attributes.

3. Aggregate and Transform

With profile data joined, we can now calculate aggregates:

from pyspark.sql import functions as F

aggregated_df = joined_df \
    .withWatermark("timestamp", "1 minute") \
    .groupBy(
        F.window("timestamp", "10 minutes"),
        "user_country", "user_age"
    ) \
    .agg(F.count("action").alias("events"))

formatted_df = aggregated_df.select("window", "user_country", "user_age", "events")

This rolls up data into windowed aggregates by country and age bucket, transforming into the desired output format.

4. Sink Results to Parquet

Finally we write the results as Parquet files partitioned by date:

(formatted_df.writeStream
   .outputMode("append")
   .format("parquet") 
   .option("path", "s3a://bucket/events") 
   .option("checkpointLocation", "s3://bucket/checkpoints")
   .partitionBy("window")  
   .start())

This achieves a batch-like architecture using streamed data, external lookups, transformations and open formats like Parquet for further analysis.

Conclusion

As we‘ve explored, PySpark DataFrames provide a very rich, code-friendly abstraction for manipulating distributed data sets at scale and unlocking the power of Apache Spark clusters.

With compute frameworks handling parallelization and fault tolerance under the hood, developers can focus on domain specific data flows using familiar APIs like SQL, data frames and streams.

The key abilities we need are:

  • Structuring unstructured data sources via schemas
  • Joining disparate databases/APIs into a unifying analytics layer
  • Building scaled data pipelines with code instead of individual scripts
  • Leveraging streaming data sources and sink results for future re-use
  • Encapsulating complex business logic as transformations

Based on examples like batch and streaming pipelines, loading JSON/CSV formats and joining databases, we see how PySpark enables developers to be tremendously productive building robust data applications.

I hope you enjoyed the guide! Let me know in the comments if you have any other questions.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *