Scheduling with Airflow

Airflow is a platform to programmatically author, schedule and monitor workflows. The user can create Directed Acyclic Graphs (DAGs) of tasks to set dependencies between them, schedule the workflow and they will be run in the provided order.

A situation where Synthesized would benefit from Airflow is when creating an overnight scheduled batch. For this example, we have a Python function populate_data() that populates the data source with new data.

Then, we create a function synthesize() that reads the data and synthesizes the same amount of samples (read more in the Single Table Guide):

import pandas as pd
import synthesized

def populate_data(df):
    """Example function that would populate data in our source database"""
    pass

def synthesize(df):
    df = pd.from_csv("[source]")

    # Extract meta-information and learn it
    df_meta = synthesized.MetaExtractor.extract(df)
    synthesizer = synthesized.HighDimSynthesizer(df_meta)
    synthesizer.learn(df_train=df)

    # Synthesize and store the results
    df_synthesized = synthesizer.synthesize(num_rows=len(df))
    df_synthesized.to_csv("[destination]")

Finally, we create a DAG called 'Data Synthesis Overnight Batch', that will be executed daily, starting from the 1st of January 2022, and will sequentially run the following steps:

  1. Execute populate_data() to populate the data source.

  2. Execute synthesize() to create a synthetic copy of it.

  3. Send an email to admin@synthesized.io confirming that the execution was finished.

from datetime import datetime

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

# Create Directed Acyclic Graph (DAG)
dag = DAG('data_synthesis', description='Data Synthesis Overnight Batch',
            schedule_interval='@daily', start_date=datetime(2022, 1, 1))

# Create operations and add them to the DAG
populate_data_op = PythonOperator(task_id='populate_data', python_callable=populate_data, dag=dag)
synthesis_op = PythonOperator(task_id='synthesize', python_callable=synthesize, dag=dag)
email_op = DummyOperator(task_id='send_email', dag=dag)

# Define dependencies
populate_data_op >> synthesis_op >> email_op

Example Dockerfile and docker-compose configuration files are given below:

FROM apache/airflow:2.2.1-python3.8
COPY ./synthesized-wheels /wheelhouse
RUN pip install -U pip
RUN pip install --no-cache-dir /wheelhouse/*.whl plyvel
   # .env
   AIRFLOW_IMAGE_NAME=ghcr.io/synthesized-io/synthesized-airflow:1.8rc0-airflow2.2.1-python3.8
   AIRFLOW_UID=0
   SYNTHESIZED_KEY={{SYNTHESIZED-LICENCE-KEY}}
# docker-compose.yaml

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Basic Airflow cluster configuration for CeleryExecutor with Synthesized, Redis and PostgreSQL.
---
version: '3'
x-airflow-common:
    &airflow-common
    image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3}
    build: .
    environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    SYNTHESIZED_KEY: ${SYNTHESIZED_KEY}
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
    volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    user: "${AIRFLOW_UID:-50000}:0"
    depends_on:
    &airflow-common-depends-on
    redis:
        condition: service_healthy
    postgres:
        condition: service_healthy

services:
    postgres:
    image: postgres:13
    environment:
        POSTGRES_USER: airflow
        POSTGRES_PASSWORD: airflow
        POSTGRES_DB: airflow
    volumes:
        - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
        test: ["CMD", "pg_isready", "-U", "airflow"]
        interval: 5s
        retries: 5
    restart: always

    redis:
    image: redis:latest
    expose:
        - 6379
    healthcheck:
        test: ["CMD", "redis-cli", "ping"]
        interval: 5s
        timeout: 30s
        retries: 50
    restart: always

    airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
        - 8080:8080
    healthcheck:
        test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
        interval: 10s
        timeout: 10s
        retries: 5
    restart: always
    depends_on:
        <<: *airflow-common-depends-on
        airflow-init:
        condition: service_completed_successfully

    airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
        test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
        interval: 10s
        timeout: 10s
        retries: 5
    restart: always
    depends_on:
        <<: *airflow-common-depends-on
        airflow-init:
        condition: service_completed_successfully

    airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
        test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
        interval: 10s
        timeout: 10s
        retries: 5
    environment:
        <<: *airflow-common-env
        DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
        <<: *airflow-common-depends-on
        airflow-init:
        condition: service_completed_successfully

    airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
        test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
        interval: 10s
        timeout: 10s
        retries: 5
    restart: always
    depends_on:
        <<: *airflow-common-depends-on
        airflow-init:
        condition: service_completed_successfully

    airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    command:
        - -c
        - |
        function ver() {
            printf "%04d%04d%04d%04d" $${1//./ }
        }
        airflow_version=$$(gosu airflow airflow version)
        airflow_version_comparable=$$(ver $${airflow_version})
        min_airflow_version=2.2.0
        min_airflow_version_comparable=$$(ver $${min_airflow_version})
        if (( airflow_version_comparable < min_airflow_version_comparable )); then
            echo
            echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
            echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
            echo
            exit 1
        fi
        if [[ -z "${AIRFLOW_UID}" ]]; then
            echo
            echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
            echo "If you are on Linux, you SHOULD follow the instructions below to set "
            echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
            echo "For other operating systems you can get rid of the warning with manually created .env file:"
            echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"
            echo
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
            echo
            echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
            echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
            echo
            warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
            echo
            echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
            echo "At least 2 CPUs recommended. You have $${cpus_available}"
            echo
            warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
            echo
            echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
            echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
            echo
            warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
            echo
            echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
            echo "Please follow the instructions to increase amount of resources available:"
            echo "   https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
            echo
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    environment:
        <<: *airflow-common-env
        _AIRFLOW_DB_UPGRADE: 'true'
        _AIRFLOW_WWW_USER_CREATE: 'true'
        _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
        _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
    user: "0:0"
    volumes:
        - .:/sources

    airflow-cli:
    <<: *airflow-common
    profiles:
        - debug
    environment:
        <<: *airflow-common-env
        CONNECTION_CHECK_MAX_COUNT: "0"
    command:
        - bash
        - -c
        - airflow

    flower:
    <<: *airflow-common
    command: celery flower
    ports:
        - 5555:5555
    healthcheck:
        test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
        interval: 10s
        timeout: 10s
        retries: 5
    restart: always
    depends_on:
        <<: *airflow-common-depends-on
        airflow-init:
        condition: service_completed_successfully

volumes:
    postgres-db-volume:
Airflow full documentation can be found here: https://airflow.apache.org/docs/apache-airflow/stable/index.html