Service for extracting tables from the CCAO system-of-record and uploading them to the Data Department's data warehouse
AGPL-3.0 License
This repository contains the dependencies and code necessary to run
Spark extract jobs targeting Cook
County's iasWorld property system-of-record. It is a replacement for
service-sqoop-iasworld
,
which is now deprecated.
Each Spark job pulls an iasWorld table (or part of a table) via JDBC and writes it as Hive-partitioned Parquet files to AWS S3. The Data Department then queries the Parquet files using AWS Athena, providing a 1-1 mirror of the system-of-record for analytical queries.
Jobs are submitted in "batches" (called applications by Spark). Each batch may contain multiple extract jobs. Once all jobs for a batch are complete, we also (optionally) trigger four additional processes. In order:
.env
file.[!NOTE] Before attempting to submit batches to the cluster, first make sure the Spark Docker Compose stack is active by running
docker compose up -d
in the repository. Also, make sure all secret and.env
files are populated, see Files not included for more information.
service-spark-iasworld
job batches are submitted via JSON, either as a string
or as a file. All batches should have the format below. Note that the name
of the job itself (e.g. job2
below) is arbitrary.
{
"addn": {
"table_name": "iasworld.addn",
"min_year": 2020,
"max_year": 2024,
"cur": ["Y", "D"],
"predicates_path": "default_predicates.sql"
},
"job2": {
"table_name": "iasworld.asmt_all",
"min_year": 2021,
"max_year": 2021,
"cur": ["Y"],
"predicates_path": "default_predicates.sql"
}
}
table_name (required)
- Name of the iasWorld table to extract, must beiasworld.
(or ias.
for test environment).min_year (optional)
- Minimum tax year (inclusive) to extract from thenull
in a job definition to ignore this column when filtering1999
.max_year (optional)
- Maximum tax year (inclusive) to extract from themin_year
and max_year
to the samenull
in a job definition to ignore this column when filteringcur (optional)
- Values of the cur
column to extract from the table.null
in a job definition to ignore this column["Y", "N", "D"]
.predicates_path (optional)
- String path to a SQL file within the config/
null
in a job definitiondefault_predicates.sql
.The example batch above contains two separate jobs, one per table. If you want to add additional tables/jobs to the batch, you can manually add the corresponding table objects and modify the fields as listed above.
In practice, modifying JSON is a bit of a pain, so we store long-lived
batch and job definitions in YAML, then convert them to JSON using yq
.
The file config/default_jobs.yaml
contains definitions for three common job
batches:
Batches are submitted to the Spark Docker cluster via the command line. The
main job submission argument is either --json-string
or --json-file
.
For example, to submit the test jobs in config/default_jobs.yaml
via
--json-string
, run the following command:
docker exec spark-node-master ./submit.sh \
--json-string "$(yq -o=json .test_jobs ./config/default_jobs.yaml)"
Or from a file:
yq -o=json .test_jobs ./config/default_jobs.yaml > /tmp/jobs.json
docker exec spark-node-master ./submit.sh --json-file /tmp/jobs.json
The command line interface also has multiple optional flags:
--extract-target
- iasWorld target environment to extract data from. Mustprod
or test
. Defaults to prod
.--run-github-workflow/--no-run-github-workflow
- Run the test_dbt_models
--run-glue-crawler/--no-run-glue-crawler
- Run the iasWorld Glue crawler--upload-data/--no-upload-data
- Upload extracted data to the iasWorld S3--upload-logs/--no-upload-logs
- Upload batch logs to AWS CloudWatch?The default values for these flags are set in the config/default_settings.yaml
file. The boolean flags are all True
by default.
Spark automatically attempts to mirror the data types within iasWorld using its own equivalent types. However, on occasion, it may use an incorrect or undesirable type. In such cases, this repository provides a hierarchical system of column-level schema/type overrides, with each type overriding the previous one:
NUMBER
Oracle types are converted to DECIMAL(10,0)
TIMESTAMP
Oracle types are converted to STRING
. This behavior isconfig/default_settings.yaml
.config/table_definitions.yaml
.[!WARNING]
NUMERIC
types are implicitly converted toDECIMAL(10,0)
because as of 2024, allNUMERIC
columns without a specified precision and scale are actually just integers. If this changes in the future, it's possible that we could begin to silently truncate numbers via this implicit type conversion. As such, stay on top of schema updates from the iasWorld team.
Predicates, filters, and partitions are Spark concepts used to construct individual jobs in a batch. They are mostly handled automatically, but you may need to change them in rare cases. The list below outlines the role of each concept and how to change them if needed:
config/
min_year
, max_year
,cur
values passed as part of a job definition. If these values are$TABLE/taxyr=$YEAR/cur=$CUR_VALUE/part-0.parquet
.min_year
,max_year
, and/or cur
values that are set. If these values are all null,$TABLE/part-0.parquet
.Some necessary setup and credential files are not included in this repository for security or licensing reasons. Templated versions are included for instructional purposes. If you want to use this repository, you will need to populate the following:
drivers/ojdbc8.jar
- This is the JDBC driver for our Oracle backend andsecrets/
- These are credential files needed to connect to other systems..env
- This file sets a few non-critical but still private options.Batches are currently scheduled via
cron
. To edit the
schedule file, use crontab -e
as the main server user. The example crontab
file below schedules daily jobs for frequently updated tables and weekly ones
for rarely-updated tables. Note that the jobs currently must be run as
user 1003.
# Extract recent years from frequently used tables on weekdays at 1 AM CST
0 6 * * 1,2,3,4,5 docker exec spark-node-master ./submit.sh --json-string "$(yq -o=json .default_jobs /full/path/to/default_jobs.yaml)"
# Extract all tables on Saturday at 1 AM CST
0 6 * * 6 docker exec spark-node-master ./submit.sh --json-string "$(yq -o=json .weekend_jobs /full/path/to/default_jobs.yaml)"
# Extract all test environment tables on Sunday at 1 AM CST
0 6 * * 7 docker exec spark-node-master ./submit.sh --json-string "$(yq -o=json .weekend_jobs_test /full/path/to/default_jobs.yaml)" --no-run-github-workflow --extract-target test
Here's a breakdown of important files and the purpose of each one:
.
├── docker-compose.yaml - Defines the Spark nodes, environment, and networking
├── Dockerfile - Defines dependencies bundled in each Spark node
├── .env - Runtime configuration variables passed to containers
├── pyproject.toml - Project metadata and tool settings
├── README.md - This file!
├── run.sh - Entrypoint shell script to create Spark jobs
├── .github/ - GitHub Actions workflows for linting, builds, etc.
├── config/
│ ├── default_jobs.yaml - Define batches of Spark jobs (one per table)
│ ├── default_predicates.sql - List of mutually exclusive SQL BETWEEN expressions
│ ├── default_settings.yaml - Runtime defaults and schema overrides
│ ├── spark-defaults.conf - Spark memory and driver settings
│ └── table_definitions.yaml - Possible job values per table and schema overrides
├── drivers/
│ └── ojdbc8.jar - Not included, but necessary to connect to iasWorld
├── secrets/
│ ├── AWS_CREDENTIALS_FILE - AWS credentials config file specific to this job
│ ├── GH_PEM - GitHub PEM file used to authorize workflow dispatch
│ └── IPTS_PASSWORD - Password file loaded at runtime into containers
├── src/
│ ├── submit_jobs.py - Job submission entrypoint. Takes JSON as input
│ ├── submit.sh - Helper to launch jobs using spark-submit
│ └── utils/
│ ├── aws.py - AWS client class for triggering Glue crawlers
│ ├── github.py - GitHub client class for running Actions workflows
│ ├── helpers.py - Miscellaneous helper functions
│ └── spark.py - Spark job and session classes
└── target/
├── final/ - Landing directory after Parquet repartitioning
└── initial/ - Landing directory for initial JDBC read output