Scheduling with Cloud Composer

Cloud Composer is a fully managed workflow orchestration service built on Apache Airflow.

A situation where Synthesized would benefit from Airflow is when creating an overnight scheduled batch. For this example we will use a simple script adapted from single table synthesis guide:

synthesis.py
import synthesized

df = synthesized.util.get_example_data()
df_meta = synthesized.MetaExtractor.extract(df)

synthesizer = synthesized.HighDimSynthesizer(df_meta)

synthesizer.learn(df)
data = synthesizer.synthesize(num_rows=42)
print(data)  (1)
1 Dump the data to stdout.
For demonstration purposes, we dump the produced dataframe to the stdout. In real-life scenarios, the synthesized data will typically be written to a database or other kind of storage.

To run Airflow workloads with Composer, we will build a Docker image with preinstalled Synthesized SDK and the script above. We will then write, deploy and launch a DAG that will run synthesis using KubernetesPodOperator.

Building Docker image

Dockerfile should be created with the following content:

FROM synthesizedio.jfrog.io/synthesized-docker/synthesized:1.8

ADD synthesis.py /
CMD python /synthesis.py

With this file in the current directory, build and push the Docker image to GCR:

docker build -t gcr.io/{{ GCP-PROJECT-ID }}/synthesizer:0.0.1 .
docker push gcr.io/{{ GCP-PROJECT-ID }}/synthesizer:0.0.1
You will need to have docker authenticated with your GCP account with permissions to push to GCR: https://cloud.google.com/container-registry/docs/advanced-authentication. You can read more on how to work with GCR here: https://cloud.google.com/container-registry/docs

Authoring and deploying Airflow DAG

Now that the Docker image published to GCR, we can create and publish an Airflow DAG. Before that, we need to setup a Kubernetes secret that will hold the Synthesized licence key:

kubectl create secret generic synthesized --from-literal licence-key={{SYNTHESIZED-LICENCE-KEY}}

This secret will be later injected as an environment variable when Kubernetes will launch the pod.

The DAG definition is as follows:

synthesized_dag.py
import datetime

from airflow import models
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.kubernetes.secret import Secret

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

secret_env = Secret(
    deploy_type='env',
    # The name of the environment variable to inject the secret value to
    deploy_target='SYNTHESIZED_KEY',
    # Name of the Kubernetes Secret we created previously
    secret='synthesized',
    # Key of a secret stored in this Secret object
    key='licence-key')

with models.DAG(
    dag_id='synthesized',
    schedule_interval=datetime.timedelta(days=1),
    start_date=YESTERDAY
) as dag:
    kubernetes_min_pod = KubernetesPodOperator(
        # The ID specified for the task.
        task_id='synthesized-sdk-example',
        # Name of task you want to run, used to generate Pod ID.
        name='synthesized-sdk-example',
        namespace='default',
        image='gcr.io/{GCP-PROJECT-ID}/synthesizer:0.0.1',
        secrets=[secret_env]
    )

Create new Cloud Composer Environment as described here. Upload synthesized_dag.py the environment’s Google Storage bucket according to these instructions. Go to the Airflow UI and wait for the newly uploaded DAG to be discovered by Airflow. Once it’s discovered, a synthesized-sdk-example will be kicked off. Discovery may take up to 5 minutes with default settings. To reduce this time, you can modify Airflow configuration. For example, this will set scan interval to 30 seconds:

[scheduler]
dag_dir_list_interval = 30

Navigate to the task logs. You will see initialization and progress indication logs there.

Once the task is complete, you will see the resulting dataframe contents dumped to the log (and probably truncated because of default Airflow logging setup):

...
[2022-03-22, 13:32:22 UTC] {pod_manager.py:203} INFO -     SeriousDlqin2yrs  ...    DebtRatio
[2022-03-22, 13:32:22 UTC] {pod_manager.py:203} INFO - 0                  0  ...     0.106937
...