Using Spark Datasources API

In this post, let us take a look at how Apache Spark Datasources API, its concepts and how it can be implemented using an example from the WSO2 DAS.

Datasources API

Spark Datasources API is an important extension point in Apache Spark and Spark SQL. It allows users to link a Dataframe to a variety of datasources. A Dataframe is an extension of a RDD with a schema attached to it. This allows users to query the underlying datasource using SQL (Spark SQL specifically).

Here, we will discuss the concepts and motivation behind the API, followed by a concrete implementation of the interfaces in the Java environment.

API Code and Concepts 

You can find the code for the API in the ####org.apache.spark.sql.sources package. Here, the extension points can be found with the annotation @DeveloperAPI. interfaces.scala has all the traits required in the API.

Rather than discussing the traits in the interfaces.scala file, let us go through the code, linking it with concepts behind the API.

How datasources are connected to an RDD 

For Spark Core requires an RDD object to perform any computation on data and Spark SQL needs a schema around an RDD so that it could link it to a Dataframe.

This process starts by creating a “Relation” and a “Scan”.

Let us look at this in a little more detail.

BaseRelation 

This is the abstract class, which specifies the schema of the Dataframe. Classes which extend this, should be able to implement a method to return the schema as a StructType. Please find the code here.


@DeveloperApi
abstract class BaseRelation {
 def sqlContext: SQLContext
 def schema: StructType

 def sizeInBytes: Long = sqlContext.conf.defaultSizeInBytes

 def needConversion: Boolean = true
}

As you could see here, it has two methods sqlContext and schema, which needs to be taken at the extension of this abstract class (by using the constructor).

TableScan, PrunedScan, PrunedFilteredScan

These traits (similar to an interface in Java) are responsible in creating the RDD from the underlying datasource. You can find the code here. For an example, TableScan & PrunedScan looks like this


@DeveloperApi
trait TableScan {
 def buildScan(): RDD[Row]
}

@DeveloperApi
trait PrunedScan {
 def buildScan(requiredColumns: Array[String]): RDD[Row]
}

A further clarification, as you could see here, buildScan method in the TableScan is used to create a table (RDD) while pruned scan creates the RDD from the given columns.

So, as I mentioned earlier, a custom relation should implement this BaseRelation and one or more of these scans. This fulfills requirements of a schema and an RDD for Spark runtime to create a Dataframe.

RelationProvider, SchemaRelationProvider, CreatableRelationProvider

Once we have a concrete relation implementation, we are in a position to create a relation from a ‘CREATE TEMPORARY TABLE’ query. This is done by implementing the RelationProvider trait.

trait RelationProvider {

 def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}

CREATE TEMPORARY TABLE query syntax looks like this

create temporary table [table name] using [relation provider class] options ([parameters]);  

Therefore, when you run this query in Spark SQL, it would send a map of parameters and the SQL context object to the given relation class (which should implement the RelationProvider trait) so that it could create a BaseRelation

InsertableRelation

So far, we discussed relations and relation providers to scan a table. InsertableRelation is where you could push data into a given data source. Find the code here.

@DeveloperApi
trait InsertableRelation {
 def insert(data: DataFrame, overwrite: Boolean): Unit
}

This InsertableRelation is coupled with the INSERT INTO/OVERWRITE TABLE queries. When you run such a query, the resultant dataframe of the query will be passed on to this relation, together with a boolean flag which indicates if it requires overwriting.

An overview of the API is depicted in the following image

spark datasources API

We saw here now, that Spark Datasources API provides a very simple set of APIs which could be used to connect Spark to an outside data source.

In the next post I will explain to you how WSO2 DAS used this API to connect to the DAS Data Access Layer. The above concepts will be much more clear when you see the implementation.

best!