scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)
scala> val accum = sc.accumulator(0, "My Accumulator") accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Int = 10
object VectorAccumulatorParam extends AccumulatorParam[Vector] {
def zero(initialValue: Vector): Vector = {
Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector): Vector = {
v1 += v2
}
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
//在这里,accum的值仍然是0,因为没有动作操作引起map被实际的计算.
/**
* 实例:利用广播进行黑名单过滤!
* 检查新的数据 根据是否在广播变量-黑名单内,从而实现过滤数据。
*/
public class BroadcastAccumulator {
/**
* 创建一个List的广播变量
*
*/
private static volatile Broadcast<List<String>> broadcastList = null;
/**
* 计数器!
*/
private static volatile Accumulator<Integer> accumulator = null;
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").
setAppName("WordCountOnlineBroadcast");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
/**
* 注意:分发广播需要一个action操作触发。
* 注意:广播的是Arrays的asList 而非对象的引用。广播Array数组的对象引用会出错。
* 使用broadcast广播黑名单到每个Executor中!
*/
broadcastList = jsc.sc().broadcast(Arrays.asList("Hadoop","Mahout","Hive"));
/**
* 累加器作为全局计数器!用于统计在线过滤了多少个黑名单!
* 在这里实例化。
*/
accumulator = jsc.sparkContext().accumulator(0,"OnlineBlackListCounter");
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999);
/**
* 这里省去flatmap因为名单是一个个的!
*/
JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) {
return v1 + v2;
}
});
/**
* Funtion里面 前几个参数是 入参。
* 后面的出参。
* 体现在call方法里面!
*
*/
wordsCount.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception {
rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Integer> wordPair) throws Exception {
if (broadcastList.value().contains(wordPair._1)) {
/**
* accumulator不仅仅用来计数。
* 可以同时写进数据库或者缓存中。
*/
accumulator.add(wordPair._2);
return false;
}else {
return true;
}
};
/**
* 广播和计数器的执行,需要进行一个action操作!
*/
}).collect();
System.out.println("广播器里面的值"+broadcastList.value());
System.out.println("计时器里面的值"+accumulator.value());
return null;
}
});
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
package com.Streaming
import java.util
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf}
import org.apache.spark.broadcast.Broadcast
/**
* Created by lxh on 2016/6/30.
*/
object BroadcastAccumulatorStreaming {
/**
* 声明一个广播和累加器!
*/
private var broadcastList:Broadcast[List[String]] = _
private var accumulator:Accumulator[Int] = _
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[4]").setAppName("broadcasttest")
val sc = new SparkContext(sparkConf)
/**
* duration是ms
*/
val ssc = new StreamingContext(sc,Duration(2000))
// broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark"))
broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark"))
accumulator= ssc.sparkContext.accumulator(0,"broadcasttest")
/**
* 获取数据!
*/
val lines = ssc.socketTextStream("localhost",9999)
/**
* 1.flatmap把行分割成词。
* 2.map把词变成tuple(word,1)
* 3.reducebykey累加value
* (4.sortBykey排名)
* 4.进行过滤。 value是否在累加器中。
* 5.打印显示。
*/
val words = lines.flatMap(line => line.split(" "))
val wordpair = words.map(word => (word,1))
wordpair.filter(record => {broadcastList.value.contains(record._1)})
val pair = wordpair.reduceByKey(_+_)
/**
* 这个pair 是PairDStream<String, Integer>
* 查看这个id是否在黑名单中,如果是的话,累加器就+1
*/
/* pair.foreachRDD(rdd => {
rdd.filter(record => {
if (broadcastList.value.contains(record._1)) {
accumulator.add(1)
return true
} else {
return false
}
})
})*/
val filtedpair = pair.filter(record => {
if (broadcastList.value.contains(record._1)) {
accumulator.add(record._2)
true
} else {
false
}
}).print
println("累加器的值"+accumulator.value)
// pair.filter(record => {broadcastList.value.contains(record._1)})
/* val keypair = pair.map(pair => (pair._2,pair._1))*/
/**
* 如果DStream自己没有某个算子操作。就通过转化transform!
*/
/* keypair.transform(rdd => {
rdd.sortByKey(false)//TODO
})*/
pair.print()
ssc.start()
ssc.awaitTermination()
}
}
机械节能产品生产企业官网模板...
大气智能家居家具装修装饰类企业通用网站模板...
礼品公司网站模板
宽屏简约大气婚纱摄影影楼模板...
蓝白WAP手机综合医院类整站源码(独立后台)...苏ICP备2024110244号-2 苏公网安备32050702011978号 增值电信业务经营许可证编号:苏B2-20251499 | Copyright 2018 - 2025 源码网商城 (www.ymwmall.com) 版权所有