博客
关于我
Flink CDC的使用
阅读量:479 次
发布时间:2019-03-06

本文共 4602 字,大约阅读时间需要 15 分钟。

MySQL数据准备与Flink CDC实时数据捕获

MySQL数据准备

在开始使用Flink CDC捕获MySQL变更数据之前,需要先准备好MySQL数据库。以下是具体的操作步骤:

  • 创建并使用数据库
  • create database if not exists test;
    use test;
    1. 创建学生表
    2. drop table if exists stu;
      create table stu (
      id int primary key auto_increment,
      name varchar(100),
      age int
      );
      1. 插入初始数据
      2. insert into stu(name, age) values("张三", 18);
        insert into stu(name, age) values("李四", 20);
        insert into stu(name, age) values("王五", 21);

        注意事项:确保表中有主键,否则Flink CDC可能无法正常工作。

        开启MySQL binlog

        为了实现Flink CDC对MySQL数据库的变更数据实时捕获,需要先开启MySQL的二进制日志。

      3. 修改MySQL配置文件
      4. sudo vim /etc/my.cnf
        1. 在配置文件中添加以下内容:
        2. server-id = 1
          log-bin=mysql-bin
          binlog_format=row
          binlog-do-db=test

          注意事项:启用binlog的数据库需要根据实际情况调整设置,确保二进制日志文件路径和权限正确。

          1. 重启MySQL服务
          2. sudo systemctl restart mysqld

            Flink代码开发

            本节将介绍如何使用Flink CDC从MySQL数据库实时捕获增删改数据。

            依赖管理

          3. 添加Flink CDC依赖
          4. com.ververica
            flink-connector-mysql-cdc
            2.4.0
            1. 其他Flink依赖(如 flink-table-api-java-bridge 等)
            2. org.apache.flink
              flink-table-api-java-bridge
              ${flink.version}
              import com.ververica.cdc.connectors.mysql.source.MySqlSource;
              import com.ververica.cdc.connectors.mysql.table.StartupOptions;
              import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
              import org.apache.flink.api.common.eventtime.WatermarkStrategy;
              import org.apache.flink.streaming.api.datastream.DataStreamSource;
              import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
              public class FlinkCDCDemo {
              public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(4);
              MySqlSource
              mySqlSource = MySqlSource.builder()
              .hostname("node4")
              .port(3306)
              .username("root")
              .password("000000")
              .databaseList("test")
              .tableList("test.stu")
              .deserializer(new JsonDebeziumDeserializationSchema())
              .startupOptions(StartupOptions.initial())
              .build();
              DataStreamSource
              dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source")
              .setParallelism(1);
              dataStreamSource.print();
              env.execute();
              }
              }

              注意事项:确保MySQL的binlog已经启用,并且Flink运行环境的版本与依赖版本匹配。

              测试与验证

              添加新数据

              执行以下SQL语句:

              mysql> insert into stu(name, age) values("赵六", 23);

              输出示例

              {
              "before": null,
              "after": {
              "id": 4,
              "name": "赵六",
              "age": 23
              },
              "source": {
              "version": "1.9.7.Final",
              "connector": "mysql",
              "name": "mysql_binlog_source",
              "ts_ms": 1719831654000,
              "snapshot": "false",
              "db": "test",
              "sequence": null,
              "table": "stu",
              "server_id": 1,
              "gtid": null,
              "file": "mysql-bin.000001",
              "pos": 2300,
              "row": 0,
              "thread": 13,
              "query": null
              },
              "op": "c",
              "ts_ms": 1719831654692,
              "transaction": null
              }

              修改数据

              执行以下SQL语句:

              mysql> update stu set name="zl", age=19 where name="赵六";

              输出示例

              {
              "before": {
              "id": 4,
              "name": "赵六",
              "age": 23
              },
              "after": {
              "id": 4,
              "name": "zl",
              "age": 19
              },
              "source": {
              "version": "1.9.7.Final",
              "connector": "mysql",
              "name": "mysql_binlog_source",
              "ts_ms": 1719831987000,
              "snapshot": "false",
              "db": "test",
              "sequence": null,
              "table": "stu",
              "server_id": 1,
              "gtid": null,
              "file": "mysql-bin.000001",
              "pos": 2604,
              "row": 0,
              "thread": 13,
              "query": null
              },
              "op": "u",
              "ts_ms": 1719831987238,
              "transaction": null
              }

              删除数据

              执行以下SQL语句:

              mysql> delete from stu where id=4;

              输出示例

              {
              "before": {
              "id": 4,
              "name": "zl",
              "age": 19
              },
              "after": null,
              "source": {
              "version": "1.9.7.Final",
              "connector": "mysql",
              "name": "mysql_binlog_source",
              "ts_ms": 1719832151000,
              "snapshot": "false",
              "db": "test",
              "sequence": null,
              "table": "stu",
              "server_id": 1,
              "gtid": null,
              "file": "mysql-bin.000001",
              "pos": 2913,
              "row": 0,
              "thread": 13,
              "query": null
              },
              "op": "d",
              "ts_ms": 1719832151198,
              "transaction": null
              }

              注意事项:通过IDEA控制台可以实时查看Flink程序的输出日志,确保数据捕获和处理过程中没有错误发生。

    转载地址:http://iaqbz.baihongyu.com/

    你可能感兴趣的文章
    Parameter ‘password‘ not found. Available parameters are [md5String, param1, username, param2]
    查看>>
    ParameterizedThreadStart task
    查看>>
    Paramiko exec_命令的实时输出
    查看>>
    Spring security之管理session
    查看>>
    paramiko模块
    查看>>
    param[:]=param-lr*param.grad/batch_size的理解
    查看>>
    spring mvc excludePathPatterns失效 如何解决spring拦截器失效 excludePathPatterns忽略失效 拦截器失效 spring免验证拦截器不起作用
    查看>>
    Spring Cloud 之注册中心 EurekaServerAutoConfiguration源码分析
    查看>>
    Parrot OS 6.2 重磅发布!推出全新 Docker 容器启动器
    查看>>
    Parrot OS 6.3 发布!全面提升安全性,新增先进工具,带来更高性能
    查看>>
    ParseChat应用源码ios版
    查看>>
    Part 2异常和错误
    查看>>
    Pascal Script
    查看>>
    Spring Boot集成Redis实现keyspace监听 | Spring Cloud 34
    查看>>
    Spring Boot中的自定义事件详解与实战
    查看>>
    Passport 密码模式
    查看>>
    Spring Boot(七十六):集成Redisson实现布隆过滤器(Bloom Filter)
    查看>>
    passport 简易搭配
    查看>>
    passwd命令限制用户密码到期时间
    查看>>
    Spring Boot 动态加载jar包,动态配置太强了!
    查看>>