- 浏览: 72531 次
林林总总玩了Spark快一个月了, 打算试一下kafka的消息系统加上Spark Streaming 进行实时推送数据的处理。
简单的写了一个类作为kafka的producer, 然后SparkStreaming的类作为consumer
Producer 的run方法产生数据:
SparkStreaming的consumer:
zookeeper和Kafka的config都是默认配置, 由于资源不够, 目前都是单机环境, 就改了一下zookeeper的server port, 和kafka这边zookeeper的host+port
结果运行的时候就报错:
16/06/28 16:55:33 WARN ClientUtils$: Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [BrokerEndPoint(0,localhost,9092)] failed
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
16/06/28 16:55:33 INFO SyncProducer: Disconnecting from localhost:9092
16/06/28 16:55:33 WARN ConsumerFetcherManager$LeaderFinderThread: [test_CNCSHUM4L3C-1467104130749-71111338-leader-finder-thread], Failed to find leader for Set([test,0])
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,9092))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
... 3 more
可以看到kafka的broker是去找localhost:9092, 由于eclipse环境在本地, kafka和zookeeper是在vm上面, 基本锁定是这个原因。
关键是, localhost是在哪里改。。。。
尝试跟踪源码, 发现KafkaUtils.createStream 有一个方法是可以传一个Map进去, Map里面存的configuration。 尝试传入"metadata.broker.list", 结果从spark的日志中看到这个属性不能在这里设置, 直接被ingore了。
折腾了两天, 基本上把本机可以动的地方都动了, 没用。
后来想到会不会是kafka启动的时候用的server.properties里面有设置, 打开一看, 果然, 有一个属性:
#advertised.listeners=PLAINTEXT://your.host.name:9092
默认被注释掉了, 看说明如果被注释掉了后就直接设置成localhost了, 果断修改成Kafka的IP:Port,重启Kafka, 启动producer, 运行Consumer, 错误消失, 分析结果出来了
简单的写了一个类作为kafka的producer, 然后SparkStreaming的类作为consumer
Producer 的run方法产生数据:
public void run() { KafkaProducer<Integer, String> producer = getProducer(); int messageNum = 0; Random rd = new Random(); while(true){ String page = "Page_" + rd.nextInt(15) + ".html"; Integer click = rd.nextInt(10); float stayTime = rd.nextFloat(); Integer likeOrNot = rd.nextInt(3); String messageStr = page + "\t" + click + "\t" + stayTime + "\t" + likeOrNot; long startTime = System.currentTimeMillis(); System.out.println("sending message: " + messageStr); producer.send(new ProducerRecord<>(topic, 999, messageStr)); messageNum++; try { Thread.sleep(200); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
SparkStreaming的consumer:
val ssc = new StreamingContext("local[2]", "appName", Seconds(5),System.getenv("SPARK_HOME")) ssc.checkpoint("./") val kafkaStream = KafkaUtils.createStream(ssc, "10.32.190.165:2181", "test", Map("test"->1)) // val kafkaStream = KafkaUtils.createStream(ssc, Map("group.id"->"test","zookeeper.connect"->"10.32.190.165:2181", "zookeeper.connection.timeout.ms"->"10000"), Map("test"->1),StorageLevel.MEMORY_AND_DISK_SER_2) // kafkaStream.ytt val msgRDD = kafkaStream.map(_._2) val newRdd = msgRDD.map { x => (x.split("\t")(0), getValueOfPage(x.split("\t"))) }.reduceByKey((a,b) => a + b) val resultRdd = newRdd.transform(x =>x.sortByKey(false)) var updateFunc = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => { iterator.flatMap(x=> { var page = x._1 var nowValue = x._2.sum var oldValue : Double = x._3.getOrElse(0) Some(nowValue + oldValue) }.map { y => (x._1, y) }) } val initRDD = ssc.sparkContext.parallelize(List(("page_0.html", 0.0))) val stateRDD = newRdd.updateStateByKey[Double](updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true,initRDD); // val sortRDD = stateRDD.map(x => (x._2, x._1)) // newRdd. stateRDD.foreachRDD(r =>{ val sortedRDD = r.map(x => (x._2, x._1)).sortByKey(false) val topK = sortedRDD.take(3) topK.foreach(y => println(y)) }) // resultRdd.print() ssc.start() ssc.awaitTermination()
zookeeper和Kafka的config都是默认配置, 由于资源不够, 目前都是单机环境, 就改了一下zookeeper的server port, 和kafka这边zookeeper的host+port
结果运行的时候就报错:
16/06/28 16:55:33 WARN ClientUtils$: Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [BrokerEndPoint(0,localhost,9092)] failed
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
16/06/28 16:55:33 INFO SyncProducer: Disconnecting from localhost:9092
16/06/28 16:55:33 WARN ConsumerFetcherManager$LeaderFinderThread: [test_CNCSHUM4L3C-1467104130749-71111338-leader-finder-thread], Failed to find leader for Set([test,0])
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,9092))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
... 3 more
可以看到kafka的broker是去找localhost:9092, 由于eclipse环境在本地, kafka和zookeeper是在vm上面, 基本锁定是这个原因。
关键是, localhost是在哪里改。。。。
尝试跟踪源码, 发现KafkaUtils.createStream 有一个方法是可以传一个Map进去, Map里面存的configuration。 尝试传入"metadata.broker.list", 结果从spark的日志中看到这个属性不能在这里设置, 直接被ingore了。
折腾了两天, 基本上把本机可以动的地方都动了, 没用。
后来想到会不会是kafka启动的时候用的server.properties里面有设置, 打开一看, 果然, 有一个属性:
#advertised.listeners=PLAINTEXT://your.host.name:9092
默认被注释掉了, 看说明如果被注释掉了后就直接设置成localhost了, 果断修改成Kafka的IP:Port,重启Kafka, 启动producer, 运行Consumer, 错误消失, 分析结果出来了
发表评论
-
kafka + flume + hdfs + zookeeper + spark 测试环境搭建
2017-07-20 11:28 1048最近由于项目需要, 搭建了一个类似线上环境的处理流数据的环境 ... -
源码跟踪executor如何写数据到blockmanager, 以及如何从blockmanager读数据
2016-08-10 19:41 1363之前看了Job怎么submit 以 ... -
Spark中Blockmanager相关代码解析
2016-08-04 19:47 1795前一段时间看了如何划分stage以及如何提交Job, 最后把结 ... -
Spark在submitStage后如何通过clustermanager调度执行task到Driver接收计算结果的代码解析
2016-08-01 14:08 1373前文: http://humingminghz.iteye.c ... -
Spark中saveAsTextFile至stage划分和job提交的源代码分析
2016-07-29 14:20 3290之前看了Spark Streaming和Spark SQL, ... -
SparkSQL DF.agg 执行过程解析
2016-07-19 10:21 4051在上一篇文章前, 我一直没看懂为什么下面的代码就能得到max或 ... -
SparkSQL SQL语句解析过程源代码浅析
2016-07-15 19:34 6559前两天一直在忙本职工 ... -
SparkSQL SQL语句解析过程浅析
2016-07-15 19:06 0前两天一直在忙本职工 ... -
SparkStreaming从启动Receiver到收取数据生成RDD的代码浅析
2016-07-08 17:54 2195前面一片文章介绍了SocketTextStream 是如何从b ... -
Sparkstreaming是如何获取数据组成Dstream的源码浅析
2016-07-08 11:23 1420前面一篇文章介绍了SparkStreaming是如何不停的循环 ... -
SparkSQL 使用SQLContext读取csv文件 分析数据 (含部分数据)
2016-07-06 11:24 10090前两天开始研究SparkSQL, 其主要分为HiveConte ... -
SparkStreaming是如何完成不停的循环处理的代码浅析
2016-07-02 12:26 4584一直很好奇Sparkstreaming的ssc.start是怎 ... -
SparkStreaming 对Window的reduce的方法解析
2016-06-30 11:57 4664在sparkstreaming中对窗口 ... -
Sparkstreaming reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 的源码/原理解析
2016-06-29 19:50 8704最近在玩spark streaming, 感觉到了他的强大。 ...
相关推荐
徐老师大数据培训Hadoop+HBase+ZooKeeper+Spark+Kafka+Scala+Ambari
1、内容概要:Hadoop+Spark+Hive+HBase+Oozie+Kafka+Flume+Flink+Elasticsearch+Redash等大数据集群及组件搭建指南(详细搭建步骤+实践过程问题总结)。 2、适合人群:大数据运维、大数据相关技术及组件初学者。 3、...
flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【大数据】
通过VirtualBox安装多台虚拟机,实现集群环境搭建。 优势:一台电脑即可。 应用场景:测试,学习。...内附百度网盘下载地址,有hadoop+zookeeper+spark+kafka等等·····需要的安装包和配置文件
本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...
基于Flink+ClickHouse构建的分析平台,涉及 Flink1.9.0 、ClickHouse、Hadoop、Hbase、Kafka、Hive、Jmeter、Docker 、HDFS、MapReduce 、Zookeeper 等技术
【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子信息等专业的课程设计、期末大作业和毕设...基于spark streaming+kafka+hbase的日志统计分析系统源码+项目说明.zip
新闻、健身实时数据,基于spark+kafka+flume+echarts可视化+hadoop的项目。有文档有教程。
大数据 hadoop spark hbase ambari全套视频教程(购买的付费视频)
基于Spark+Kafka+Flume实现的电影推荐系统.zip
毕业设计,课程设计,项目源码均经过助教老师测试,运行无误,欢迎下载交流 ----- 下载后请首先打开README.md文件(如有)
通过flume监控文件,让kafka消费flume数据,再将sparkstreaming连接kafka作为消费者进行数据处理,文档整理实现
基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip ...
毕业设计:基于Spark+Kafka+Hive的智能货运系统设计与实现.zip
毕业设计 基于Hadoop+Hive+Kafka+Spark+ElasticSearch+Canal大数据酒店预订平台设计与实现源码+详细文档+全部数据资料 高分项目.zip毕业设计 基于Hadoop+Hive+Kafka+Spark+ElasticSearch+Canal大数据酒店预订平台...
亲手在Centos7上安装,所用软件列表 apache-flume-1.8.0-bin.tar.gz ...kafka_2.12-1.0.0.tgz scala-2.12.4.tar.gz scala-2.12.4.tgz spark-2.2.0-bin-hadoop2.7.tgz spark-2.2.0.tgz zookeeper-3.4.11.tar.gz
flume+kafka+flink+mysql实现nginx数据统计与分析
搭建spark分布式环境的安装包以及spark连接mysql、sparkstreaming连接kafka需要的jar包
Flume+kafka+Storm整合 示例简介: 以下为三个组建整合,这里只做操作也演示结果,原理性方面大家多学习基础。 流程顺序是flume获取telnet数据,将接收到的数据发送至kafak,kafka作为Storm的spout,Storm进行有向无...
大数据实习hdfs+flume+kafka+spark+hbase+hive项目.zip