Any Airflow project day 1, you can spin up a local desktop Kubernetes Airflow environment AND one in Google Cloud Composer with tested data pipelines(DAGs) >> [ , ]
MIT License
SAME AIRFLOW DATA PIPELINES | WHEREVER YOU RUN THEM
Any Airflow project day 1, you can spin up a local desktop Kubernetes Airflow environment AND a Google Cloud Composer Airflow environment with working example DAGs across both ✨
It is a painful exercise to setup secure airflow environments with parity(local desktop, dev, qa, prod). Too often, I've done all this work in my local desktop airflow environment only to find out the DAGs don't work in a Kubernetes deployment or vice versa. As I got more hands-on with infrastructure/networking, I was performing two jobs: Data and DevOps engineer. Responsibilities overlap and both roles are traditionally ill-equipped to come to consensus. Either the networking specifics go over the Data engineer's head and/or the data pipeline IAM permissions and DAG idempotency go over the DevOps engineer's head. There's also the issue of security and DevOps saying that spinning up an airflow-dev-cloud-environment is too risky without several development cycles to setup bastion hosts, subnets, private IPs, etc. These conversations alone can lead to several-weeks delays before you can even START DRAFTING airflow pipelines! It doesn't have to be this way.
This toolkit is for BOTH Data and DevOps engineers to solve the problems above 😲
High-Level Success Criteria:
In Scope
Out of Scope
terratest
for terraform unit testingTime to Complete: 5-10 minutes
Sign up for a free trial OR use an existing GCP account
Manually FORK the repo through the github interface OR CLONE this repo: git clone https://github.com/sungchun12/airflow-toolkit.git
Create a new Google Cloud project
Get into starting position for deployment: cd airflow-toolkit/
Time to Complete: 10-15 minutes
Download docker desktop and start docker desktop
Customize Docker Desktop for the below settings
Click Apply & Restart
where appropriate
Run the below commands in your terminal
# install homebrew
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)"
# install helm, terragrunt, terraform, kubectl to local desktop
brew install helm terragrunt terraform kubectl
# Install Google Cloud SDK and follow the prompts
# https://cloud.google.com/sdk/install
curl https://sdk.cloud.google.com | bash
Close the current terminal and start a new one for the above changes to take effect
Add the Editor
, Secret Manager Admin
, Role Administrator
, and Security Admin
roles
Note: this provides wide permissions for the purposes of this demo, this will need to be updated based on your specific situation
Create a Service Account Key JSON File-should automatically download
Move private JSON
key into the root directory of this git repo you just cloned and rename it account.json
(don't worry it will be officially gitignored
)
Run the below commands in your local desktop terminal
# Authenticate gcloud commands with service account key file
gcloud auth activate-service-account --key-file account.json
# Enable in scope Google Cloud APIs
gcloud services enable \
compute.googleapis.com \
iam.googleapis.com \
cloudresourcemanager.googleapis.com \
bigquery.googleapis.com \
storage-component.googleapis.com \
storage.googleapis.com \
container.googleapis.com \
containerregistry.googleapis.com \
composer.googleapis.com \
secretmanager.googleapis.com
# Store contents of private service account key in Secrets Manager to be used by airflow later within the `add_gcp_connections.py` DAG
# Create a secrets manager secret from the key
gcloud secrets create airflow-conn-secret \
--replication-policy="automatic" \
--data-file=account.json
# List the secret
gcloud secrets list
# verify secret contents ad hoc
gcloud secrets versions access latest --secret="airflow-conn-secret"
# if you run the below toolkits multiple times, there may be times where you'll have to delete and recreate the secret
gcloud secrets delete airflow-conn-secret
# Optional: install specific Google Cloud version of kubectl
# The homebrew installation earlier above will suffice
# ┌──────────────────────────────────────────────────────────────────┐
# │ These components will be installed. │
# ├─────────────────────┬─────────────────────┬──────────────────────┤
# │ Name │ Version │ Size │
# ├─────────────────────┼─────────────────────┼──────────────────────┤
# │ kubectl │ 1.15.11 │ < 1 MiB │
# │ kubectl │ 1.15.11 │ 87.1 MiB │
# └─────────────────────┴─────────────────────┴──────────────────────┘
gcloud components install kubectl
# Configure Docker
gcloud auth configure-docker
# Create SSH key pair for secure git clones
ssh-keygen
cat ~/.ssh/id_rsa.pub
Manually create a cloud source mirror repo
based on the GitHub repo
Note: documented to not be possible through the current state API-further reading
Replace all relevant variables within dags/
folder
# file location
# /airflow-toolkit/dags/add_gcp_connections.py
CONN_PARAMS_DICT = {
"gcp_project": "wam-bam-258119", # replace with your specific project
"gcp_conn_id": "my_gcp_connection",
"gcr_conn_id": "gcr_docker_connection",
"secret_name": "airflow-conn-secret",
}
# file location
# /airflow-toolkit/dags/bigquery_connection_check.py
TASK_PARAMS_DICT = {
"dataset_id": "dbt_bq_example",
"project_id": "wam-bam-258119", # replace with your specific project
"gcp_conn_id": "my_gcp_connection",
}
# file location
# /airflow-toolkit/dags/airflow_utils.py
GIT_REPO = "github_sungchun12_airflow-toolkit" # replace with the cloud source mirror repo name
PROJECT_ID = "wam-bam-258119" # replace with your specific project
After doing the above ONCE, you can run the below toolkits multiple times with the same results(idempotent)
Time to Complete: 5-8 minutes
Note: This was ONLY tested on a Mac desktop environment
General Mechanics
shell script logs will generate similar mechanics
Note: I plan to automate this yaml setup in a future feature
custom-setup.yaml
, starting at line 174
pwd
in your terminal from the root airflow-toolkit/
directory and replace all the <YOUR GIT REPO DIRECTORY HERE>
placeholdersextraVolumes: # this will create the volume from the directory
- name: dags
hostPath:
path: <YOUR GIT REPO DIRECTORY HERE>/dags/
- name: dag-environment-configs
hostPath:
path: <YOUR GIT REPO DIRECTORY HERE>/dag_environment_configs/
- name: kube-config
hostPath:
path: <YOUR GIT REPO DIRECTORY HERE>/.kube/
- name: service-account
hostPath:
path: <YOUR GIT REPO DIRECTORY HERE>/account.json
- name: tests
hostPath:
path: <YOUR GIT REPO DIRECTORY HERE>/tests/
# example below
extraVolumes: # this will create the volume from the directory
- name: dags
hostPath:
path: /Users/sung/Desktop/airflow-toolkit/dags/
- name: dag-environment-configs
hostPath:
path: /Users/sung/Desktop/airflow-toolkit/dag_environment_configs/
- name: kube-config
hostPath:
path: /Users/sung/Desktop/airflow-toolkit/.kube/
- name: service-account
hostPath:
path: /Users/sung/Desktop/airflow-toolkit/account.json
- name: tests
hostPath:
path: /Users/sung/Desktop/airflow-toolkit/tests/
#!/bin/bash
# follow terminal prompt after entering below command
# leave this terminal open to sustain airflow webserver
# Set of environment variables
export ENV="dev"
export PROJECT_ID="airflow-demo-build"
export DOCKER_DBT_IMG="gcr.io/$PROJECT_ID/dbt_docker:$ENV-latest"
source deploy_local_desktop_airflow.sh
Note:
bigquery_connection_check
will fail unlessadd_gcp_connections
succeeds first
Note: the airflow webserver may freeze given resource limitations
press
ctrl + c
within the terminal where you ran the deploy script
Run the below commands in your terminal(these already exist within the deploy script)
# view airflow UI
export POD_NAME=$(kubectl get pods --namespace airflow -l "component=web,app=airflow" -o jsonpath="{.items[0].metadata.name}")
echo "airflow UI webserver --> http://127.0.0.1:8080"
kubectl port-forward --namespace airflow $POD_NAME 8080:8080
# start a remote shell in the airflow worker for ad hoc operations or to run pytests
kubectl exec -it airflow-worker-0 -- /bin/bash
➜ airflow-toolkit git:(feature-docs) ✗ kubectl exec -it airflow-worker-0 -- /bin/bash
# list files in current working directory
airflow@airflow-worker-0:/opt/airflow$ ls
airflow.cfg dag_environment_configs dags logs tests unittests.cfg
# run all test scripts
airflow@airflow-worker-0:/opt/airflow$ pytest -vv --disable-pytest-warnings
======================================== test session starts ========================================
platform linux -- Python 3.6.10, pytest-5.4.3, py-1.9.0, pluggy-0.13.1 -- /usr/local/bin/python
cachedir: .pytest_cache
rootdir: /opt/airflow
plugins: celery-4.4.2
collected 19 items
tests/test_add_gcp_connections.py::test_import_dags PASSED [ 5%]
tests/test_add_gcp_connections.py::test_contains_tasks PASSED [ 10%]
tests/test_add_gcp_connections.py::test_task_dependencies PASSED [ 15%]
tests/test_add_gcp_connections.py::test_schedule PASSED [ 21%]
tests/test_add_gcp_connections.py::test_task_count_test_dag PASSED [ 26%]
tests/test_add_gcp_connections.py::test_tasks[t1] PASSED [ 31%]
tests/test_add_gcp_connections.py::test_tasks[t2] PASSED [ 36%]
tests/test_add_gcp_connections.py::test_end_to_end_pipeline SKIPPED [ 42%]
tests/test_dbt_example.py::test_import_dags PASSED [ 47%]
tests/test_dbt_example.py::test_contains_tasks PASSED [ 52%]
tests/test_dbt_example.py::test_task_dependencies PASSED [ 57%]
tests/test_dbt_example.py::test_schedule PASSED [ 63%]
tests/test_dbt_example.py::test_task_count_test_dag PASSED [ 68%]
tests/test_dbt_example.py::test_dbt_tasks[dbt_debug] PASSED [ 73%]
tests/test_dbt_example.py::test_dbt_tasks[dbt_run] PASSED [ 78%]
tests/test_dbt_example.py::test_dbt_tasks[dbt_test] PASSED [ 84%]
tests/test_dbt_example.py::test_end_to_end_pipeline SKIPPED [ 89%]
tests/test_sample.py::test_answer PASSED [ 94%]
tests/test_sample.py::test_f PASSED [100%]
============================= 17 passed, 2 skipped, 1 warning in 39.77s =============================
# list DAGs
airflow@airflow-worker-0:/opt/airflow$ airflow list_dags
[2020-08-18 19:17:09,579] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-08-18 19:17:09,580] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dags
Set custom environment variable GOOGLE_APPLICATION_CREDENTIALS for deployment setup: local_desktop
Set custom environment variable GOOGLE_APPLICATION_CREDENTIALS for deployment setup: local_desktop
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
add_gcp_connections
bigquery_connection_check
dbt_example
kubernetes_sample
tutorial
# import, get, set airflow variables
airflow@airflow-worker-0:/opt/airflow$ airflow variables --import /opt/airflow/dag_environment_configs/test_airflow_variables.json
1 of 1 variables successfully updated.
airflow@airflow-worker-0:/opt/airflow$ airflow variables --get test_airflow_variable
do you see this?
airflow@airflow-worker-0:/opt/airflow$ airflow variables --set test_airflow_variable "lovely"
airflow@airflow-worker-0:/opt/airflow$ airflow variables --get test_airflow_variable
lovely
Time to Complete: 1-2 minutes
#!/bin/bash
source teardown_local_desktop_airflow.sh
➜ airflow-toolkit git:(feature-docs) ✗ source teardown_local_desktop_airflow.sh
***********************
Delete Kuberenetes Cluster Helm Deployment and Secrets
***********************
release "airflow" uninstalled
kill: illegal process id: f19
secret "dbt-secret" deleted
secret "gcr-key" deleted
secret "ssh-key-secret" deleted
namespace "airflow" deleted
pytest
directly within this setupairflow
namespace to pull the image from Google Container Registry based on the manually created secret: gcr-key
kubectl get serviceaccounts
NAME SECRETS AGE
airflow 1 43m
default 1 43m
KubernetesPodOperator
will pull the docker image based on the permissions above BUT will run dbt
operations based on the manually created secret: dbt-secret
kubectl get secrets
NAME TYPE DATA AGE
airflow-postgresql Opaque 1 50m
airflow-redis Opaque 1 50m
airflow-token-zfpz8 kubernetes.io/service-account-token 3 50m
dbt-secret Opaque 1 50m
default-token-pz55g kubernetes.io/service-account-token 3 50m
gcr-key kubernetes.io/dockerconfigjson 1 50m
sh.helm.release.v1.airflow.v1 helm.sh/release.v1 1 50m
Optional: Detailed resource management view for local desktop
# install kubernetes dashboard
kubectl apply -f https://raw.githubusercontent.com/kubernetes/dashboard/v2.1.0/aio/deploy/recommended.yaml
# start the web server
kubectl proxy
# view the dashboard
open http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy#/login
# copy and paste the token output into dashboard UI
kubectl -n kube-system describe secret $(kubectl -n kube-system get secret | awk '/^deployment-controller-token-/{print $1}') | awk '$1=="token:"{print $2}'
Enter
ctrl + c
within the terminal where you ran the kubernetes dashboard script to close it
Follow Post-Deployment Instructions for Toolkits #2 & #3
instructions AFTER deployment
Time to Complete: 50-60 minutes(majority of time waiting for cloud composer to finish deploying)
Note: This follows the example directory structure provided by terragrunt with modules housed in the same git repo-further reading
Do NOT run this in parallel with toolkit #3 as default variables will cause conflicts
terraform state will be split into multiple files per module
Read Post-Deployment Instructions for Toolkits #2 & #3 after this deployment
#!/bin/bash
# create a secrets manager secret from the key
gcloud secrets create terraform-secret \
--replication-policy="automatic" \
--data-file=account.json
# List the secret
# example terminal output
# NAME CREATED REPLICATION_POLICY LOCATIONS
# airflow-conn-secret 2020-08-18T19:45:50 automatic -
# terraform-secret 2020-08-12T14:34:50 automatic -
gcloud secrets list
# verify secret contents ad hoc
gcloud secrets versions access latest --secret="terraform-secret"
# file location
# /airflow-toolkit/terragrunt_infrastructure_live/non-prod/account.hcl
locals {
project = "wam-bam-258119" # replace this with your specific project
service_account_email = "[email protected]" # replace this with your specific service account email
}
# file location
# /airflow-toolkit/terragrunt_infrastructure_live/terragrunt.hcl
remote_state {
backend = "gcs"
generate = {
path = "backend.tf"
if_exists = "overwrite"
}
config = {
project = "${local.project}"
location = "${local.region}"
credentials = "${local.credentials_file}"
bucket = "secure-bucket-tfstate-airflow-infra-${local.region}" # replace with something unique BEFORE `-${local.region}`
prefix = "${path_relative_to_include()}"
}
}
#!/bin/bash
# assumes you are already in the the repo root directory
cd terragrunt_infrastructure_live/non-prod/us-central1/dev/
# export the Google Cloud project ID where the secrets live-to be used by terragrunt
# example: export PROJECT_ID="wam-bam-258119"
export PROJECT_ID=<your project id>
gcloud config set project $PROJECT_ID #TODO: add this step to the CICD pipeline rather than the get secret shell script
# this has mock outputs to emulate module dependencies with a prefix "mock-"
# OR you can run a more specific plan
# terragrunt run-all plan -out=terragrunt_plan
# --terragrunt-non-interactive flag if this is run for the first time to create the state gcs bucket without a prompt
# https://github.com/gruntwork-io/terragrunt/issues/486
terragrunt run-all plan --terragrunt-non-interactive
# this has mock outputs to emulate module dependencies
terragrunt run-all validate
# follow terminal prompt after entering below command
# do NOT interrupt this process until finished or it will corrupt terraform state
# OR you can apply a more specific plan
# terragrunt run-all apply terragrunt_plan
terragrunt run-all apply
Time to Complete: 5-10 minutes
#!/bin/bash
# follow terminal prompt after entering below command
terragrunt destroy-all
# you may occasionally see terragrunt errors related to duplicate files
# run the below often to avoid those errors
cd terragrunt_infrastructure_live/
bash terragrunt_cleanup.sh
terragrunt plan-all
command within the dir: ./terragrunt_infrastructure_live/non-prod/
Follow Post-Deployment Instructions for Toolkits #2 & #3
instructions AFTER deployment
Time to Complete: 50-60 minutes(majority of time waiting for cloud composer to finish deploying)
Note: This uses terragrunt as a thin wrapper within a single subdirectory
Do NOT run this in parallel with toolkit #2 as default variables will cause conflicts
terraform state will be stored in one file
Read Post-Deployment Instructions for Toolkits #2 & #3 after this deployment
#!/bin/bash
# create a secrets manager secret from the key
gcloud secrets create terraform-secret \
--replication-policy="automatic" \
--data-file=account.json
# List the secret
# example terminal output
# NAME CREATED REPLICATION_POLICY LOCATIONS
# airflow-conn-secret 2020-08-18T19:45:50 automatic -
# terraform-secret 2020-08-12T14:34:50 automatic -
gcloud secrets list
# verify secret contents ad hoc
gcloud secrets versions access latest --secret="terraform-secret"
# file location
# /airflow-toolkit/terraform_simple_setup/terragrunt.hcl
remote_state {
backend = "gcs"
generate = {
path = "backend.tf"
if_exists = "overwrite"
}
config = {
project = "wam-bam-258119" # replace with your GCP project id
location = "US"
credentials = "service_account.json"
bucket = "secure-bucket-tfstate-composer" # replace with something unique
prefix = "dev"
}
}
# file location
# /airflow-toolkit/terraform_simple_setup/variables.tf
variable "project" {
description = "name of your GCP project"
type = string
default = "big-dreams-please" # replace with your GCP project id
}
variable "service_account_email" {
description = "Service account used for VMs"
type = string
default = "[email protected]" # replace with your service account email
}
account.json
into the directory below and rename it service_account.json
Avoids the hassle of calling the terraform-secret for this simple terraform setup
#!/bin/bash
cd terraform_simple_setup/
# utilizes terragrunt as a thin wrapper utility to automatically create the gcs backend remote state bucket
terragrunt init
# preview the cloud resources you will create
# OR you can run a more specific plan
# terraform plan -out=terraform_plan
terraform plan
# validate terraform syntax and configuration
terraform validate
# follow terminal prompt after entering below command
# OR you can apply a more specific plan
# terraform apply terraform_plan
terraform apply
Time to Complete: 5-10 minutes
#!/bin/bash
# follow terminal prompt after entering below command
terraform destroy
Time to Complete: 5-10 minutes Only compute instances on the same VPC as Cloud Composer can access the environment programmatically
gcloud composer
commands will NOT work on your local desktop
Compute Instance Admin(v1) and Service Account User
roles to the iap ssh service account(adjust the terraform code less) OR create a custom role with compute.instances.setMetadata
(adjust the terraform code more)If you are the owner of the project, you can skip the identity aware proxy ssh step and simply ssh through the console itself
#!/bin/bash
# ssh via identity aware proxy into the bastion host(which will then run commands against cloud composer)
# update the env vars before running ssh tunnel
ACCESS_KEY_FILE="account.json"
PROJECT_ID="airflow-demo-build" # your GCP project ID
ZONE="us-central1-b" # your GCP compute engine ZONE defined in terraform/terragrunt variables, likely us-central1-a or us-central1-b
# SERVICE_ACCOUNT_EMAIL="service-account-iap-ssh@$PROJECT_ID.iam.gserviceaccount.com" # Toolkit 3 Default
SERVICE_ACCOUNT_EMAIL="iap-ssh-sa-dev@$PROJECT_ID.iam.gserviceaccount.com" # Toolkit 2 Default
KEY_FILE="iap-ssh-access-sa.json"
source utils/cloud_composer/iap_ssh_tunnel.sh
# install basic software in the bastion host
sudo apt-get install kubectl git
# Set Composer project, location, and zone
# The hard-code values are based on defaults set by terraform module variables
# Minimizes redundant flags in downstream commands
gcloud config set project airflow-demo-build # your GCP project ID
gcloud config set composer/location us-central1
gcloud config set compute/zone us-central1-b # your GCP compute engine ZONE defined in terraform/terragrunt variables, likely us-central1-a or us-central1-b
# list cloud composer DAGs
gcloud composer environments run dev-composer \
list_dags
# capture cloud composer environment config
COMPOSER_ENVIRONMENT="dev-composer"
COMPOSER_CONFIG=$(gcloud composer environments describe ${COMPOSER_ENVIRONMENT} --format='value(config.gkeCluster)')
# COMPOSER_CONFIG ex: projects/wam-bam-258119/zones/us-central1-b/clusters/us-central1-dev-composer-de094856-gke
# capture kubernetes credentials and have kubectl commands point to this cluster
gcloud container clusters get-credentials $COMPOSER_CONFIG
# copy and paste contents of service account json file from local machine into the bastion host
cat <<EOF > account.json
<paste service account file contents>
EOF
# be very careful with naming convention for this secret or else the KubernetesPodOperator will timeout
kubectl create secret generic dbt-secret --from-file=account.json
# Create SSH key pair for secure git clones
ssh-keygen
# copy and paste contents to your git repo SSH keys section
# https://github.com/settings/keys
cat ~/.ssh/id_rsa.pub
# create the ssh key secret
kubectl create secret generic ssh-key-secret \
--from-file=id_rsa=$HOME/.ssh/id_rsa \
--from-file=id_rsa.pub=$HOME/.ssh/id_rsa.pub
kubectl get secrets
#!/bin/bash
# these commands work from the `airflow-toolkit/` root directory
# reauthorize the main service account to gcloud
gcloud auth activate-service-account --key-file account.json
# add secrets manager IAM policy binding to composer service account
# The hard-code values are based on defaults set by terraform module variables
PROJECT_ID="airflow-demo-build"
MEMBER_SERVICE_ACCOUNT_EMAIL="serviceAccount:composer-sa-dev@$PROJECT_ID.iam.gserviceaccount.com" # Toolkit 2 Default
# MEMBER_SERVICE_ACCOUNT_EMAIL="serviceAccount:composer-dev-account@$PROJECT_ID.iam.gserviceaccount.com" # Toolkit 3 Default
SECRET_ID="airflow-conn-secret"
gcloud secrets add-iam-policy-binding $SECRET_ID \
--member=$MEMBER_SERVICE_ACCOUNT_EMAIL \
--role="roles/secretmanager.secretAccessor"
# Configure variables to interact with cloud composer
export PROJECT_DIR=$PWD
# Set Composer location
gcloud config set composer/location us-central1
COMPOSER_ENVIRONMENT="dev-composer"
COMPOSER_BUCKET=$(gcloud composer environments describe ${COMPOSER_ENVIRONMENT} --format='value(config.dagGcsPrefix)' | sed 's/\/dags//g')
# sync files in dags folder to the gcs bucket linked to cloud composer
# this may not work if you have python 3.8.5 installed on macOS
# see: https://github.com/GoogleCloudPlatform/gsutil/issues/961
gsutil -m rsync -r $PROJECT_DIR/dags $COMPOSER_BUCKET/dags
Note: The airflow webserver will take 30 seconds to update the view with the updated DAGs. However, you can run DAGs as soon as you upload the new files to the gcs bucket.
Note:
bigquery_connection_check
will fail unlessadd_gcp_connections
succeeds first
Folder | Purpose |
---|---|
.github/workflows | Quick terragrunt/terraform validations |
dags | airflow pipeline code |
dags_archive | draft DAG code |
dbt_bigquery_example | Working and locally tested dbt code which performs BigQuery SQL transforms |
Dockerfiles | Docker images to be used by Cloud Composer |
docs | Images and other relevant documentation |
terraform_simple_setup | Terraform modules for a terraform-only setup |
terragrunt_infrastructure_live | Terragrunt orchestrator to run terraform operations |
terragrunt_infrastructure_modules | Base terraform modules for terragrunt to consume in the live directory |
tests | Example DAG test cases |
utils | Various utilities to automate more specific ad hoc tasks |