In this post, let us take a look at how Apache Spark Datasources API, its concepts and how it can be implemented using an example from the WSO2 DAS.
Datasources API
Spark Datasources API is an important extension point in Apache Spark and Spark SQL. It allows users to link a Dataframe to a variety of datasources. A Dataframe is an extension of a RDD with a schema attached to it. This allows users to query the underlying datasource using SQL (Spark SQL specifically).
Here, we will discuss the concepts and motivation behind the API, followed by a concrete implementation of the interfaces in the Java environment.
API Code and Concepts
You can find the code for the API in the ####org.apache.spark.sql.sources package. Here, the extension points can be found with the annotation @DeveloperAPI. interfaces.scala has all the traits required in the API.
Rather than discussing the traits in the interfaces.scala file, let us go through the code, linking it with concepts behind the API.
How datasources are connected to an RDD
For Spark Core requires an RDD object to perform any computation on data and Spark SQL needs a schema around an RDD so that it could link it to a Dataframe.
This process starts by creating a “Relation” and a “Scan”.
Let us look at this in a little more detail.
BaseRelation
This is the abstract class, which specifies the schema of the Dataframe. Classes which extend this, should be able to implement a method to return the schema as a StructType. Please find the code here.
@DeveloperApi abstract class BaseRelation { def sqlContext: SQLContext def schema: StructType def sizeInBytes: Long = sqlContext.conf.defaultSizeInBytes def needConversion: Boolean = true }
As you could see here, it has two methods sqlContext and schema, which needs to be taken at the extension of this abstract class (by using the constructor).
TableScan, PrunedScan, PrunedFilteredScan
These traits (similar to an interface in Java) are responsible in creating the RDD from the underlying datasource. You can find the code here. For an example, TableScan & PrunedScan looks like this
@DeveloperApi trait TableScan { def buildScan(): RDD[Row] } @DeveloperApi trait PrunedScan { def buildScan(requiredColumns: Array[String]): RDD[Row] }
A further clarification, as you could see here, buildScan method in the TableScan is used to create a table (RDD) while pruned scan creates the RDD from the given columns.
So, as I mentioned earlier, a custom relation should implement this BaseRelation and one or more of these scans. This fulfills requirements of a schema and an RDD for Spark runtime to create a Dataframe.
RelationProvider, SchemaRelationProvider, CreatableRelationProvider
Once we have a concrete relation implementation, we are in a position to create a relation from a ‘CREATE TEMPORARY TABLE’ query. This is done by implementing the RelationProvider trait.
trait RelationProvider { def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation }
CREATE TEMPORARY TABLE query syntax looks like this
create temporary table [table name] using [relation provider class] options ([parameters]);
Therefore, when you run this query in Spark SQL, it would send a map of parameters and the SQL context object to the given relation class (which should implement the RelationProvider trait) so that it could create a BaseRelation
InsertableRelation
So far, we discussed relations and relation providers to scan a table. InsertableRelation is where you could push data into a given data source. Find the code here.
@DeveloperApi trait InsertableRelation { def insert(data: DataFrame, overwrite: Boolean): Unit }
This InsertableRelation is coupled with the INSERT INTO/OVERWRITE TABLE queries. When you run such a query, the resultant dataframe of the query will be passed on to this relation, together with a boolean flag which indicates if it requires overwriting.
An overview of the API is depicted in the following image
We saw here now, that Spark Datasources API provides a very simple set of APIs which could be used to connect Spark to an outside data source.
In the next post I will explain to you how WSO2 DAS used this API to connect to the DAS Data Access Layer. The above concepts will be much more clear when you see the implementation.
best!