Synthesizing with PySpark

Spark/PySpark allows for fast large scale data processing across machines on a cluster. For now, the Synthesized package is designed to run on a single machine and have all data located within memory. To incorporate this process into a Spark pipeline, users will need to accumulate a sample of the data they are interested in within a single machine and train the Synthesizer there.

Synthesized requires data to be in the pandas.DataFrame format, the PySpark method toPandas converts a Spark DataFrame into the required format. spark.createDataFrame will convert a synthesized pandas.DataFrame back to Spark.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df_spark = spark.createDataFrame([
    Row(a=1, b=2., c='string1'),
    Row(a=2, b=3., c='string2'),
    Row(a=4, b=5., c='string3'),
    Row(a=1, b=2., c='string1'),
    Row(a=2, b=3., c='string2'),
    Row(a=4, b=5., c='string3'),
    Row(a=1, b=2., c='string1'),
    Row(a=2, b=3., c='string2'),
    Row(a=4, b=5., c='string3'),
])  # create simple Spark DataFrame

df = df_spark.sample(fraction=2/3).toPandas() # sample a subset, collect and convert to pandas

Now we have a sample of data in a pandas.DataFrame, we can use the standard workflow for the Synthesized package, documented in the Single Table Guide.

import synthesized
df_meta = synthesized.MetaExtractor.extract(df)
synth = synthesized.HighDimSynthesizer(df_meta)
synth.fit(df)
df_synth = synth.sample(num_rows=100)
df_synth_spark = spark.createDataFrame(df_synth) # convert back to spark

Incorporating into a distributed pipeline

The key step in this process is

df = df_spark.sample(fraction=2/3).toPandas()

Here we grab a sample of the data and load it into memory on the driver node of the cluster, it is therefore important that the amount of data that is sampled easily fits into memory and that the driver node has a good amount of processing power to train the Synthesizer. If instead, it is desirable to use a worker node to run the Synthesizer, it is possible to set up the Synthesized package on that worker node and to remotely execute a similar script.