以太坊 JDK动态代理 centos email asynchronous pip ant base64 Avalon cocos2d html5 vue的优点 webform开发教程 solidworks图库 mysql获取当前时间戳 flutter ui构建工具 python数据格式 linux重启mysql python环境变量配置 java开发 搭建java开发环境 java迭代器 java创建文件 java版本查看 php语言入门 javascript实例 垃圾邮件数据集 魔兽世界字体包 磁盘分区软件 unix操作系统下载 电池救星 高等数学同济第七版 跳一跳脚本 c语言代码表白 spoonwep 魔兽七个人 掌门一对一下载 今日头条邀请码 电脑cmd命令大全 数独软件 ps3d字体
当前位置: 首页 > 学习教程  > 编程语言

sparkstreamming 消费kafka(2)

2020/9/19 15:27:24 文章标签:

spark streaming提供了两种获取方式,一种是同storm一样,实时读取缓存到内存中;另一种是定时批量读取。

这两种方式分别是:

  • Receiver-base

  • Direct

 

一 、Receiver-base:

Spark官方最先提供了基于Receiver的Kafka数据消费模式。不过这种方式是先把数据从kafka中读取出来,然后缓存在内存,再定时处理。如果这时候集群退出,而偏移量又没处理好的话,数据就丢掉了,存在程序失败丢失数据的可能,后在Spark 1.2时引入一个配置参数spark.streaming.receiver.writeAheadLog.enable以规避此风险。

Receiver-based的Kafka读取方式是基于Kafka高阶(high-level) api来实现对Kafka数据的消费。在提交Spark Streaming任务后,Spark集群会划出指定的Receivers来专门、持续不断、异步读取Kafka的数据,读取时间间隔以及每次读取offsets范围可以由参数来配置。读取的数据保存在Receiver中,具体StorageLevel方式由用户指定,诸如MEMORY_ONLY等。当driver 触发batch任务的时候,Receivers中的数据会转移到剩余的Executors中去执行。在执行完之后,Receivers会相应更新ZooKeeper的offsets。如要确保at least once的读取方式,可以设置spark.streaming.receiver.writeAheadLog.enable为true。具体Receiver执行流程见下图:

spark streaming启动过后,会选择一台excetor作为ReceiverSupervior

1:Reciver的父级ReciverTracker分发多个job(task)到不同的executor,并启动ReciverSupervisor.

2:ReceiverSupervior会启动对应的实例reciver(kafkareciver,TwitterReceiver),并调用onstart()

3:kafkareciver在通过onstart()启动后就开启线程源源不断的接收数据,并交给ReceiverSupervior,通过ReceiverSupervior.store函数一条一条接收

4:ReceiverSupervior会调用BlockGenertor.adddata填充数据。

 

所有的中间数据都缓存在BlockGenertor

1:首先BlockGenertor维护了一个缓冲区,currentbuffer,一个无限长度的arraybuffer。为了防止内存撑爆,这个currentbuffer的大小可以被限制,通过设置参数spark.streaming.reciver.maxRate,以秒为单位。currentbuffer所使用的内存不是storage(负责spark计算过程中的所有存储,包括磁盘和内存),而是珍贵的计算内存。所以currentbuffer应该被限制,防止占用过多计算内存,拖慢任务计算效率,甚至有可能拖垮Executor甚至集群。

2:维护blockforpushing队列,它是等待被拉到到BlockManager的中转站。它是currentbuffer和BlockManager的中间环节。它里面的每一个元素其实就是一个currentbuffer。

3:维护两个定时器,其实就是一个生产-消费模式。blockintervaltimer定时器,负责生产端,定时将currentbuffer放进blockforpushing队列。blockforpushingthread负责消费端,定时将blockforpushing里的数据转移到BlockManager。

Kafka的high-level数据读取方式让用户可以专注于所读数据,而不用关注或维护consumer的offsets,这减少用户的工作量以及代码量而且相对比较简单。因此,在刚开始引入Spark Streaming计算引擎时,我们优先考虑采用此种方式来读取数据,具体的代码如下:

如上述代码,函数getKafkaInputStream提供了zookeeper, topic, groupId, numReceivers, partition以及ssc,其传入函数分别对应:

  • zookeeper: ZooKeeper连接信息

  • topic: Kafka中输入的topic信息

  • groupId: consumer信息

  • numReceivers: 打算开启的receiver个数, 并用来调整并发

  • partition: Kafka中对应topic的分区数

以上几个参数主要用来连接Kafka并读取Kafka数据。具体执行的步骤如下:

  • Kafka相关读取参数配置,其中 zookeeper.connect即传入进来的zookeeper参数;auto.offset.reset设置从topic的最新处开始读取数据;zookeeper.connection.timeout.ms指zookeepr连接超时时间,以防止网络不稳定的情况;fetch.message.max.bytes则是指单次读取数据的大小;group.id则是指定consumer。

  • 指定topic的并发数,当指定receivers个数之后,但是如果配置当receivers个数小于topic的partition个数时,在每个receiver上面会起相应的线程来读取不同的partition。(感觉这里是相当于一个分区分配一个线程来消费,但是实际上kafka消费者一个消费者可以消费多个分区数据,只是一个分区数据不能被多个消费者消费而已。)

  • 读取Kafka数据,numReceivers的参数在此用于指定我们需要多少Executor来作为Receivers,开多个Receivers是为了提高应用吞吐量。

  • union用于将多个Receiver读取的数据关联起来。

二、Direct:

这种方式是延迟的。也就是说当action真正触发时才会去kafka里接数据。因此不存在currentbuffer的概念。它把kafka每个分区里的数据,映射为KafkaRdd的概念。题外话,在structured streaming中,也已经向DataFrame和DataSet统一了,弱化了RDD的概念。

真正与kafka打交道的是KafkaCluster,全限定名: org.apache.spark.streaming.kafka.KafkaCluster。包括设备kafka各种参数,连接,获取分区,以及偏移量,设置偏移量范围等。

Direct方式采用Kafka简单的consumer api方式来读取数据,无需经由ZooKeeper,此种方式不再需要专门Receiver来持续不断读取数据。当batch任务触发时,由Executor读取数据,并参与到其他Executor的数据计算过程中去。driver来决定读取多少offsets,并将offsets交由checkpoints来维护。将触发下次batch任务,再由Executor读取Kafka数据并计算。从此过程我们可以发现Direct方式无需Receiver读取数据,而是需要计算时再读取数据,所以Direct方式的数据消费对内存的要求不高,只需要考虑批量计算所需要的内存即可;另外batch任务堆积时,也不会影响数据堆积。其具体读取方式如下图:

源码:
 

Spark Streaming提供了一些重载读取Kafka数据的方法,本文中关注两个基于Scala的方法,这在我们的应用场景中会用到,具体的方法代码如下:

  • 方法createDirectStream中,ssc是StreamingContext;kafkaParams的具体配置见Receiver-based之中的配置,与之一样;这里面需要指出的是fromOffsets ,其用来指定从什么offset处开始读取数据。

方法createDirectStream中,该方法只需要3个参数,其中kafkaParams还是一样,并未有什么变化,不过其中有个配置auto.offset.reset可以用来指定是从largest或者是smallest处开始读取数据;topic是指Kafka中的topic,可以指定多个。具体提供的方法代码如下

在实际的应用场景中,我们会将两种方法结合起来使用,大体的方向分为两个方面:

  • 应用启动。当程序开发并上线,还未消费Kafka数据,此时从largest处读取数据,采用第二种方法;

  • 应用重启。因资源、网络等其他原因导致程序失败重启时,需要保证从上次的offsets处开始读取数据,此时就需要采用第一种方法来保证我们的场景

总体方向上,我们采用以上方法满足我们的需要,当然具体的策略我们不在本篇中讨论,后续会有专门的文章来介绍。从largest或者是smallest处读Kafka数据代码实现如下:

程序失败重启的逻辑代码如下:

代码中的fromOffsets参数从外部存储获取并需要处理转换,其代码如下:

该方法提供了从指定offsets处读取Kafka数据。如果发现读取数据异常,我们认为是offsets失败,此种情况去捕获这个异常,然后从largest处读取Kafka数据。

 

Receive_base VS   Direct两种方式的优缺点:

Direct方式具有以下方面的优势:

1、简化并行(Simplified Parallelism)。不现需要创建以及union多输入源,Kafka topic的partition与RDD的partition一一对应

2、高效(Efficiency)。Receiver-based保证数据零丢失(zero-data loss)需要配置spark.streaming.receiver.writeAheadLog.enable,此种方式需要保存两份数据,浪费存储空间也影响效率。而Direct方式则不存在这个问题。

3、强一致语义(Exactly-once semantics)。High-level数据由Spark Streaming消费,但是Offsets则是由Zookeeper保存。通过参数配置,可以实现at-least once消费,此种情况有重复消费数据的可能。

4、降低资源。Direct不需要Receivers,其申请的Executors全部参与到计算任务中;而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。

5、降低内存。Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。实际应用中我们可以把原先的10G降至现在的2-4G左右。

6、鲁棒性更好。Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。

 

Direct方式的缺点:

  • 提高成本。Direct需要用户采用checkpoint或者第三方存储来维护offsets,而不像Receiver-based那样,通过ZooKeeper来维护Offsets,此提高了用户的开发成本。

  • 监控可视化。Receiver-based方式指定topic指定consumer的消费情况均能通过ZooKeeper来监控,而Direct则没有这种便利,如果做到监控并可视化,则需要投入人力开发。

 

Receive-base优点:

1、Kafka的high-level数据读取方式让用户可以专注于所读数据,而不用关注或维护consumer的offsets,这减少用户的工作量以及代码量而且相对比较简单。

 

Receive-base的缺点:

1、防数据丢失。做checkpoint操作以及配置spark.streaming.receiver.writeAheadLog.enable参数,配置spark.streaming.receiver.writeAheadLog.enable参数,每次处理之前需要将该batch内的日志备份到checkpoint目录中,这降低了数据处理效率,反过来又加重了Receiver端的压力;另外由于数据备份机制,会受到负载影响,负载一高就会出现延迟的风险,导致应用崩溃。

2、单Receiver内存。由于receiver也是属于Executor的一部分,那么为了提高吞吐量,提高Receiver的内存。但是在每次batch计算中,参与计算的batch并不会使用到这么多的内存,导致资源严重浪费。

3、在程序失败恢复时,有可能出现数据部分落地,但是程序失败,未更新offsets的情况,这导致数据重复消费

4、提高并行度,采用多个Receiver来保存Kafka的数据。Receiver读取数据是异步的,并不参与计算。如果开较高的并行度来平衡吞吐量很不划算。5、Receiver和计算的Executor的异步的,那么遇到网络等因素原因,导致计算出现延迟,计算队列一直在增加,而Receiver则在一直接收数据,这非常容易导致程序崩溃。

6、采用MEMORY_AND_DISK_SER降低对内存的要求。但是在一定程度上影响计算的速度

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


本文链接: http://www.dtmao.cc/news_show_200248.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?