As a PySpark developer, being able to accurately and efficiently count the rows in a dataframe is an important skill. Whether you need to get a quick sense of the data size, validate input and output counts in a data pipeline, or produce accurate analytics, PySpark provides several simple yet powerful options for counting dataframe rows.
In this comprehensive guide, we will explore the various methods available for counting rows in PySpark dataframes and when to use each one:
- Using the count() Action
- count() vs countDistinct()
- Aggregating with count()
- Counting After Filtering
- Counting Groups with groupby
- Counting During Joins
- Fast Approximate Counts with estimateCount()
- Performance Considerations
To demonstrate, we will use a simple dataframe containing website visit data:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(‘website_visits‘).getOrCreate()
data = [("Google", "2020-01-01", 10),
("Google", "2020-01-02", 12),
("Bing", "2020-01-01", 20),
("Bing", "2020-01-02", 5),
("Google", "2020-01-03", 8)]
df = spark.createDataFrame(data, ["site", "date", "visits"])
df.show()
+------+----------+------+
| site| date| visits|
+------+----------+------+
| Google|2020-01-01| 10|
| Google|2020-01-02| 12|
| Bing|2020-01-01| 20|
| Bing|2020-01-02| 5|
| Google|2020-01-03| 8|
+------+----------+------+
Now let‘s explore how to efficiently count rows in this sample dataframe.
Using the count() Action
The simplest way to get a count of rows in PySpark is to use the count()
action:
df.count()
# 5
This performs a full pass over the dataframe, counting each row. count()
is an action which triggers a Spark job to execute and return the result.
The key thing to understand is that count()
kicks off a Spark job. If you are already running other Spark transformations and actions on the same dataframe, the count result may be returned much faster since the data is cached in memory.
However, if you run only the count()
on a dataframe, Spark will need to read all the data from disk or external sources before returning the result. So performance can vary.
count() vs countDistinct()
Spark also provides countDistinct()
which will deduplicate row values before counting:
df.select("site").countDistinct()# 2 - only counts Google and Bing once
This is useful when you have many duplicate values in a column but just want a distinct count. countDistinct()
requires more processing than count()
so only use it when you specifically need to deduplicate values.
Aggregating with count()
You can also use Spark‘s aggregation functions to count. For example, to count sites:
from pyspark.sql.functions import countDistinct
df.agg(countDistinct("site")).show()
# +----------------+
# |count(DISTINCT site)|
# +----------------+
# | 2|
# +----------------+
This allows you to easily count multiple columns in one statement:
df.agg(
countDistinct("site"),
count("date")
).show()
# +----------------+----------+
# |count(DISTINCT site)|count(date)|
# +----------------+----------+
# | 2| 5|
# +----------------+----------+
Just be aware that using aggregation functions will trigger a Spark job for the count logic even if you don‘t call an action like show()
.
Counting After Filtering
You can also filter data before counting to get row counts based on conditions:
df.filter("site = ‘Google‘").count()
# 3
This is very useful for quick analytics on subsets of data.
You have to be careful to only filter columns that have index acceleration for good performance. Filtering by non-indexed columns requires a full scan of the dataset.
Counting Groups with groupby
The groupby
function is enormously helpful for counting subsets of dataframe rows:
df.groupBy("site").count().show()
# +------+-----+
# | site|count|
# +------+-----+
# | Google| 3|
# | Bing| 2|
# +------+-----+
You can also group and aggregate on multiple columns which provides powerful pivot table-like analytics:
df.groupBy("site", "date").count().show()
# +------+----------+-----+
# | site| date|count|
# +------+----------+-----+
# | Google|2020-01-03| 1|
# | Google|2020-01-01| 1|
# | Bing|2020-01-02| 1|
# | Google|2020-01-02| 1|
# | Bing|2020-01-01| 1|
# +------+----------+-----+
Just be aware that heavy use of groupby
can cause performance issues, especially for high cardinality column values. Test and optimize carefully.
Counting During Joins
You can use joins to count rows from multiple data sources:
visits_by_day = df.groupBy("date").count()
visits_by_day.join(customer_records, "date").show()
This allows you to enrich and count rows from disparate sources. Just watch out for cartesian products resulting in much larger intermediate results.
Fast Approximate Counts with estimateCount()
For very large datasets, Spark allows you to efficiently get approximate row counts using estimateCount()
.
Instead of counting every row which can take very long for billion-row datasets, estimateCount()
runs calculations on a sample of table partitions. This returns results orders of magnitude faster while still being statistically accurate within a few percentage points.
Here‘s an example usage:
df.rdd.getNumPartitions()
# 8 partitions
df.rdd.toDF().selectExpr("count(1)").show()
# Actual count
# 5
df.rdd.toDF().estimateCount()
# Estimated count
# 5
The approximation logic triggers when the number of partitions exceeds a threshold, so it may not always be used. But can speed things up on larger data while still giving you an estimate adequate for most purposes.
Just be aware that estimateCount() is meant only for approximations, not exact counts you would use for analytics. There are also some specific technical caveats when using it on streaming dataframes.
Performance Considerations
There are a few things to keep in mind when counting dataframe rows in PySpark:
-
Leverage Actions You Already Use – If you are already doing Spark transformations and actions that process all rows (like writing aggregated output), reusing that dataframe to
count()
will be faster since data is cached in memory after initial job. -
Filter When Possible – This can drastically reduce amount of data scanned by
count()
, improving performance greatly. Counting a subset is much faster than counting all rows. -
Use Approximations if Enough – Functions like
estimateCount()
can return fuzzy but useful row counts from very large datasets in just a few seconds instead of hours. -
Test Complex Logic On Samples – When using complex aggregations, first try sampling a subset of data. This allows you to refine the logic on smaller data for much faster iteration.
Counting rows in PySpark dataframes is easy but proper usage requires understanding the various methods and when to apply each one appropriately. By following the best practices outlined here, you can efficiently produce accurate counts to power your Spark analytics and data pipelines.