BIGDATA April 13, 2017

Spark

Words count 85k Reading time 1:17 Read count 0

Spark

简单之美 | RDD:基于内存的集群计算容错抽象

Spark on Yarn

Spark 官方提供了三种集群部署方案: Standalone, Mesos, YARN,区别就在于资源管理调度平台不同。

想在已有的Hadoop集群上使用Spark,实现Spark on Yarn只需修改配置文件vi ./conf/spark-env.sh添加以下内容

export HADOOP_HOME=/share/apps/hadoop

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

任务提交主要参数(用好可以充分利用资源)

参数名称 含义
–master MASTER_URL 可以是spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local
–deploy-mode DEPLOY_MODE Driver程序运行的地方,client或者cluster
–class CLASS_NAME 主类名称,含包名
–name NAME Application名称
–jars JARS Driver依赖的第三方jar包
–py-files PY_FILES 用逗号隔开的放置在Python应用程序PYTHONPATH上的.zip, .egg, .py文件列表
–files FILES 用逗号隔开的要放置在每个executor工作目录的文件列表
–executor-memory MEM executor内存大小,默认1G
–total-executor-cores NUM executor使用的总核数,仅限于Spark Alone、Spark on Mesos模式
–executor-cores NUM 每个executor使用的内核数,默认为1,仅限于Spark on Yarn模式
–queue QUEUE_NAME 提交应用程序给哪个YARN的队列,默认是default队列,仅限于Spark on Yarn模式
–num-executors NUM 启动的executor数量,默认是2个,仅限于Spark on Yarn模式

更多内容,见博客文章

测试代码

1
./bin/spark-submit --master yarn --name spark-test --class org.apache.spark.examples.SparkPi lib/spark-examples*.jar 10
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/python
# -*- coding: UTF-8 -*-

# test_spark.py
from pyspark import SparkContext

logFile = "C:\spark-2.0.2-bin-hadoop2.6\README.md"
sc = SparkContext("local","Simple App")
logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()

print("Lines with a: %i, lines with b: %i"%(numAs, numBs))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#!/usr/bin/python
# -*- coding: UTF-8 -*-

# testDFrame.py
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
import sys
reload(sys)
sys.setdefaultencoding("utf-8")

def schema_inference_example(spark):
sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("/user/jiangmanhua/Experiment/url_domain_labeled")
parts = lines.map(lambda l: l.split("\t"))
label = parts.map(lambda p: Row(label_name=p[0], type=p[1]))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(label)
schemaPeople.createOrReplaceTempView("label")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT type, count(label_name) as cc FROM label group by type")

# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.type + "cnt: " + str(p.cc)).collect()
for name in teenNames:
print(name)



if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
schema_inference_example(spark)

spark.stop()




pycharm访问spark

1. 安装py4j

主要两个步骤:安装py4j、配置路径

1
2
3
4
# Windows
pip install py4j
# Linux
sudo pip2 install py4j
2. 配置环境变量
1
2
HADOOP_HOME = C:\hadoop-2.6.5
SPARK_HOME = C:\spark-2.0.2-bin-hadoop2.6

也可以在pycharm中选择“Run” ->“Edit Configurations” ->“Environment variables” 增加SPARK_HOME目录以及HADOOP_HOME目录

3. 关联Spark和Hadoop

$PYTHON_HOME\lib\site-packages下新建pyspark.pth文件内容为pyspark的目录 (如D:\spark-2.0.2-bin-hadoop2.6\python)

1
2
# Linux
echo /home/manhua/app/spark/python > /home/manhua/.local/lib/python2.7/site-packages/pyspark.pth
4. windows上的问题。

下载winutils.exe放到C:\hadoop-2.6.5\bin

set spark.sql.cli.print.header=true;

DataFrame列太多 Too many columns

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
case class V1402(
phone:String,
infoSelected:Boolean
// ...
)
object V1402 {
def apply(phone:String, flags:Array[Boolean]): V1402 = {
new V1402(phone,
flags(0), flags(1), flags(2), flags(3), flags(4), flags(5), flags(6), flags(7), flags(8),
flags(9), flags(10), flags(11), flags(12), flags(13), flags(14), flags(15), flags(16), flags(17),
flags(18), flags(19), flags(20), flags(21), flags(22), flags(23), flags(24), flags(25),
flags(26), flags(27), flags(28), flags(29), flags(30), flags(31), flags(32), flags(33),
flags(34), flags(35), flags(36), flags(37), flags(38), flags(39), flags(40), flags(41)
)
}
}

MergeDataframe

新老数据合并,右表有取右表,否则取左表

1
2
3
4
5
6
7
8
9
10
11
12
13
import spark.implicits._  
df.joinWith(d, df("slot") === d("slot") &&
df("servicetype") === d("servicetype") &&
df("utype") === d("utype") &&
df("userid") === d("userid") &&
df("id") === d("id"), "fullouter")
.map {
case (left, right) => {
Vcollection.apply(if (right != null) right else left)
}
}.filter(_.op_type != 2)
.write.mode("overwrite")
.orc(s"/tmp/drs_collectino")

合并修改数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val inc_result = spark.sql(
s"""
|select rq_channel, 1 from dim.mcloud_channel
|limit 3
|""".stripMargin)
inc_result.cache()

val result = inc_result

inc_result.joinWith(result, inc_result("rq_channel") === result("rq_channel"), "fullouter")
.map{case (left, right) => {
val newValues = Seq(
left.getAs[String]("rq_channel"),
left.getAs[Int]("1") + right.getAs[Int]("1")
)
Row.fromSeq(newValues)
}}(result.encoder).show

科学计数法字符串转数值

select cast(cast(‘1.3169590272E11’ as float) as decimal(19,0))
select cast(cast(personaldiskspace as float) as decimal(19,0))

Security

UI filter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package spark;

import org.apache.commons.codec.binary.Base64;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.StringTokenizer;
import javax.servlet.*;
import javax.servlet.http.*;

/* spark-defaults.conf
spark.ui.filters=spark.HcyFilter //filter类
spark.spark.HcyFilter.param.username=foo //spark-webUI界面登陆的用户 spark.filter类.param.参数名
spark.spark.HcyFilter.param.password=foo //spark-webUI界面登陆的密码 spark.filter类.param.参数名
spark.acls.enable=true

spark-sql --master yarn --jars basic-1.0-SNAPSHOT.jar --conf spark.ui.filters=spark.HcyFilter --conf spark.spark.HcyFilter.param.username=foo --conf spark.spark.HcyFilter.param.password=foo --conf spark.acls.enable=true
*/
public class HcyFilter implements Filter {

private String username = "";
private String password = "";
private String realm = "Protected";

@Override
public void init(FilterConfig filterConfig) throws ServletException {
username = filterConfig.getInitParameter("username");
password = filterConfig.getInitParameter("password");
}

private void unauthorized(HttpServletResponse response, String message) throws IOException {
response.setHeader("WWW-Authenticate", "Basic realm=\"" + realm + "\"");
response.sendError(401, message);
}

private void unauthorized(HttpServletResponse response) throws IOException {
unauthorized(response, "Unauthorized - ui filter");
}

@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) servletRequest;
HttpServletResponse response = (HttpServletResponse) servletResponse;

String authHeader = request.getHeader("Authorization");
if (authHeader != null) {
StringTokenizer st = new StringTokenizer(authHeader);
if (st.hasMoreTokens()) {
String basic = st.nextToken();
if (basic.equalsIgnoreCase("Basic")) {
try {
String credentials = new String(Base64.decodeBase64(st.nextToken()), "UTF-8");

int p = credentials.indexOf(":");
if (p != -1) {
String _username = credentials.substring(0, p).trim();
String _password = credentials.substring(p + 1).trim();

if (!username.equals(_username) || !password.equals(_password)) {
unauthorized(response, "Bad credentials");
}

filterChain.doFilter(servletRequest, servletResponse);
} else {
unauthorized(response, "Invalid authentication token");
}
} catch (UnsupportedEncodingException e) {
throw new Error("Couldn't retrieve authentication", e);
}
}
}
} else {
unauthorized(response);
}
}

@Override
public void destroy() {

}
}

ACL方式

使用YARN时,默认的登陆用户是dr.who(可以在Hadoop的core-site.xml中使用hadoop.http.staticuser.user属性来指定登陆用户)

限制访问
[hadoop@wl1 ~]$ spark-shell –master yarn –conf spark.acls.enable=true

允许查看
[hadoop@wl1 ~]$ spark-shell –master yarn –conf spark.acls.enable=true –conf spark.ui.view.acls=dr.who

允许kill任务
[hadoop@wl1 ~]$ spark-shell –master yarn –conf spark.acls.enable=true –conf spark.ui.view.acls=dr.who –conf spark.modify.acls=dr.who

管理员配置
[hadoop@wl1 ~]$ spark-shell –master yarn –conf spark.acls.enable=true –conf spark.admin.acls=dr.who

也可以按用户组配置,需要关注spark.user.groups.mapping

thrift server 增加账密鉴权

  • 实现org.apache.hive.service.auth.PasswdAuthenticationProvider
  • 修改/新增 {spark_home}/conf/hive-thrift-site.xml
    • hive.server2.authentication=CUSTOM (不写到配置中避免影响计算任务)
    • hive.server2.custom.authentication.class=me.jmh.spark.MyAuth
    • hive.server2.custom.authentication.filepath=/data/soft/spark/conf/thriftAuth
  • 启动参数 –hiveconf hive.server2.authentication=CUSTOM
  • 编辑密码账号
    1
    2
    3
    4
    5
    6
    7
    8
    9
    <property>
    <name>hive.server2.custom.authentication.class</name>
    <value>me.jmh.spark.MyAuth</value>
    </property>

    <property>
    <name>hive.server2.custom.authentication.filepath</name>
    <value>/data/soft/spark/conf/thriftAuth</value>
    </property>

Thrift Server

限制返回结果大小

  • 配置增量返回 spark.sql.thriftServer.incrementalCollect,每批返回1000条
  • 客户端配置最大获取结果量,如zeppelin

监听执行

spark.sql.queryExecutionListeners

读取路径文件过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
DataFrameReader
- source=format
- load: DataSource.lookupDataSourceV2(source通过with DataSourceRegister注册,返回ParquetDataSourceV2
- get【Table】FromProvider, schema由文件infer
- 生成计划
Dataset.ofRows(
sparkSession,
DataSourceV2Relation.create(table, catalog-none, ident-none, dsOptions))

【关键是获得table对象->转为DataSourceV2Relation】
然后是逻辑计划到执行计划

newScanBuilder

format -> datasource -> table -> relation(LogicalPlan) -> newScanBuilder

[analyzer]FindDataSourceTable#readDataSourceTable
DataSource#resolveRelation

FileSourceStrategy -> FileSourceScanExec (从hadoopFsRelation.fileFormat获得reader:buildReaderWithPartitionValues)-> FileScanRDD

具体执行读操作的函数buildReaderWithPartitionValues

分区过多时可以带条件查分区

show partitions db.tbl PARTITION (year=2022)
支持 and/or/>/</=

本地搭建Spark Job History

配置 spark.history.fs.logDirectory 为本地某目录,日志文件诸如application_1652535872918_10360.lz4放入即可
启动服务 sbin/start-history-server.sh
访问18080

测试环境指定thrift地址

1
2
3
4
5
val spark = SparkSession.builder()
.master("local[1]")
.enableHiveSupport()
.config("spark.hadoop.hive.metastore.uris", "thrift://192.168.56.112:9083")
.getOrCreate()

beeline

使用命令beeline -u jdbc:hive2://localhost:10199访问thrift server时,身份为anonymous

连接oracle

https://mvnrepository.com/artifact/com.oracle/ojdbc7/12.1.0.2
将jar包放到 SPARK_HOME/jars,启动beeline,!scan查看确认驱动加载成功

!addlocaldriverjar datax/plugin/reader/oraclereader/libs/ojdbc6-11.2.0.3.jar
!scan
!connect

beeline -u “jdbc:oracle:thin:@[ip]:[port]/srvName” -n xxx -p xxx -e “$sql”
如果直接-u连接失败,先进去beeline,再执行!connect ‘jdbc:oracle:thin:@[ip]:[port]/srvName’交互输入用户名密码

//监听sid服务,表示sid为orcl
database.url=jdbc:oracle:thin:@171.xxx.96.xx:xxxx:orcl
//监听服务名,表示服务名为orcl
database.url=jdbc:oracle:thin:@171.xxx.96.xx:xxxx/orcl

SELECT table_name, column_name, data_type
FROM all_tab_cols
WHERE table_name = ‘表名 ;
注意:表名一定要大写。

oracle Date数据列入hive

  1. 转字符 TO_CHAR(EFECTIVETIME,’yyyy-MM-dd HH24:mi:ss’)
  2. 直接插入 EFECTIVETIME seatunnel导入则只有日期部分
  3. 转秒 (EFECTIVETIME - to_date(‘1970-01-01 08:00:00’,’yyyy-mm-dd hh24:mi:ss’))86400
    问题:数值溢出;科学计数法显示,
    86400000/1000 正常
    WHERE ROWNUM < 5;
  4. 转时间戳 TO_TIMESTAMP(TO_CHAR(EFECTIVETIME,’yyyy-MM-dd HH24:mi:ss’),’yyyy-MM-dd HH24:mi:ss’) efectivetime

oceanbase - oracle模式 查看当前用户的所有表
SELECT owner, table_name FROM all_tables;

limit 3 改 FETCH FIRST 3 ROWS ONLY

提取CSV数据

1
2
3
4
create table tmp.mytable
USING CSV as
select /*+COALESCE(1)*/ * from dim.mcloud_behavior
;
1
2
3
4
5
-- source table
create temporary view s using csv options (path 'file:/tmp/spark-kyuubi-source-test.csv',header "true");

insert overwrite directory using csv options (path 'file:/tmp/spark-kyuubi-target', delimiter '|',header "false", emptyValue '') select * from s;

支持json格式

从hive的lib中添加jar包,建表使用

1
2
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE

zip -u spark-jar.zip lib/hive-hcatalog-core-2.3.9.jar

行转列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// convert row to column
spark.sql(
s"""
|select msisdn phone,
| substr(msisdn,-1) part,
| kv['0'] sdk_core,
| kv['1'] wap,
| kv['2'] web,
| kv['3'] android,
| kv['4'] ios,
| kv['5'] pc,
| kv['6'] guanjia,
| kv['7'] wechat,
| kv['8'] WP8,
| kv['9'] other,
| kv['10'] TV,
| kv['11'] miniprogram,
| kv['12'] smart_hardware,
| kv['13'] harmony
|from (
| select msisdn,
| str_to_map(concat_ws(',',collect_set(concat_ws(':',media_id,last_active_date)))) kv
| from ${Table.ads_huadan_user_media_last_active_date_df}
| where tp='$year$month$day'
| group by msisdn
|)t
|""".stripMargin)

并行runjob

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var df = spark.read.parquet("s3://data/type=access/interval=1551484800").repartition(55)
df.cache()

val executorService = Executors.newFixedThreadPool(5)
val latch = new CountDownLatch(10)
for( _ <- Range(0, 5) ) {
executorService.execute(new Runnable {
override def run(): Unit = {
val id = UUID.randomUUID().toString()
df.coalesce(11).write.parquet(s"s3://data/test/${id}")
latch.countDown()
}
})
}
executorService.shutdown() // 停止接收任务
latch.await(1, TimeUnit.HOURS) // 等待任务计数归零结束

Built-in Functions

Spark SQL, Built-in Functions (apache.org)

list/map 合并 UDAF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package com.cmic.mcloud.label.bigtable.v1

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{SparkSession, functions, Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator

object Demo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.master("local[1]")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._

val seq = List(("American Person", List("Tom", "Jim")),
("China Person", List("LiLei", "HanMeiMei")),
("China Person", List("Red", "Blue")))
spark.sparkContext.parallelize(seq).toDF("n","l").createOrReplaceTempView("t")
spark.udf.register("merge", functions.udaf(MyListMerge))
spark.sql(
s"""
|select n, collect_list(l), merge(l)
|from t
|group by n
|""".stripMargin).show(false)


val seq2 = List(("American Person", Map("Tom"-> "Jim")),
("China Person", Map("LiLei"-> "HanMeiMei")),
("China Person", Map("Red"-> "Blue")))
spark.sparkContext.parallelize(seq2).toDF("n", "l").createOrReplaceTempView("t2")
spark.udf.register("merge2", functions.udaf(MyMapMerge))
// spark.sql("create or replace temporary function merge2 as 'com.cmic.mcloud.label.bigtable.v1.udf.HiveMapMerge'")
spark.sql(
s"""
|select n, merge2(l)
|from t2
|group by n
|""".stripMargin).show(false)
}
}



object MyListMerge extends Aggregator[List[String], List[String], List[String]] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: List[String] = List.empty[String]
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: List[String], employee: List[String]): List[String] = {
buffer.++(employee)
}
// Merge two intermediate values
def merge(b1: List[String], b2: List[String]): List[String] = {
b1.++(b2)
}
// Transform the output of the reduction
def finish(reduction: List[String]): List[String] = reduction
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[List[String]] = Encoders.product
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[List[String]] = Encoders.product
}


object MyMapMerge extends Aggregator[Map[String,String], Map[String,String], Map[String,String]] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: Map[String,String] = Map.empty[String,String]
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Map[String,String], employee: Map[String,String]): Map[String,String] = {
buffer ++ employee
}
// Merge two intermediate values
def merge(b1: Map[String,String], b2: Map[String,String]): Map[String,String] = {
b1 ++ b2
}
// Transform the output of the reduction
def finish(reduction: Map[String,String]): Map[String,String] = reduction
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[Map[String,String]] = ExpressionEncoder()
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Map[String,String]] = ExpressionEncoder()
}


// 要支持sql注册则使用此类
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DataTypes, MapType, StringType, StructField, StructType}

import java.util.{ArrayList, List}

class HiveMapMergeUDAF extends UserDefinedAggregateFunction {
private val inputFields: List[StructField] = new ArrayList[StructField]
inputFields.add(DataTypes.createStructField("labelmap", MapType(StringType, StringType), true))
private val _inputDataType = DataTypes.createStructType(inputFields)

override def inputSchema: StructType = _inputDataType

override def bufferSchema: StructType = _inputDataType

override def dataType: DataType = MapType(StringType, StringType)

override def deterministic: Boolean = false

override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0, Map.empty[String, String])

override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer.update(0, buffer.getMap(0) ++ input.getMap(0))
}

override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1.update(0, buffer1.getMap(0) ++ buffer2.getMap(0))
}

override def evaluate(buffer: Row): Any = buffer.getMap(0)

}

使用: 由多个小map合并成一个大map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def init(spark:SparkSession): Unit = {
spark.sql("create or replace temporary function map_merge as 'com.cmic.mcloud.label.bigtable.common.HiveMapMergeUDAF'")
}

def map_merge(spark:SparkSession, tp:String) = {
init(spark)
spark.sql(
s"""
| select msisdn, map_merge(kv) kv
| from bigtable.basetag_map
| where tp='$tp'
| group by msisdn
|""".stripMargin)
}

def exportAll(spark:SparkSession, tp:String) = {
val mapTypeDF = map_merge(spark, tp)
// will query all rows
val keys = mapTypeDF.select(explode(map_keys(col("kv")))).distinct().collect().map(_.get(0))
val keyCols = keys.map(f => col("kv").getItem(f).as(f.toString))
mapTypeDF.select(col("msisdn") +: keyCols: _*)
}

查询使用方式

1
2
3
4
5
6
7
8
9
select msisdn,tags,tags["storage_purchase_m"],
map_contains_key(tags,'function_purchase_m'),
map_filter(tags, (k, v) -> k in ('function_purchase_m','pay_flag'))
from bigtable.tag_m
where tp=202312 and taggroup='member_order'
and (map_contains_key(tags,'function_purchase_m')
or map_contains_key(tags,'pay_flag'))
and tags['storage_purchase_m'] > 1
limit 10;

map_contains_key为新增函数,老版本使用tags['key'] is not null or array_contains(map_keys(map))

注册udf函数

1)临时函数
在一次会话(Session)中使用如下语句创建临时函数:

add jar hdfs://bicluster/dolphinscheduler/hadoop/resources/test/jmh/label-1.0.jar;
create or replace temporary function collect_map as ‘com.cmic.mcloud.label.bigtable.v1.udf.HiveMapMerge’;

select * from bigtable.basetag_map where msisdn =’19802021823’;
select collect_map(kv) from bigtable.basetag_map where msisdn =’19802021823’;

(2)永久函数
这个特性需要高版本的Hive支持,它的好处是可以将UDF Jar存放至HDFS,函数仅需要创建一次即可以永久使用,如下:
CREATE FUNCTION collect_map AS ‘com.cmic.mcloud.label.bigtable.v1.udf.MyMapMerge’ USING JAR ‘hdfs://bicluster/dolphinscheduler/hadoop/resources/test/jmh/label-1.0.jar’;

run SQL on File

1
2
3
4
select * from FORMAT.`PATH`

其中含子目录情况下,path不支持 select * from orc.`/system/hdfs/*`
但支持 select * from orc.`/system/hdfs/2*`

调试临时数据

1
2
3
4
SELECT a, count(*) as cnt 
FROM
VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b)
group by 1

临时表

1
2
3
4
5
create table tmp.flat_gd_202410(
msisdn string,
opr_type string,
opr_time string)
using csv options(delimiter '|', header false)

避免集群外机器提交任务java.net.UnknownHostException

增加配置
SPARK_LOCAL_HOSTNAME
或 spark.driver.host

代码实现下载hdfs文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import java.io.{BufferedOutputStream, FileOutputStream}
import java.net.URI

def get(hdfsFilePath:String, localFilePath:String): Unit = {
val conf = new Configuration()
val hdfsUri = "hdfs://ns1"

val fs = FileSystem.get(new URI(hdfsUri), conf)

if (fs.exists(new Path(hdfsFilePath))) {
val inputStream = fs.open(new Path(hdfsFilePath))
val outputStream = new BufferedOutputStream(new FileOutputStream(localFilePath))

val buffer = new Array[Byte](1024)
var bytesRead = inputStream.read(buffer)
while (bytesRead != -1) {
outputStream.write(buffer, 0, bytesRead)
bytesRead = inputStream.read(buffer)
}

inputStream.close()
outputStream.close()
} else {
println(s"File $hdfsFilePath does not exist in HDFS.")
}

fs.close()
}

//get("/user/ydy_bi_user48/spark/jdbc/hive-jdbc-2.3.10-standalone.jar", "/data01/zeppelin/ydy_bi_user48/local-repo/tmp/hive-jdbc-2.3.10-standalone.jar")

get("/user/ydy_bi_user48/spark/jdbc/hive-jdbc-uber-2.6.5.0-292.jar", "/data01/zeppelin/ydy_bi_user48/local-repo/tmp/hive-jdbc-uber-2.6.5.0-292.jar")
0%