1.开窗函数
1. row_number() over()
- 示例: row_number() OVER (PARTITION BY category ORDER BY revenue DESC) rank
作用: 给每个分组的数据,按照排序顺序,打上分组内的行号(分组topN)
2. sum、max、min、count、avg等聚合函数
示例:sum(pv) over(partition by cookieid order by createtime) as pv1
作用: 给每个分组按顺序排序后, 前后两两累加
window子句:
- rows between … and …(开始到结束,位置不能交换)
- unbounded preceding :从第一行开始
- current row:到当前行
- 首行:unbounded preceding
- 末行:unbounded following
- 前 n 行:n preceding
- 后 n 行:n following
rows & range
- 是逻辑窗口,是指定当前行对应值的范围取值,列数不固定,只要行值在范围内,对应列都包含在内
- 是物理窗口,根据order by子句排序后,取前n行的数据以及后n行的数据进行计算(与当前行的值无关,至于排序由的行号有关)
3. 几种排序函数
- row_number() :顺序排,忽略并列排名
- dense_rank() :有并列,后面的元素接着排名
- rank() :有并列,后面的元素跳着排名
- ntile(n) :用于将分组数据按照顺序切分成n片
4. lag & lead
- lag(field,n):取前 n 行的值
- lead(field n):取后 n 行的值
5. first_value & last_value
- first_value(field) :取分组内排序后,截止到当前行的第一个值
- last_value(field) :取分组内排序后,截止到当前行的最后一个值
6. cube & rollup
- cube:根据group by维度的所有组合进行聚合
- rollup:是cube的自己,以左侧的维度为主,进行层级聚合
DSL
import org.apache.spark.sql.expressions.Window
import ssc.implicits._
import org.apache.spark.sql.functions._
val w1 = Window.partitionBy(“cookieid”).orderBy(“createtime”)
val w2 = Window.partitionBy(“cookieid”).orderBy(“pv”)
//聚合函数
df.select($ “cookieid”, $ “pv”, sum(“pv”).over(w1).alias(“pv1”)).show()
//排名
df.select($ “cookieid”, $ “pv”, rank().over(w2).alias(“rank”)).show()
df.select($ “cookieid”, $ “pv”, dense_rank().over(w2).alias(“dense_rank”)).show()
df.select($ “cookieid”, $ “pv”, row_number().over(w2).alias(“row_number”)).show()
//lag、lead
df.select($ “cookieid”, $ “pv”, lag(“pv”, 2).over(w2).alias(“row_number”)).show()
df.select($ “cookieid”, $ “pv”, lag(“pv”, -2).over(w2).alias(“row_number”)).show()
//cube、rollup
df.cube(“cookieid”, “createtime”).agg(sum(“pv”)).show()
df.rollup(“cookieid”, “createtime”).agg(sum(“pv”)).show()
round(): 函数用于把数值字段舍入为指定的小数位数。
percent_rank(): 用于按排名的百分比统计
2.集合函数:
array_contains(column,value): 检查array类型字段是否包含指定元素
explode(): 展开array或map为多行
from_json(): 解析JSON字符串为StructType or ArrayType,有多种参数形式,详见文档。
to_json(): 转为json字符串,支持StructType, ArrayType of StructTypes, a MapType or ArrayType of MapTypes。
get_json_object(column,path)
获取指定json路径的json对象字符串。
select get_json_object(’{"a"1,“b”:2}’,’$.a’);
json_tuple(column,fields)
获取json中指定字段值。select json_tuple(’{“a”:1,“b”:2}’,‘a’,‘b’);
map_keys(): 返回map的键组成的array
map_values(): 返回map的值组成的array
size(): array or map的长度
sort_array(e: Column, asc: Boolean): 将array中元素排序(自然排序),默认asc。
3.字符串函数
base64(e: Column): base64转码
unbase64(e: Column): base64解码
concat(exprs: Columns): 连接多列字符串
concat_ws(sep: String, exprs: Columns): 使用sep作为分隔符连接多列字符串
decode(value: Column, charset: String): 解码
encode(value: Column, charset: String): 转码,charset支持 ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’。
lower(e: Column): 转小写
upper(e: Column): 转大写
split(str: Column, pattern: String): 用pattern分割str
substring(str: Column, pos: Int, len: Int): 在str上截取从pos位置开始长度为len的子字符串。
repeat(str: Column, n: Int):将str重复n次返回
reverse(str: Column): 将str反转
length(e: Column): 字符串长度
trim(e: Column):剪掉左右两边的空格、空白字符
4.混杂函数
crc32(e: Column):计算CRC32,返回bigint
hash(cols: Column*):计算 hash code,返回int
md5(e: Column):计算MD5摘要,返回32位,16进制字符串
sha1(e: Column):计算SHA-1摘要,返回40位,16进制字符串
sha2(e: Column, numBits: Int):计算SHA-1摘要,返回numBits位,16进制字符串。numBits支持224, 256, 384, or 512.
5.聚合函数
avg: 平均值
collect_list: 聚合指定字段的值到list
collect_set: 聚合指定字段的值到set
max: 最大值
min: 最小值
sum: 求和
sumDistinct: 非重复值求和 SQL中用法 :select sum(distinct class)
count: 计数
countDistinct: 去重计数 SQL中用法:select count(distinct class)
其他函数
coalesce()函数: 找出给定的几个列中的第一个非空值
when()函数: if…else…
UDAF
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.*;import java.util.ArrayList;
import java.util.List;public class UDAFCountStrNum extends UserDefinedAggregateFunction {/*** 聚合函数的输入数据结构* @return*/public StructType inputSchema() {DataType strType = DataTypes.StringType;List<StructField> fields = new ArrayList<StructField>();fields.add(DataTypes.createStructField("name",strType,true));return DataTypes.createStructType(fields);}/*** 缓存区数据结构* @return*/public StructType bufferSchema() {DataType intType = DataTypes.IntegerType;List<StructField> fields = new ArrayList<StructField>();fields.add(DataTypes.createStructField("count",intType,true));return DataTypes.createStructType(fields);}/*** 返回值数据结构* @return*/public DataType dataType() {return DataTypes.IntegerType;}/*** 是否是幂等的* @return*/public boolean deterministic() {return true;}/*** 初始化缓冲区* @param buffer*/public void initialize(MutableAggregationBuffer buffer) {buffer.update(0,0);}/*** 给聚合函数传入一条新的数据进行处理* @param buffer* @param input*/public void update(MutableAggregationBuffer buffer, Row input) {buffer.update(0,buffer.getInt(0) + input.getString(0).length());}/*** 合并聚合缓冲区* @param buffer1* @param buffer2*/public void merge(MutableAggregationBuffer buffer1, Row buffer2) {buffer1.update(0,buffer1.getInt(0) + buffer2.getInt(0));}/*** 计算最终结果* @param buffer* @return*/public Object evaluate(Row buffer) {return buffer.getInt(0);}
}
1.编写自定义的UDAF类, 继承UserDefinedAggregateFunction类, 并重写其8个方法
2.session.udf().register(“countStrNum”, new UDAFCountStrNum()); 用SparkSession注册UDAF
3.Dataset res = session.sql(“select countStrNum(name) from table_name”); 在sql中使用udaf