QuarkXPress JavaWeb 细胞因子 Python 控制跳转 TensorRT ajax wpf dns msbuild sas vue下载 vue前端 传智播客python php抽奖源码 jq遍历对象 jquery绑定事件的方法 java两个数组合并 jquery获取兄弟节点 docker启动命令 python解析json数据 python环境搭建 python自学入门 python运行 python自定义异常 javatrim java数组反转 java8函数式接口 java中的继承 java运算 java中map java集成开发环境 java创建对象 javascript基础 垃圾邮件数据集 wps2011 服务器系统下载 js四舍五入 日历制作模板 飞猪ip
当前位置: 首页 > 学习教程  > 编程语言

Flink 入门实战之六Source自定义读取mysql数据

2020/12/5 10:05:04 文章标签:

自定义Source连接mysql 重写SourceRichFunction通过JDBC方式连接Mysql使用预执行语句执行带参数sql语句 package com.dayimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.flink.configuration.Configuration import org.apache.flink.stre…

自定义Source连接mysql

  • 重写SourceRichFunction通过JDBC方式连接Mysql
  • 使用预执行语句执行带参数sql语句
package com.day

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
object SourceMysql {
  case class User (uid: Int, name: String, age: Int, sex:String)

  def main(args: Array[String]): Unit = {
    // 1、获取流式环境变量
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 2、添加自定义source  连接mysql
    val dataStream = env.addSource(new MysqlSource)
    // 3、打印数据
    dataStream.print()
    // 4、执行任务
    env.execute("mysql Job")
  }

  //自定义source连接mysql
  class MysqlSource extends RichSourceFunction[User] {
    val uri = "jdbc:mysql://jeff200:3306/test_db?characterEncoding=utf-8"
    val user = "root"
    val pwd = "root"
    classOf[com.mysql.jdbc.Driver]
    var conn: Connection= _
    // 预编带参数语句
    var preStatement: PreparedStatement = _
    override def open(parameters: Configuration): Unit = {
      conn = DriverManager.getConnection(uri, user, pwd)
      val sql =
        """
          |SELECT uid, name, age, sex
          |FROM t_user
          |WHERE uid = ?
          |""".stripMargin
      preStatement = conn.prepareStatement(sql)
    }

    override def run(ctx: SourceFunction.SourceContext[User]): Unit = {
      val uid = 1
      preStatement.setInt(1, uid) // 查询uid为1的用户信息
      val result = preStatement.executeQuery()
      while (result.next){
        ctx.collect(
          User(
            uid = result.getInt(1),
            name = result.getString(2),
            age = result.getInt(3),
            sex = result.getString(4)
          )
        )
      }
    }

    override def cancel(): Unit = {
      if (conn != null) conn.close
    }
  }
}
  • 核心依赖pom.xml
<!-- mysql 连接器 -->
<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
   <version>5.1.35</version>
</dependency>
  • 结果输出
    在这里插入图片描述
  • 注意
    由于使用第三方JDBC连接器,发布到生产环境时候要将mysql-connector-java-5.1.35.jar引入flink/lib下面

本文链接: http://www.dtmao.cc/news_show_450147.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?