EtlFlow is an ecosystem of functional libraries in Scala based on ZIO for running complex Auditable workflows which can interact with Google Cloud Platform, AWS, Kubernetes, Databases, SFTP servers, On-Prem Systems and more.
APACHE-2.0 License
EtlFlow is an ecosystem of functional libraries in Scala based on ZIO for running complex Auditable jobs/workflows which can interact with Google Cloud Platform, AWS, Kubernetes, Databases and more.
Below are some important features of this library, some of which come from ZIO.
This project is compiled with scala versions 2.12.17, 2.13.10, 3.3.0
Available via maven central. Add the below latest release as a dependency to your project
SBT
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-gcp" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-jdbc" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-spark" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-k8s" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-http" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-ftp" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-redis" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-aws" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-email" % "1.7.3"
Maven
<dependency>
<groupId>com.github.tharwaninitin</groupId>
<artifactId>etlflow-core_2.12</artifactId>
<version>1.7.3</version>
</dependency>
The core module provides Task and Audit APIs, which are used by all tasks in different modules. It also provides a Job API that facilitates grouping multiple tasks together to leverage auditing and logging capabilities at the job/workflow level.
Below is the simplest example of creating a Task and running it using EtlFlow. This example uses the noop audit backend, which does nothing. This is useful when you want to test a task that requires an audit backend to be passed in.
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.7.3"
import etlflow.audit.Audit
import etlflow.task.GenericTask
import zio._
object Job1 extends ZIOAppDefault {
def executeTask(): Task[Unit] = ZIO.logInfo(s"Hello EtlFlow Task")
val genericTask1: GenericTask[Any, Unit] = GenericTask(
name = "Generic Task",
task = executeTask()
)
val task1: RIO[Audit, Unit] = genericTask1.toZIO
override def run: Task[Unit] = task1.provide(etlflow.audit.noop)
}
EtlFlow provides an auditing interface that can be used to track the execution of tasks and jobs (collections of tasks). The auditing interface is integrated with the Task Interface. Each task uses this interface to maintain the state of all tasks in the job/workflow in the backend of choice for end-to-end auditability. Currently, there are audit backend implementations available for BigQuery, MySQL, and Postgres. Audit has a simple and concise interface, which makes it quite easy to add any new backend.
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-jdbc" % "1.7.3"
import etlflow.task.{GenericTask, DBQueryTask}
import etlflow.model.Credential.JDBC
import zio._
object Job2 extends ZIOAppDefault {
private val task1 = GenericTask(
name = "Generic Task 1",
task = ZIO.logInfo(s"Task 1")
)
private val task2 = GenericTask(
name = "Generic Task 2",
task = ZIO.logInfo(s"Task 2")
)
val job = for {
_ <- task1.toZIO
_ <- task2.toZIO
} yield ()
private val cred = JDBC(sys.env("DB_URL"), sys.env("DB_USER"), sys.env("DB_PWD"), sys.env("DB_DRIVER"))
override def run: Task[Unit] = job.provide(etlflow.audit.DB(cred))
}
Here's a snapshot of data for the task_run
table after this job has run:
task_run_id | job_run_id | task_name | task_type | metadata | status | created_at | modified_at |
---|---|---|---|---|---|---|---|
1 | 100 | Task 1 | GenericTask | {} | SUCCESS | 2023-07-13 10:00:00 UTC | 2023-07-13 11:00:00 UTC |
2 | 100 | Task 2 | GenericTask | {} | RUNNING | 2023-07-13 12:00:00 UTC | 2023-07-13 13:00:00 UTC |
Job API enables grouping multiple tasks together for auditing capabilities at the job level, below is the example of creating a JobApp and running it using EtlFlow. By default, it uses noop audit layer but here we are using JDBC layer to persist auditing information in database.
import etlflow._
import etlflow.audit.Audit
import etlflow.task._
import zio._
object MyJobApp extends JobApp {
private val cred = JDBC(sys.env("DB_URL"), sys.env("DB_USER"), sys.env("DB_PWD"), sys.env("DB_DRIVER"))
override val auditLayer: Layer[Throwable, Audit] = etlflow.audit.DB(cred)
private val task1 = GenericTask(
name = "Task_1",
task = ZIO.logInfo(s"Hello EtlFlow Task")
)
def job(args: Chunk[String]): RIO[audit.Audit, Unit] = task1.toZIO
}
Here's a snapshot of data for the job_run
and task_run
table after this job has run:
job_run_id | job_name | metadata | status | created_at | modified_at |
---|---|---|---|---|---|
1 | MyJobApp | {} | SUCCESS | 2023-07-13 10:00:00 UTC | 2023-07-13 11:00:00 UTC |
task_run_id | job_run_id | task_name | task_type | metadata | status | created_at | modified_at |
---|---|---|---|---|---|---|---|
1 | 1 | Task 1 | GenericTask | {} | SUCCESS | 2023-07-13 10:00:00 UTC | 2023-07-13 11:00:00 UTC |
# To run all below GCP examples set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the location of the service account json key.
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/key.json
import etlflow.task._
import gcp4zio.dp._
import etlflow.audit
import etlflow.audit.Audit
import zio._
val gcpProject: String = "GCP_PROJECT"
val gcpRegion: String = "GCP_REGION"
val dpCluster: String = "DP_CLUSTER"
val dpEndpoint: String = "DP_ENDPOINT"
val dpBucket: String = "DP_BUCKET"
val createCluster = DPCreateTask("DPCreateTask", dpCluster, ClusterProps(dpBucket))
val deleteCluster = DPDeleteTask("DPDeleteTask", dpCluster)
val args = List("1000")
val mainClass = "org.apache.spark.examples.SparkPi"
val libs = List("file:///usr/lib/spark/examples/jars/spark-examples.jar")
val conf = Map("spark.executor.memory" -> "1g", "spark.driver.memory" -> "1g")
val sparkJob = DPSparkJobTask("DPSparkJobTask", args, mainClass, libs, conf)
val programGCP: RIO[DPJob with DPCluster with Audit, Unit] = for {
_ <- createCluster.toZIO
_ <- sparkJob.toZIO
_ <- deleteCluster.toZIO
} yield ()
val dpJobLayer = DPJob.live(dpCluster, gcpProject, gcpRegion, dpEndpoint)
val dpClusterLayer = DPCluster.live(gcpProject, gcpRegion, dpEndpoint)
programGCP.provide(dpJobLayer,dpClusterLayer,audit.noop)
Check this for complete example.
// Todo
This module depends on kubernetes official Java client library version 18.0.1
import etlflow.task._
import etlflow.k8s._
import etlflow.audit
import etlflow.audit.Audit
import zio._
val jobName: String = "hello"
val programK8S: RIO[K8S with Audit, Unit] = for {
_ <- K8SJobTask(
name = "CreateKubeJobTask",
jobName = jobName,
image = "busybox:1.28",
command = Some(List("/bin/sh", "-c", "sleep 5; ls /etc/key; date; echo Hello from the Kubernetes cluster"))
).toZIO
_ <- K8STrackJobTask("TrackKubeJobTask", jobName).toZIO
_ <- K8SJobLogTask("GetKubeJobLogTask", jobName).toZIO
_ <- K8SDeleteJobTask("DeleteKubeJobTask", jobName).toZIO
} yield ()
programK8S.provide(K8S.live(),audit.noop)
Check this for complete example.
// Todo
// Todo
// Todo
// Todo
// Todo
Please feel free to add issues to report any bugs or to propose new features.