Calling another custom Python function from Pyspark UDF
PySpark, often known as Python API for Apache Spark, was created for distributed data processing. It gives users the ability to efficiently and scalable do complex computations and transformations on large datasets. User-Defined Functions (UDFs), which let users create their unique functions and apply them to Spark DataFrames or RDDs, which is one of the main features of PySpark. Using UDFs, PySpark’s capabilities may be expanded and customized to meet certain needs. In this article, we will learn how to call another custom Python function from Pyspark UDF.
Calling Another Custom Python Function from Pyspark UDF
Python-coded PySpark UDFs provide the ability to call other Python functions, whether they are built-in or user-defined functions from outside libraries. By enabling users to make use of existing Python code, this feature improves the modularity and reusability of UDFs. Within the distributed PySpark environment, users may easily implement their domain-specific logic, carry out challenging calculations, or use cutting-edge algorithms. Users may take advantage of the full potential of Python’s vast ecosystem of libraries and features by invoking Python functions from PySpark UDFs.
Steps to Call another Custom Python Function from a PySpark UDF
Let us see a step-by-step process to call another custom Python function from a Pyspark UDF.
Step 1: Import the necessary modules
First, import the ‘udf’ from the ‘pyspark.sql.functions’ module, which offers tools for dealing with Spark DataFrames.
from pyspark.sql.functions import udf
Step 2: Start Spark Session
Next, create a spark session by importing the necessary spark modules.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Step 3: Create a Dataframe
The next step is to create a dataframe that will be used to perform the operations on in spark.
data = [("Marry", 25), ("Sunny", 30), ("Ram", 35)]
df = spark.createDataFrame(data, ["name", "age"])
Step 4: Define the custom Python function
Then define a custom Python function that we wish to invoke from the PySpark UDF. We can use any logic or calculations we need in this function. For example, a function to convert a string to an upper case string.
def to_uppercase(string):
return string.upper()
Step 5: Create a PySpark UDF
Use the UDF function from the ‘pyspark.sql.functions’ module to construct a PySpark UDF after creating the custom Python function. The ‘udf()’ function should receive the custom Python function as an argument. The custom function is registered as a UDF, so that it may be applied to DataFrame columns.
to_uppercase_udf = udf(to_uppercase)
Step 6: Apply the UDF to a DataFrame
After creating the PySpark UDF, use the ‘withColumn()’ function to apply it to a DataFrame column. In the DataFrame, this method adds a new column or deletes an existing column. Each row of the DataFrame will call the UDF once, applying the custom Python function to the designated column and producing the desired result.
df = df.withColumn("name_uppercase", to_uppercase_udf(df["name"]))
Step 7: Display the DataFrame
At last, we will use the ‘show()’ function to display the dataframe to see the changes made to it.
df.show()
By following these instructions, we can execute customized calculations and transformations on the PySpark DataFrames by calling another custom Python function from a PySpark UDF.
Example to Call another Custom Python Function from a PySpark UDF
Now, let us see a few different examples of calling Python custom functions from a Pyspark UDF.
Example 1: Converting a DataFrame column to uppercase
In this example, we will create a spark dataframe ‘df’ using Pyspark that contains names and ages of people. Then we will define a custom Python function ‘to_uppercase()’ which takes a Python String as an argument and converts it to the upper case and stores the result in a new column of that dataframe. Then we created Pyspark UDF using Pyspark’s ‘udf()‘ function.
Python3
# import modules from pyspark.sql import SparkSession from pyspark.sql.functions import udf # Defining the custom Python function here def to_uppercase(string): return string.upper() # Now we will create a SparkSession spark = SparkSession.builder.getOrCreate() # Now create a DataFrame data = [( "Marry" , 25 ), ( "Sunny" , 30 ), ( "Ram" , 35 )] df = spark.createDataFrame(data, [ "name" , "age" ]) # Make a PySpark UDF now to_uppercase_udf = udf(to_uppercase) # Now Apply the UDF to the 'name' column. df = df.withColumn( "name_uppercase" , to_uppercase_udf(df[ "name" ])) # Function to show the DataFrame df.show() |
Output:
Example 2: Calling a custom Python function that combines multiple DataFrame columns
In this example, we will create a dataframe that contains 2 columns – ‘first_name‘ and ‘last_name‘. Then create a Python custom function ‘combine_columns‘ which takes the ‘first_name’ and ‘last_name’ as parameters and returns a column that combines them together to create ‘full_name’‘.
Python3
# import modules from pyspark.sql import SparkSession from pyspark.sql.functions import udf # Defining the custom Python function def combine_columns(col1, col2): return col1 + " " + col2 # Now create a SparkSession spark = SparkSession.builder.getOrCreate() # Now create a DataFrame data = [( "John" , "Doe" ), ( "Ram" , "Kumar" ), ( "Smith" , "Jones" )] df = spark.createDataFrame(data, [ "first_name" , "last_name" ]) # Make a PySpark UDF combine_columns_udf = udf(combine_columns) # Apply the UDF to the 'first_name' and 'last_name' columns df = df.withColumn( "full_name" , combine_columns_udf(df[ "first_name" ], df[ "last_name" ])) # Function to show the DataFrame df.show() |
Output:
Example 3: Calling a Custom Python Function from PySpark UDF with External Libraries
For more complex calculations, PySpark enables us to use external Python libraries within bespoke functions. Assume we wish to use the fuzzy matching library ‘fuzzywuzzy’ and a custom Python method named ‘calculate_similarity’ to compare the similarity between two texts.
In this example, we import the ‘fuzz’ module from the fuzzywuzzy library in Python and use the ‘fuzz.ratio()‘ function to determine the degree of similarity between two texts. We create the unique Python method ‘calculate_similarity()‘ to use the input strings to invoke the ‘fuzz.ratio()’ algorithm. Using the ‘udf()’ function, we build a UDF named ‘similarity_udf’ and define the input and output types. Finally, we use the ‘withColumn()’ method to apply the UDF to the ‘string1’ and ‘string2’ columns, and the resultant DataFrame with the similarity ratios is presented.
Python3
# import modules from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType from fuzzywuzzy import fuzz # Creating a SparkSession spark = SparkSession.builder.getOrCreate() # Sample DataFrame taken with columns 'string1' and 'string2' data = [( "apple" , "apples" ), ( "banana" , "bananas" ), ( "cat" , "dog" )] df = spark.createDataFrame(data, [ "string1" , "string2" ]) # Creating a Custom Python function def calculate_similarity(str1, str2): return fuzz.ratio(str1, str2) # Creating a UDF from the custom function similarity_udf = udf(calculate_similarity, IntegerType()) # Apply the UDF to calculate similarity df.withColumn( "similarity" , similarity_udf(df[ "string1" ], df[ "string2" ])).show() |
Output:
Example 4: Applying a Custom Python Function with Complex Logic
Let’s look at an example where we have a DataFrame with a column of strings representing sentences and we want to use a custom Python function called ‘count_words’ to determine how many words are present in each phrase.
In this illustration, the custom Python function ‘count_words’ uses the ‘split()’ method to break the input text up into words and uses the ‘len()’ function to get the word count. Using the ‘udf()’ function, we build a UDF named ‘count_udf’ and define the input and output types. Finally, we use the ‘withColumn()’ method to apply the UDF to the “sentence” column, and the resultant DataFrame with the word counts is presented.
Python3
# import modules from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType # Creating a SparkSession spark = SparkSession.builder.getOrCreate() # Sample DataFrame with a column 'sentence' data = [( "Hello, PySpark!" ,), ( "PySpark is great in today's world" ,), ( "Spark DataFrames are powerful in python to work on" ,)] df = spark.createDataFrame(data, [ "sentence" ]) # Creating a Custom Python function def count_words(sentence): return len (sentence.split()) # Creating a UDF from the custom function count_udf = udf(count_words, IntegerType()) # Apply the UDF to count words in each sentence df.withColumn( "word_count" , count_udf(df[ "sentence" ])).show() |
Output:
Contact Us