时间:2021-07-01 10:21:17 帮助过:18人阅读
这里翻译一下:
我们为我们的缓冲区设置初始值,我们不仅可以设置数字,还可以使用index getBoolen等去改变他的值,但是我们需要知道的是,在这个缓冲区中,数组和map依然是不可变的。
其实最后一句我也是不太明白,等我以后如果能研究并理解这句话,再回来补充吧。
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
这个是重要的update函数,对于平均值,我们可以不断迭代输入的值进行累加。buffer(0)统计总和,buffer(1)统计长度。
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
在做完update后spark 需要将结果进行merge到我们的区域,因此有一个merge 进行覆盖buffer
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
这是将最终的结果进行计算。
在写完这个类以后我们在我们的sparksession里面进行编写测试案例。
spark.sparkContext.textFile("file:///Users/4pa/Desktop/people.txt")
.map(_.split(","))
.map(agg=>Person(agg(0),agg(1).trim.toInt))
.toDF().createOrReplaceTempView("people")
spark.udf.register("myAverage",Myaverage)
val udfRes = spark.sql("select name,myAverage(age) as avgAge from people group by name")
udfRes.show()
从上面我们可以看出来,这种自定义函数不是类型安全的,因此能否实现一个安全的自定义函数呢?
个人觉得最好的例子还是官网给的例子,具体的解释都已经给了出来,思路其实和上面是一样的,只不过定义了两个caseclass,用于类型的验证。
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Employee, Average, Double] {
// 初始化
def zero: Average = Average(0L, 0L)
// 这个其实有点map-reduce的意思,只不过是对一个类的reduce,第一个值是和,第二个是总数
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// 实现缓冲区的一个覆盖
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// 计算最终数值
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[Average] = Encoders.product
// 指定返回类型
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
SparkSQL 如何自定义函数
标签:its struct 统计 ack init apply ini rac sid