当前位置: 首页 > news >正文

网站开发接外包/360指数

网站开发接外包,360指数,门户网站html模板,wordpress显示标签1.开窗函数 1. row_number() over() 示例: row_number() OVER (PARTITION BY category ORDER BY revenue DESC) rank 作用: 给每个分组的数据,按照排序顺序,打上分组内的行号(分组topN) 2. sum、max、min、count、avg等聚合函数 示例:sum(pv) over(pa…

1.开窗函数

1. row_number() over()

  • 示例: row_number() OVER (PARTITION BY category ORDER BY revenue DESC) rank
    作用: 给每个分组的数据,按照排序顺序,打上分组内的行号(分组topN)

2. sum、max、min、count、avg等聚合函数
示例:sum(pv) over(partition by cookieid order by createtime) as pv1
作用: 给每个分组按顺序排序后, 前后两两累加

window子句:

  • rows between … and …(开始到结束,位置不能交换)
  • unbounded preceding :从第一行开始
  • current row:到当前行
  • 首行:unbounded preceding
  • 末行:unbounded following
  • 前 n 行:n preceding
  • 后 n 行:n following

rows & range

  • 是逻辑窗口,是指定当前行对应值的范围取值,列数不固定,只要行值在范围内,对应列都包含在内
  • 是物理窗口,根据order by子句排序后,取前n行的数据以及后n行的数据进行计算(与当前行的值无关,至于排序由的行号有关)

3. 几种排序函数

  • row_number() :顺序排,忽略并列排名
  • dense_rank() :有并列,后面的元素接着排名
  • rank() :有并列,后面的元素跳着排名
  • ntile(n)   :用于将分组数据按照顺序切分成n片

4. lag & lead

  • lag(field,n):取前 n 行的值
  • lead(field n):取后 n 行的值

5. first_value & last_value

  • first_value(field) :取分组内排序后,截止到当前行的第一个值
  • last_value(field) :取分组内排序后,截止到当前行的最后一个值

6. cube & rollup

  • cube:根据group by维度的所有组合进行聚合
  • rollup:是cube的自己,以左侧的维度为主,进行层级聚合

DSL
import org.apache.spark.sql.expressions.Window
import ssc.implicits._
import org.apache.spark.sql.functions._
val w1 = Window.partitionBy(“cookieid”).orderBy(“createtime”)
val w2 = Window.partitionBy(“cookieid”).orderBy(“pv”)

//聚合函数
df.select($ “cookieid”, $ “pv”, sum(“pv”).over(w1).alias(“pv1”)).show()

//排名
df.select($ “cookieid”, $ “pv”, rank().over(w2).alias(“rank”)).show()
df.select($ “cookieid”, $ “pv”, dense_rank().over(w2).alias(“dense_rank”)).show()
df.select($ “cookieid”, $ “pv”, row_number().over(w2).alias(“row_number”)).show()

//lag、lead
df.select($ “cookieid”, $ “pv”, lag(“pv”, 2).over(w2).alias(“row_number”)).show()
df.select($ “cookieid”, $ “pv”, lag(“pv”, -2).over(w2).alias(“row_number”)).show()

//cube、rollup
df.cube(“cookieid”, “createtime”).agg(sum(“pv”)).show()
df.rollup(“cookieid”, “createtime”).agg(sum(“pv”)).show()

round(): 函数用于把数值字段舍入为指定的小数位数。

percent_rank(): 用于按排名的百分比统计

2.集合函数:
array_contains(column,value): 检查array类型字段是否包含指定元素

explode(): 展开array或map为多行

from_json(): 解析JSON字符串为StructType or ArrayType,有多种参数形式,详见文档。

to_json(): 转为json字符串,支持StructType, ArrayType of StructTypes, a MapType or ArrayType of MapTypes。

get_json_object(column,path)
获取指定json路径的json对象字符串。
select get_json_object(’{"a"1,“b”:2}’,’$.a’);

json_tuple(column,fields)
获取json中指定字段值。select json_tuple(’{“a”:1,“b”:2}’,‘a’,‘b’);

map_keys(): 返回map的键组成的array

map_values(): 返回map的值组成的array

size(): array or map的长度

sort_array(e: Column, asc: Boolean): 将array中元素排序(自然排序),默认asc。

3.字符串函数
base64(e: Column): base64转码

unbase64(e: Column): base64解码

concat(exprs: Columns): 连接多列字符串
concat_ws(sep: String, exprs: Columns): 使用sep作为分隔符连接多列字符串
decode(value: Column, charset: String): 解码
encode(value: Column, charset: String): 转码,charset支持 ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’。

lower(e: Column): 转小写

upper(e: Column): 转大写

split(str: Column, pattern: String): 用pattern分割str

substring(str: Column, pos: Int, len: Int): 在str上截取从pos位置开始长度为len的子字符串。

repeat(str: Column, n: Int):将str重复n次返回

reverse(str: Column): 将str反转

length(e: Column): 字符串长度

trim(e: Column):剪掉左右两边的空格、空白字符

4.混杂函数
crc32(e: Column):计算CRC32,返回bigint

hash(cols: Column*):计算 hash code,返回int

md5(e: Column):计算MD5摘要,返回32位,16进制字符串

sha1(e: Column):计算SHA-1摘要,返回40位,16进制字符串

sha2(e: Column, numBits: Int):计算SHA-1摘要,返回numBits位,16进制字符串。numBits支持224, 256, 384, or 512.

5.聚合函数

avg: 平均值

collect_list: 聚合指定字段的值到list

collect_set: 聚合指定字段的值到set

max: 最大值

min: 最小值

sum: 求和

sumDistinct: 非重复值求和 SQL中用法 :select sum(distinct class)

count: 计数

countDistinct: 去重计数 SQL中用法:select count(distinct class)

其他函数
coalesce()函数: 找出给定的几个列中的第一个非空值

when()函数: if…else…

UDAF

import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.*;import java.util.ArrayList;
import java.util.List;public class UDAFCountStrNum extends UserDefinedAggregateFunction {/*** 聚合函数的输入数据结构* @return*/public StructType inputSchema() {DataType strType = DataTypes.StringType;List<StructField> fields = new ArrayList<StructField>();fields.add(DataTypes.createStructField("name",strType,true));return DataTypes.createStructType(fields);}/*** 缓存区数据结构* @return*/public StructType bufferSchema() {DataType intType = DataTypes.IntegerType;List<StructField> fields = new ArrayList<StructField>();fields.add(DataTypes.createStructField("count",intType,true));return DataTypes.createStructType(fields);}/*** 返回值数据结构* @return*/public DataType dataType() {return DataTypes.IntegerType;}/*** 是否是幂等的* @return*/public boolean deterministic() {return true;}/*** 初始化缓冲区* @param buffer*/public void initialize(MutableAggregationBuffer buffer) {buffer.update(0,0);}/*** 给聚合函数传入一条新的数据进行处理* @param buffer* @param input*/public void update(MutableAggregationBuffer buffer, Row input) {buffer.update(0,buffer.getInt(0) + input.getString(0).length());}/*** 合并聚合缓冲区* @param buffer1* @param buffer2*/public void merge(MutableAggregationBuffer buffer1, Row buffer2) {buffer1.update(0,buffer1.getInt(0) + buffer2.getInt(0));}/*** 计算最终结果* @param buffer* @return*/public Object evaluate(Row buffer) {return buffer.getInt(0);}
}

1.编写自定义的UDAF类, 继承UserDefinedAggregateFunction类, 并重写其8个方法
2.session.udf().register(“countStrNum”, new UDAFCountStrNum()); 用SparkSession注册UDAF
3.Dataset res = session.sql(“select countStrNum(name) from table_name”); 在sql中使用udaf

相关文章: