Building a data pipeline CLI
Table of Contents
I want to write a CLI that fetches CSV files from an SFTP server, transforms them, and loads them into a database. It's Python, and I need a reproducible environment where every developer on my team gets the same Python version and the same pinned dependencies.
Here is a simplified version of my transform step — it uppercases the name column:
"""Tiny CSV transform: read from stdin, uppercase the name column, write to stdout."""
import csv
import sys
reader = csv.DictReader(sys.stdin)
writer = csv.DictWriter(sys.stdout, fieldnames=reader.fieldnames, lineterminator="\n")
writer.writeheader()
for row in reader:
row["name"] = row["name"].upper()
writer.writerow(row)
And a sample input file:
name,email
alice,alice@example.com
bob,bob@example.com
First I set up my Dagger project and install daggerlib:
dagger init --sdk=python
dagger install github.com/Konubinix/daggerlib
The following is only for testing purposes — it makes the project use the local daggerlib checkout instead of the published version:
sed -i '/pin/d; s|"github.com/Konubinix/daggerlib@main"|"../.."|; s|"\.\./\.\.",|"../.."|' dagger.json
Then I write my Dagger module. Each <<...>> reference is defined in
the sections below:
from typing import Annotated
import dagger
from dagger import DefaultPath, dag, function, object_type
@object_type
class DataPipeline:
@function
async def transform(self, src: Annotated[dagger.Directory, DefaultPath(".")]) -> str:
"""Run the CSV transform in a clean Alpine container."""
return await (
dag.lib().alpine_python_user_venv()
.with_file('/app/transform.py', src.file('transform.py'))
.with_file('/app/sample.csv', src.file('sample.csv'))
.with_exec(['sh', '-c', 'python3 transform.py < sample.csv'])
.stdout()
)
@function
async def check_requests(self) -> str:
"""Verify requests is installed in the venv."""
return await (
dag.lib().alpine_python_user_venv(pip_packages=['requests'])
.with_exec(['python3', '-c', 'import requests; print(requests.__version__)'])
.stdout()
)
@function
async def transform_debian(self, src: Annotated[dagger.Directory, DefaultPath(".")]) -> str:
"""Run the same transform on Debian for native library compatibility."""
return await (
dag.lib().debian_python_user_venv()
.with_file('/app/transform.py', src.file('transform.py'))
.with_file('/app/sample.csv', src.file('sample.csv'))
.with_exec(['sh', '-c', 'python3 transform.py < sample.csv'])
.stdout()
)
@function
async def pip_lock(self, src: Annotated[dagger.Directory, DefaultPath(".")]) -> str:
"""Lock dependencies with pip-compile."""
return await (
dag.lib().pip_tools()
.with_file('/home/sam/requirements.in', src.file('requirements.in'))
.with_exec(['sh', '-c', 'pip-compile --quiet requirements.in && head -5 requirements.txt'])
.stdout()
)
1. Running the transform in a clean environment
I write a Dagger function that gets a Python container, injects my script and data, and runs the transform:
@function
async def transform(self, src: Annotated[dagger.Directory, DefaultPath(".")]) -> str:
"""Run the CSV transform in a clean Alpine container."""
return await (
dag.lib().alpine_python_user_venv()
.with_file('/app/transform.py', src.file('transform.py'))
.with_file('/app/sample.csv', src.file('sample.csv'))
.with_exec(['sh', '-c', 'python3 transform.py < sample.csv'])
.stdout()
)
dagger call transform
2. Adding requests for the SFTP client wrapper
My pipeline depends on requests for the download step. I add it
to the container:
@function
async def check_requests(self) -> str:
"""Verify requests is installed in the venv."""
return await (
dag.lib().alpine_python_user_venv(pip_packages=['requests'])
.with_exec(['python3', '-c', 'import requests; print(requests.__version__)'])
.stdout()
)
dagger call check-requests
3. Switching to Debian for native database drivers
The load step uses psycopg2 to talk to PostgreSQL. That requires
libpq-dev and a C compiler, which are easier on Debian. Same
workflow — my transform runs identically:
@function
async def transform_debian(self, src: Annotated[dagger.Directory, DefaultPath(".")]) -> str:
"""Run the same transform on Debian for native library compatibility."""
return await (
dag.lib().debian_python_user_venv()
.with_file('/app/transform.py', src.file('transform.py'))
.with_file('/app/sample.csv', src.file('sample.csv'))
.with_exec(['sh', '-c', 'python3 transform.py < sample.csv'])
.stdout()
)
dagger call transform-debian
4. Pinning dependencies with pip-tools
I don't want surprise upgrades breaking the pipeline at 3am. I use
pip-compile to lock all transitive dependencies from my
requirements.in:
requests
@function
async def pip_lock(self, src: Annotated[dagger.Directory, DefaultPath(".")]) -> str:
"""Lock dependencies with pip-compile."""
return await (
dag.lib().pip_tools()
.with_file('/home/sam/requirements.in', src.file('requirements.in'))
.with_exec(['sh', '-c', 'pip-compile --quiet requirements.in && head -5 requirements.txt'])
.stdout()
)
dagger call pip-lock