Pyspark in Databricks

Pathan
3 min readOct 12, 2023

--

I believe most of you are already working with databricks and uses pyspark for data wrangling and ETL tasks. I want to write some nuances that I found working with pyspark, and it can be useful for few folks at least.

  1. Choosing the right cluster

I chose the smallest cluster 8GB and 2 cores with two workers and auto-scaling enabled. It is a mistake. Below are two pointers on how to select the clusters

a. Decide on the data being used. For example, if there are going to be lot of small dataframes, it is good to choose smaller cluster with more workers to distribute the tasks to. If there are very large dataframes (>10M records), better to choose extra large cluster with few workers and enable autoscaling. This reduces redistribution of data across. Lot of times we might think using a smaller cluster will help save costs, but in long time, the larger cluster will execute the same workload in half of the time.

b. Some usecases require more memory to hold dataframes till the end of execution.

c. When a very large dataframe(>5M records) has to be joined with a smaller dataframe, ensure to force distribute the smaller dataframe across which reduces the processing time.

large_df1.join(broadcast(small_df), on=["common_column"], how="left") \
.select(
large_df1["*"],
small_df["column2"]
)

broadcast will force the smaller dataframe to redistrube rather than large one. We should own the data redistribution across the cluster. If we know the work loads, better fix the infra, rather than relying on autoscaling.

2. Working with pyspark

a. Pyspark is lazy execution. It means the commands that you submit will be stored and added to execution plan. It will really execute the commands by optimizing all of them when we execute save, cache, persist, display, count etc. So, hoarding lot of complex transformations will take a lot of time to execute. I followed the below steps to reduce the time and better performance. Example

transformation-1

transformation-2

transformation-3

save file

read from file

transformation-4

i. Write to a file often when few transformations are complete and read them back to dataframe.

df.write.format('avro').mode("overwrite").save(df_target_file_path)

ii. Choose whether to use avro or parquet. Avro is more favorable,when you want to read the whole row in the next steps rather than few columns. Parquet stores data in columnar format, better used when you are reading few colums and values. Both are way better than plain csv

iii. I save the files using a schema, so we can ensure the data gets stored as we expect them

df.schema
StructType([StructField('Column1', StringType(), True), StructField('Column2', decimal(25,4), True)

We can force the file to save using the schema we want.

df.write.format('avro').option("forceSchema", myCustomSchemaString).mode("overwrite").save(df_target_file_path)

iv. While reading the file back, we can enforce the schema to read the file faster. Never use inferSchema:True unless you don’t know the file exactly. If you are processing a monthly file, save the schema, and use it while loading the file. It will be fast. (5–10x)

df= spark.read.option("forceSchema", myCustomSchemaString).format("avro").load(df_target_file_path)

v. Creating a parquet file will create several files with default partition. Lot of times, we end up using default partition and files will be too huge to load or read. We can partition them by forcing the system not to load more than 10000 or some records in a partition.

df.write.option("maxRecordsPerFile", 10000).save(df_target_file_path)

--

--

Pathan
Pathan

Written by Pathan

SQL-DB Architect, Data Scientist, Learning full-stack development

No responses yet