Scaling Airflow Dataset Scheduling: Lessons from Common Crawl Link to heading
Since joining Common Crawl, I’ve been involved with a number of activities revolving around indexing and cataloging the integrity of our data. To coordinate these activites, and to be able to re-run them upon dataset changes, I decided to give Airflow a try. The introduction of dataset-based scheduling in Airflow 2.4 seemed attractive, but it also comes with interesting challenges when working at our scale.
Data Flow Architecture Link to heading
We have two main data sources categories - crawls, which are created ~monthly, and news, which is created daily. Note: I’m saying “created” and not “updated” - each dataset is distinct, and usable on it’s own in many cases. So, it’s not really an “update” of our crawl dataset, it’s a “specific” one off instance of a crawl-type dataset.
Here’s a visualization of how our datasets flow through various processing stages - each box represents a DatasetAlias in Airflow.
This architecture requires careful coordination of dataset dependencies, especially since the integrity dataset depend on ANY upstream dataset changing.
Resource Management Link to heading
We follow a critical Airflow best practice: using it purely as an orchestrator, not for processing. Even though our dataset is huge (petabytes), Airflow just coordinates the work:
- Our data lives in AWS S3
- Processing happens on Hadoop clusters or EMR Spark jobs in other clusters
- Airflow handles starting clusters and submitting tasks, but does no heavy lifting itself
Naming and Organization Link to heading
Our dataset uri convention is pretty straightforward – we use the actual S3 path where the final list of dataset files live. These paths.gz files are a simple list of s3 prefixes in the bucket. Since this is the last thing we output if successful. It also serves as the main input file for most of our jobs.
s3://commoncrawl/crawl-data/CC-MAIN-2014-41/warc.paths.gz (alias "warcs")
s3://commoncrawl/crawl-data/CC-MAIN-2014-41/wet.paths.gz (alias "wets")
s3://commoncrawl/crawl-data/CC-MAIN-2014-41/wat.paths.gz (alias "wats")
s3://commoncrawl/crawl-data/CC-MAIN-2014-41/cc-index.paths.gz ("warc-cdx")
s3://commoncrawl/crawl-data/CC-MAIN-2014-41/cc-index-table.paths.gz ("warc-tabular")
s3://commoncrawl-private-redaction/cc-index/collections/CC-MAIN-2018-26-WET/indexes/cluster.idx ("wet-cdx")
s3://commoncrawl-private-redaction/cc-index/collections/CC-MAIN-2018-26-WAT/indexes/cluster.idx ("wat-cdx")
Balancing Time-Based and Dataset-Based Scheduling Link to heading
We’ve found a hybrid approach works best. We use “sensor” DAGs that run on a time-based schedule to check for the existence of new datasets. Once detected, we let dataset-based scheduling handle the downstream processing.
# Example sensor DAG
with DAG(
'check_for_new_crawl_data',
schedule_interval='0 * * * *', # Hourly check
catchup=False,
) as dag:
check_task = PythonOperator(
task_id='check_for_new_data',
python_callable=check_and_announce_new_datasets,
outlets=[Dataset('s3://commoncrawl/crawl-data/CC-MAIN-{{ ds_nodash }}/paths.txt.gz')]
)
The beauty of dataset scheduling is that it naturally handles dependencies – if the source data a DAG needs is ready, it can run, even if other parts of other DAGs have failed. We only need to care about our specific input datasets.
The “First Run” Problem Link to heading
One of the first hurdles I faced was the “first run” problem. We already had hundreds of existing datasets in S3 – how do we get Airflow to recognize these without triggering an avalanche of processing every time the sensor runs?
My initial approach was pragmatic:
# Example pseudocode for handling existing datasets
def check_and_announce_existing_datasets(ti):
# Get the list of already announced datasets
announced_datasets = Variable.get("announced_datasets", deserialize_json=True, default=[])
# Check if our dataset already exists
dataset_path = "s3://commoncrawl/crawl-data/CC-MAIN-2025-04/paths.txt.gz"
if s3_file_exists(dataset_path) and dataset_path not in announced_datasets:
# Announce it once
dataset = Dataset(dataset_path)
dataset_event = DatasetEvent(dataset)
# Update our tracking variable
announced_datasets.append(dataset_path)
Variable.set("announced_datasets", announced_datasets, serialize_json=True)
else :
... process it ...
... also announce and track here...
This works, but it’s not likely ideal for concurrency. There are likely better alternatives we will move to later:
Using XCom?: Storing dataset announcement history in XCom provides better task-run context and integrates with Airflow’s cleanup mechanisms.
External Metadata Stores?: For our scale, a dedicated database table or integration with data catalog or lineage systems provides better robustness.
Redis or Similar?: Using atomic operations in a key-value store can prevent race conditions when multiple processes try to announce datasets.
Challenges and Limitations Link to heading
The Airflow UI breaks down pretty quickly when you have hundreds or thousands of datasets under each alias. The dataset graph becomes an unreadable mess of lines – we need better visualization options that allow collapsing or filtering parts of the graph.
I’m not just complaining here… when I have some extra time, I will see if I can work up a patch to fix this in airflow, I have a few ideas.
Side note: the conceptual model for datasets in Airflow feels unnecessarily complex. There’s Dataset, DatasetAlias, Metadata, extra parameters, etc. I’d prefer a single approach that works for both small and large situations.
Advice for Implementing Dataset Scheduling Link to heading
If you’re just getting started with dataset-based scheduling in Airflow, here’s my advice:
Test locally first - It’s REALLY easy to make a mess of the Airflow database, so experiment on your local machine or dev server before touching production.
Plan for complexity - Don’t start too simply with your DAGs. Evolving them over time gets tricky and messy, so think through future requirements upfront.
Watch for UI limitations - The current dataset visualization doesn’t scale well beyond a few dozen datasets.
Keep it simple - Despite the power of dataset scheduling, the simplest approach that works is usually best.
Dataset-based scheduling will transform how we handle data dependencies at Common Crawl, making our pipelines more resilient and responsive to data availability, retries, etc. Despite some growing pains, it’s become an essential part of our data processing architecture.
Find my links below ↓