val sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val accum = sc.longAccumulator("longAccum") //统计奇数的个数
val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{
if(n%2!=0) accum.add(1L)
n%2==0
}).reduce(_+_)
println("sum: "+sum)
println("accum: "+accum.value)
sc.stop()
package spark
import constant.Constant
import org.apache.spark.util.AccumulatorV2
import util.getFieldFromConcatString
import util.setFieldFromConcatString
open class SessionAccmulator : AccumulatorV2<String, String>() {
private var result = Constant.SESSION_COUNT + "=0|"+
Constant.TIME_PERIOD_1s_3s + "=0|"+
Constant.TIME_PERIOD_4s_6s + "=0|"+
Constant.TIME_PERIOD_7s_9s + "=0|"+
Constant.TIME_PERIOD_10s_30s + "=0|"+
Constant.TIME_PERIOD_30s_60s + "=0|"+
Constant.TIME_PERIOD_1m_3m + "=0|"+
Constant.TIME_PERIOD_3m_10m + "=0|"+
Constant.TIME_PERIOD_10m_30m + "=0|"+
Constant.TIME_PERIOD_30m + "=0|"+
Constant.STEP_PERIOD_1_3 + "=0|"+
Constant.STEP_PERIOD_4_6 + "=0|"+
Constant.STEP_PERIOD_7_9 + "=0|"+
Constant.STEP_PERIOD_10_30 + "=0|"+
Constant.STEP_PERIOD_30_60 + "=0|"+
Constant.STEP_PERIOD_60 + "=0"
override fun value(): String {
return this.result
}
/**
* 合并数据
*/
override fun merge(other: AccumulatorV2<String, String>?) {
if (other == null) return else {
if (other is SessionAccmulator) {
var newResult = ""
val resultArray = arrayOf(Constant.SESSION_COUNT,Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s,
Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m,
Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m,
Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6, Constant.STEP_PERIOD_7_9,
Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60)
resultArray.forEach {
val oldValue = other.result.getFieldFromConcatString("|", it)
if (oldValue.isNotEmpty()) {
val newValue = oldValue.toInt() + 1
//找到原因,一直在循环赋予值,debug30分钟 很烦
if (newResult.isEmpty()){
newResult = result.setFieldFromConcatString("|", it, newValue.toString())
}
//问题就在于这里,自定义没有写错,合并错了
newResult = newResult.setFieldFromConcatString("|", it, newValue.toString())
}
}
result = newResult
}
}
}
override fun copy(): AccumulatorV2<String, String> {
val sessionAccmulator = SessionAccmulator()
sessionAccmulator.result = this.result
return sessionAccmulator
}
override fun add(p0: String?) {
val v1 = this.result
val v2 = p0
if (v2.isNullOrEmpty()){
return
}else{
var newResult = ""
val oldValue = v1.getFieldFromConcatString("|", v2!!)
if (oldValue.isNotEmpty()){
val newValue = oldValue.toInt() + 1
newResult = result.setFieldFromConcatString("|", v2, newValue.toString())
}
result = newResult
}
}
override fun reset() {
val newResult = Constant.SESSION_COUNT + "=0|"+
Constant.TIME_PERIOD_1s_3s + "=0|"+
Constant.TIME_PERIOD_4s_6s + "=0|"+
Constant.TIME_PERIOD_7s_9s + "=0|"+
Constant.TIME_PERIOD_10s_30s + "=0|"+
Constant.TIME_PERIOD_30s_60s + "=0|"+
Constant.TIME_PERIOD_1m_3m + "=0|"+
Constant.TIME_PERIOD_3m_10m + "=0|"+
Constant.TIME_PERIOD_10m_30m + "=0|"+
Constant.TIME_PERIOD_30m + "=0|"+
Constant.STEP_PERIOD_1_3 + "=0|"+
Constant.STEP_PERIOD_4_6 + "=0|"+
Constant.STEP_PERIOD_7_9 + "=0|"+
Constant.STEP_PERIOD_10_30 + "=0|"+
Constant.STEP_PERIOD_30_60 + "=0|"+
Constant.STEP_PERIOD_60 + "=0"
result = newResult
}
override fun isZero(): Boolean {
val newResult = Constant.SESSION_COUNT + "=0|"+
Constant.TIME_PERIOD_1s_3s + "=0|"+
Constant.TIME_PERIOD_4s_6s + "=0|"+
Constant.TIME_PERIOD_7s_9s + "=0|"+
Constant.TIME_PERIOD_10s_30s + "=0|"+
Constant.TIME_PERIOD_30s_60s + "=0|"+
Constant.TIME_PERIOD_1m_3m + "=0|"+
Constant.TIME_PERIOD_3m_10m + "=0|"+
Constant.TIME_PERIOD_10m_30m + "=0|"+
Constant.TIME_PERIOD_30m + "=0|"+
Constant.STEP_PERIOD_1_3 + "=0|"+
Constant.STEP_PERIOD_4_6 + "=0|"+
Constant.STEP_PERIOD_7_9 + "=0|"+
Constant.STEP_PERIOD_10_30 + "=0|"+
Constant.STEP_PERIOD_30_60 + "=0|"+
Constant.STEP_PERIOD_60 + "=0"
return this.result == newResult
}
}
机械节能产品生产企业官网模板...
大气智能家居家具装修装饰类企业通用网站模板...
礼品公司网站模板
宽屏简约大气婚纱摄影影楼模板...
蓝白WAP手机综合医院类整站源码(独立后台)...苏ICP备2024110244号-2 苏公网安备32050702011978号 增值电信业务经营许可证编号:苏B2-20251499 | Copyright 2018 - 2025 源码网商城 (www.ymwmall.com) 版权所有