Scheduling with Cloud Composer#

Cloud Composer is a fully managed workflow orchestration service built on Apache Airflow.

A situation where Synthesized would benefit from Airflow is when creating an overnight scheduled batch. For this example we will use a simple script adapted from single table synthesis guide:


import synthesized

df = synthesized.util.get_example_data()
df_meta = synthesized.MetaExtractor.extract(df)

synthesizer = synthesized.HighDimSynthesizer(df_meta)

data = synthesizer.synthesize(num_rows=42)
print(data) # dump the dataframe to stdout


For demonstration purposes, we dump the produced dataframe to the stdout. In real-life scenarios, the synthesized data will typically be written to a database or other kind of storage.

To run Airflow workloads with Composer, we will build a Docker image with preinstalled Synthesized SDK and the script above. We will then write, deploy and launch a DAG that will run synthesis using KubernetesPodOperator.

Building Docker image#

Dockerfile should be created with the following content:

# Dockerfile


CMD python /

With this file in the current directory, build and push the Docker image to GCR:

docker build -t{{ GCP-PROJECT-ID }}/synthesizer:0.0.1 .
docker push{{ GCP-PROJECT-ID }}/synthesizer:0.0.1


You can read more on how to work with GCR here:

Authoring and deploying Airflow DAG#

Now that the Docker image published to GCR, we can create and publish an Airflow DAG. Before that, we need to setup a Kubernetes secret that will hold the Synthesized licence key:

kubectl create secret generic synthesized --from-literal licence-key={{SYNTHESIZED-LICENCE-KEY}}

This secret will be later injected as an environment variable when Kubernetes will launch the pod.

The DAG definition is as follows:


import datetime

from airflow import models
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.kubernetes.secret import Secret

YESTERDAY = - datetime.timedelta(days=1)

secret_env = Secret(
    # The name of the environment variable to inject the secret value to
    # Name of the Kubernetes Secret we created previously
    # Key of a secret stored in this Secret object

with models.DAG(
        start_date=YESTERDAY) as dag:
    kubernetes_min_pod = KubernetesPodOperator(
        # The ID specified for the task.
        # Name of task you want to run, used to generate Pod ID.

Create new Cloud Composer Environment as described here. Upload the environment’s Google Storage bucket according to these instructions. Go to the Airflow UI and wait for the newly uploaded DAG to be discovered by Airflow. Once it’s discovered, a synthesized-sdk-example will be kicked off. Discovery may take up to 5 minutes with default settings. To reduce this time, you can modify Airflow configuration. For example, this will set scan interval to 30 seconds:

dag_dir_list_interval = 30

Navigate to the task logs. You will see initialization and progress indication logs there, like this:

Training stage 3/5:  42%|████▏
Training stage 3/5:  42%|████▏
Training stage 3/5:  42%|████▏
Training stage 3/5:  42%|████▏
Training stage 3/5:  42%|████▏
Training stage 3/5:  42%|████▏
Training stage 3/5:  42%|████▏

Once the task is complete, you will see the resulting dataframe contents dumped to the log (and probably truncated because of default Airflow logging setup):

[2022-03-22, 13:32:22 UTC] {} INFO -     SeriousDlqin2yrs  ...    DebtRatio
[2022-03-22, 13:32:22 UTC] {} INFO - 0                  0  ...     0.106937