Spark Scala: Joining 2 tables and extract the data with max date(please see description)

Vaishak Source

I want to join two tables A and B and pick the records having max date from table B for each value.

Consider the following tables:

Table A:

+---+-----+----------+
| id|Value|start_date|
+---+---- +----------+
| 1 |   a | 1/1/2018 |
| 2 |   a | 4/1/2018 |
| 3 |   a | 8/1/2018 |
| 4 |   c | 1/1/2018 |
| 5 |   d | 1/1/2018 |
| 6 |   e | 1/1/2018 |
+---+-----+----------+

Table B:

+---+-----+----------+
|Key|Value|sent_date |
+---+---- +----------+
| x |   a | 2/1/2018 |
| y |   a | 7/1/2018 |
| z |   a | 11/1/2018|
| p |   c | 5/1/2018 |
| q |   d | 5/1/2018 |
| r |   e | 5/1/2018 |
+---+-----+----------+

The aim is to bring in column id from Table A to Table B for each value in Table B. For the same, table A and B needs to be joined together with column value and for each record in B, max(A.start_date) for each data in column Value in Table A is found with condition A.start_date < B.sent_date

Lets consider the value=a here. In table A, we can see 3 records for Value=a with 3 different start_date. So when joining Table B, for value=a with sent_date=2/1/2018, record with max(start_date) for start_date which are less than sent_date in Table B is taken(in this case 1/1/2018) and corresponding data in column A.id is pulled to Table B.

Similarly for record with value=a and sent_date = 11/1/2018 in Table B, id=3 from table A needs to be pulled to table B.

The result must be as follows:

+---+-----+----------+---+
|Key|Value|sent_date |id |
+---+---- +----------+---+
| x |   a | 2/1/2018 | 1 |
| y |   a | 7/1/2018 | 2 |
| z |   a | 11/1/2018| 3 |
| p |   c | 5/1/2018 | 4 |
| q |   d | 5/1/2018 | 5 |
| r |   e | 5/1/2018 | 6 |
+---+-----+----------+---+

I am using Spark 2.3. I have joined the two tables(using Dataframe) and found the max(start_date) based on the condition. But I am unable to figure out how to pull the records here.

Can anyone help me out here

Thanks in Advance!!

scalaapache-sparkhiveapache-spark-sql

Answers

answered 5 days ago stack0114106 #1

I just changed the date "11/1/2018" to "9/1/2018" as the string sorting gives incorrect results. When converted to date, the logic would still work. See below

scala> val df_a = Seq((1,"a","1/1/2018"),
     | (2,"a","4/1/2018"),
     | (3,"a","8/1/2018"),
     | (4,"c","1/1/2018"),
     | (5,"d","1/1/2018"),
     | (6,"e","1/1/2018")).toDF("id","value","start_date")
df_a: org.apache.spark.sql.DataFrame = [id: int, value: string ... 1 more field]

scala> val df_b = Seq(("x","a","2/1/2018"),
     | ("y","a","7/1/2018"),
     | ("z","a","9/1/2018"),
     | ("p","c","5/1/2018"),
     | ("q","d","5/1/2018"),
     | ("r","e","5/1/2018")).toDF("key","valueb","sent_date")
df_b: org.apache.spark.sql.DataFrame = [key: string, valueb: string ... 1 more field]

scala>  val df_join = df_b.join(df_a,'valueb==='valuea,"inner")
df_join: org.apache.spark.sql.DataFrame = [key: string, valueb: string ... 4 more fields]

scala> df_join.filter('sent_date >= 'start_date).withColumn("rank", rank().over(Window.partitionBy('key,'valueb,'sent_date).orderBy('start_date.desc))).filter('rank===1).drop("valuea","start_date","rank").show()
+---+------+---------+---+
|key|valueb|sent_date| id|
+---+------+---------+---+
|  q|     d| 5/1/2018|  5|
|  p|     c| 5/1/2018|  4|
|  r|     e| 5/1/2018|  6|
|  x|     a| 2/1/2018|  1|
|  y|     a| 7/1/2018|  2|
|  z|     a| 9/1/2018|  3|
+---+------+---------+---+


scala>

UPDATE

Below is the udf to handle date strings with MM/dd/yyyy formats

scala> def dateConv(x:String):String=
     | {
     | val y = x.split("/").map(_.toInt).map("%02d".format(_))
     | y(2)+"-"+y(0)+"-"+y(1)
     | }
dateConv: (x: String)String

scala>  val udfdateconv = udf( dateConv(_:String):String )
udfdateconv: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

scala> val df_a_dt = df_a.withColumn("start_date",date_format(udfdateconv('start_date),"yyyy-MM-dd").cast("date"))
df_a_dt: org.apache.spark.sql.DataFrame = [id: int, valuea: string ... 1 more field]

scala> df_a_dt.printSchema
root
 |-- id: integer (nullable = false)
 |-- valuea: string (nullable = true)
 |-- start_date: date (nullable = true)


scala> df_a_dt.show()
+---+------+----------+
| id|valuea|start_date|
+---+------+----------+
|  1|     a|2018-01-01|
|  2|     a|2018-04-01|
|  3|     a|2018-08-01|
|  4|     c|2018-01-01|
|  5|     d|2018-01-01|
|  6|     e|2018-01-01|
+---+------+----------+


scala>

comments powered by Disqus