# Load a text file and convert each line to a Row. lines = sc.textFile("/user/jiangmanhua/Experiment/url_domain_labeled") parts = lines.map(lambda l: l.split("\t")) label = parts.map(lambda p: Row(label_name=p[0], type=p[1]))
# Infer the schema, and register the DataFrame as a table. schemaPeople = spark.createDataFrame(label) schemaPeople.createOrReplaceTempView("label")
# SQL can be run over DataFrames that have been registered as a table. teenagers = spark.sql("SELECT type, count(label_name) as cc FROM label group by type")
# The results of SQL queries are Dataframe objects. # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`. teenNames = teenagers.rdd.map(lambda p: "Name: " + p.type + "cnt: " + str(p.cc)).collect() for name in teenNames: print(name)
objectDemo{ defmain(args: Array[String]): Unit = { val spark = SparkSession.builder .master("local[1]") .enableHiveSupport() .getOrCreate() import spark.implicits._
val seq = List(("American Person", List("Tom", "Jim")), ("China Person", List("LiLei", "HanMeiMei")), ("China Person", List("Red", "Blue"))) spark.sparkContext.parallelize(seq).toDF("n","l").createOrReplaceTempView("t") spark.udf.register("merge", functions.udaf(MyListMerge)) spark.sql( s""" |select n, collect_list(l), merge(l) |from t |group by n |""".stripMargin).show(false)
val seq2 = List(("American Person", Map("Tom"-> "Jim")), ("China Person", Map("LiLei"-> "HanMeiMei")), ("China Person", Map("Red"-> "Blue"))) spark.sparkContext.parallelize(seq2).toDF("n", "l").createOrReplaceTempView("t2") spark.udf.register("merge2", functions.udaf(MyMapMerge)) // spark.sql("create or replace temporary function merge2 as 'com.cmic.mcloud.label.bigtable.v1.udf.HiveMapMerge'") spark.sql( s""" |select n, merge2(l) |from t2 |group by n |""".stripMargin).show(false) } }
objectMyListMergeextendsAggregator[List[String], List[String], List[String]] { // A zero value for this aggregation. Should satisfy the property that any b + zero = b defzero: List[String] = List.empty[String] // Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object defreduce(buffer: List[String], employee: List[String]): List[String] = { buffer.++(employee) } // Merge two intermediate values defmerge(b1: List[String], b2: List[String]): List[String] = { b1.++(b2) } // Transform the output of the reduction deffinish(reduction: List[String]): List[String] = reduction // Specifies the Encoder for the intermediate value type defbufferEncoder: Encoder[List[String]] = Encoders.product // Specifies the Encoder for the final output value type defoutputEncoder: Encoder[List[String]] = Encoders.product }
objectMyMapMergeextendsAggregator[Map[String,String], Map[String,String], Map[String,String]] { // A zero value for this aggregation. Should satisfy the property that any b + zero = b defzero: Map[String,String] = Map.empty[String,String] // Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object defreduce(buffer: Map[String,String], employee: Map[String,String]): Map[String,String] = { buffer ++ employee } // Merge two intermediate values defmerge(b1: Map[String,String], b2: Map[String,String]): Map[String,String] = { b1 ++ b2 } // Transform the output of the reduction deffinish(reduction: Map[String,String]): Map[String,String] = reduction // Specifies the Encoder for the intermediate value type defbufferEncoder: Encoder[Map[String,String]] = ExpressionEncoder() // Specifies the Encoder for the final output value type defoutputEncoder: Encoder[Map[String,String]] = ExpressionEncoder() }
definit(spark:SparkSession): Unit = { spark.sql("create or replace temporary function map_merge as 'com.cmic.mcloud.label.bigtable.common.HiveMapMergeUDAF'") }
defmap_merge(spark:SparkSession, tp:String) = { init(spark) spark.sql( s""" | select msisdn, map_merge(kv) kv | from bigtable.basetag_map | where tp='$tp' | group by msisdn |""".stripMargin) }
defexportAll(spark:SparkSession, tp:String) = { val mapTypeDF = map_merge(spark, tp) // will query all rows val keys = mapTypeDF.select(explode(map_keys(col("kv")))).distinct().collect().map(_.get(0)) val keyCols = keys.map(f => col("kv").getItem(f).as(f.toString)) mapTypeDF.select(col("msisdn") +: keyCols: _*) }
查询使用方式
1 2 3 4 5 6 7 8 9
select msisdn,tags,tags["storage_purchase_m"], map_contains_key(tags,'function_purchase_m'), map_filter(tags, (k, v) -> k in ('function_purchase_m','pay_flag')) from bigtable.tag_m where tp=202312 and taggroup='member_order' and (map_contains_key(tags,'function_purchase_m') or map_contains_key(tags,'pay_flag')) and tags['storage_purchase_m'] > 1 limit 10;
map_contains_key为新增函数,老版本使用tags['key'] is not null or array_contains(map_keys(map))
注册udf函数
1)临时函数 在一次会话(Session)中使用如下语句创建临时函数:
add jar hdfs://bicluster/dolphinscheduler/hadoop/resources/test/jmh/label-1.0.jar; create or replace temporary function collect_map as ‘com.cmic.mcloud.label.bigtable.v1.udf.HiveMapMerge’;
select * from bigtable.basetag_map where msisdn =’19802021823’; select collect_map(kv) from bigtable.basetag_map where msisdn =’19802021823’;
(2)永久函数 这个特性需要高版本的Hive支持,它的好处是可以将UDF Jar存放至HDFS,函数仅需要创建一次即可以永久使用,如下: CREATE FUNCTION collect_map AS ‘com.cmic.mcloud.label.bigtable.v1.udf.MyMapMerge’ USING JAR ‘hdfs://bicluster/dolphinscheduler/hadoop/resources/test/jmh/label-1.0.jar’;
run SQL on File
1 2 3 4
select * from FORMAT.`PATH`
其中含子目录情况下,path不支持 select * from orc.`/system/hdfs/*` 但支持 select * from orc.`/system/hdfs/2*`
调试临时数据
1 2 3 4
SELECT a, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) group by 1
defget(hdfsFilePath:String, localFilePath:String): Unit = { val conf = newConfiguration() val hdfsUri = "hdfs://ns1"
val fs = FileSystem.get(newURI(hdfsUri), conf)
if (fs.exists(newPath(hdfsFilePath))) { val inputStream = fs.open(newPath(hdfsFilePath)) val outputStream = newBufferedOutputStream(newFileOutputStream(localFilePath))
val buffer = newArray[Byte](1024) var bytesRead = inputStream.read(buffer) while (bytesRead != -1) { outputStream.write(buffer, 0, bytesRead) bytesRead = inputStream.read(buffer) }
inputStream.close() outputStream.close() } else { println(s"File $hdfsFilePath does not exist in HDFS.") }