This post walks through the serverless ETL pipeline I built using Google Cloud Functions that takes a Monefy CSV export from Google Drive and loads it into a BigQuery table for analysis. It’s a configuration-driven, event-based system that automatically processes files uploaded to Google Drive, parses the data and loads it into BigQuery.

Architecture Overview

The solution consists of two separate Google Cloud Functions that form a chain of events, triggered by a file being added to a Google Drive folder.

[Google Drive] -> [Cloud Function #1] -> [Google Cloud Storage] -> [Cloud Function #2] -> [BigQuery]

  1. drive_file_downloader (HTTP Trigger): This function is the entry point. It’s triggered by a Google Drive Push Notification when a change occurs in the monitored folder. Its job is to download the new file from Google Drive and upload it to a “landing zone” bucket in Google Cloud Storage. It can also handle exporting Google Workspace files (Docs, Sheets) into standard formats like .docx or .xlsx.

  2. process_csv_to_bigquery (CloudEvent Trigger): This function is triggered when a new file appears in the GCS bucket. It reads the file, identifies the correct parser based on its filename, transforms the data according to a predefined schema, and loads it into the target BigQuery table.

The Power of a Configuration-Driven Approach

The entire pipeline is controlled by a single config.json file. This is the key to making the system flexible and easy to manage without deploying new code for every change.

Here’s a snippet from the configuration:

{
    "bucket_name": "jb-g-drive-to-bq",
    "folders_to_watch": {
        "monefy": {
            "folder_id": "<FOLDER ID FROM GOOGLE DRIVE URL>",
            "resource_id": "<RESOURCE ID OUTPUT FROM NOTIFICATION API>"
        }
    },
    "parsers": {
        "monefy": {
            "filename_pattern": "^Monefy.Data.*\\.csv$",
            "file_type": "csv",
            "project_id": "james-gcp-project",
            "dataset_id": "personal_finance",
            "table_id": "monefy",
            "write_disposition": "WRITE_TRUNCATE",
            "csv_options": {
                "delimiter": ",",
                "date_format": "%d/%m/%Y"
            },
            "schema": [
                {"name": "date", "type": "DATE", "source_column": "date"},
                {"name": "account", "type": "STRING", "source_column": "account"},
                {"name": "category", "type": "STRING", "source_column": "category"},
                {"name": "amount", "type": "FLOAT", "source_column": "amount"},
                {"name": "description", "type": "STRING", "source_column": "description"}
            ]
        }
    }
}

The parsers section is where the magic happens. It allows me to define:

This means I can add a new data source just by adding a new entry to the folders_to_watch and parsers sections of the config.

Setting Up the Google Drive Connection

Setting up the initial notification from Google Drive was simple enough but properly parsing the notification message took longer than expected. The Google Drive API allows you to “watch” a folder for changes. When a change occurs, it sends a POST request to a webhook URL you provide.

To handle this, I wrote a helper script, setup_channel.py. This script uses the Google API Python client to:

  1. Authenticate with a service account.
  2. Tell the Google Drive API to watch a specific folder_id.
  3. Set the notification webhook address to the URL of the drive_file_downloader Cloud Function.
def setup_watch(folder_id, function_url, key_file_path):
    """Authenticates and creates a push notification channel to watch a Drive folder."""
    
    creds = service_account.Credentials.from_service_account_file(
        key_file_path, scopes=['https://www.googleapis.com/auth/drive'])
    
    drive_service = build('drive', 'v3', credentials=creds)

    channel_request_body = {
        'id': str(uuid.uuid4()),
        'type': 'web_hook',
        'address': function_url,
    }

    response = drive_service.files().watch(
        fileId=folder_id,
        body=channel_request_body,
        supportsAllDrives=True
    ).execute()
    
    return response['resourceId']

The API then returns a resourceId, which is a unique identifier for that notification channel. I store this in the config.json to link incoming notifications back to the correct folder configuration, because for some reason the folder id isn’t included in the notification message, only the randomly generated resource id from when the watcher was first created. For more details, check out the official documentation on Push Notifications.

Processing The Data

All the core logic for the two Cloud Functions is contained within main.py. Let’s break down how it works.

Function 1: drive_file_downloader

This function is triggered when Google Drive sends a push notification to the function’s HTTP endpoint. First, it inspects the request headers to get the X-Goog-Resource-ID, which tells us which notification channel was triggered.

# From main.py
resource_id = request.headers.get('X-Goog-Resource-ID')
folder_key = RESOURCE_ID_TO_FOLDER_KEY.get(resource_id)
folder_id = FOLDERS_TO_WATCH[folder_key]['folder_id']

Using this resource_id, it looks up the corresponding folder details from the config.json settings loaded at the start of the script. With the folder_id, it then queries the Google Drive API for any files created in that folder within a lookback_minutes window (also defined in the config). This prevents processing old files if the function is triggered unexpectedly.

# From main.py
time_now = datetime.now(timezone.utc)
time_past = time_now - timedelta(minutes=LOOKBACK_MINUTES)
time_filter = time_past.isoformat()

response = DRIVE_SERVICE.files().list(
    q=f"'{folder_id}' in parents and createdTime > '{time_filter}' and trashed = false",
    fields='files(id, name, mimeType)',
    orderBy='createdTime'
).execute()

For each new file found, the function downloads it from Google Drive. A nice feature here is that it checks the file’s mimeType. If it’s a Google Workspace file (like a Sheet or Doc), it uses the export_media method to convert it to a standard format (e.g., .xlsx). Otherwise, it downloads the binary file directly. Finally, it uploads the file content to the specified Google Cloud Storage bucket. Something not used for this project but it’s good to plan ahead.

Function 2: process_csv_to_bigquery

This second function is triggered automatically when the first function uploads a new file to the GCS bucket. The event payload tells the function the bucket and file_name that was just added.

The first step is to determine how to parse this file. It iterates through the parsers defined in config.json and uses the filename_pattern (a regular expression) to find a match.

# From main.py
file_name = cloud_event.data["name"]
parser_config = None
for parser in PARSERS.values():
    if re.match(parser['filename_pattern'], file_name):
        parser_config = parser
        break

If a matching parser is found, the function downloads the file from GCS. Since the config specifies the file_type as csv, it uses Python’s built-in csv module to read the data row by row.

The real transformation happens next. For each row in the CSV, it iterates through the schema defined in the parser’s configuration. This schema maps the source_column from the CSV to the destination name and type in BigQuery. It performs data type conversions as specified, for example, converting date strings into the correct format and cleaning up numbers.

# From main.py
reader = csv.DictReader(io.StringIO(file_data), ...)
for row in reader:
    new_row = {}
    for col_schema in parser_config['schema']:
        raw_value = row[col_schema['source_column']]
        # --- Data Type Conversion ---
        if col_schema['type'] == 'DATE':
            # ... convert date format ...
        elif col_schema['type'] == 'FLOAT':
            # ... convert to float ...
        # ... etc ...
        new_row[col_schema['name']] = converted_value
    rows_to_insert.append(new_row)

Finally, with a list of cleaned and structured rows, it uses the BigQuery client library to load the data. The write_disposition from the config determines whether to append the data or, in this case, WRITE_TRUNCATE the table before loading the new data.

Conclusion & Dashboard

This project was something I wanted to do for a while, so it was great to get in completed in a couple of evenings. I love building event-driven, serverless architecture on Google Cloud and should think of more projects like this to complete. By using a configuration-driven design, the pipeline is robust and adaptable for future data integration needs too, so there’s other exports I could potentially do next.

With the data now in BigQuery, I have created a Looker Studio dashboard to visualise my spending habits over time. I’m going to experiment with connecting this data to LLMs via MCP Servers to provide some interesting insights next.

The full source code for this project is available on my GitHub. Google Gemini helped with writing this blog post which would explain the empty and soulless writing style. I wrote the previous sentence and this one too.