Iterating on columns in dataframe

Dark Shadows Source

I have the following data frames df1

+----------+----+----+----+-----+
|      WEEK|DIM1|DIM2|  T1|   T2|
+----------+----+----+----+-----+
|2016-04-02|  14|NULL|9874|  880|
|2016-04-30|  14|  FR|9875|   13|
|2017-06-10|  15| PQR|9867|57721|
+----------+----+----+----+-----+

df2

+----------+----+----+----+-----+
|      WEEK|DIM1|DIM2|  T1|   T2|
+----------+----+----+----+-----+
|2016-04-02|  14|NULL|9879|  820|
|2016-04-30|  14|  FR|9785|    9|
|2017-06-10|  15| XYZ|9967|57771|
+----------+----+----+----+-----+

I need to produce my output as following -

+----------+----+----+----+-----+----+-----+-------+-------+----------+------------+
|      WEEK|DIM1|DIM2|  T1|   T2|  T1|   T2|t1_diff|t2_diff|pr_primary|pr_reference|
+----------+----+----+----+-----+----+-----+-------+-------+----------+------------+
|2016-04-02|  14|NULL|9874|  880|9879|  820|     -5|     60|         Y|           Y|
|2017-06-10|  15| PQR|9867|57721|null| null|   null|   null|         Y|           N|
|2017-06-10|  15| XYZ|null| null|9967|57771|   null|   null|         N|           Y|
|2016-04-30|  14|  FR|9875|   13|9785|    9|     90|      4|         Y|           Y|
+----------+----+----+----+-----+----+-----+-------+-------+----------+------------+

Here, t1_diff is difference between left T1 and right T1, t2_diff is difference between left T2 and right T2, pr_primary is Y if row is present in df1 and not in df2 and similarly for pr_reference. I have generated the above with following piece of code

val df1 = Seq(
  ("2016-04-02", "14", "NULL", 9874, 880), ("2016-04-30", "14", "FR", 9875, 13), ("2017-06-10", "15", "PQR", 9867, 57721)
).toDF("WEEK", "DIM1", "DIM2","T1","T2")

val df2 = Seq(
  ("2016-04-02", "14", "NULL", 9879, 820), ("2016-04-30", "14", "FR", 9785, 9), ("2017-06-10", "15", "XYZ", 9967, 57771)
).toDF("WEEK", "DIM1", "DIM2","T1","T2")

import org.apache.spark.sql.functions._

val joined = df1.as("l").join(df2.as("r"), Seq("WEEK", "DIM1", "DIM2"), "fullouter")

val j1 = joined.withColumn("t1_diff",col(s"l.T1") - col(s"r.T1")).withColumn("t2_diff",col(s"l.T2") - col(s"r.T2"))
val isPresentSubstitution = udf( (x: String, y: String) => if (x == null && y == null) "N" else "Y")
j1.withColumn("pr_primary",isPresentSubstitution(col(s"l.T1"), col(s"l.T2"))).withColumn("pr_reference",isPresentSubstitution(col(s"r.T1"), col(s"r.T2"))).show

I want to make it generalize for any number of columns not just T1 and T2. Can someone suggest me a better way to do this ? I am running this in spark.

scalaapache-sparkdataframespark-dataframe

Answers

answered 6 months ago Satish Karuturi #1

You can do it by adding sequence number to each dataframe and later join those two dataframes based on seq number.

    val df3 = df1.withColumn("SeqNum", monotonicallyIncreasingId)
    val df4 = df2.withColumn("SeqNum", monotonicallyIncreasingId)

    df3.as("l").join(df4.as("r"),"SeqNum").withColumn("t1_diff",col("l.T1") - col("r.T1")).withColumn("t2_diff",col("l.T2") - col("r.T2")).drop("SeqNum").show()

answered 6 months ago Antot #2

To be able to set any number of columns like t1_diff with any expresion calculating their values, we need to make some refactoring allowing to use withColumn in a more generic manner.

First, we need to collect the target values: the names of the target columns and the expressions that calculate their contents. This can be done with a sequence of Tuples:

val diffColumns = Seq(
  ("t1_diff", col("l.T1") - col("r.T1")),
  ("t2_diff", col("l.T2") - col("r.T2"))
)
// or, to make it more readable, create a dedicated "case class DiffColumn(colName: String, expression: Column)"

Now we can use folding to produce the joined DataFrame from joined and the sequence above:

val joinedWithDiffCols = 
  diffColumns.foldLeft(joined) { case(df, diffTuple) =>
    df.withColumn(diffTuple._1, diffTuple._2)
  }

joinedWithDiffCols contains the same data as j1 from the question.

To append new columns, you now have to modify diffColumns sequence only. You can even put the calculation of pr_primary and pr_reference in this sequence (but rename the ref to appendedColumns in this case, to be more precise).

Update

To facilitate the creation of the tuples for diffCollumns, it also can be generalized, for example:

// when both column names are same:
def generateDiff(column: String): (String, Column) = generateDiff(column, column)

// when left and right column names are different:
def generateDiff(leftCol: String, rightCol: String): (String, Column) =
  (s"${leftCol}_diff", col("l." + leftCol) - col("r." + rightCol))

val diffColumns = Seq("T1", "T2").map(generateDiff)

End-of-update

answered 6 months ago philantrovert #3

Assuming the columns are named same in both df1 and df2, you can do something like:

val diffCols = df1.columns
                  .filter(_.matches("T\\d+"))
                  .map(c => col(s"l.$c") - col(s"r.$c") as (s"${c.toLowerCase}_diff") )

And then use it with joined like:

joined.select( ( col("*") :+ diffCols ) :_*).show(false)
//+----------+----+----+----+-----+----+-----+-------+-------+
//|WEEK      |DIM1|DIM2|T1  |T2   |T1  |T2   |t1_diff|t2_diff|
//+----------+----+----+----+-----+----+-----+-------+-------+
//|2016-04-02|14  |NULL|9874|880  |9879|820  |-5     |60     |
//|2017-06-10|15  |PQR |9867|57721|null|null |null   |null   |
//|2017-06-10|15  |XYZ |null|null |9967|57771|null   |null   |
//|2016-04-30|14  |FR  |9875|13   |9785|9    |90     |4      |
//+----------+----+----+----+-----+----+-----+-------+-------+

comments powered by Disqus