1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| package ads_labels
import org.apache.spark.Partitioner import org.apache.spark.sql.SparkSession
import java.sql.Date import java.text.SimpleDateFormat import java.util
object wp05_part {
class CustomPartitioner() extends Partitioner { override def numPartitions: Int = 10
override def getPartition(key: Any): Int = { key.toString.substring(key.toString.length - 1).toInt } }
def main(args: Array[String]): Unit = { val spark = SparkSession.builder .enableHiveSupport() .getOrCreate()
val dateStr = args(0)
val phonePartitioner = new CustomPartitioner()
import spark.implicits._ val d1=spark.sql( s""" |select distinct phone, id, parent_id |from t1 |where ... |""".stripMargin) .groupByKey(_.getAs[String]("phone")) .mapGroups((phone, rows) => { ... (phone, ...) }).rdd.keyBy(_._1).partitionBy(phonePartitioner) .map(_._2) .toDF("msisdn", "wp0503")
val d2=spark.sql( s""" |select msisdn,... |from t2 |group by msisdn |""".stripMargin) .rdd.keyBy(_.getString(0)).partitionBy(phonePartitioner) .map(r => { (r._2.getString(0), r._2.getDate(1), r._2.getDate(2))}) .toDF("msisdn",...) val d3=spark.sql( s""" |select servnumber as msisdn,... |from t3 |where ... |group by servnumber |""".stripMargin) .rdd.keyBy(_.getString(0)).partitionBy(phonePartitioner) .map(r => { (r._2.getString(0), r._2.getDate(1), r._2.getDate(2), r._2.getInt(3)) }) .toDF("msisdn", ...)
d2.join(d1, Seq("msisdn"), "outer") .join(d3, Seq("msisdn"), "outer") .toDF() .createOrReplaceTempView("result")
spark.sql( s""" |insert overwrite table R1 partition (dtime='test${dateStr}') |select msisdn,... |from result |""".stripMargin) } }
|