This post walks through a serverless ETL pipeline I built on Google Cloud that automatically processes CSV files from a specific Google Drive folder and loads them into a BigQuery table for analysis. It’s a secure, configuration-driven, and event-based system that requires zero manual intervention once deployed.

Architecture Overview

The solution is built around a chain of three distinct, serverless Cloud Functions, each with a specific job. This separation of concerns makes the system robust and easy to maintain.

The workflow is a chain of events connecting several Google Cloud services:

Pub/Sub -> Cloud Function #1 (Setup Watch) -> Firestore

Google Drive -> Cloud Function #2 (Download File) -> Cloud Storage -> Cloud Function #3 (Process File) -> BigQuery

  1. drive_setup_watch (Pub/Sub Trigger): This function acts as the control plane for the pipeline. Triggered by a simple Pub/Sub message, it communicates with the Google Drive API to set up (or renew) a “watch” on a specific folder. It then stores the details of this notification channel in Firestore, creating a persistent state that the other functions can use.

  2. drive_file_downloader (HTTP Trigger): This is a secure webhook that receives the push notifications from the Google Drive API whenever a file is added or updated. It uses the notification’s unique ID to look up the correct folder details from Firestore, downloads the new file from Drive, and uploads it to a landing zone in Google Cloud Storage.

  3. drive_process_csv_to_bigquery (Cloud Storage Trigger): The final link in the chain. This function is triggered automatically when a new file appears in the Cloud Storage bucket. It reads the file, uses a flexible configuration to parse and transform the data, and loads the clean, structured result into the final BigQuery table.

A Configuration-Driven Approach

The entire pipeline is configured by a single config.json file. This is the key to making the system flexible and easy to manage without deploying new code for every change.

Here’s how the configuration looks:

{
    "project_id": "james-gcp-project",
    "secret_id": "drive-webhook-secret",
    "bucket_name": "jb-g-drive-to-bq",
    "lookback_minutes": 5,
    "firestore_collection": "google-drive-watch-channels",
    "folder_ids": {
        "monefy": "1bnhOMSVgO5akYeJjxZjVbg5cAK0Fsx8R"
    },
    "parsers": {
        "monefy": {
            "filename_pattern": "^Monefy.Data.*\\.csv$",
            "file_type": "csv",
            "project_id": "james-gcp-project",
            "dataset_id": "personal_finance",
            "table_id": "monefy",
            "write_disposition": "WRITE_TRUNCATE",
            "csv_options": {
                "delimiter": ",",
                "date_format": "%d/%m/%Y"
            },
            "schema": [
                {"name": "date", "type": "DATE", "source_column": "date"},
                {"name": "account", "type": "STRING", "source_column": "account"},
                {"name": "category", "type": "STRING", "source_column": "category"},
                {"name": "amount", "type": "FLOAT", "source_column": "amount"},
                {"name": "description", "type": "STRING", "source_column": "description"}
            ]
        }
    }
}

The initial fields are fairly self-explanatory, just the names of GCP services in use here.

lookback_minutes is a requirement for searching for recently added files. As the Google Drive notification API does not provide any information on what file was added (that would be far too helpful) we must search for recently added files by defining a lookback period. I look for files added in the past 5 minutes.

The folder_ids contains the ids of the folders I want to monitor. The ids are read directly from the URL when opening the folder online, which is handy!

The parsers section is where the logic for extracting and loading the CSV data is configured. It allows me to define:

This means I can add a new data source just by adding a new entry to the folder_ids and parsers sections of the config.

Automating the Drive Connection with Firestore

The most challenging part of integrating with the Google Drive API is managing the notification channels. These channels can only live up to one week (for some ridiculous reason) so need to be renewed if they are to be of any use. Furthermore, the notification message from the API doesn’t include the folder ID, only a unique resourceId for the channel itself. My solution automates this entire process.

The drive_setup_watch function is the brain of the operation. When I want to start watching a folder, I publish a simple message to a Pub/Sub topic:

gcloud pubsub topics publish drive-setup-watch --message '{"folder_key": "monefy"}'

This triggers the function, which performs a critical sequence of actions:

  1. It looks up the folder_id from the config.json using the provided folder_key.
  2. It stops any existing watch channel for that folder to prevent duplicates.
  3. It creates a new, secure webhook URL by fetching a secret from Secret Manager. This ensures that only Google Drive can call our drive_file_downloader function.
  4. It requests a new watch channel from the Google Drive API.
  5. It saves the new channel_id and resource_id into a Firestore document, using the folder_key as the document ID.

Firestore acts as the persistent state store, creating a reliable link between the abstract resource_id sent by the notification and the specific folder configuration it relates to.

Creating a Cloud Scheduler task which performs this step periodically is a good idea. As I like to update my data once I month I set up this to run on the first of each month, giving me a week to upload the latest data to Google Drive.

Processing The Data

All the core logic for the three Cloud Functions is contained within main.py.

Function 1: drive_file_downloader

This function is triggered when Google Drive sends a push notification. First, it validates the secret in the URL to ensure the request is legitimate. Then, it inspects the request headers to get the X-Goog-Resource-ID.

# From main.py
resource_id = request.headers.get('X-Goog-Resource-ID')

# Query Firestore to find the matching folder configuration
docs = FIRESTORE_CLIENT.collection(FIRESTORE_COLLECTION).where('resource_id', '==', resource_id).limit(1).stream()

Using this resource_id, it queries Firestore to find the corresponding document. This document gives the function the folder_id it needs to query the Drive API for recent files. For each new file found, the function downloads it and uploads it to Google Cloud Storage, triggering the next step in the pipeline.

Function 2: drive_process_csv_to_bigquery

This second function is triggered automatically when the first function uploads a new file to the GCS bucket. The event payload tells the function the bucket and file_name.

The first step is to determine how to parse this file. It iterates through the parsers defined in config.json and uses the filename_pattern (a regular expression) to find a match.

# From main.py
file_name = cloud_event.data["name"]
parser_config = next((p for p in PARSERS.values() if re.match(p['filename_pattern'], file_name)), None)

If a matching parser is found, the function downloads the file from GCS. Since the config specifies the file_type as csv, it uses Python’s built-in csv module to read the data row by row. The real transformation happens next, as it iterates through the schema defined in the parser’s configuration, converting data types and structuring the data for BigQuery.

Finally, with a list of cleaned rows, it uses the BigQuery client library to load the data. The write_disposition from the config determines whether to append the data or, in this case, WRITE_TRUNCATE the table before loading.

Conclusion & Dashboard

This project was something I wanted to do for a while, so it was great to get it completed in a couple of evenings. I love building event-driven, serverless architecture on Google Cloud and this project was a perfect example of its power. By using a configuration-driven design, the pipeline is robust and adaptable for future data integration needs too.

With the data now flowing automatically into BigQuery, I have created a Looker Studio dashboard to visualise my spending habits over time. I’m going to experiment with connecting this data to LLMs via MCP Servers to provide some interesting insights next.

The full source code for this project is available on my GitHub.