JDK动态代理 package inheritance installation pip devise jvm mockito LimeJS 后台ui模板 pythonset python中文 java使用mysql java查找字符串 怎么安装java环境 java读取文件数据 java语言运算符 java网络编程 黑客攻防实战入门 咪咕客户端下载 python队列 ps怎么插入表格 git命令 图片轮播代码 0x00000057 php正则匹配 男网红头像 变声器电脑版 三星打印机怎么加墨 c4d序列号 dll注入器 vue响应式原理 c4d克隆 笔底春风 局域网监控系统 cf兑换券 剪影的意思 沙盘sandboxie myeclipse ap天赋
当前位置: 首页 > 学习教程  > 编程语言

Spark Streaming中的Window操作

2020/12/28 18:48:40 文章标签:

目录1、window 输出窗口内容2、countByWindow 统计窗口中出现的元素个数3、countByValueAndWindow 统计元素相同的元素个数4、reduceByWindow 输出窗口元素5、reduceByKeyAndWindow 窗口中进行reduceByKey操作6、reduceByKeyAndWindow 对窗口进行流入和流出的reduceByKey操作窗…

目录

    • 1、window 输出窗口内容
    • 2、countByWindow 统计窗口中出现的元素个数
    • 3、countByValueAndWindow 统计元素相同的元素个数
    • 4、reduceByWindow 输出窗口元素
    • 5、reduceByKeyAndWindow 窗口中进行reduceByKey操作
    • 6、reduceByKeyAndWindow 对窗口进行流入和流出的reduceByKey操作

窗口函数,就是在DStream流上,以一个可配置的长度为窗口,以一个可配置的速率向前移动窗口,根据窗口函数的具体内容,分别对当前窗口中的这一波数据采集某对应的操作算子。

需要注意的是窗口长度,和窗口移动速率需要是batch time的整数倍。
在这里插入图片描述
添加pom依赖

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>2.0.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
      <version>2.0.0</version>
    </dependency>

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.4.5</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>

    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.6.6</version>
    </dependency>

1、window 输出窗口内容

window(windowLength,SlideInterval)
该操作由一个DStream对象调用,传入一个窗口长度参数,一个窗口移动速率参数,然后将当前时刻当前长度窗口中的元素取出形成一个新的DStream。

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkWindowDemo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkWindowDemo").setMaster("local[*]")

    val streamingContext = new StreamingContext(conf,Seconds(2))   //批处理时间设置为2秒

    streamingContext.checkpoint("checkpoint")

    val kafkaParams: Map[String, String] = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.136.20:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG -> "kafkaGroup2")
    )

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("SparkKafkaDemo"), kafkaParams) //SparkKafkaDemo是Kafka的一个Topic
    )

    val numStream: DStream[(String, Int)] = kafkaStream.flatMap(line => line.value().toString.split("\\s+"))
      .map((_, 1))
      //加8秒窗口,6秒滑动一次,也就是步长,步长必须是采集时间的倍数
      .window(Seconds(8),Seconds(6))
    numStream.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

创建生产信息进行测试

kafka-console-producer.sh --topic SparkKafkaDemo --broker-list 192.168.136.20:9092
#输入:
java
java
scala
#输出:
(java,1)
(java,1)
(scala,1)

2、countByWindow 统计窗口中出现的元素个数

countByWindow(windowLength,slideInterval)

返回指定长度窗口中的元素个数。

只能单纯的计数,并不含有逻辑,输出的是数字。

注:需要设置checkpoint

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkWindowDemo2 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkWindowDemo").setMaster("local[*]")

    val streamingContext = new StreamingContext(conf,Seconds(2))   //批处理时间设置为2秒,也就是采集时间
    //设置checkpoint
    streamingContext.checkpoint("file:\\D:\\test\\checkpoint")

    val kafkaParams: Map[String, String] = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.136.20:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG -> "kafkaGroup2")
    )

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("SparkKafkaDemo"), kafkaParams)
    )

    val numStream: DStream[Long] = kafkaStream.flatMap(line => line.value().toString.split("\\s+"))
      .map((_, 1))
      //加8秒窗口,4秒滑动一次,也就是步长,步长必须是采集时间的倍数
      //8秒内出现的单词数
      //countByWindow 返回指定窗口的元素个数
      .countByWindow(Seconds(8),Seconds(4))
    numStream.print()   //输出的是数字

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

创建生产信息进行测试

kafka-console-producer.sh --topic SparkKafkaDemo --broker-list 192.168.136.20:9092
#输入:
java
java
scala
#输出:
3

3、countByValueAndWindow 统计元素相同的元素个数

countByValueAndWindow(windowLength,slideInterval, [numTasks])

统计当前时间窗口中元素值相同的元素的个数

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object SparkWindowDemo3 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkWindowDemo").setMaster("local[*]")

    val streamingContext = new StreamingContext(conf,Seconds(2))   //批处理时间设置为2秒,也就是采集时间
	// 设置checkpoint
    streamingContext.checkpoint("file:\\D:\\test\\checkpoint")


    val kafkaParams: Map[String, String] = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.136.20:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG -> "kafkaGroup2")
    )

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("SparkKafkaDemo"), kafkaParams)
    )

    val numStream: DStream[(String, Long)] = kafkaStream.flatMap(line => line.value().toString.split("\\s+"))
      //加8秒窗口,4秒滑动一次,也就是步长,步长必须是采集时间的倍数
      //8秒内出现的单词数
      //countByValueAndWindow 统计当前时间窗口中元素值相同的元素的个数
      .countByValueAndWindow(Seconds(8),Seconds(4))
    numStream.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

创建生产信息进行测试

kafka-console-producer.sh --topic SparkKafkaDemo --broker-list 192.168.136.20:9092
#输入:
java
java
scala
#输出:
(java,2)
(scala,1)

4、reduceByWindow 输出窗口元素

reduceByWindow(func,windowLength,slideInterval)
在调用DStream上首先取窗口函数的元素形成新的DStream,然后在窗口元素形成的DStream上进行reduce。

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkWindowDemo4 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkWindowDemo").setMaster("local[*]")

    val streamingContext = new StreamingContext(conf,Seconds(2))   //批处理时间设置为2秒,也就是采集时间

    //streamingContext.checkpoint("file:\\D:\\test\\checkpoint")


    val kafkaParams: Map[String, String] = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.136.20:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG -> "kafkaGroup2")
    )

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("SparkKafkaDemo"), kafkaParams)
    )

    val numStream: DStream[String] = kafkaStream.flatMap(line => line.value().toString.split("\\s+"))
      //加8秒窗口,4秒滑动一次,也就是步长,步长必须是采集时间的倍数
      //8秒窗口期内,将所有出现的数据做拼接:_+":"+_
      .reduceByWindow(_+":"+_,Seconds(8),Seconds(4))
    numStream.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

创建生产信息进行测试

kafka-console-producer.sh --topic SparkKafkaDemo --broker-list 192.168.136.20:9092
#输入:
java
spark
scala
java
#输出:
java:spark:scala:java

5、reduceByKeyAndWindow 窗口中进行reduceByKey操作

reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])

reduceByKeyAndWindow的数据源是基于该DStream的窗口长度中的所有数据进行计算。该操作有一个可选的并发数参数。

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkWindowDemo5 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkWindowDemo").setMaster("local[*]")

    val streamingContext = new StreamingContext(conf,Seconds(2))   //批处理时间设置为2秒,也就是采集时间

    //streamingContext.checkpoint("file:\\D:\\test\\checkpoint")

    val kafkaParams: Map[String, String] = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.136.20:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG -> "kafkaGroup2")
    )

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("SparkKafkaDemo"), kafkaParams)
    )

    val numStream: DStream[(String,Int)] = kafkaStream.flatMap(line => line.value().toString.split("\\s+"))
      .map((_,1))
      //加8秒窗口,2秒滑动一次,也就是步长,步长必须是采集时间的倍数
      //8秒窗口期内实现wordcount或者reduce
      .reduceByKeyAndWindow((x:Int,y:Int)=>{x+y},Seconds(8),Seconds(2))
    numStream.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

创建生产信息进行测试

kafka-console-producer.sh --topic SparkKafkaDemo --broker-list 192.168.136.20:9092
#输入:
java
java
scala
scala
spark
#输出:
(spark,1)
(scala,2)
(java,2)

6、reduceByKeyAndWindow 对窗口进行流入和流出的reduceByKey操作

reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])这个窗口操作和上一个的区别是多传入一个函数invFunc。前面的func作用和上一个reduceByKeyAndWindow相同,后面的invFunc是用于处理流出rdd的。

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkWindowDemo5 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkWindowDemo").setMaster("local[*]")

    val streamingContext = new StreamingContext(conf,Seconds(2))   //批处理时间设置为2秒,也就是采集时间

    //streamingContext.checkpoint("file:\\D:\\test\\checkpoint")

    val kafkaParams: Map[String, String] = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.136.20:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG -> "kafkaGroup2")
    )

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("SparkKafkaDemo"), kafkaParams)
    )

    val numStream: DStream[(String,Int)] = kafkaStream.flatMap(line => line.value().toString.split("\\s+"))
      .map((_,1))
      //加8秒窗口,2秒滑动一次,也就是步长,步长必须是采集时间的倍数
      //8秒窗口期内实现wordcount或者reduce
      //.reduceByKeyAndWindow((x:Int,y:Int)=>{x+y},Seconds(8),Seconds(2))
      .reduceByKeyAndWindow((x:Int,y:Int)=>{x+y},(x:Int,y:Int)=>{x-y},Seconds(8),Seconds(2))

    numStream.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

创建生产信息进行测试

kafka-console-producer.sh --topic SparkKafkaDemo --broker-list 192.168.136.20:9092
#输入:
java
java
scala

#输出:
-------------------------------------------
Time: 1608800912000 ms
-------------------------------------------
(java,1)

-------------------------------------------
Time: 1608800914000 ms
-------------------------------------------
(java,2)

-------------------------------------------
Time: 1608800916000 ms
-------------------------------------------
(scala,1)
(java,2)

-------------------------------------------
Time: 1608800918000 ms
-------------------------------------------
(scala,1)
(java,2)

-------------------------------------------
Time: 1608800920000 ms
-------------------------------------------
(scala,1)
(java,1)

-------------------------------------------
Time: 1608800922000 ms
-------------------------------------------
(scala,1)
(java,0)

-------------------------------------------
Time: 1608800924000 ms
-------------------------------------------
(scala,0)
(java,0)


本文链接: http://www.dtmao.cc/news_show_550122.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?