Comparing DataFrames in Spark

Dark Shadows Source

I have 2 dataframes

df1

+----------+----------------+--------------------+--------------+-------------+
|      WEEK|DIM1            |DIM2                |T1            |  T2         |
+----------+----------------+--------------------+--------------+-------------+
|2016-04-02|              14|                NULL|          9874|   880       |
|2016-04-30|              14|FR                  |          9875|    13       |
|2017-06-10|              15|                 PQR|          9867| 57721       |
+----------+----------------+--------------------+--------------+-------------+

df2

+----------+----------------+--------------------+--------------+-------------+
|      WEEK|DIM1            |DIM2                |T1            |  T2         |
+----------+----------------+--------------------+--------------+-------------+
|2016-04-02|              14|                NULL|          9879|   820       |
|2016-04-30|              14|FR                  |          9785|    9        |
|2017-06-10|              15|                 XYZ|          9967| 57771       |
+----------+----------------+--------------------+--------------+-------------+

I want to write a comparator in spark which compares T1, T2 in both dataframes upon WEEK, DIM1, DIM2 with T1, T2 in df1 should be greater than T1, T2 by 3. I want to return all rows which do not match the above criterion with difference between T1, T2 among dataframes. I also want to have rows present in df1 not present in df2 and vice versa for the following combination WEEK, DIM1, DIM2.

The output should be like this

+----------+----------------+--------------------+--------------+-------------+------------------+-----------------+
|      WEEK|DIM1            |DIM2                |T1_dIFF       |  T2_dIFF    | Presenent_In_DF1 | Presenent_In_DF2|
+----------+----------------+--------------------+--------------+-------------+------------------+-----------------+
|2016-04-30|              14|FR                  |            90|    4        | Y                | Y               |
|2017-06-10|              15|PQR                 |          9867|    57721    | Y                | N               |
|2017-06-10|              15|XYZ                 |          9967|    57771    | N                | Y               |
+----------+----------------+--------------------+--------------+-------------+------------------+-----------------+

What is the best way to go around this ?

I have implemented the following but do not know how to proceed after this -

val df1 = Seq(
  ("2016-04-02", "14", "NULL", 9874, 880), ("2016-04-30", "14", "FR", 9875, 13), ("2017-06-10", "15", "PQR", 9867, 57721)
).toDF("WEEK", "DIM1", "DIM2","T1","T2")

val df2 = Seq(
  ("2016-04-02", "14", "NULL", 9879, 820), ("2016-04-30", "14", "FR", 9785, 9), ("2017-06-10", "15", "XYZ", 9967, 57771)
).toDF("WEEK", "DIM1", "DIM2","T1","T2")

import org.apache.spark.sql.functions._

val joined = df1.as("l").join(df2.as("r"), Seq("WEEK", "DIM1", "DIM2"), "fullouter")

The joined look like this -

+----------+----+----+----+-----+----+-----+
|      WEEK|DIM1|DIM2|  T1|   T2|  T1|   T2|
+----------+----+----+----+-----+----+-----+
|2016-04-02|  14|NULL|9874|  880|9879|  820|
|2017-06-10|  15| PQR|9867|57721|null| null|
|2017-06-10|  15| XYZ|null| null|9967|57771|
|2016-04-30|  14|  FR|9875|   13|9785|    9|
+----------+----+----+----+-----+----+-----+

I do not know how to proceed after this in a good way, relatively new to scala.

scalaapache-sparkdataframespark-dataframe

Answers

answered 6 months ago Avishek Bhattacharya #1

One easy solution could be to join df1 and df2 with the WEEK as unique Key. In the joined data you need to keep all the columns from df1 and df2.

Then you can do a map operation on the dataframe to produce the rest of the columns.

Something like

df1.createOrReplaceTempTable("df1")
df2.createOrReplaceTempTable("df2")
val df = spark.sql("select df1.*, df2.DIM1 as df2_DIM1, df2.DIM2 as df2_DIM2, df2.T1 as df2_T1, df2.T2 as df2_T2 from df1 join df2 on df1.WEEK = df2.WEEK")
// Now map on the dataframe to produce the diff dataframe
// Or you can use the SQL to do that.

comments powered by Disqus