Ingesting YouTube Data with Apache Airflow 3.x

3/11/2025

Here’s the deal: you want to pull YouTube video data—title, description, transcript, the works—into DuckDB using Airflow. Most tutorials overcomplicate this. Let’s cut the fluff and get you a pipeline that actually works, is secure, and doesn’t break when you upgrade Airflow.

What You Actually Need

Install the Only Packages That Matter

pip install duckdb youtube-transcript-api

Don’t Hardcode Secrets—Use Airflow Connections

If you’re still using os.environ for API keys, stop. Go to Airflow UI → Admin → Connections. Add:

The Pipeline: What’s Worth Knowing

The Only Code Patterns You Need

Get the API Key (the right way):

from airflow.hooks.base import BaseHook
def get_youtube_api_key():
    return BaseHook.get_connection('youtube_api').password

Search YouTube:

def search_youtube_videos(topic, max_results=10):
    api_key = get_youtube_api_key()
    url = f"https://www.googleapis.com/youtube/v3/search?part=snippet&type=video&maxResults={max_results}&q={topic}&key={api_key}"
    resp = requests.get(url)
    resp.raise_for_status()
    return [
        {
            'youtube_video_url': f"https://www.youtube.com/watch?v={item['id']['videoId']}",
            'title': item['snippet']['title'],
            'description': item['snippet']['description'],
            'timestamp': item['snippet']['publishedAt'],
            'video_id': item['id']['videoId']
        }
        for item in resp.json().get('items', [])
    ]

Get the Transcript (handles missing/disabled):

from youtube_transcript_api import YouTubeTranscriptApi
def get_video_transcript(video_id):
    try:
        ytt_api = YouTubeTranscriptApi()
        transcript = ytt_api.fetch(video_id)
        return ' '.join([snippet.text for snippet in transcript])
    except Exception:
        return None

Ingest to DuckDB:

import duckdb
def ingest_to_duckdb(videos, db_path):
    con = duckdb.connect(db_path)
    con.execute('''
        CREATE TABLE IF NOT EXISTS youtube_videos (
            youtube_video_url TEXT,
            title TEXT,
            description TEXT,
            timestamp TEXT,
            chapters TEXT,
            transcript TEXT
        )
    ''')
    for v in videos:
        con.execute('''
            INSERT INTO youtube_videos VALUES (?, ?, ?, ?, ?, ?)
        ''', [
            v['youtube_video_url'],
            v['title'],
            v['description'],
            v['timestamp'],
            str(v.get('chapters')),
            v.get('transcript')
        ])
    con.close()

Modern Airflow DAG (SDK style):

from airflow.decorators import dag
from airflow.providers.standard.operators.python import PythonOperator
import pendulum

@dag(
    dag_id="youtube_topic_to_duckdb",
    start_date=pendulum.datetime(2023, 1, 1),
    schedule=None,
    catchup=False,
    params={"topic": "python programming"},
    description="Search YouTube for a topic and ingest video data into DuckDB",
)
def youtube_topic_to_duckdb():
    def search_and_ingest_youtube_videos(topic, **context):
        # Use the helpers above
        pass
    PythonOperator(
        task_id="search_and_ingest",
        python_callable=search_and_ingest_youtube_videos,
        op_args=["{{ params.topic }}"],
    )

youtube_topic_to_duckdb()

Reality Check: What Sucks, What Doesn’t

Who Actually Needs This

Actionable Advice

  1. Don’t hardcode secrets. Use Airflow Connections.

  2. Use the latest Airflow SDK and standard providers. Don’t copy-paste old DAG patterns.

  3. Handle API failures and missing transcripts. YouTube is flaky.

  4. Use DuckDB for local analytics. It’s fast and easy.

  5. If you’re not running Airflow 3.x, upgrade. The new style is worth it.


That’s it. You now have a real, modern pipeline for ingesting YouTube data with Airflow. No more excuses.