Building a data pipeline CLI

Table of Contents

I want to write a CLI that fetches CSV files from an SFTP server, transforms them, and loads them into a database. It's Python, and I need a reproducible environment where every developer on my team gets the same Python version and the same pinned dependencies.

Here is a simplified version of my transform step — it uppercases the name column:

"""Tiny CSV transform: read from stdin, uppercase the name column, write to stdout."""
import csv
import sys

reader = csv.DictReader(sys.stdin)
writer = csv.DictWriter(sys.stdout, fieldnames=reader.fieldnames, lineterminator="\n")
writer.writeheader()
for row in reader:
    row["name"] = row["name"].upper()
    writer.writerow(row)

And a sample input file:

name,email
alice,alice@example.com
bob,bob@example.com

First I set up my Dagger project and install daggerlib:

dagger init --sdk=python
dagger install github.com/Konubinix/daggerlib

The following is only for testing purposes — it makes the project use the local daggerlib checkout instead of the published version:

sed -i '/pin/d; s|"github.com/Konubinix/daggerlib@main"|"../.."|; s|"\.\./\.\.",|"../.."|' dagger.json

Then I write my Dagger module. Each <<...>> reference is defined in the sections below:

from typing import Annotated

import dagger
from dagger import DefaultPath, dag, function, object_type


@object_type
class DataPipeline:
    @function
    async def transform(self, src: Annotated[dagger.Directory, DefaultPath(".")]) -> str:
        """Run the CSV transform in a clean Alpine container."""
        return await (
            dag.lib().alpine_python_user_venv()
            .with_file('/app/transform.py', src.file('transform.py'))
            .with_file('/app/sample.csv', src.file('sample.csv'))
            .with_exec(['sh', '-c', 'python3 transform.py < sample.csv'])
            .stdout()
        )
    @function
    async def check_requests(self) -> str:
        """Verify requests is installed in the venv."""
        return await (
            dag.lib().alpine_python_user_venv(pip_packages=['requests'])
            .with_exec(['python3', '-c', 'import requests; print(requests.__version__)'])
            .stdout()
        )
    @function
    async def transform_debian(self, src: Annotated[dagger.Directory, DefaultPath(".")]) -> str:
        """Run the same transform on Debian for native library compatibility."""
        return await (
            dag.lib().debian_python_user_venv()
            .with_file('/app/transform.py', src.file('transform.py'))
            .with_file('/app/sample.csv', src.file('sample.csv'))
            .with_exec(['sh', '-c', 'python3 transform.py < sample.csv'])
            .stdout()
        )
    @function
    async def pip_lock(self, src: Annotated[dagger.Directory, DefaultPath(".")]) -> str:
        """Lock dependencies with pip-compile."""
        return await (
            dag.lib().pip_tools()
            .with_file('/home/sam/requirements.in', src.file('requirements.in'))
            .with_exec(['sh', '-c', 'pip-compile --quiet requirements.in && head -5 requirements.txt'])
            .stdout()
        )

1. Running the transform in a clean environment

I write a Dagger function that gets a Python container, injects my script and data, and runs the transform:

@function
async def transform(self, src: Annotated[dagger.Directory, DefaultPath(".")]) -> str:
    """Run the CSV transform in a clean Alpine container."""
    return await (
        dag.lib().alpine_python_user_venv()
        .with_file('/app/transform.py', src.file('transform.py'))
        .with_file('/app/sample.csv', src.file('sample.csv'))
        .with_exec(['sh', '-c', 'python3 transform.py < sample.csv'])
        .stdout()
    )
dagger call transform

2. Adding requests for the SFTP client wrapper

My pipeline depends on requests for the download step. I add it to the container:

@function
async def check_requests(self) -> str:
    """Verify requests is installed in the venv."""
    return await (
        dag.lib().alpine_python_user_venv(pip_packages=['requests'])
        .with_exec(['python3', '-c', 'import requests; print(requests.__version__)'])
        .stdout()
    )
dagger call check-requests

3. Switching to Debian for native database drivers

The load step uses psycopg2 to talk to PostgreSQL. That requires libpq-dev and a C compiler, which are easier on Debian. Same workflow — my transform runs identically:

@function
async def transform_debian(self, src: Annotated[dagger.Directory, DefaultPath(".")]) -> str:
    """Run the same transform on Debian for native library compatibility."""
    return await (
        dag.lib().debian_python_user_venv()
        .with_file('/app/transform.py', src.file('transform.py'))
        .with_file('/app/sample.csv', src.file('sample.csv'))
        .with_exec(['sh', '-c', 'python3 transform.py < sample.csv'])
        .stdout()
    )
dagger call transform-debian

4. Pinning dependencies with pip-tools

I don't want surprise upgrades breaking the pipeline at 3am. I use pip-compile to lock all transitive dependencies from my requirements.in:

requests
@function
async def pip_lock(self, src: Annotated[dagger.Directory, DefaultPath(".")]) -> str:
    """Lock dependencies with pip-compile."""
    return await (
        dag.lib().pip_tools()
        .with_file('/home/sam/requirements.in', src.file('requirements.in'))
        .with_exec(['sh', '-c', 'pip-compile --quiet requirements.in && head -5 requirements.txt'])
        .stdout()
    )
dagger call pip-lock

Author: root

Created: 2026-04-18 Sat 21:17

Validate