Contents

各个输入按号码尾号分区后关联,在输入数据已知分布均匀的情况可以比hash更有效避免倾斜

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package ads_labels

import org.apache.spark.Partitioner
import org.apache.spark.sql.SparkSession

import java.sql.Date
import java.text.SimpleDateFormat
import java.util

object wp05_part {


class CustomPartitioner() extends Partitioner {
override def numPartitions: Int = 10

override def getPartition(key: Any): Int = {
key.toString.substring(key.toString.length - 1).toInt
}
}


def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.enableHiveSupport()
.getOrCreate()

val dateStr = args(0)

val phonePartitioner = new CustomPartitioner()

import spark.implicits._
val d1=spark.sql(
s"""
|select distinct phone, id, parent_id
|from t1
|where ...
|""".stripMargin)
.groupByKey(_.getAs[String]("phone"))
.mapGroups((phone, rows) => {
...
(phone, ...)
}).rdd.keyBy(_._1).partitionBy(phonePartitioner)
.map(_._2)
.toDF("msisdn", "wp0503")

val d2=spark.sql(
s"""
|select msisdn,...
|from t2
|group by msisdn
|""".stripMargin)
.rdd.keyBy(_.getString(0)).partitionBy(phonePartitioner)
.map(r => { (r._2.getString(0), r._2.getDate(1), r._2.getDate(2))})
.toDF("msisdn",...)

val d3=spark.sql(
s"""
|select servnumber as msisdn,...
|from t3
|where ...
|group by servnumber
|""".stripMargin)
.rdd.keyBy(_.getString(0)).partitionBy(phonePartitioner)
.map(r => {
(r._2.getString(0), r._2.getDate(1), r._2.getDate(2), r._2.getInt(3))
})
.toDF("msisdn", ...)

d2.join(d1, Seq("msisdn"), "outer")
.join(d3, Seq("msisdn"), "outer")
.toDF()
.createOrReplaceTempView("result")

spark.sql(
s"""
|insert overwrite table R1 partition (dtime='test${dateStr}')
|select msisdn,...
|from result
|""".stripMargin)
}
}