比特微 leetcodeLCP Apache Pivot教程 动态条形图 jackson wso2 Avalon vue组件注册 pmp教学视频 swift视频教程 php项目实战 sql视频教程 jq去空格 js事件绑定 jquery事件绑定 teamviewer验证被拒绝 js对象添加元素 oracle可视化工具 mysql函数返回结果集 jquery获取兄弟节点 mysql建表主键自增长 python正则 python基础教程 python用什么ide java中泛型 java截取 linux系统教程 python视频教程 swing布局 js轮播图代码 gilisoft 只狼鬼佛 pr转场特效下载 eclipse中文版下载 cms教程 脚本 视频修复工具 7个人 assist是什么意思 远程桌面管理软件
当前位置: 首页 > 学习教程  > 编程语言

Flink官方文档笔记07 使用DataStream API的欺诈检测案例

2020/7/24 10:38:31 文章标签:

文章目录

    • Fraud Detection with the DataStream API
    • What Are You Building?
    • Prerequisites
    • Help, I’m Stuck!
    • How to Follow Along
  • Importing Flink into an IDE
    • Preparation
    • IntelliJ IDEA
    • Installing the Scala plugin
    • Importing Flink
    • Checkstyle For Java
  • ==Breaking Down==(分解) the Code
    • Writing a Real Application (v1)
    • Fraud Detector v2: State + Time = ❤️
    • Final Application
    • Expected Output

Fraud Detection with the DataStream API

Apache Flink offers a DataStream API for building robust, stateful streaming applications. It provides fine-grained(详细的,深入的) control over state and time, which allows for the implementation of advanced event-driven systems. In this step-by-step guide you’ll learn how to build a stateful streaming application with Flink’s DataStream API.

What Are You Building?

Credit card fraud is a growing concern in the digital age. Criminals steal credit card numbers by running scams(欺诈病毒) or hacking into insecure systems. Stolen numbers are tested by making one or more small purchases, often for a dollar or less. If that works, they then make more significant purchases to get items they can sell or keep for themselves.

In this tutorial, you will build a fraud detection system for alerting on suspicious credit card transactions. Using a simple set of rules, you will see how Flink allows us to implement advanced business logic and act in real-time.

Prerequisites

This walkthrough(演练) assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language.

Help, I’m Stuck!

If you get stuck, check out the community support resources. In particular, Apache Flink’s user mailing list is consistently ranked as one of the most active of any Apache project and a great way to get help quickly.
这里说的是你遇到bug自己解决不了,你可以在这上面寻求帮助。

How to Follow Along

If you want to follow along, you will require a computer with:

  • Java 8 or 11
  • Maven

我这里使用的是windows10,java8,idea上创建maven项目,连接虚拟机上的Flink。

A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly, so you only need to focus on filling out the business logic. These dependencies include flink-streaming-java which is the core dependency for all Flink streaming applications and flink-walkthrough-common that has data generators and other classes specific to this walkthrough.

Note: Each code block within this walkthrough may not contain the full surrounding class for brevity. The full code is available at the bottom of the page.

$ mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-walkthrough-datastream-java \
    -DarchetypeVersion=1.11.0 \
    -DgroupId=frauddetection \
    -DartifactId=frauddetection \
    -Dversion=0.1 \
    -Dpackage=spendreport \
    -DinteractiveMode=false

You can edit the groupId, artifactId and package if you like. With the above parameters, Maven will create a folder named frauddetection that contains a project with all the dependencies to complete this tutorial. After importing the project into your editor, you can find a file FraudDetectionJob.java (or FraudDetectionJob.scala) with the following code which you can run directly inside your IDE. Try setting break points through out the data stream and run the code in DEBUG mode to get a feeling for how everything works.

这里它用的linux上的maven构建项目,我用的是idea,属实让人着急,所以这里必须先搞懂idea怎么开发flink。

Importing Flink into an IDE

The sections below describe how to import the Flink project into an IDE for the development of Flink itself. For writing Flink programs, please refer to the Java API and the Scala API quickstart guides.

NOTE: Whenever something is not working in your IDE, try with the Maven command line first (mvn clean package -DskipTests) as it might be your IDE that has a bug or is not properly set up.

Preparation

To get started, please first checkout the Flink sources from one of our repositories, e.g.

git clone https://github.com/apache/flink.git

IntelliJ IDEA

A brief guide on how to set up IntelliJ IDEA IDE for development of the Flink core. As Eclipse is known to have issues with mixed Scala and Java projects, more and more contributors are migrating to IntelliJ IDEA.

The following documentation describes the steps to setup IntelliJ IDEA 2019.1.3 (https://www.jetbrains.com/idea/download/) with the Flink sources.

Installing the Scala plugin

The IntelliJ installation setup offers to install the Scala plugin. If it is not installed, follow these instructions before importing Flink to enable support for Scala projects and files:

  1. Go to IntelliJ plugins settings (IntelliJ IDEA -> Preferences ->
    Plugins) and click on “Install Jetbrains plugin…”.
  2. Select and install the “Scala” plugin.
  3. Restart IntelliJ

这里安装插件的详细过程可以看我这篇博客的第三步:
https://blog.csdn.net/weixin_42072754/article/details/104982002

Importing Flink

  1. Start IntelliJ IDEA and choose “New -> Project from Existing Sources”
  2. Select the root folder of the cloned Flink repository
  3. Choose “Import project from external model” and select “Maven”
  4. Leave the default options and successively click “Next” until you reach the SDK section.
  5. If there is no SDK listed, create one using the “+” sign on the top left. Select “JDK”, choose the JDK home directory and click “OK”. Select the most suiting JDK version. NOTE: A good rule of thumb is to select the JDK version matching the active Maven profile.
  6. Continue by clicking “Next” until finishing the import.
  7. Right-click on the imported Flink project -> Maven -> Generate Sources and Update Folders. Note that this will install Flink libraries in your local Maven repository, located by default at “/home/$USER/.m2/repository/org/apache/flink/”. Alternatively, mvn clean package -DskipTests also creates the files necessary for the IDE to work but without installing the libraries.
  8. Build the Project (Build -> Make Project)

上述过程是先下载flink整个仓库,再通过 Generate Sources and Update Folders.的方式添加到maven仓库,其实只要在pom文件里写deps,然后自动导入就可以。

Checkstyle For Java

IntelliJ supports checkstyle within the IDE using the Checkstyle-IDEA plugin.

Install the “Checkstyle-IDEA” plugin from the IntelliJ plugin repository.
Configure the plugin by going to Settings -> Other Settings -> Checkstyle.
Set the “Scan Scope” to “Only Java sources (including tests)”.
Select 8.14 in the “Checkstyle Version” dropdown and click apply. This step is important, don’t skip it!
In the “Configuration File” pane, add a new configuration using the plus icon:
    Set the “Description” to “Flink”.
    Select “Use a local Checkstyle file”, and point it to "tools/maven/checkstyle.xml" within your repository.
    Check the box for “Store relative to project location”, and click “Next”.
    Configure the “checkstyle.suppressions.file” property value to "suppressions.xml", and click “Next”, then “Finish”.
Select “Flink” as the only active configuration file, and click “Apply” and “OK”.
Checkstyle will now give warnings in the editor for any Checkstyle violations.

Once the plugin is installed you can directly import “tools/maven/checkstyle.xml” by going to Settings -> Editor -> Code Style -> Java -> Gear Icon next to Scheme dropbox. This will for example automatically adjust the imports layout.

You can scan an entire module by opening the Checkstyle tools window and clicking the “Check Module” button. The scan should report no errors.

Note Some modules are not fully covered by checkstyle, which include flink-core, flink-optimizer, and flink-runtime. Nevertheless please make sure that code you add/modify in these modules still conforms to the checkstyle rules.

好了,idea的使用Flink方法结束了,但是我依然没搞懂,所以我选择使用如下pom文件。

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.2</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <hadoop.version>2.6.0</hadoop.version>
        <flink.version>1.7.2</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <iheart.version>1.4.3</iheart.version>
        <fastjson.version>1.2.7</fastjson.version>
    </properties>

    <dependencies>
        <!-- 导入scala的依赖 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- 导入flink streaming和scala的依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 导入flink和scala的依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 指定flink-client API的版本 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 导入flink-table的依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 指定hadoop-client API的版本 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>xml-apis</groupId>
                    <artifactId>xml-apis</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- 指定mysql-connector的依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
        <!-- 指定fastjson的依赖 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.60</version>
        </dependency>
        <dependency>
            <groupId>com.jayway.jsonpath</groupId>
            <artifactId>json-path</artifactId>
            <version>2.3.0</version>
        </dependency>
        <!-- 指定flink-connector-kafka的依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- 指定 json/xml 转对象的依赖包 -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.9.9</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.9.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-scala_2.11</artifactId>
            <version>2.9.9</version>
        </dependency>
        <!-- 指定 redis的依赖包 -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.7.1</version><!--版本号可根据实际情况填写-->
        </dependency>


    </dependencies>

但是1.7.2版本可能没有walkthrough包,所以没法在idea上运行,我的linux上也没有maven,重新安装学习成本很高,所以暂时搁置,先往下看,把整个案例的核心思想理解了再说。

FraudDetectionJob.java

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

public class FraudDetectionJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> transactions = env
            .addSource(new TransactionSource())
            .name("transactions");
        
        DataStream<Alert> alerts = transactions
            .keyBy(Transaction::getAccountId)
            .process(new FraudDetector())
            .name("fraud-detector");

        alerts
            .addSink(new AlertSink())
            .name("send-alerts");

        env.execute("Fraud Detection");
    }
}

FraudDetector.java

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    private static final double SMALL_AMOUNT = 1.00;
    private static final double LARGE_AMOUNT = 500.00;
    private static final long ONE_MINUTE = 60 * 1000;

    @Override
    public void processElement(
            Transaction transaction,
            Context context,
            Collector<Alert> collector) throws Exception {

        Alert alert = new Alert();
        alert.setId(transaction.getAccountId());

        collector.collect(alert);
    }
}

Breaking Down(分解) the Code

Let’s walk step-by-step through the code of these two files. The FraudDetectionJob class defines the data flow of the application and the FraudDetector class defines the business logic of the function that detects fraudulent transactions.

让我们一步一步地慢慢看这俩代码文件,
FraudDetectionJob 类定义了应用的数据流。
FraudDetector 类定义了欺诈检测的业务逻辑

We start describing how the Job is assembled(组装,装配) in the main method of the FraudDetectionJob class.
我们开始描述一下这个FraudDetectionJob类里各个代码片段是如何组合在一起的,他们各自的功能是什么。

The Execution Environment 执行环境

The first line sets up your StreamExecutionEnvironment. The execution environment is how you set properties for your Job, create your sources, and finally trigger the execution of the Job.
这个env对象包含了你Job的属性,构建你的数据源,并最终触发Job的运行。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Creating a Source 构建一个数据源

Sources ingest data from external systems, such as Apache Kafka, Rabbit MQ, or Apache Pulsar, into Flink Jobs.
数据源会从现有的数据源系统里读取数据,比如kafka,pulsar等,读取到Flink里。

This walkthrough uses a source that generates an infinite stream of credit card transactions for you to process.
这个案例使用了一个可以产生无限多个信用卡交易记录信息的数据源,让你来处理。

Each transaction contains an account ID (accountId), timestamp (timestamp) of when the transaction occurred, and US$ amount (amount).
每条交易信息包括 id,交易时间戳,交易总金额三个信息。

The name attached to(附属于) the source is just for debugging purposes, so if something goes wrong, we will know where the error originated.

这个数据源的名字只是为了测试使用,所以如果哪里出了错误,我们就能第一时间知道了

DataStream<Transaction> transactions = env
    .addSource(new TransactionSource())
    .name("transactions");

Partitioning Events & Detecting Fraud 分区的事件和欺诈检测

The transactions stream contains a lot of transactions from a large number of users, such that it needs to be processed in parallel by multiple fraud detection tasks.
因为数据源数据非常多,所以它需要被多个欺诈检测程序并行处理。

Since fraud occurs on a per-account basis, you must ensure that all transactions for the same account are processed by the same parallel task of the fraud detector operator.
由于欺诈可能发生在每个账户上,所以你必须确保每个账户是被相同的并行欺诈检测器处理的。

To ensure that the same physical task processes all records for a particular key, you can partition a stream using DataStream#keyBy.
为了确保相同的物理任务处理特定键的所有记录,可以使用DataStream#keyBy对流进行分区

The process() call adds an operator that applies a function to each partitioned element in the stream.
process()调用添加一个运算符,该运算符将函数应用于流中的每个分区元素。

It is common to say the operator immediately after a keyBy, in this case FraudDetector, is executed within a keyed context.
通常说,紧接在keyBy之后的操作符(在本例中为欺诈检测器)是在一个键控上下文中执行的。

DataStream<Alert> alerts = transactions
    .keyBy(Transaction::getAccountId)
    .process(new FraudDetector())
    .name("fraud-detector");

Outputting Results

A sink writes a DataStream to an external system; such as Apache Kafka, Cassandra, and AWS Kinesis.
sink将数据流写入外部系统;比如Apache Kafka、Cassandra和AWS Kinesis。

The AlertSink logs each Alert record with log level INFO, instead of writing it to persistent storage, so you can easily see your results.

AlertSink用日志级别的信息记录每个警告记录,而不是将其写入持久存储,因此您可以很容易地看到结果,就是把写在磁盘上的日志信息打印到屏幕上了。

alerts.addSink(new AlertSink());

Executing the Job

Flink applications are built lazily and shipped to the cluster for execution only once fully formed.
Flink应用程序是惰性地构建的,并且只在完全形成后才被交付给集群执行。

Call StreamExecutionEnvironment#execute to begin the execution of our Job and give it a name.
调用StreamExecutionEnvironment#execute来开始我们的作业的执行,并给它一个名称。

env.execute("Fraud Detection");

The Fraud Detector

The fraud detector is implemented as a KeyedProcessFunction.
欺诈检测器被部署为KeyedProcessFunction。

Its method KeyedProcessFunction#processElement is called for every transaction event.
这个方法被作用于每个交易事件。

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final double SMALL_AMOUNT = 1.00;
    private static final double LARGE_AMOUNT = 500.00;
    private static final long ONE_MINUTE = 60 * 1000;

    @Override
    public void processElement(
            Transaction transaction,
            Context context,
            Collector<Alert> collector) throws Exception {
  
        Alert alert = new Alert();
        alert.setId(transaction.getAccountId());

        collector.collect(alert);
    }
}

This first version produces an alert on every transaction, which some may say is overly conservative.
这个第一个版本对每个交易产生一个警告,有些人可能会说这是过于稳健的。

The next steps of this tutorial will guide you to expand the fraud detector with more meaningful business logic.
本教程的后续步骤将指导您使用更有意义的业务逻辑扩展欺诈检测器。

Writing a Real Application (v1)

For the first version, the fraud detector should output an alert for any account that makes a small transaction immediately followed by a large one.
对于第一个版本,欺诈检测器应该为立即进行小交易后又进行大交易的任何帐户输出警报。

Where small is anything less than $1.00 and large is more than $500.
小的是小于1美元的,大的是大于500美元的。

Imagine your fraud detector processes the following stream of transactions for a particular account.
假设您的欺诈检测器为特定帐户处理以下交易流。
在这里插入图片描述

Transactions 3 and 4 should be marked as fraudulent(欺诈的) because it is a small transaction, $0.09, followed by a large one, $510. Alternatively, transactions 7, 8, and 9 are not fraud because the small amount of $0.02 is not immediately followed by the large one;

instead, there is an intermediate transaction that breaks the pattern.
相反,有一个打破这种模式的中间交易。

To do this, the fraud detector must remember information across events;
为此,欺诈检测器必须记住事件之间的信息;

a large transaction is only fraudulent if the previous one was small.
一笔大交易只有在前一笔交易规模较小的情况下才算欺诈。

Remembering information across events requires state, and that is why we decided to use a KeyedProcessFunction.
跨事件记忆信息需要状态,这就是我们决定使用KeyedProcessFunction的原因。

It provides fine-grained control over both state and time, which will allow us to evolve our algorithm with more complex requirements throughout this walkthrough.
它提供了对状态和时间的细粒度控制,这将允许我们在此演练中针对更复杂的需求改进算法。

The most straightforward(直接的) implementation is a boolean flag that is set whenever a small transaction is processed.
最直接的实现是在处理小事务时设置一个布尔标志。

When a large transaction comes through, you can simply check if the flag is set for that account.
当一个大的交易完成时,您可以简单地检查是否为该帐户设置了标志。

However, merely implementing the flag as a member variable in the FraudDetector class will not work.
但是,仅仅将标志实现为FraudDetector类中的成员变量是无法工作的。

Flink processes the transactions of multiple accounts with the same object instance of FraudDetector, which means if accounts A and B are routed through the same instance of FraudDetector, a transaction for account A could set the flag to true and then a transaction for account B could set off a false alert.
Flink处理多个帐户的交易FraudDetector的同一个对象实例,这意味着如果账户A和B是FraudDetector的路由通过相同的实例,一个事务的账户可以设置flag为true,然后交易账户B可以引发一个错误的警报。

We could of course use a data structure like a Map to keep track of the flags for individual keys, however, a simple member variable would not be fault-tolerant and all its information be lost in case of a failure.
当然,我们可以使用像Map这样的数据结构来跟踪单个键的标志,但是,一个简单的成员变量不会是容错的,一旦出现故障,它的所有信息都会丢失。

Hence, the fraud detector would possibly miss alerts if the application ever had to restart to recover from a failure.
因此,如果应用程序必须重新启动才能从故障中恢复,欺诈检测器可能会错过警报。

To address these challenges, Flink provides primitives for fault-tolerant state that are almost as easy to use as regular member variables.
为了解决这些问题,Flink提供了容错状态的基元,几乎和常规成员变量一样易于使用。

The most basic type of state in Flink is ValueState, a data type that adds fault tolerance to any variable it wraps.
Flink中最基本的状态类型是ValueState,这是一种数据类型,可以为它包装的任何变量添加容错性。

ValueState is a form of keyed state, meaning it is only available in operators that are applied in a keyed context;
ValueState是键控状态的一种形式,这意味着它只在应用于键控(键控可以理解为只作用于按key处理的)上下文的操作符中可用;

any operator immediately following DataStream#keyBy.
立即跟随DataStream#keyBy的任何操作符。

A keyed state of an operator is automatically scoped to the key of the record that is currently processed.
操作符的键控状态自动限定为当前处理的记录的键。

In this example, the key is the account id for the current transaction (as declared by keyBy()), and FraudDetector maintains an independent state for each account.
在本例中,键是当前事务的帐户id(由keyBy()声明),而FraudDetector为每个帐户维护一个独立的状态。

ValueState is created using a ValueStateDescriptor which contains metadata about how Flink should manage the variable.
ValueState是使用ValueStateDescriptor创建的,其中包含Flink应该如何管理变量的元数据。

The state should be registered before the function starts processing data.
在函数开始处理数据之前,应该对状态进行注册。

The right hook for this is the open() method.
正确的钩子是open()方法。

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    private transient ValueState<Boolean> flagState;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);
    }

ValueState is a wrapper class, similar to AtomicReference or AtomicLong in the Java standard library.
ValueState是一个包装类,类似于Java标准库中的AtomicReference或AtomicLong。

It provides three methods for interacting with its contents; update sets the state, value gets the current value, and clear deletes its contents.
它提供了三种与内容交互的方法;update设置状态,value获取当前值,clear删除其内容。

If the state for a particular key is empty, such as at the beginning of an application or after calling ValueState#clear, then ValueState#value will return null.
如果特定键的状态为空,例如在应用程序开始时或调用ValueState#clear之后,则ValueState#value将返回null。

Modifications to the object returned by ValueState#value are not guaranteed to be recognized by the system, and so all changes must be performed with ValueState#update.
对ValueState#value返回的对象的修改不能保证被系统识别,因此所有的修改都必须使用ValueState#update执行。

对ValueState#value返回的对象的修改不能保证被系统识别,因此所有的修改都必须使用ValueState#update执行。

Otherwise, fault tolerance is managed automatically by Flink under the hood, and so you can interact with it like with any standard variable.
否则,容错是由Flink在幕后自动管理的,因此您可以像与任何标准变量交互一样与它交互。

Below, you can see an example of how you can use a flag state to track potential fraudulent transactions.
下面,您将看到如何使用标志状态跟踪潜在的欺诈交易的示例。

@Override
public void processElement(
        Transaction transaction,
        Context context,
        Collector<Alert> collector) throws Exception {

    // Get the current state for the current key
    Boolean lastTransactionWasSmall = flagState.value();

    // Check if the flag is set
    if (lastTransactionWasSmall != null) {
        if (transaction.getAmount() > LARGE_AMOUNT) {
            // Output an alert downstream
            Alert alert = new Alert();
            alert.setId(transaction.getAccountId());

            collector.collect(alert);            
        }

        // Clean up our state
        flagState.clear();
    }

    if (transaction.getAmount() < SMALL_AMOUNT) {
        // Set the flag to true
        flagState.update(true);
    }
}

For every transaction, the fraud detector checks the state of the flag for that account.
对于每个交易,欺诈检测器检查该帐户标志的状态。

Remember, ValueState is always scoped to the current key, i.e., account. If the flag is non-null, then the last transaction seen for that account was small, and so if the amount for this transaction is large, then the detector outputs a fraud alert.
记住,ValueState的作用域始终是当前键,即。,账户。如果该标志是非null,则该帐户看到的最后一个事务很小,因此如果该事务的金额很大,则检测器输出欺诈警报。

After that check, the flag state is unconditionally cleared.
在检查之后,flag的状态就会被无条件地清除。

Either the current transaction caused a fraud alert, and the pattern is over, or the current transaction did not cause an alert, and the pattern is broken and needs to be restarted.
当前事务引起欺诈警报,模式结束;或者当前事务没有引起警报,模式中断,需要重新启动。

Finally, the transaction amount is checked to see if it is small.
最后,检查交易金额是否小。

If so, then the flag is set so that it can be checked by the next event.
如果是,则设置该标志以便下一个事件可以检查它。

Notice that ValueState<Boolean> actually has three states, unset ( null), true, and false, because all ValueState’s are nullable.
注意,’ ValueState '实际上有三种状态,unset (null), true和false,因为所有ValueState都是空的。

This job only makes use of unset ( null) and true to check whether the flag is set or not.
该job仅使用unset (null)和true检查标志是否设置。

Fraud Detector v2: State + Time = ❤️

Scammers don’t wait long to make their large purchase to reduce the chances their test transaction is noticed.
骗子不会等待很长时间来完成他们的大宗购买,以减少他们的测试交易被注意的机会。

For example, suppose you wanted to set a 1 minute timeout to your fraud detector; i.e., in the previous example transactions 3 and 4 would only be considered fraud if they occurred within 1 minute of each other.
例如,假设您希望将欺诈检测器设置为1分钟超时;即。,在前面的示例中,事务3和事务4只有在1分钟内发生时才被认为是欺诈。

Flink’s KeyedProcessFunction allows you to set timers which invoke a callback method at some point in time in the future.
Flink的KeyedProcessFunction允许你设置计时器,在将来的某个时间点调用回调方法。

Let’s see how we can modify our Job to comply with our new requirements:
让我们看看如何修改我们的工作,以符合我们的新要求:

Whenever the flag is set to true, also set a timer for 1 minute in the future.
当标志被设置为true时,也设置一个1分钟的计时器。
When the timer fires, reset the flag by clearing its state.
当计时器触发时,通过清除其状态重置标志。
If the flag is ever cleared the timer should be canceled.
如果该标志被清除,则应该取消计时器。

To cancel a timer, you have to remember what time it is set for, and remembering implies state, so you will begin by creating a timer state along with your flag state.
要取消定时器,您必须记住它设置的时间,记住意味着状态,所以您将首先创建一个计时器状态和您的标志状态。

private transient ValueState<Boolean> flagState;
private transient ValueState<Long> timerState;

@Override
public void open(Configuration parameters) {
    ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
            "flag",
            Types.BOOLEAN);
    flagState = getRuntimeContext().getState(flagDescriptor);

    ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
            "timer-state",
            Types.LONG);
    timerState = getRuntimeContext().getState(timerDescriptor);
}

KeyedProcessFunction#processElement is called with a Context that contains a timer service.
KeyedProcessFunction#processElement是通过包含计时器服务的上下文调用的。

The timer service can be used to query the current time, register timers, and delete timers.
计时器服务可用于查询当前时间、注册计时器和删除计时器。

With this, you can set a timer for 1 minute in the future every time the flag is set and store the timestamp in timerState.
这样,您就可以在每次设置标志时设置一个1分钟的计时器,并将时间戳存储在timerState中。

if (transaction.getAmount() < SMALL_AMOUNT) {
    // set the flag to true
    flagState.update(true);

    // set the timer and timer state
    long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
    context.timerService().registerProcessingTimeTimer(timer);
    timerState.update(timer);
}

Processing time is wall clock time, and is determined by the system clock of the machine running the operator.
加工时间为挂钟时间,由操作者运行机器的系统时钟决定。

When a timer fires, it calls KeyedProcessFunction#onTimer. Overriding this method is how you can implement your callback to reset the flag.
当计时器触发时,它调用KeyedProcessFunction#onTimer。重写此方法是实现回调来重置标志的方法。

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
    // remove flag after 1 minute
    timerState.clear();
    flagState.clear();
}

Finally, to cancel the timer, you need to delete the registered timer and delete the timer state.
最后,要取消计时器,需要删除已注册的计时器和计时器状态。

You can wrap this in a helper method and call this method instead of flagState.clear().
您可以将其封装在一个helper方法中并调用该方法,而不是flag .clear()。

private void cleanUp(Context ctx) throws Exception {
    // delete timer
    Long timer = timerState.value();
    ctx.timerService().deleteProcessingTimeTimer(timer);

    // clean up all state
    timerState.clear();
    flagState.clear();
}

And that’s it, a fully functional, stateful, distributed streaming application!
就是这样,一个功能齐全、有状态的分布式流应用程序!

Final Application

package spendreport;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    private static final double SMALL_AMOUNT = 1.00;
    private static final double LARGE_AMOUNT = 500.00;
    private static final long ONE_MINUTE = 60 * 1000;

    private transient ValueState<Boolean> flagState;
    private transient ValueState<Long> timerState;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);

        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
                "timer-state",
                Types.LONG);
        timerState = getRuntimeContext().getState(timerDescriptor);
    }

    @Override
    public void processElement(
            Transaction transaction,
            Context context,
            Collector<Alert> collector) throws Exception {

        // Get the current state for the current key
        Boolean lastTransactionWasSmall = flagState.value();

        // Check if the flag is set
        if (lastTransactionWasSmall != null) {
            if (transaction.getAmount() > LARGE_AMOUNT) {
                //Output an alert downstream
                Alert alert = new Alert();
                alert.setId(transaction.getAccountId());

                collector.collect(alert);
            }
            // Clean up our state
            cleanUp(context);
        }

        if (transaction.getAmount() < SMALL_AMOUNT) {
            // set the flag to true
            flagState.update(true);

            long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
            context.timerService().registerProcessingTimeTimer(timer);

            timerState.update(timer);
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
        // remove flag after 1 minute
        timerState.clear();
        flagState.clear();
    }

    private void cleanUp(Context ctx) throws Exception {
        // delete timer
        Long timer = timerState.value();
        ctx.timerService().deleteProcessingTimeTimer(timer);

        // clean up all state
        timerState.clear();
        flagState.clear();
    }
}


Expected Output

Running this code with the provided TransactionSource will emit fraud alerts for account 3. You should see the following output in your task manager logs:
使用提供的TransactionSource运行此代码将为帐户3发出欺诈警报。你应该看到以下输出在你的任务管理器日志:

2019-08-19 14:22:06,220 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
2019-08-19 14:22:11,383 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
2019-08-19 14:22:16,551 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
2019-08-19 14:22:21,723 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
2019-08-19 14:22:26,896 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}

这篇主要讲了state,time,结合的具体API实例,不需要运行,只需要你能理解代码的含义,以及他描述的运行逻辑,再记忆一下这几个类和方法,这一篇你就吸收的差不多了。关于前面我没法在idea上运行这个代码的问题,我会继续想办法解决,如果解决了我会第一时间更新到博客。


本文链接: http://www.dtmao.cc/news_show_50323.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?