Memory Leaks When Using Pandas_udf And Parquet Serialization?
Solution 1:
I wanted to comment to your post, but my reputation is too low.
According to my experience udf slow down your performance drastically, especially if you write them in python (or pandas?). There is an article, why you shoudn't use python udfs and use scala udfs instead: https://medium.com/wbaa/using-scala-udfs-in-pyspark-b70033dd69b9
In my case it was possible to use built-in functions, even it was pretty complicated, and the runtime decreased to about 5% compared to before.
For your OOM Error and why a repartition worked for you, I have no explanation. The only advice I can give you is to avoid UDFs as much as possible, although it seems to be not that easy in your case.
Solution 2:
This thread is a bit old, but I stumped across the exact same problem and spent quite a few hours ont it. So I just wanted to explain how I solve it, with the hope that it saves some hours for anyone else hitting the same issue in the future.
The problem here is not related to pandas_udf
or parquet, but with the use of withColumn
to generate the columns. When adding multiple columns to a dataframe is way more efficient to use the select
method. This article explains why.
So for example, instead of
for j in range(z):
df = df.withColumn(
f"N{j}",
F.col("ID") + float(j)
)
you should write
df = df.select(
*df.columns,
*[(F.col("ID") + float(j)).alias(f"N{j}") for j in range(z)]
)
The rewritten script looks like this (Note that I still had to increment the driver memory to 2GB, but at least is quite a reasonable amount of memory)
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as spktyp
# Dummy pandas_udf -------------------------------------------------------------@F.pandas_udf(spktyp.DoubleType())defpredict(x):
return x + 100.0# Initialization ---------------------------------------------------------------
spark = (pyspark.sql.SparkSession.builder
.appName("mre")
.config("spark.driver.memory", "2g")
.master("local[3]").getOrCreate())
sc = spark.sparkContext
# Generate a dataframe ---------------------------------------------------------
out_path = "out.parquet"
z = 105
m = 750000
schema = spktyp.StructType(
[spktyp.StructField("ID", spktyp.DoubleType(), True)]
)
df = spark.createDataFrame(
[(float(i),) for i inrange(m)],
schema
)
df = df.select(
*df.columns,
*[(F.col("ID") + float(j)).alias(f"N{j}") for j inrange(z)]
)
df = df.withColumn(
"X",
F.array(
F.lit("A"),
F.lit("B"),
F.lit("C"),
F.lit("D"),
F.lit("E")
).getItem(
(F.rand()*3).cast("int")
)
)
# Set the column names for grouping, input and output --------------------------
group_col = "X"
in_col = "N0"
out_col = "EP"# Extract different group ids in grouping variable -----------------------------
rows = df.select(group_col).distinct().collect()
groups = [row[group_col] for row in rows]
print(f"Groups: {groups}")
# Split and treat the first id -------------------------------------------------
first, *others = groups
cur_df = df.filter(F.col(group_col) == first)
result = cur_df.withColumn(
out_col,
predict(in_col)
)
# Traverse the remaining group ids ---------------------------------------------for i, other inenumerate(others):
cur_df = df.filter(F.col(group_col) == other)
new_df = cur_df.select(
*cur_df.columns,
predict(in_col).alias(out_col)
)
# Incremental union --------------------------------------------------------
result = result.unionByName(new_df)
# Save to disk -----------------------------------------------------------------
result.write.mode("overwrite").parquet(out_path)
Post a Comment for "Memory Leaks When Using Pandas_udf And Parquet Serialization?"