Scaling Airflow Dataset Scheduling: Lessons from Common Crawl Link to heading

Since joining Common Crawl, I’ve been involved with a number of activities revolving around indexing and cataloging the integrity of our data. To coordinate these activites, and to be able to re-run them upon dataset changes, I decided to give Airflow a try. The introduction of dataset-based scheduling in Airflow 2.4 seemed attractive, but it also comes with interesting challenges when working at our scale.

Data Flow Architecture Link to heading

We have two main data sources categories - crawls, which are created ~monthly, and news, which is created daily. Note: I’m saying “created” and not “updated” - each dataset is distinct, and usable on it’s own in many cases. So, it’s not really an “update” of our crawl dataset, it’s a “specific” one off instance of a crawl-type dataset.

Here’s a visualization of how our datasets flow through various processing stages - each box represents a DatasetAlias in Airflow.

graph LR A[Crawls] B[News] A -->C B -->D subgraph "Airflow" C[WARC] D[Monthly News] C -->F[WAT] C -->G[WET] C -->C1[Graph] C -->C2[Host] C -->E[WARC CDX] F -->K[WAT CDX] G -->L[WET CDX] E & K & L -->I[Columnar] D -->H[News CDX] I -->Q[Columnar Integrity] E -->O[WARC Integrity] H -->P[NEWS Integrity] K -->M[WAT Integrity] L -->N[WET Integrity] M & N & O & P & Q -->Z[Integrity Dataset] end style A font-size:30px style B font-size:30px style C font-size:30px style C1 font-size:30px style C2 font-size:30px style D font-size:30px style E font-size:30px style F font-size:30px style G font-size:30px style H font-size:30px style I font-size:30px style K font-size:30px style L font-size:30px style M font-size:30px style N font-size:30px style O font-size:30px style P font-size:30px style Q font-size:30px style Z font-size:30px

This architecture requires careful coordination of dataset dependencies, especially since the integrity dataset depend on ANY upstream dataset changing.

Resource Management Link to heading

We follow a critical Airflow best practice: using it purely as an orchestrator, not for processing. Even though our dataset is huge (petabytes), Airflow just coordinates the work:

  1. Our data lives in AWS S3
  2. Processing happens on Hadoop clusters or EMR Spark jobs in other clusters
  3. Airflow handles starting clusters and submitting tasks, but does no heavy lifting itself

Naming and Organization Link to heading

Our dataset uri convention is pretty straightforward – we use the actual S3 path where the final list of dataset files live. These paths.gz files are a simple list of s3 prefixes in the bucket. Since this is the last thing we output if successful. It also serves as the main input file for most of our jobs.

s3://commoncrawl/crawl-data/CC-MAIN-2014-41/warc.paths.gz (alias "warcs")
s3://commoncrawl/crawl-data/CC-MAIN-2014-41/wet.paths.gz (alias "wets")
s3://commoncrawl/crawl-data/CC-MAIN-2014-41/wat.paths.gz (alias "wats")
s3://commoncrawl/crawl-data/CC-MAIN-2014-41/cc-index.paths.gz ("warc-cdx")
s3://commoncrawl/crawl-data/CC-MAIN-2014-41/cc-index-table.paths.gz ("warc-tabular")
s3://commoncrawl-private-redaction/cc-index/collections/CC-MAIN-2018-26-WET/indexes/cluster.idx ("wet-cdx")
s3://commoncrawl-private-redaction/cc-index/collections/CC-MAIN-2018-26-WAT/indexes/cluster.idx ("wat-cdx")

Balancing Time-Based and Dataset-Based Scheduling Link to heading

We’ve found a hybrid approach works best. We use “sensor” DAGs that run on a time-based schedule to check for the existence of new datasets. Once detected, we let dataset-based scheduling handle the downstream processing.

# Example sensor DAG
with DAG(
    'check_for_new_crawl_data',
    schedule_interval='0 * * * *',  # Hourly check
    catchup=False,
) as dag:
    
    check_task = PythonOperator(
        task_id='check_for_new_data',
        python_callable=check_and_announce_new_datasets,
        outlets=[Dataset('s3://commoncrawl/crawl-data/CC-MAIN-{{ ds_nodash }}/paths.txt.gz')]
    )

The beauty of dataset scheduling is that it naturally handles dependencies – if the source data a DAG needs is ready, it can run, even if other parts of other DAGs have failed. We only need to care about our specific input datasets.

The “First Run” Problem Link to heading

One of the first hurdles I faced was the “first run” problem. We already had hundreds of existing datasets in S3 – how do we get Airflow to recognize these without triggering an avalanche of processing every time the sensor runs?

My initial approach was pragmatic:

# Example pseudocode for handling existing datasets
def check_and_announce_existing_datasets(ti):
    # Get the list of already announced datasets
    announced_datasets = Variable.get("announced_datasets", deserialize_json=True, default=[])
    
    # Check if our dataset already exists
    dataset_path = "s3://commoncrawl/crawl-data/CC-MAIN-2025-04/paths.txt.gz"
    if s3_file_exists(dataset_path) and dataset_path not in announced_datasets:
        # Announce it once
        dataset = Dataset(dataset_path)
        dataset_event = DatasetEvent(dataset)
        
        # Update our tracking variable
        announced_datasets.append(dataset_path)
        Variable.set("announced_datasets", announced_datasets, serialize_json=True)

    else :
        ... process it ...
        ... also announce and track here...

This works, but it’s not likely ideal for concurrency. There are likely better alternatives we will move to later:

  1. Using XCom?: Storing dataset announcement history in XCom provides better task-run context and integrates with Airflow’s cleanup mechanisms.

  2. External Metadata Stores?: For our scale, a dedicated database table or integration with data catalog or lineage systems provides better robustness.

  3. Redis or Similar?: Using atomic operations in a key-value store can prevent race conditions when multiple processes try to announce datasets.

Challenges and Limitations Link to heading

The Airflow UI breaks down pretty quickly when you have hundreds or thousands of datasets under each alias. The dataset graph becomes an unreadable mess of lines – we need better visualization options that allow collapsing or filtering parts of the graph.

I’m not just complaining here… when I have some extra time, I will see if I can work up a patch to fix this in airflow, I have a few ideas.

so many lines…

Side note: the conceptual model for datasets in Airflow feels unnecessarily complex. There’s Dataset, DatasetAlias, Metadata, extra parameters, etc. I’d prefer a single approach that works for both small and large situations.

Advice for Implementing Dataset Scheduling Link to heading

If you’re just getting started with dataset-based scheduling in Airflow, here’s my advice:

  1. Test locally first - It’s REALLY easy to make a mess of the Airflow database, so experiment on your local machine or dev server before touching production.

  2. Plan for complexity - Don’t start too simply with your DAGs. Evolving them over time gets tricky and messy, so think through future requirements upfront.

  3. Watch for UI limitations - The current dataset visualization doesn’t scale well beyond a few dozen datasets.

  4. Keep it simple - Despite the power of dataset scheduling, the simplest approach that works is usually best.

Dataset-based scheduling will transform how we handle data dependencies at Common Crawl, making our pipelines more resilient and responsive to data availability, retries, etc. Despite some growing pains, it’s become an essential part of our data processing architecture.

This post is actually the notes for my internal team demo - I'll update it if we decide to use Airflow long term, and with any improved techniques we find. I'd love to hear how others are implementing this in their organizations, especially if you have a dataset like ours - lots of partitions that can be treated as one.
Find my links below ↓