Scheduling with Cloud Composer#

Cloud Composer is a fully managed workflow orchestration service built on Apache Airflow.

A situation where Synthesized would benefit from Airflow is when creating an overnight scheduled batch. For this example we will use a simple script adapted from single table synthesis guide:

# synthesis.py

import synthesized

df = synthesized.util.get_example_data()
df_meta = synthesized.MetaExtractor.extract(df)

synthesizer = synthesized.HighDimSynthesizer(df_meta)

synthesizer.learn(df)
data = synthesizer.synthesize(num_rows=42)
print(data) # dump the dataframe to stdout

Note

For demonstration purposes, we dump the produced dataframe to the stdout. In real-life scenarios, the synthesized data will typically be written to a database or other kind of storage.

To run Airflow workloads with Composer, we will build a Docker image with preinstalled Synthesized SDK and the script above. We will then write, deploy and launch a DAG that will run synthesis using KubernetesPodOperator.

Building Docker image#

Dockerfile should be created with the following content:

# Dockerfile

FROM synthesizedio.jfrog.io/synthesized-docker/synthesized:1.8

ADD synthesis.py /
CMD python /synthesis.py

With this file in the current directory, build and push the Docker image to GCR:

docker build -t gcr.io/{{ GCP-PROJECT-ID }}/synthesizer:0.0.1 .
docker push gcr.io/{{ GCP-PROJECT-ID }}/synthesizer:0.0.1

Note

You can read more on how to work with GCR here: https://cloud.google.com/container-registry/docs

Authoring and deploying Airflow DAG#

Now that the Docker image published to GCR, we can create and publish an Airflow DAG. Before that, we need to setup a Kubernetes secret that will hold the Synthesized licence key:

kubectl create secret generic synthesized --from-literal licence-key={{SYNTHESIZED-LICENCE-KEY}}

This secret will be later injected as an environment variable when Kubernetes will launch the pod.

The DAG definition is as follows:

# synthesized_dag.py

import datetime

from airflow import models
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.kubernetes.secret import Secret

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

secret_env = Secret(
    deploy_type='env',
    # The name of the environment variable to inject the secret value to
    deploy_target='SYNTHESIZED_KEY',
    # Name of the Kubernetes Secret we created previously
    secret='synthesized',
    # Key of a secret stored in this Secret object
    key='licence-key')

with models.DAG(
        dag_id='synthesized',
        schedule_interval=datetime.timedelta(days=1),
        start_date=YESTERDAY) as dag:
    kubernetes_min_pod = KubernetesPodOperator(
        # The ID specified for the task.
        task_id='synthesized-sdk-example',
        # Name of task you want to run, used to generate Pod ID.
        name='synthesized-sdk-example',
        namespace='default',
        image='gcr.io/{GCP-PROJECT-ID}/synthesizer:0.0.1',
        secrets=[secret_env]
    )

Create new Cloud Composer Environment as described here. Upload synthesized_dag.py the environment’s Google Storage bucket according to these instructions. Go to the Airflow UI and wait for the newly uploaded DAG to be discovered by Airflow. Once it’s discovered, a synthesized-sdk-example will be kicked off. Discovery may take up to 5 minutes with default settings. To reduce this time, you can modify Airflow configuration. For example, this will set scan interval to 30 seconds:

[scheduler]
dag_dir_list_interval = 30

Navigate to the task logs. You will see initialization and progress indication logs there, like this:

...
Training stage 3/5:  42%|████▏
Training stage 3/5:  42%|████▏
Training stage 3/5:  42%|████▏
Training stage 3/5:  42%|████▏
Training stage 3/5:  42%|████▏
Training stage 3/5:  42%|████▏
Training stage 3/5:  42%|████▏
...

Once the task is complete, you will see the resulting dataframe contents dumped to the log (and probably truncated because of default Airflow logging setup):

...
[2022-03-22, 13:32:22 UTC] {pod_manager.py:203} INFO -     SeriousDlqin2yrs  ...    DebtRatio
[2022-03-22, 13:32:22 UTC] {pod_manager.py:203} INFO - 0                  0  ...     0.106937
...