Sail 0.2.1: Enhanced UDF Support and Steps towards Full Spark Parity
The LakeSail Team
January 15, 2025
LakeSail is excited to announce version 0.2.1 of Sail. This release improves Sail’s compatibility with Spark, with improvements to both SQL and DataFrame APIs. More importantly, Sail now comes with comprehensive support for PySpark user-defined functions (UDFs), bridging the gap between traditional ETL workloads and AI applications. With version 0.2.1, Sail is ready for an even wider range of data processing needs.
For the tests mined from the Spark codebase, 2,754 (72.5%) of 3,800 tests are now successful on Sail, marking a significant increase from the 2,504 (65.7%) of 3,809 tests supported in 0.2.0 (with 9 fragile tests skipped in this release). The implemented features already support a broad range of data analytics tasks, including all 22 queries from the derived TPC-H benchmark and 94 of the 99 queries from the derived TPC-DS benchmark (up from 79 in version 0.2.0).
Sail with PySpark UDFs
The most notable feature in Sail 0.2.1 is comprehensive support for PySpark UDFs. A PySpark UDF allows you to integrate custom data processing logic written in Python with queries written in SQL or DataFrame APIs. The most straightforward UDF transforms a single column of tabular data, one row at a time. Here is an example:
from pyspark.sql.functions import udf
@udf("string")
def upper(s):
return s.upper() if s is not None else s
spark.sql("SELECT 'hello' as v").select(upper("v")).show()
# +--------+
# |upper(v)|
# +--------+
# | HELLO|
# +--------+
Despite the simplicity of the UDF concept, Spark offers a rich set of UDF types that allow you to transform a single record, a group of records in aggregation queries, or a partition of the dataset. The Python function can accept Python objects, Pandas objects, or Arrow record batches. The Sail documentation offers dozens of PySpark UDF examples that can serve as a cheatsheet for anyone who needs to navigate through such complexity.
We believe UDFs will serve as the bridge between traditional ETL workloads and AI applications. UDF support has been one of our top priorities since the creation of Sail, and the initial effort dates back to April 2024. We have now arrived at an implementation that we are generally happy with and are surprised by how far we have come when looking back. We now support all the PySpark UDF types except one (the experimental applyInPandasWithState()
method of pyspark.sql.GroupedData
). The correctness of our implementation is shown by hundreds of passed UDF tests in the PySpark unit test suite. We will continue tackling the remaining failed tests, many of which merely define how the API should behave given invalid input.
The beauty of Sail’s UDF is its performance boost without changing a single line of your PySpark code. In Spark, the ETL code runs in JVM, so the data must be moved between JVM and the Python worker process that runs your UDF. The data serialization overhead is the key reason why PySpark is known to be slow. In Sail, the Python interpreter runs in the same process as the Rust-based query execution engine. This means your Python UDF code shares the memory space with the ETL code that manages your data. This is beneficial for all UDF types, especially for Pandas UDFs (introduced in Spark 2.3), since the conversion between Sail’s internal data format (Arrow) and Pandas objects can be zero-copy for certain table schemas and data distributions.
The most performant UDF type, in our view, is the Arrow UDF (introduced in Spark 3.3), which can be utilized with the mapInArrow()
method of pyspark.sql.DataFrame
. The Arrow UDF accepts an iterator of Arrow record batches from one data partition, and returns an iterator for the transformed record batches for that partition. Here is an example showing how you can use the Arrow UDF to filter partitioned data and return a different number of records:
import pyarrow.compute as pc
def transform(iterator):
for batch in iterator:
yield batch.filter(pc.utf8_length(pc.field("text")) > 2)
df = spark.createDataFrame([(1, "Hello"), (2, "Hi")], ["id", "text"])
df.mapInArrow(transform, schema=df.schema).show()
# +---+-----+
# | id| text|
# +---+-----+
# | 1|Hello|
# +---+-----+
With Arrow UDFs, no data copy or serialization occurs when calling the Python function from Rust. The Rust-based query engine and the Python interpreter see the same data in the same memory space. This means you can operate on large datasets in your Python code (e.g., for AI model inference) without worrying about the overhead! This is the latest demonstration of how Sail is working towards a unified solution for data processing and AI.
Join the Slack Community
As we introduce version 0.2.1 of Sail, we are also thrilled to unveil our new Slack community. This workspace aims to improve interactions among our development team, users, and contributors, providing a more direct avenue for discussion and collaboration. We invite you to join our community on Slack and engage in the project on GitHub.
As an open-source project, we greatly value your involvement and insights in driving Sail’s ongoing development and success. Together, we aim to make a meaningful impact on the ever-demanding data processing needs of our time.
Get in Touch
If you are interested in Sail as a managed service, LakeSail offers flexible enterprise support options for Sail, including managed deployment on Kubernetes. Get in touch to learn more.
Get started with Sail 0.2.1 today!