Spark SQL UDF (User Defined Functions)…

Apache Spark SQL allows users to define their own functions as in other query engines such as Apache Hive, Cloudera Impala etc.

Here we’ll go through an example of using Spark UDFs in the Java environment.

Spark UDFs

Spark SQL currently supports UDFs up to 22 arguments (UDF1 to UDF22). Check the org.apache.spark.sql.api.java package for these UDF interfaces. A good place to check the usages of UDFs is to take a look at the Java UDF test suite.

A UDF can be defined conveniently in Scala and Java 8 using anonymous functions. For a simple example in Scala (from the Scala UDF test suite),


test("Simple UDF") {
 udf.register("strLenScala", (_: String).length)
 assert(sql("SELECT strLenScala('test')").head().getInt(0) === 4)
 }

test("ZeroArgument UDF") {
 udf.register("random0", () => { Math.random()})
 assert(sql("SELECT random0()").head().getDouble(0) >= 0.0)
 }

test("TwoArgument UDF") {
 udf.register("strLenScala", (_: String).length + (_:Int))
 assert(sql("SELECT strLenScala('test', 1)").head().getInt(0) === 5)
 }

As you could see here, a UDF should be registered with the SQLContext instance.

Scala UDFs in Java 7 

Things are slightly different in Java 7 (and earlier). Here, we would have to implement the corresponding UDF interface depending on the number of arguments in the function.

An simple example from the Spark Java test suite

 @Test
 public void udf1Test() {
 // With Java 8 lambdas:
 // sqlContext.registerFunction(
 // "stringLengthTest", (String str) -> str.length(), DataType.IntegerType);

 sqlContext.udf().register("stringLengthTest", new UDF1<String, Integer>() {
 @Override
 public Integer call(String str) throws Exception {
 return str.length();
 }
 }, DataTypes.IntegerType);

 Row result = sqlContext.sql("SELECT stringLengthTest('test')").head();
 assert(result.getInt(0) == 4);
 }

 @Test
 public void udf2Test() {
 // With Java 8 lambdas:
 // sqlContext.registerFunction(
 // "stringLengthTest",
 // (String str1, String str2) -> str1.length() + str2.length,
 // DataType.IntegerType);

 sqlContext.udf().register("stringLengthTest", new UDF2<String, String, Integer>() {
 @Override
 public Integer call(String str1, String str2) throws Exception {
 return str1.length() + str2.length();
 }
 }, DataTypes.IntegerType);

 Row result = sqlContext.sql("SELECT stringLengthTest('test', 'test2')").head();
 assert(result.getInt(0) == 9);
 }

In Java implementation, we would have to provide a name for the UDF, implementation of the interface and the corresponding return type.

UDF return types 

When working with Spark SQL, It is vital to understand the data types of UDFs. The return types currently supported by Spark SQL are as follows with their corresponding Java type (from Spark SQL DataTypes)

ByteType = byte or Byte
ShortType = short or Short
IntegerType = int or Integer
LongType = long or Long
FloatType = float or Float
DoubleType = double or Double
DecimalType = java.math.BigDecimal
StringType = String
BinaryType = byte[]
BooleanType = boolean or Boolean
TimestampType = java.sql.Timestamp
DateType = java.sql.Date
ArrayType = java.util.List
MapType = java.util.Map
StructType = org.apache.spark.sql.Row
StructField = The value type in Java of the data type of this field (For example, int for a StructField with the data type IntegerType)

A slightly different example from WSO2 DAS 

WSO2 DAS allows users to register their own UDFs and use those in the Carbon Analytics scripts and Console.

Here the users can create their UDF implementing the corresponding interface and put it in the Carbon OSGI environment. Analytics core will register these classes using Java reflection.

Here is an example UDF which implements UDF3 interface


public class CompositeID implements UDF3<Integer, Integer, Integer, List<String>>, Serializable{

private static final long serialVersionUID = -9073977448886578865L;

@Override
 public List<String> call(Integer integer, Integer integer2, Integer integer3) throws Exception {
 ArrayList<String> arr = new ArrayList<>();
 arr.add(integer.toString());
 arr.add(integer2.toString());
 arr.add(integer3.toString());

return arr;
 }
}

As you could see here, the function accepts three integers and returns a list of strings.

This UDF will be registered under the SQLContext as follows inside the Ananlytics Core.


 private void registerUDFs(SQLContext sqlCtx)
 throws IllegalAccessException, InvocationTargetException, InstantiationException,
 ClassNotFoundException, NoSuchMethodException {
Class<?> udf1 = Class.forName("org.wso2.carbon.analytics.spark.core.udf.CompositeID");


 Object instance1 = udf1.newInstance();
 sqlCtx.udf().register("compositeID", (UDF3<?, ?, ?, ?>) instance1, DataTypes.createArrayType(DataTypes.StringType));
 }

Here I have used reflection to instantiate the UDF. Note the return type here. Since we are returning a List<String> here, we need to give the matching Spark return DataType. Corresponding type for  java.util.List is ArrayType. ArrayType objects can be instantiated using the DataTypes.createArrayType() factory method. Since we are returning a List <String>, we need to create an ArrayType of type DataTypes.StringType.

The UDF can then be queried as follows..


INSERT INTO TABLE plug_usage SELECT MAX(value) - MIN (value) AS usage, compositeID(house_id, household_id, plug_id) AS composite_id FROM debsData WHERE property = false GROUP BY house_id, household_id, plug_id

Iterate through a Spark DataFrame using its partitions in Java

My work at WSO2 Inc mainly revolves around the Business Activity Monitor (BAM)/ Data Analytics Server (DAS). For the DAS 3.0 release, we are bringing in Apache Spark as the analytics engine to the WSO2 Carbon Platform replacing Apache Hadoop and Apache Hive. I am working on this Spark migration. Spark introduces an interesting concept of RDDs to the analytics community. I am not going go into details about the RDDs. Please click here for further information. Once an RDD is created in the Spark “world”, it can be used for data manipulation/ analysis etc.

SparkSQL the SQL query engine for Spark, uses an extension of this RDD called, DataFrame, formerly called a SchemaRDD. For further information, click here.

Here I will be discussing how to use the partitions of a DataFrame to iterate through the underlying data… and some useful debugging tips in the Java environment. (Thought this was useful because, Spark is written in Scala, hence almost all of its features heavily use Scala functionalities… and when we bring it to the Java env, things might not work as expected!)

DataFrames… WTH?

As per Spark,

A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

Problem: How to retrieve data? Take all the elements?

A DataFrame (DF) encapsulates data in Rows and we can retrieve these Rows as a list or as an array, using the following collect methods in a DF.

def collect(): Array[Row]
def collectAsList(): java.util.List[Row]

But the problem here is, a ‘collect’ method collects all the data under a DF (in RDD jargon, it is an action op). Since Spark uses in-memory processing, if this DF covers a large data set, the collect operation will be inefficient.

Solution: Take data using Partitions!

Using underlying partitions of a DF gives a better solution for this!

def foreachPartition(f: Iterator[Row] => Unit): Unit

As you could see in the method signature, it takes function as the method parameter, and this function takes a Row Iterator and returns a Unit. So here, it would not collect all the data under the DF at once!

Example:

val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
b.foreachPartition(x => println(x.reduce(_ + _))) 

From Scala to Java… 

This looks fairly straightforward in Scala, but when is comes to Java, things are a little ‘messy’! There we would have to implement the Scala ‘anonymous function‘ .

In WSO2 DAS we have implemented this in our implementation of Spark BaseRelation.

data.foreachPartition(new AbstractFunction1<Iterator<Row>, BoxedUnit>(){
 @Override
 public BoxedUnit apply(Iterator<Row> v1) {
// your logic goes here... 
 return BoxedUnit.UNIT;
 }
 });

Here we have implemented the scala.Function1 using scala.runtime.AbstractFunction1. BoxedUnit here is equivalent to a void result in Java.

Troubleshooting tips….

  • One important thing to note here is that, in a distributed environment, this anonymous function will be serialized to all Spark Workers.
  • All non-serializing objects should be instantiated within the

Houston, We’ve Got a Problem!

BUT in the real DAS implementation, we came across the following exception…

org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ...

It turns out that when we implement the scala.runtime.AbstractFunction1 in Java environment, it is not readily serializable. (AbstractFunction1 does not implement java.io.Serializable interface)

So, as a solution, we implemented our own AbstractFunction1 implementation as follows, and it actually worked!

public class AnalyticsFunction1 extends AbstractFunction1<Iterator<Row>, BoxedUnit>
 implements Serializable {

 private static final long serialVersionUID = -1919222653470217466L;
 private int tId;
 private String tName;
 private StructType sch;

 public AnalyticsFunction1(int tId, String tName, StructType sch) {
 this.tId = tId;
 this.tName = tName;
 this.sch = sch;
 }

 @Override
 public BoxedUnit apply(Iterator<Row> iterator) {
 List<Record> records = new ArrayList<>();
 while (iterator.hasNext()) {
 if (records.size() == AnalyticsConstants.MAX_RECORDS) {
 try {
 ServiceHolder.getAnalyticsDataService().put(records);
 } catch (AnalyticsException e) {
 e.printStackTrace();
 }
 records.clear();
 } else {
 Row row = iterator.next();
 records.add(new Record(this.tId, this.tName,
 convertRowAndSchemaToValuesMap(row, this.sch)));
 }
 }

 if (!records.isEmpty()) {
 try {
 ServiceHolder.getAnalyticsDataService().put(records);
 } catch (AnalyticsException e) {
 e.printStackTrace();
 }
 }
 return BoxedUnit.UNIT;
 }

 private Map<String, Object> convertRowAndSchemaToValuesMap(Row row, StructType schema) {
 String[] colNames = schema.fieldNames();
 Map<String, Object> result = new HashMap<>();
 for (int i = 0; i < row.length(); i++) {
 result.put(colNames[i], row.get(i));
 }
 return result;
 }

}

It can be instantiated as follows…


data.foreachPartition(new AnalyticsFunction1(tenantId, tableName, data.schema()));

You can access the GitHub repo here.

sum up!

  • While iterating through a DataFrame, try your best to avoid ‘collect()’ method
  • Try to always use a method which returns an iterator to the data.
  • When using ‘foreachPartition()’ in Java, implement the anonymous function in an extended class which implements java.io.Serializable interface
  • Read this Databricks Knowledgebase: General Troubleshooting 

Hello world!

This is.. for the third time, I’m trying to get myself registered in the blogging community! Let’s hope it kicks off this time! The aim is to shoot the moon.. so that it’d end up at least in a coconut tree! Fingers and toes crossed! Cheers!