Contents
  1. 1. Spark
  2. 2. Spark on Yarn
  3. 3. 测试代码
  4. 4. pycharm访问spark
    1. 4.0.1. 1. 安装py4j
    2. 4.0.2. 2. 配置环境变量
    3. 4.0.3. 3. 关联Spark和Hadoop
    4. 4.0.4. 4. windows上的问题。
  • DataFrame列太多 Too many columns
  • MergeDataframe
    1. 科学计数法字符串转数值
    2. 连接oracle
  • Security
    1. UI filter
    2. ACL方式
    3. thrift server 增加账密鉴权
  • Thrift Server
    1. 限制返回结果大小
  • 读取路径文件过程
  • 分区过多时可以带条件查分区
  • 本地搭建Spark Job History
  • 测试环境指定thrift地址
  • beeline
  • 提取CSV数据
  • 支持json格式
  • 行转列
  • 并行runjob
  • Built-in Functions
  • list/map 合并 UDAF
    1. 查询使用方式
  • 注册udf函数
  • run SQL on File
  • Spark

    简单之美 | RDD:基于内存的集群计算容错抽象

    Spark on Yarn

    Spark 官方提供了三种集群部署方案: Standalone, Mesos, YARN,区别就在于资源管理调度平台不同。

    想在已有的Hadoop集群上使用Spark,实现Spark on Yarn只需修改配置文件vi ./conf/spark-env.sh添加以下内容

    export HADOOP_HOME=/share/apps/hadoop

    export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

    任务提交主要参数(用好可以充分利用资源)

    参数名称 含义
    –master MASTER_URL 可以是spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local
    –deploy-mode DEPLOY_MODE Driver程序运行的地方,client或者cluster
    –class CLASS_NAME 主类名称,含包名
    –name NAME Application名称
    –jars JARS Driver依赖的第三方jar包
    –py-files PY_FILES 用逗号隔开的放置在Python应用程序PYTHONPATH上的.zip, .egg, .py文件列表
    –files FILES 用逗号隔开的要放置在每个executor工作目录的文件列表
    –executor-memory MEM executor内存大小,默认1G
    –total-executor-cores NUM executor使用的总核数,仅限于Spark Alone、Spark on Mesos模式
    –executor-cores NUM 每个executor使用的内核数,默认为1,仅限于Spark on Yarn模式
    –queue QUEUE_NAME 提交应用程序给哪个YARN的队列,默认是default队列,仅限于Spark on Yarn模式
    –num-executors NUM 启动的executor数量,默认是2个,仅限于Spark on Yarn模式

    更多内容,见博客文章

    测试代码

    1
    ./bin/spark-submit --master yarn --name spark-test --class org.apache.spark.examples.SparkPi lib/spark-examples*.jar 10
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    #!/usr/bin/python
    # -*- coding: UTF-8 -*-

    # test_spark.py
    from pyspark import SparkContext

    logFile = "C:\spark-2.0.2-bin-hadoop2.6\README.md"
    sc = SparkContext("local","Simple App")
    logData = sc.textFile(logFile).cache()

    numAs = logData.filter(lambda s: 'a' in s).count()
    numBs = logData.filter(lambda s: 'b' in s).count()

    print("Lines with a: %i, lines with b: %i"%(numAs, numBs))

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    #!/usr/bin/python
    # -*- coding: UTF-8 -*-

    # testDFrame.py
    from pyspark.sql import SparkSession
    from pyspark.sql import Row
    from pyspark.sql.types import *
    import sys
    reload(sys)
    sys.setdefaultencoding("utf-8")

    def schema_inference_example(spark):
    sc = spark.sparkContext

    # Load a text file and convert each line to a Row.
    lines = sc.textFile("/user/jiangmanhua/Experiment/url_domain_labeled")
    parts = lines.map(lambda l: l.split("\t"))
    label = parts.map(lambda p: Row(label_name=p[0], type=p[1]))

    # Infer the schema, and register the DataFrame as a table.
    schemaPeople = spark.createDataFrame(label)
    schemaPeople.createOrReplaceTempView("label")

    # SQL can be run over DataFrames that have been registered as a table.
    teenagers = spark.sql("SELECT type, count(label_name) as cc FROM label group by type")

    # The results of SQL queries are Dataframe objects.
    # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
    teenNames = teenagers.rdd.map(lambda p: "Name: " + p.type + "cnt: " + str(p.cc)).collect()
    for name in teenNames:
    print(name)



    if __name__ == "__main__":
    spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
    schema_inference_example(spark)

    spark.stop()




    pycharm访问spark

    1. 安装py4j

    主要两个步骤:安装py4j、配置路径

    1
    2
    3
    4
    # Windows
    pip install py4j
    # Linux
    sudo pip2 install py4j
    2. 配置环境变量
    1
    2
    HADOOP_HOME = C:\hadoop-2.6.5
    SPARK_HOME = C:\spark-2.0.2-bin-hadoop2.6

    也可以在pycharm中选择“Run” ->“Edit Configurations” ->“Environment variables” 增加SPARK_HOME目录以及HADOOP_HOME目录

    3. 关联Spark和Hadoop

    $PYTHON_HOME\lib\site-packages下新建pyspark.pth文件内容为pyspark的目录 (如D:\spark-2.0.2-bin-hadoop2.6\python)

    1
    2
    # Linux
    echo /home/manhua/app/spark/python > /home/manhua/.local/lib/python2.7/site-packages/pyspark.pth
    4. windows上的问题。

    下载winutils.exe放到C:\hadoop-2.6.5\bin

    set spark.sql.cli.print.header=true;

    DataFrame列太多 Too many columns

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    case class V1402(
    phone:String,
    infoSelected:Boolean
    // ...
    )
    object V1402 {
    def apply(phone:String, flags:Array[Boolean]): V1402 = {
    new V1402(phone,
    flags(0), flags(1), flags(2), flags(3), flags(4), flags(5), flags(6), flags(7), flags(8),
    flags(9), flags(10), flags(11), flags(12), flags(13), flags(14), flags(15), flags(16), flags(17),
    flags(18), flags(19), flags(20), flags(21), flags(22), flags(23), flags(24), flags(25),
    flags(26), flags(27), flags(28), flags(29), flags(30), flags(31), flags(32), flags(33),
    flags(34), flags(35), flags(36), flags(37), flags(38), flags(39), flags(40), flags(41)
    )
    }
    }

    MergeDataframe

    新老数据合并,右表有取右表,否则取左表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import spark.implicits._  
    df.joinWith(d, df("slot") === d("slot") &&
    df("servicetype") === d("servicetype") &&
    df("utype") === d("utype") &&
    df("userid") === d("userid") &&
    df("id") === d("id"), "fullouter")
    .map {
    case (left, right) => {
    Vcollection.apply(if (right != null) right else left)
    }
    }.filter(_.op_type != 2)
    .write.mode("overwrite")
    .orc(s"/tmp/drs_collectino")

    科学计数法字符串转数值

    select cast(cast(‘1.3169590272E11’ as float) as decimal(19,0))
    select cast(cast(personaldiskspace as float) as decimal(19,0))

    连接oracle

    https://mvnrepository.com/artifact/com.oracle/ojdbc7/12.1.0.2
    将jar包放到 SPARK_HOME/jars,启动beeline,!scan查看确认驱动加载成功

    beeline -u “jdbc:oracle:thin:@[ip]:[port]/srvName” -n xxx -p xxx -e “$sql”
    如果直接-u连接失败,先进去beeline,再执行!connect ‘jdbc:oracle:thin:@[ip]:[port]/srvName’交互输入用户名密码

    //监听sid服务,表示sid为orcl
    database.url=jdbc:oracle:thin:@171.xxx.96.xx:xxxx:orcl
    //监听服务名,表示服务名为orcl
    database.url=jdbc:oracle:thin:@171.xxx.96.xx:xxxx/orcl

    SELECT table_name, column_name, data_type
    FROM all_tab_cols
    WHERE table_name = ‘表名 ;
    注意:表名一定要大写。

    oracle Date数据列入hive

    1. 转字符 TO_CHAR(EFECTIVETIME,’yyyy-MM-dd HH24:mi:ss’)
    2. 直接插入 EFECTIVETIME seatunnel导入则只有日期部分
    3. 转秒 (EFECTIVETIME - to_date(‘1970-01-01 08:00:00’,’yyyy-mm-dd hh24:mi:ss’))86400
      问题:数值溢出;科学计数法显示,
      86400000/1000 正常
      WHERE ROWNUM < 5;
    4. 转时间戳 TO_TIMESTAMP(TO_CHAR(EFECTIVETIME,’yyyy-MM-dd HH24:mi:ss’),’yyyy-MM-dd HH24:mi:ss’) efectivetime

    Security

    UI filter

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    package spark;

    import org.apache.commons.codec.binary.Base64;

    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import java.util.StringTokenizer;
    import javax.servlet.*;
    import javax.servlet.http.*;

    /* spark-defaults.conf
    spark.ui.filters=spark.HcyFilter //filter类
    spark.spark.HcyFilter.param.username=foo //spark-webUI界面登陆的用户 spark.filter类.param.参数名
    spark.spark.HcyFilter.param.password=foo //spark-webUI界面登陆的密码 spark.filter类.param.参数名
    spark.acls.enable=true

    spark-sql --master yarn --jars basic-1.0-SNAPSHOT.jar --conf spark.ui.filters=spark.HcyFilter --conf spark.spark.HcyFilter.param.username=foo --conf spark.spark.HcyFilter.param.password=foo --conf spark.acls.enable=true
    */
    public class HcyFilter implements Filter {

    private String username = "";
    private String password = "";
    private String realm = "Protected";

    @Override
    public void init(FilterConfig filterConfig) throws ServletException {
    username = filterConfig.getInitParameter("username");
    password = filterConfig.getInitParameter("password");
    }

    private void unauthorized(HttpServletResponse response, String message) throws IOException {
    response.setHeader("WWW-Authenticate", "Basic realm=\"" + realm + "\"");
    response.sendError(401, message);
    }

    private void unauthorized(HttpServletResponse response) throws IOException {
    unauthorized(response, "Unauthorized - ui filter");
    }

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
    HttpServletRequest request = (HttpServletRequest) servletRequest;
    HttpServletResponse response = (HttpServletResponse) servletResponse;

    String authHeader = request.getHeader("Authorization");
    if (authHeader != null) {
    StringTokenizer st = new StringTokenizer(authHeader);
    if (st.hasMoreTokens()) {
    String basic = st.nextToken();
    if (basic.equalsIgnoreCase("Basic")) {
    try {
    String credentials = new String(Base64.decodeBase64(st.nextToken()), "UTF-8");

    int p = credentials.indexOf(":");
    if (p != -1) {
    String _username = credentials.substring(0, p).trim();
    String _password = credentials.substring(p + 1).trim();

    if (!username.equals(_username) || !password.equals(_password)) {
    unauthorized(response, "Bad credentials");
    }

    filterChain.doFilter(servletRequest, servletResponse);
    } else {
    unauthorized(response, "Invalid authentication token");
    }
    } catch (UnsupportedEncodingException e) {
    throw new Error("Couldn't retrieve authentication", e);
    }
    }
    }
    } else {
    unauthorized(response);
    }
    }

    @Override
    public void destroy() {

    }
    }

    ACL方式

    使用YARN时,默认的登陆用户是dr.who(可以在Hadoop的core-site.xml中使用hadoop.http.staticuser.user属性来指定登陆用户)

    限制访问
    [hadoop@wl1 ~]$ spark-shell –master yarn –conf spark.acls.enable=true

    允许查看
    [hadoop@wl1 ~]$ spark-shell –master yarn –conf spark.acls.enable=true –conf spark.ui.view.acls=dr.who

    允许kill任务
    [hadoop@wl1 ~]$ spark-shell –master yarn –conf spark.acls.enable=true –conf spark.ui.view.acls=dr.who –conf spark.modify.acls=dr.who

    管理员配置
    [hadoop@wl1 ~]$ spark-shell –master yarn –conf spark.acls.enable=true –conf spark.admin.acls=dr.who

    也可以按用户组配置,需要关注spark.user.groups.mapping

    thrift server 增加账密鉴权

    • 实现org.apache.hive.service.auth.PasswdAuthenticationProvider
    • 修改/新增 {spark_home}/conf/hive-thrift-site.xml
      • hive.server2.authentication=CUSTOM (不写到配置中避免影响计算任务)
      • hive.server2.custom.authentication.class=me.jmh.spark.MyAuth
      • hive.server2.custom.authentication.filepath=/data/soft/spark/conf/thriftAuth
    • 启动参数 –hiveconf hive.server2.authentication=CUSTOM
    • 编辑密码账号
      1
      2
      3
      4
      5
      6
      7
      8
      9
      <property>
      <name>hive.server2.custom.authentication.class</name>
      <value>me.jmh.spark.MyAuth</value>
      </property>

      <property>
      <name>hive.server2.custom.authentication.filepath</name>
      <value>/data/soft/spark/conf/thriftAuth</value>
      </property>

    Thrift Server

    限制返回结果大小

    • 配置增量返回 spark.sql.thriftServer.incrementalCollect,每批返回1000条
    • 客户端配置最大获取结果量,如zeppelin

    读取路径文件过程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    DataFrameReader
    - source=format
    - load: DataSource.lookupDataSourceV2(source通过with DataSourceRegister注册,返回ParquetDataSourceV2
    - get【Table】FromProvider, schema由文件infer
    - 生成计划
    Dataset.ofRows(
    sparkSession,
    DataSourceV2Relation.create(table, catalog-none, ident-none, dsOptions))

    【关键是获得table对象->转为DataSourceV2Relation】
    然后是逻辑计划到执行计划

    newScanBuilder

    format -> datasource -> table -> relation(LogicalPlan) -> newScanBuilder

    [analyzer]FindDataSourceTable#readDataSourceTable
    DataSource#resolveRelation

    FileSourceStrategy -> FileSourceScanExec (从hadoopFsRelation.fileFormat获得reader:buildReaderWithPartitionValues)-> FileScanRDD

    具体执行读操作的函数buildReaderWithPartitionValues

    分区过多时可以带条件查分区

    show partitions db.tbl PARTITION (year=2022)
    支持 and/or/>/</=

    本地搭建Spark Job History

    配置 spark.history.fs.logDirectory 为本地某目录,日志文件诸如application_1652535872918_10360.lz4放入即可
    启动服务 sbin/start-history-server.sh
    访问18080

    测试环境指定thrift地址

    1
    2
    3
    4
    5
    val spark = SparkSession.builder()
    .master("local[1]")
    .enableHiveSupport()
    .config("spark.hadoop.hive.metastore.uris", "thrift://192.168.56.112:9083")
    .getOrCreate()

    beeline

    使用命令beeline -u jdbc:hive2://localhost:10199访问thrift server时,身份为anonymous

    提取CSV数据

    1
    2
    3
    4
    create table tmp.mytable
    USING CSV as
    select /*+COALESCE(1)*/ * from dim.mcloud_behavior
    ;
    1
    2
    3
    4
    5
    -- source table
    create temporary view s using csv options (path 'file:/tmp/spark-kyuubi-source-test.csv',header "true");

    insert overwrite directory using csv options (path 'file:/tmp/spark-kyuubi-target', delimiter ',',header "false") select * from s;

    支持json格式

    从hive的lib中添加jar包,建表使用

    1
    2
    ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
    STORED AS TEXTFILE

    zip -u spark-jar.zip lib/hive-hcatalog-core-2.3.9.jar

    行转列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    // convert row to column
    spark.sql(
    s"""
    |select msisdn phone,
    | substr(msisdn,-1) part,
    | kv['0'] sdk_core,
    | kv['1'] wap,
    | kv['2'] web,
    | kv['3'] android,
    | kv['4'] ios,
    | kv['5'] pc,
    | kv['6'] guanjia,
    | kv['7'] wechat,
    | kv['8'] WP8,
    | kv['9'] other,
    | kv['10'] TV,
    | kv['11'] miniprogram,
    | kv['12'] smart_hardware,
    | kv['13'] harmony
    |from (
    | select msisdn,
    | str_to_map(concat_ws(',',collect_set(concat_ws(':',media_id,last_active_date)))) kv
    | from ${Table.ads_huadan_user_media_last_active_date_df}
    | where tp='$year$month$day'
    | group by msisdn
    |)t
    |""".stripMargin)

    并行runjob

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    var df = spark.read.parquet("s3://data/type=access/interval=1551484800").repartition(55)
    df.cache()

    val executorService = Executors.newFixedThreadPool(5)
    val latch = new CountDownLatch(10)
    for( _ <- Range(0, 5) ) {
    executorService.execute(new Runnable {
    override def run(): Unit = {
    val id = UUID.randomUUID().toString()
    df.coalesce(11).write.parquet(s"s3://data/test/${id}")
    latch.countDown()
    }
    })
    }
    executorService.shutdown() // 停止接收任务
    latch.await(1, TimeUnit.HOURS) // 等待任务计数归零结束

    Built-in Functions

    Spark SQL, Built-in Functions (apache.org)

    list/map 合并 UDAF

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    package com.cmic.mcloud.label.bigtable.v1

    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    import org.apache.spark.sql.{SparkSession, functions, Encoder, Encoders}
    import org.apache.spark.sql.expressions.Aggregator

    object Demo {
    def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
    .master("local[1]")
    .enableHiveSupport()
    .getOrCreate()
    import spark.implicits._

    val seq = List(("American Person", List("Tom", "Jim")),
    ("China Person", List("LiLei", "HanMeiMei")),
    ("China Person", List("Red", "Blue")))
    spark.sparkContext.parallelize(seq).toDF("n","l").createOrReplaceTempView("t")
    spark.udf.register("merge", functions.udaf(MyListMerge))
    spark.sql(
    s"""
    |select n, collect_list(l), merge(l)
    |from t
    |group by n
    |""".stripMargin).show(false)


    val seq2 = List(("American Person", Map("Tom"-> "Jim")),
    ("China Person", Map("LiLei"-> "HanMeiMei")),
    ("China Person", Map("Red"-> "Blue")))
    spark.sparkContext.parallelize(seq2).toDF("n", "l").createOrReplaceTempView("t2")
    spark.udf.register("merge2", functions.udaf(MyMapMerge))
    // spark.sql("create or replace temporary function merge2 as 'com.cmic.mcloud.label.bigtable.v1.udf.HiveMapMerge'")
    spark.sql(
    s"""
    |select n, merge2(l)
    |from t2
    |group by n
    |""".stripMargin).show(false)
    }
    }



    object MyListMerge extends Aggregator[List[String], List[String], List[String]] {
    // A zero value for this aggregation. Should satisfy the property that any b + zero = b
    def zero: List[String] = List.empty[String]
    // Combine two values to produce a new value. For performance, the function may modify `buffer`
    // and return it instead of constructing a new object
    def reduce(buffer: List[String], employee: List[String]): List[String] = {
    buffer.++(employee)
    }
    // Merge two intermediate values
    def merge(b1: List[String], b2: List[String]): List[String] = {
    b1.++(b2)
    }
    // Transform the output of the reduction
    def finish(reduction: List[String]): List[String] = reduction
    // Specifies the Encoder for the intermediate value type
    def bufferEncoder: Encoder[List[String]] = Encoders.product
    // Specifies the Encoder for the final output value type
    def outputEncoder: Encoder[List[String]] = Encoders.product
    }


    object MyMapMerge extends Aggregator[Map[String,String], Map[String,String], Map[String,String]] {
    // A zero value for this aggregation. Should satisfy the property that any b + zero = b
    def zero: Map[String,String] = Map.empty[String,String]
    // Combine two values to produce a new value. For performance, the function may modify `buffer`
    // and return it instead of constructing a new object
    def reduce(buffer: Map[String,String], employee: Map[String,String]): Map[String,String] = {
    buffer ++ employee
    }
    // Merge two intermediate values
    def merge(b1: Map[String,String], b2: Map[String,String]): Map[String,String] = {
    b1 ++ b2
    }
    // Transform the output of the reduction
    def finish(reduction: Map[String,String]): Map[String,String] = reduction
    // Specifies the Encoder for the intermediate value type
    def bufferEncoder: Encoder[Map[String,String]] = ExpressionEncoder()
    // Specifies the Encoder for the final output value type
    def outputEncoder: Encoder[Map[String,String]] = ExpressionEncoder()
    }


    // 要支持sql注册则使用此类
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    import org.apache.spark.sql.types.{DataType, DataTypes, MapType, StringType, StructField, StructType}

    import java.util.{ArrayList, List}

    class HiveMapMergeUDAF extends UserDefinedAggregateFunction {
    private val inputFields: List[StructField] = new ArrayList[StructField]
    inputFields.add(DataTypes.createStructField("labelmap", MapType(StringType, StringType), true))
    private val _inputDataType = DataTypes.createStructType(inputFields)

    override def inputSchema: StructType = _inputDataType

    override def bufferSchema: StructType = _inputDataType

    override def dataType: DataType = MapType(StringType, StringType)

    override def deterministic: Boolean = false

    override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0, Map.empty[String, String])

    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer.update(0, buffer.getMap(0) ++ input.getMap(0))
    }

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1.update(0, buffer1.getMap(0) ++ buffer2.getMap(0))
    }

    override def evaluate(buffer: Row): Any = buffer.getMap(0)

    }

    使用: 由多个小map合并成一个大map

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    def init(spark:SparkSession): Unit = {
    spark.sql("create or replace temporary function map_merge as 'com.cmic.mcloud.label.bigtable.common.HiveMapMergeUDAF'")
    }

    def map_merge(spark:SparkSession, tp:String) = {
    init(spark)
    spark.sql(
    s"""
    | select msisdn, map_merge(kv) kv
    | from bigtable.basetag_map
    | where tp='$tp'
    | group by msisdn
    |""".stripMargin)
    }

    def exportAll(spark:SparkSession, tp:String) = {
    val mapTypeDF = map_merge(spark, tp)
    // will query all rows
    val keys = mapTypeDF.select(explode(map_keys(col("kv")))).distinct().collect().map(_.get(0))
    val keyCols = keys.map(f => col("kv").getItem(f).as(f.toString))
    mapTypeDF.select(col("msisdn") +: keyCols: _*)
    }

    查询使用方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    select msisdn,tags,tags["storage_purchase_m"],
    map_contains_key(tags,'function_purchase_m'),
    map_filter(tags, (k, v) -> k in ('function_purchase_m','pay_flag'))
    from bigtable.tag_m
    where tp=202312 and taggroup='member_order'
    and (map_contains_key(tags,'function_purchase_m')
    or map_contains_key(tags,'pay_flag'))
    and tags['storage_purchase_m'] > 1
    limit 10;

    map_contains_key为新增函数,老版本使用tags['key'] is not null or array_contains(map_keys(map))

    注册udf函数

    1)临时函数
    在一次会话(Session)中使用如下语句创建临时函数:

    add jar hdfs://bicluster/dolphinscheduler/hadoop/resources/test/jmh/label-1.0.jar;
    create or replace temporary function collect_map as ‘com.cmic.mcloud.label.bigtable.v1.udf.HiveMapMerge’;

    select * from bigtable.basetag_map where msisdn =’19802021823’;
    select collect_map(kv) from bigtable.basetag_map where msisdn =’19802021823’;

    (2)永久函数
    这个特性需要高版本的Hive支持,它的好处是可以将UDF Jar存放至HDFS,函数仅需要创建一次即可以永久使用,如下:
    CREATE FUNCTION collect_map AS ‘com.cmic.mcloud.label.bigtable.v1.udf.MyMapMerge’ USING JAR ‘hdfs://bicluster/dolphinscheduler/hadoop/resources/test/jmh/label-1.0.jar’;

    run SQL on File

    1
    2
    3
    4
    select * from FORMAT.`PATH`

    其中含子目录情况下,path不支持 select * from orc.`/system/hdfs/*`
    但支持 select * from orc.`/system/hdfs/2*`