- 浏览: 72422 次
Sparkstreaming reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 的源码/原理解析
- 博客分类:
- Spark
最近在玩spark streaming, 感觉到了他的强大。 然后看 StreamingContext的源码去理解spark是怎么完成计算的。 大部分的源码比较容易看懂, 但是这个
reduceByKeyAndWindow(_+_, _-_, Duration, Duration)
还是花了不少时间。 主要还是由于对spark不熟悉造成的吧, 还好基本弄明白了。
总的来说SparkStreaming提供这个方法主要是出于效率考虑。 比如说我要每10秒计算一下前15秒的内容,(每个batch 5秒), 可以想象每十秒计算出来的结果和前一次计算的结果其实中间有5秒的时间值是重复的。
那么就是通过如下步骤
1. 存储上一个window的reduce值
2.计算出上一个window的begin 时间到 重复段的开始时间的reduce 值 =》 oldRDD
3.重复时间段的值结束时间到当前window的结束时间的值 =》 newRDD
4.重复时间段的值等于上一个window的值减去oldRDD
这样就不需要去计算每个batch的值, 只需加加减减就能得到新的reduce出来的值。
从代码上面来看, 入口为:
reduceByKeyAndWindow(_+_, _-_, Duration, Duration)
一步一步跟踪进去, 可以看到实际的业务类是在ReducedWindowedDStream 这个类里面:
代码理解就直接拿这个类来看了: 主要功能是在compute里面实现, 通过下面代码回调mergeValues 来计算最后的返回值
先计算oldRDD 和newRDD
//currentWindow 就是以当前时间回退一个window的时间再向前一个batch 到当前时间的窗口 代码里面有一个图很有用:
我们要计算的new rdd就是15秒-25秒期间的值, oldRDD就是0秒到10秒的值, previous window的值是1秒 - 15秒的值
然后最终结果是 重复区间(previous window的值 - oldRDD的值) =》 也就是中间重复部分, 再加上newRDD的值, 这样的话得到的结果就是10秒到25秒这个时间区间的值
得到newRDD和oldRDD后就要拿到previous windows的值: 如果第一次没有previous window那么建一个空RDD, 为最后计算结果时 arrayOfValues(0).isEmpty 铺垫
然后把所有的值放到一个数组里面 0是previouswindow, 1到oldRDD.size是oldrdd, oldRDD.size到newRDD.size是newrdd
将每个RDD的(K,V) 转变成(K, Iterator(V))的形式:
比如说有两个值(K,a) 和(K,b) 那么coGroup后就会成为(K, Iterator(a,b))这种形式
进行最后的计算:
首先判断RDD的value数量是不是正确 previous window因为已经计算过所以只有一组值
正确值为 1 (previous window value) + numOldValues (oldRDD 每个RDD的value) + numNewValues (newRDD 每个RDD的value)
接下来取出oldRDD的值和newRDD的值:
如果previous window是空的, 那么就直接计算newRDD的值(这也是为什么每次计算时候第一次打出来的值都比较少, 因为他只有newRDD部分没有重合部分, 也就是只有10秒的内容而不是15秒)
如果有previous window的值, 那么先存到tempValue, 如果有oldRDD那么减去oldRDD, 如果有newRDD (一般都有) 那么加上newRDD的值 这样就组成上图里面10到25秒区间的值了
最后如果有filter的function的话就filter一下:
这样就返回了新window内的值
reduceByKeyAndWindow(_+_, _-_, Duration, Duration)
还是花了不少时间。 主要还是由于对spark不熟悉造成的吧, 还好基本弄明白了。
总的来说SparkStreaming提供这个方法主要是出于效率考虑。 比如说我要每10秒计算一下前15秒的内容,(每个batch 5秒), 可以想象每十秒计算出来的结果和前一次计算的结果其实中间有5秒的时间值是重复的。
那么就是通过如下步骤
1. 存储上一个window的reduce值
2.计算出上一个window的begin 时间到 重复段的开始时间的reduce 值 =》 oldRDD
3.重复时间段的值结束时间到当前window的结束时间的值 =》 newRDD
4.重复时间段的值等于上一个window的值减去oldRDD
这样就不需要去计算每个batch的值, 只需加加减减就能得到新的reduce出来的值。
从代码上面来看, 入口为:
reduceByKeyAndWindow(_+_, _-_, Duration, Duration)
一步一步跟踪进去, 可以看到实际的业务类是在ReducedWindowedDStream 这个类里面:
代码理解就直接拿这个类来看了: 主要功能是在compute里面实现, 通过下面代码回调mergeValues 来计算最后的返回值
val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]] .mapValues(mergeValues)
先计算oldRDD 和newRDD
//currentWindow 就是以当前时间回退一个window的时间再向前一个batch 到当前时间的窗口 代码里面有一个图很有用:
我们要计算的new rdd就是15秒-25秒期间的值, oldRDD就是0秒到10秒的值, previous window的值是1秒 - 15秒的值
然后最终结果是 重复区间(previous window的值 - oldRDD的值) =》 也就是中间重复部分, 再加上newRDD的值, 这样的话得到的结果就是10秒到25秒这个时间区间的值
// 0秒 10秒 15秒 25秒 // _____________________________ // | previous window _________|___________________ // |___________________| current window | --------------> Time // |_____________________________| // // |________ _________| |________ _________| // | | // V V // old RDDs new RDDs //
val currentTime = validTime val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, currentTime) val previousWindow = currentWindow - slideDuration val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration) logDebug("# old RDDs = " + oldRDDs.size) // Get the RDDs of the reduced values in "new time steps" val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime) logDebug("# new RDDs = " + newRDDs.size)
得到newRDD和oldRDD后就要拿到previous windows的值: 如果第一次没有previous window那么建一个空RDD, 为最后计算结果时 arrayOfValues(0).isEmpty 铺垫
val previousWindowRDD = getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K, V)]()))
然后把所有的值放到一个数组里面 0是previouswindow, 1到oldRDD.size是oldrdd, oldRDD.size到newRDD.size是newrdd
val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
将每个RDD的(K,V) 转变成(K, Iterator(V))的形式:
比如说有两个值(K,a) 和(K,b) 那么coGroup后就会成为(K, Iterator(a,b))这种形式
val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], partitioner)
进行最后的计算:
val mergeValues = (arrayOfValues: Array[Iterable[V]]) => { ... }
首先判断RDD的value数量是不是正确 previous window因为已经计算过所以只有一组值
正确值为 1 (previous window value) + numOldValues (oldRDD 每个RDD的value) + numNewValues (newRDD 每个RDD的value)
if (arrayOfValues.size != 1 + numOldValues + numNewValues) { throw new Exception("Unexpected number of sequences of reduced values") }
接下来取出oldRDD的值和newRDD的值:
val oldValues = (1 to numOldValues).map(i => arrayOfValues(i)).filter(!_.isEmpty).map(_.head) val newValues = (1 to numNewValues).map(i => arrayOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
如果previous window是空的, 那么就直接计算newRDD的值(这也是为什么每次计算时候第一次打出来的值都比较少, 因为他只有newRDD部分没有重合部分, 也就是只有10秒的内容而不是15秒)
if (arrayOfValues(0).isEmpty) { // If previous window's reduce value does not exist, then at least new values should exist if (newValues.isEmpty) { throw new Exception("Neither previous window has value for key, nor new values found. " + "Are you sure your key class hashes consistently?") } // Reduce the new values newValues.reduce(reduceF) // return }
如果有previous window的值, 那么先存到tempValue, 如果有oldRDD那么减去oldRDD, 如果有newRDD (一般都有) 那么加上newRDD的值 这样就组成上图里面10到25秒区间的值了
else { // Get the previous window's reduced value var tempValue = arrayOfValues(0).head // If old values exists, then inverse reduce then from previous value if (!oldValues.isEmpty) { tempValue = invReduceF(tempValue, oldValues.reduce(reduceF)) } // If new values exists, then reduce them with previous value if (!newValues.isEmpty) { tempValue = reduceF(tempValue, newValues.reduce(reduceF)) } tempValue // return }
最后如果有filter的function的话就filter一下:
if (filterFunc.isDefined) { Some(mergedValuesRDD.filter(filterFunc.get)) } else { Some(mergedValuesRDD) }
这样就返回了新window内的值
发表评论
-
kafka + flume + hdfs + zookeeper + spark 测试环境搭建
2017-07-20 11:28 1044最近由于项目需要, 搭建了一个类似线上环境的处理流数据的环境 ... -
源码跟踪executor如何写数据到blockmanager, 以及如何从blockmanager读数据
2016-08-10 19:41 1356之前看了Job怎么submit 以 ... -
Spark中Blockmanager相关代码解析
2016-08-04 19:47 1788前一段时间看了如何划分stage以及如何提交Job, 最后把结 ... -
Spark在submitStage后如何通过clustermanager调度执行task到Driver接收计算结果的代码解析
2016-08-01 14:08 1370前文: http://humingminghz.iteye.c ... -
Spark中saveAsTextFile至stage划分和job提交的源代码分析
2016-07-29 14:20 3287之前看了Spark Streaming和Spark SQL, ... -
SparkSQL DF.agg 执行过程解析
2016-07-19 10:21 4042在上一篇文章前, 我一直没看懂为什么下面的代码就能得到max或 ... -
SparkSQL SQL语句解析过程源代码浅析
2016-07-15 19:34 6558前两天一直在忙本职工 ... -
SparkSQL SQL语句解析过程浅析
2016-07-15 19:06 0前两天一直在忙本职工 ... -
SparkStreaming从启动Receiver到收取数据生成RDD的代码浅析
2016-07-08 17:54 2191前面一片文章介绍了SocketTextStream 是如何从b ... -
Sparkstreaming是如何获取数据组成Dstream的源码浅析
2016-07-08 11:23 1412前面一篇文章介绍了SparkStreaming是如何不停的循环 ... -
SparkSQL 使用SQLContext读取csv文件 分析数据 (含部分数据)
2016-07-06 11:24 10083前两天开始研究SparkSQL, 其主要分为HiveConte ... -
SparkStreaming是如何完成不停的循环处理的代码浅析
2016-07-02 12:26 4581一直很好奇Sparkstreaming的ssc.start是怎 ... -
SparkStreaming 对Window的reduce的方法解析
2016-06-30 11:57 4659在sparkstreaming中对窗口 ... -
关于Eclipse开发环境下 Spark+Kafka 获取topic的时候连接出错
2016-06-28 17:20 7353林林总总玩了Spark快一个月了, 打算试一下kafka的消息 ...
相关推荐
安装$ npm install --save node-video-duration用法 const getVideoDuration = require ( 'node-video-duration' ) ;getVideoDuration ( '...
import * as duration from 'duration-fns' // Parsing / stringifying // --------------------------------------------- duration . parse ( 'PT1M30S' ) // { minutes: 1, seconds: 30 } duration . toString ...
前端项目-moment-duration-format,用于格式化持续时间的moment.js插件。
这里给大家介绍如果一键将视频拆分为多个场景视频。... + " -t " + durationTime + " -max_muxing_queue_size 1024" + " -strict -2 -keyint_min 8 -g 8 -sc_threshold 0" + " " + DstFile + " -y ";
前端项目-humanize-duration,将毫秒持续时间转换为英语和许多其他语言。
// Beep frequency and duration (in ms) for successful // fan state changes. (Set either or both to zero to // disable) // 当成功转换风扇状态时,蜂鸣器的频率和时长(毫秒),任何一个值设定为0时,禁止蜂鸣. ...
lr_get_trans_instance_duration/获取事务实例的持续时间(由它的句柄指定) lr_get_trans_instance_wasted_time/获取事务实例浪费的时间(由它的句柄指定) lr_get_transaction_duration/获取事务的持续时间(按...
开源项目-icholy-Duration.py.zip,Python code for parsing time.Duration strings
Kaggle比赛New-York-City-Taxi-Trip-Duration方案
youtube-duration-sort按时长排序YouTube订阅视频去做添加更高质量的图标添加默认排序选项清理插入的图标清理代码
开源项目-senseyeio-duration.zip,Duration: Time-shifting library for Go
用法作为守护程序: gtracker --daemon 列印统计资料: gtracker --today+-------------- | ------------+| Name | Duration |+-------------- | ------------+| Finder | 0h 0m 7s || Sublime Text | 0h 1m 45s || ...
Android sensor校准,包括加速度、陀螺仪、地磁校准。
前端开源库-duration-js持续时间js,用于处理持续时间的小型简单库
matlab开发-duration。计算分布式作业或单个任务的持续时间
前端开源库-gulp-duration咕噜持续时间,跟踪咕噜任务的部分持续时间
The energy of a laser pulse was higher than 3 mJ with duration of 0.9 ns. The proposed system has the ability to choose independently the focus of each beam. Such a laser device can be used for ...
小程序前端源码无需后端直接导入开发者工具就可以使用了 <view class="content"><top-nav vue-id="8dd740cc-1" background="{{({backgroundColor:'#F3F4F6'})}}" border-bottom="{{false}}" bind:__l="__l" vue-...
In this paper, the current state of the art for measuring function duration with FTrace is described. This includes recent work to add a new capability to filter the trace data by function duration, ...
PI CPP 版权Tecnalia 2020 这是用C ++实现的性能指标的一个示例。 它准备在Eurobench基准测试软件中使用。 Docker映像生成 运行以下命令以为此PI创建docker镜像: docker build ....启动Docker映像 ...