RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
在SparkStreamingjob中如何读取Kafkamessages及其offsetRange

本篇文章为大家展示了在Spark Streaming job中如何读取Kafka messages及其offsetRange,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

创新互联是一家集网站建设,红安企业网站建设,红安品牌网站建设,网站定制,红安网站建设报价,网络营销,网络优化,红安网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。

在Spark Streaming job中读取Kafka topic(s)中的messages时,有时我们会需要同步记录下每次读取的messages的offsetRange。要达到这一目的,下面这两段代码(代码1和代码2)都是正确的,而且是等价的。

代码1(正确):

-----------------------

JavaPairInputDStream messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

messages.foreachRDD(

new Function, Void>() {

@Override

public Void call(JavaPairRDD rdd) throws Exception {

OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

JavaRDD valueRDD = rdd.values();

long msgNum = processEachRDD(valueRDD, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

代码2(正确):

-----------------------

JavaPairInputDStream messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

final AtomicReference offsetRanges=new AtomicReference();

lines = messages.transformToPair(new Function, JavaPairRDD>() {

@Override

public JavaPairRDD call(JavaPairRDD rdd) throws Exception {

OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

offsetRanges.set(offsets);

return rdd;

}

}).map(new Function, String>() {

@Override

public String call(Tuple2 tuple2) {

return tuple2._2();

}

});

lines.foreachRDD(new Function, Void>() {

@Override

public Void call(JavaRDD rdd) throws Exception {

long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

OffsetRange[] offsets = offsetRanges.get();

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

但是要注意,下面这两段代码(代码3和代码4)是错误的,它们都会抛出一个exception:java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges

代码3(错误):

-----------------------

JavaPairInputDStream messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

messages.transform(new Function, JavaRDD>() {

@Override

public JavaRDD call(JavaPairRDD rdd) throws Exception {

return rdd.values();

}

}).foreachRDD(new Function, Void>() {

@Override

public Void call(JavaRDD rdd) throws Exception {

long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

OffsetRange[] offsets = offsetRanges.get();

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

代码4(错误):

-----------------------

JavaPairInputDStream messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

messages.map(new Function, String>() {

@Override

public String call(Tuple2 tuple2) {

return tuple2._2();

}

}).foreachRDD(new Function, Void>() {

@Override

public Void call(JavaRDD rdd) throws Exception {

long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

OffsetRange[] offsets = offsetRanges.get();

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

上述内容就是在Spark Streaming job中如何读取Kafka messages及其offsetRange,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注创新互联行业资讯频道。


名称栏目:在SparkStreamingjob中如何读取Kafkamessages及其offsetRange
文章转载:http://scyingshan.cn/article/gejdho.html