Spark SQL UDF (User Defined Functions)…

Apache Spark SQL allows users to define their own functions as in other query engines such as Apache Hive, Cloudera Impala etc.

Here we’ll go through an example of using Spark UDFs in the Java environment.

Spark UDFs

Spark SQL currently supports UDFs up to 22 arguments (UDF1 to UDF22). Check the org.apache.spark.sql.api.java package for these UDF interfaces. A good place to check the usages of UDFs is to take a look at the Java UDF test suite.

A UDF can be defined conveniently in Scala and Java 8 using anonymous functions. For a simple example in Scala (from the Scala UDF test suite),


test("Simple UDF") {
 udf.register("strLenScala", (_: String).length)
 assert(sql("SELECT strLenScala('test')").head().getInt(0) === 4)
 }

test("ZeroArgument UDF") {
 udf.register("random0", () => { Math.random()})
 assert(sql("SELECT random0()").head().getDouble(0) >= 0.0)
 }

test("TwoArgument UDF") {
 udf.register("strLenScala", (_: String).length + (_:Int))
 assert(sql("SELECT strLenScala('test', 1)").head().getInt(0) === 5)
 }

As you could see here, a UDF should be registered with the SQLContext instance.

Scala UDFs in Java 7 

Things are slightly different in Java 7 (and earlier). Here, we would have to implement the corresponding UDF interface depending on the number of arguments in the function.

An simple example from the Spark Java test suite

 @Test
 public void udf1Test() {
 // With Java 8 lambdas:
 // sqlContext.registerFunction(
 // "stringLengthTest", (String str) -> str.length(), DataType.IntegerType);

 sqlContext.udf().register("stringLengthTest", new UDF1<String, Integer>() {
 @Override
 public Integer call(String str) throws Exception {
 return str.length();
 }
 }, DataTypes.IntegerType);

 Row result = sqlContext.sql("SELECT stringLengthTest('test')").head();
 assert(result.getInt(0) == 4);
 }

 @Test
 public void udf2Test() {
 // With Java 8 lambdas:
 // sqlContext.registerFunction(
 // "stringLengthTest",
 // (String str1, String str2) -> str1.length() + str2.length,
 // DataType.IntegerType);

 sqlContext.udf().register("stringLengthTest", new UDF2<String, String, Integer>() {
 @Override
 public Integer call(String str1, String str2) throws Exception {
 return str1.length() + str2.length();
 }
 }, DataTypes.IntegerType);

 Row result = sqlContext.sql("SELECT stringLengthTest('test', 'test2')").head();
 assert(result.getInt(0) == 9);
 }

In Java implementation, we would have to provide a name for the UDF, implementation of the interface and the corresponding return type.

UDF return types 

When working with Spark SQL, It is vital to understand the data types of UDFs. The return types currently supported by Spark SQL are as follows with their corresponding Java type (from Spark SQL DataTypes)

ByteType = byte or Byte
ShortType = short or Short
IntegerType = int or Integer
LongType = long or Long
FloatType = float or Float
DoubleType = double or Double
DecimalType = java.math.BigDecimal
StringType = String
BinaryType = byte[]
BooleanType = boolean or Boolean
TimestampType = java.sql.Timestamp
DateType = java.sql.Date
ArrayType = java.util.List
MapType = java.util.Map
StructType = org.apache.spark.sql.Row
StructField = The value type in Java of the data type of this field (For example, int for a StructField with the data type IntegerType)

A slightly different example from WSO2 DAS 

WSO2 DAS allows users to register their own UDFs and use those in the Carbon Analytics scripts and Console.

Here the users can create their UDF implementing the corresponding interface and put it in the Carbon OSGI environment. Analytics core will register these classes using Java reflection.

Here is an example UDF which implements UDF3 interface


public class CompositeID implements UDF3<Integer, Integer, Integer, List<String>>, Serializable{

private static final long serialVersionUID = -9073977448886578865L;

@Override
 public List<String> call(Integer integer, Integer integer2, Integer integer3) throws Exception {
 ArrayList<String> arr = new ArrayList<>();
 arr.add(integer.toString());
 arr.add(integer2.toString());
 arr.add(integer3.toString());

return arr;
 }
}

As you could see here, the function accepts three integers and returns a list of strings.

This UDF will be registered under the SQLContext as follows inside the Ananlytics Core.


 private void registerUDFs(SQLContext sqlCtx)
 throws IllegalAccessException, InvocationTargetException, InstantiationException,
 ClassNotFoundException, NoSuchMethodException {
Class<?> udf1 = Class.forName("org.wso2.carbon.analytics.spark.core.udf.CompositeID");


 Object instance1 = udf1.newInstance();
 sqlCtx.udf().register("compositeID", (UDF3<?, ?, ?, ?>) instance1, DataTypes.createArrayType(DataTypes.StringType));
 }

Here I have used reflection to instantiate the UDF. Note the return type here. Since we are returning a List<String> here, we need to give the matching Spark return DataType. Corresponding type for  java.util.List is ArrayType. ArrayType objects can be instantiated using the DataTypes.createArrayType() factory method. Since we are returning a List <String>, we need to create an ArrayType of type DataTypes.StringType.

The UDF can then be queried as follows..


INSERT INTO TABLE plug_usage SELECT MAX(value) - MIN (value) AS usage, compositeID(house_id, household_id, plug_id) AS composite_id FROM debsData WHERE property = false GROUP BY house_id, household_id, plug_id