Dynamically select multiple columns while joining different Dataframe in scala spark

Nish Source

I have two spark data frame df1 and df2. Is there a way for selecting output columns dynamically while joining these two dataframes? The below definition outputs all column from df1 and df2 in case of inner join.

def joinDF (df1: DataFrame,  df2: DataFrame , joinExprs: Column, joinType: String): DataFrame = {   
  val dfJoinResult = df1.join(df2, joinExprs, joinType)

Input data:

val df1 = List(("1","new","current"), ("2","closed","saving"), ("3","blocked","credit")).toDF("id","type","account")
val df2 = List(("1","7"), ("2","5"), ("5","8")).toDF("id","value")

Expected result:

val dfJoinResult = df1
  .join(df2, df1("id") === df2("id"), "inner")
  .select(df1("type"), df1("account"), df2("value")) 



I have looked at options like df.select(cols.head, cols.tail: _*) but it does not allow to select columns from both DF's. Is there a way to pass selectExpr columns dynamically along with dataframe details that we want to select it from in my def? I'm using Spark 2.2.0.



answered 8 months ago Shaido #1

It is possible to pass the select expression as a Seq[Column] to the method:

def joinDF(df1: DataFrame,  df2: DataFrame , joinExpr: Column, joinType: String, selectExpr: Seq[Column]): DataFrame = {   
  val dfJoinResult = df1.join(df2, joinExpr, joinType)

To call the method use:

val joinExpr = df1.col("id") === df2.col("id")
val selectExpr = Seq(df1.col("type"), df1.col("account"), df2.col("value"))

val testDf = joinDF(df1, df2, joinExpr, "inner", selectExpr)

This will give the desired result:

|  type|account|value|
|   new|current|    7|
|closed| saving|    5|

In the selectExpr above, it is necessary to specify which dataframe the columns are coming from. However, this can be further simplified if the following assumptions are true:

  1. The columns to join on have the same name in both dataframes
  2. The columns to be selected have unique names (the other dataframe do not have a column with the same name)

In this case, the joinExpr: Column can be changed to joinExpr: Seq[String] and selectExpr: Seq[Column] to selectExpr: Seq[String]:

def joinDF(df1: DataFrame,  df2: DataFrame , joinExpr: Seq[String], joinType: String, selectExpr: Seq[String]): DataFrame = {   
  val dfJoinResult = df1.join(df2, joinExpr, joinType)
  dfJoinResult.select(selectExpr.head, selectExpr.tail:_*)

Calling the method now looks cleaner:

val joinExpr = Seq("id")
val selectExpr = Seq("type", "account", "value")

val testDf = joinDF(df1, df2, joinExpr, "inner", selectExpr)

Note: When the join is performed using a Seq[String] the column names of the resulting dataframe will be different as compared to using an expression. When there are columns with the same name present, there will be no way to separately select these afterwards.

comments powered by Disqus