Cloud Composer
Cloud Composer is a fully managed workflow orchestration service built on Apache Airflow.
A situation where Synthesized would benefit from Airflow is when creating an overnight scheduled batch. For this example we will use a simple script adapted from single table synthesis guide:
import synthesized
df = synthesized.util.get_example_data()
df_meta = synthesized.MetaExtractor.extract(df)
synthesizer = synthesized.HighDimSynthesizer(df_meta)
synthesizer.learn(df)
data = synthesizer.synthesize(num_rows=42)
print(data) (1)
1 | Dump the data to stdout. |
For demonstration purposes, we dump the produced dataframe to the stdout. In real-life scenarios, the synthesized data will typically be written to a database or other kind of storage. |
To run Airflow workloads with Composer, we will build a Docker image with preinstalled Synthesized SDK and the script above.
We will then write, deploy and launch a DAG that will run synthesis using KubernetesPodOperator
.
Building Docker image
Dockerfile should be created with the following content:
FROM synthesizedio.jfrog.io/synthesized-docker/synthesized:1.8
ADD synthesis.py /
CMD python /synthesis.py
With this file in the current directory, build and push the Docker image to GCR:
docker build -t gcr.io/{{ GCP-PROJECT-ID }}/synthesizer:0.0.1 .
docker push gcr.io/{{ GCP-PROJECT-ID }}/synthesizer:0.0.1
You will need to have docker authenticated with your GCP account with permissions to push to GCR: https://cloud.google.com/container-registry/docs/advanced-authentication. You can read more on how to work with GCR here: https://cloud.google.com/container-registry/docs |
Authoring and deploying Airflow DAG
Now that the Docker image published to GCR, we can create and publish an Airflow DAG. Before that, we need to setup a Kubernetes secret that will hold the Synthesized licence key:
kubectl create secret generic synthesized --from-literal licence-key={{SYNTHESIZED-LICENCE-KEY}}
This secret will be later injected as an environment variable when Kubernetes will launch the pod.
The DAG definition is as follows:
import datetime
from airflow import models
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.kubernetes.secret import Secret
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
secret_env = Secret(
deploy_type='env',
# The name of the environment variable to inject the secret value to
deploy_target='SYNTHESIZED_KEY',
# Name of the Kubernetes Secret we created previously
secret='synthesized',
# Key of a secret stored in this Secret object
key='licence-key')
with models.DAG(
dag_id='synthesized',
schedule_interval=datetime.timedelta(days=1),
start_date=YESTERDAY
) as dag:
kubernetes_min_pod = KubernetesPodOperator(
# The ID specified for the task.
task_id='synthesized-sdk-example',
# Name of task you want to run, used to generate Pod ID.
name='synthesized-sdk-example',
namespace='default',
image='gcr.io/{GCP-PROJECT-ID}/synthesizer:0.0.1',
secrets=[secret_env]
)
Create new Cloud Composer Environment as described here.
Upload synthesized_dag.py
the environment’s Google Storage bucket according to these instructions.
Go to the Airflow UI and wait for the newly uploaded DAG to be discovered by Airflow. Once it’s discovered, a synthesized-sdk-example
will be kicked off. Discovery may take up to 5 minutes with default settings.
To reduce this time, you can modify Airflow configuration. For example, this will set scan interval to 30 seconds:
[scheduler]
dag_dir_list_interval = 30
Navigate to the task logs. You will see initialization and progress indication logs there.
Once the task is complete, you will see the resulting dataframe contents dumped to the log (and probably truncated because of default Airflow logging setup):
...
[2022-03-22, 13:32:22 UTC] {pod_manager.py:203} INFO - SeriousDlqin2yrs ... DebtRatio
[2022-03-22, 13:32:22 UTC] {pod_manager.py:203} INFO - 0 0 ... 0.106937
...