百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

Flink 实现 MySQL CDC 动态同步表结构

toyiye 2024-07-02 03:04 14 浏览 0 评论

使用 Flink CDC(Change Data Capture) 实现数据同步被越来越多的人接受。本文介绍了在数据同步过程中,如何将 Schema 的变化实时地从 MySQL 中同步到 Flink 程序中去。

背景

MySQL 存储的数据量大了之后往往会出现查询性能下降的问题,这时候通过 Flink SQL 里的 MySQL CDC Connector 将数据同步到其他数据存储是常见的一种处理方式。

例如 CDC 到 ES 实现数据检索,CDC 到 ClikHouse 进行 OLAP 分析,CDC 到 Kafka 实现数据同步等,然而目前官方 MySQL CDC Connector 还无法实现动态同步表结构,如果新增字段,则下游无法收到新增字段的数据,如果删除字段,那 Flink 任务将会报错退出,需要修改 SQL 后才能正常启动。

对于某些业务来说,数据库 Schema 变动是非常频繁的操作,如果只是变动就需要修改 SQL 并重启 Flink 任务,那么会带来很多不必要的维护成本。

适用版本

flink 1.11

flink-cdc-connector 1.x

无法同步表结构的原因

那么为什么 Flink SQL 无法通过 binlog 来同步表结构呢?查阅下源码可以发现,Flink 进行 binlog 数据转换时主要是通过 Flink SQL 种类似 Create Table 的语法预先定义的 Schema 来进行转换的,具体代码如下:

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {

    RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();

    TypeInformation<RowData> typeInfo = (TypeInformation<RowData>) scanContext.createTypeInformation(physicalSchema.toRowDataType());

    DebeziumDeserializationSchema<RowData> deserializer = new RowDataDebeziumDeserializeSchema(

      rowType,

      typeInfo,

      ((rowData, rowKind) -> {}),

      serverTimeZone);

         ...

}

DebeziumDeserializationSchema 是用于转换 binlog 数据到 RowData 的核心接口,创建这个类时传入了 Flink SQL 定义的物理 Schema(封装为 RowType)。

public RowDataDebeziumDeserializeSchema(RowType rowType, TypeInformation<RowData> resultTypeInfo, ValueValidator validator, ZoneId serverTimeZone) {

    this.runtimeConverter = createConverter(rowType);

    this.resultTypeInfo = resultTypeInfo;

    this.validator = validator;

    this.serverTimeZone = serverTimeZone;

}

RowDataDebeziumDeserializeSchema 是 DebeziumDeserializationSchema 核心实现类,可以看到 createConverter 方法创建了用于转换 binlog 数据的转换器。

private DeserializationRuntimeConverter createRowConverter(RowType rowType) {

    final DeserializationRuntimeConverter[] fieldConverters = rowType.getFields().stream()

      .map(RowType.RowField::getType)

      .map(this::createConverter)

      .toArray(DeserializationRuntimeConverter[]::new);

    final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);




    return (dbzObj, schema) -> {

      Struct struct = (Struct) dbzObj;

      int arity = fieldNames.length;

      GenericRowData row = new GenericRowData(arity);

      for (int i = 0; i < arity; i++) {

        String fieldName = fieldNames[i];

        Object fieldValue = struct.get(fieldName);

        Schema fieldSchema = schema.field(fieldName).schema();

                ...

}

在最核心的转换方法中,Flink 通过 rowType.getFieldNames 获取到了 SQL 定义好的 fieldNames,并在后续的转换函数中通过 fieldName 来读取 binlog 的 schema 和 value,因此当数据库的表结构发生变更时,binlog 数据中即使已经有了新增的 schema 结构与数据,但因为 fieldNames 依然还是旧的,因此无法获取到新的变更。

解决方案

既然 Flink SQL 无法实现需求,那么很容易想到,使用 JAR 作业进行一些自定义扩展是非常适合这个场景的。

  1. 首先我们需要实现自己的 DebeziumDeserializationSchema,这里实现了一个名为 JsonStringDebeziumDeserializationSchema 的简单示例,实现将 binlog 数据转换为 JSON,在实际业务中可以根据业务需求实现更个性化的操作,例如向下游发送自定义的 Schema 变更通知等等。
public class JsonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema {

   

   @Override

   public void deserialize(SourceRecord record, Collector out) throws Exception {

       Envelope.Operation op = Envelope.operationFor(record);

       Struct value = (Struct) record.value();

       Schema valueSchema = record.valueSchema();

       if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {

           String insert = extractAfterRow(value, valueSchema);

           out.collect(new Tuple2<>(true, insert));

      } else if (op == Envelope.Operation.DELETE) {

           String delete = extractBeforeRow(value, valueSchema);

           out.collect(new Tuple2<>(false, delete));

      }else {

           String after = extractAfterRow(value, valueSchema);

           out.collect(new Tuple2<>(true, after));

      }

  }




   private Map<String,Object> getRowMap(Struct after){

       return after.schema().fields().stream().collect(Collectors.toMap(Field::name,f->after.get(f)));

  }




   private String extractAfterRow(Struct value, Schema valueSchema) throws Exception {

       Struct after = value.getStruct(Envelope.FieldName.AFTER);

       Map<String,Object> rowMap = getRowMap(after);

       ObjectMapper objectMapper = new ObjectMapper();

       return objectMapper.writeValueAsString(rowMap);

  }

   private String extractBeforeRow(Struct value, Schema valueSchema) throws Exception {

       Struct after = value.getStruct(Envelope.FieldName.BEFORE);

       Map<String,Object> rowMap = getRowMap(after);

       ObjectMapper objectMapper = new ObjectMapper();

       return objectMapper.writeValueAsString(rowMap);

  }




   @Override

   public TypeInformation getProducedType() {

       return TypeInformation.of(new TypeHint<Tuple2<Boolean,String>>(){});

  }

}

实现 DebeziumDeserializationSchema 需要实现 deserialize、getProducedType 两个函数。 deserialize 实现转换数据的逻辑,getProducedType 定义返回的类型,这里返回两个参数,第一个Boolean 类型的参数表示数据是 upsert 或是 delete,第二个参数返回转换后的 JSON string,这里的 JSON 将会包含 Schema 变更后的 Column 与对应的 Value。

  1. 编写启动 Main 函数,将我们自定义的 DebeziumDeserializationSchema 实现设置到 SourceFunction 中
public class MySQLCDC{

   public static void main(String[] args) throws Exception {

       ParameterTool params = ParameterTool.fromArgs(args);

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

       // 关闭 Operator Chaining, 令运行图更容易初学者理解

       env.disableOperatorChaining();

       env.setParallelism(1);

       //checkpoint的一些配置

       env.enableCheckpointing(params.getInt("checkpointInterval",60000));

       env.getCheckpointConfig().setCheckpointTimeout(5000);

       env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

       SourceFunction source = MySQLSource.builder()

              .hostname(params.get("hostname","127.0.0.1"))

              .port(params.getInt("port",3306))

              .username(params.get("username","root"))

              .password(params.get("password",""))

              .serverTimeZone("Asia/Shanghai")

               //设置我们自己的实现

              .deserializer(new JsonStringDebeziumDeserializationSchema())

              .databaseList(params.get("databaseList","test"))

              .tableList(params.get("tableList","test.my_test"))

              .build();

       // 定义数据源

       DataStream<Tuple2<Boolean, String>> streamSource =

       env.addSource(source).name("MySQLSource");

      ...

       env.execute(MySQLCDC.class.getSimpleName());

  }

}

建立测试数据库,并插入几条数据

CREATE TABLE `my_test` (

 `f_sequence` int(11) DEFAULT NULL,

 `f_random` int(11) DEFAULT NULL,

 `f_random_str` varchar(255) NOT NULL DEFAULT '',

 `Name` varchar(255) DEFAULT '',

 `f_date` date DEFAULT NULL,

 `f_datetime` datetime DEFAULT NULL,

 `f_timestamp` bigint(20) DEFAULT NULL,

 PRIMARY KEY (`f_random_str`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8

这个时候运行程序,已经可以看到一些输出了。

Schema 变更前输出:

(true,{"f_date":18545,"f_random_str":"1","f_sequence":1,"f_timestamp":1630486762,"f_datetime":1602328271000,"f_random":1,"Name":"1"})

(true,{"f_date":18545,"f_random_str":"2","f_sequence":2,"f_timestamp":1630486762,"f_datetime":1602328271000,"f_random":2,"Name":"2"})

(true,{"f_date":18545,"f_random_str":"3","f_sequence":3,"f_timestamp":1630486762,"f_datetime":1602328271000,"f_random":3,"Name":"3"})

(true,{"f_date":18545,"f_random_str":"4","f_sequence":33333,"f_timestamp":1630486762,"f_datetime":1602328271000,"f_random":4,"Name":"3"})

但是与数据库对比可以发现,这里的时间戳与数据库时间刚好相差了 8 个小时

f_sequence|f_random|f_random_str|Name|f_date    |f_datetime         |f_timestamp|

----------+--------+------------+----+----------+-------------------+-----------+

        1|       1|1           |1   |2020-10-10|2020-10-10 11:11:11| 1630486762|

        2|       2|2           |2   |2020-10-10|2020-10-10 11:11:11| 1630486762|

        3|       3|3           |3   |2020-10-10|2020-10-10 11:11:11| 1630486762|

    33333|       4|4           |3   |2020-10-10|2020-10-10 11:11:11| 1630486762|

说明我们启动时设置的 .serverTimeZone("Asia/Shanghai") 并没有生效,查源码可以发现,底层的 Debezium 并没有实现 serverTimeZone 的配置,相应的转换是在 RowDataDebeziumDeserializeSchema 内实现的,源码如下:

private TimestampData convertToTimestamp(Object dbzObj, Schema schema) {

  if (dbzObj instanceof Long) {

    switch (schema.name()) {

      case Timestamp.SCHEMA_NAME:

        return TimestampData.fromEpochMillis((Long) dbzObj);

      case MicroTimestamp.SCHEMA_NAME:

        long micro = (long) dbzObj;

        return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000));

      case NanoTimestamp.SCHEMA_NAME:

        long nano = (long) dbzObj;

        return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000));

    }

  }

  //这里的serverTimeZone来自于Bean构造函数传入的配置项

  LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);

  return TimestampData.fromLocalDateTime(localDateTime);

}

因此如果要实现完整的功能,那么我们自己实现的 JsonStringDebeziumDeserializationSchema 也需要包含对应的 Converter,最终代码如下:

public class JsonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema {







  public JsonStringDebeziumDeserializationSchema(int zoneOffset) {

      //实现一个用于转换时间的Converter  

      this.runtimeConverter = (dbzObj,schema) -> {

            if(schema.name() != null){

                switch (schema.name()) {

                    case Timestamp.SCHEMA_NAME:

                        return TimestampData.fromEpochMillis((Long) dbzObj).toLocalDateTime().atOffset(ZoneOffset.ofHours(zoneOffset)).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);

                    case MicroTimestamp.SCHEMA_NAME:

                        long micro = (long) dbzObj;

                        return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000)).toLocalDateTime().atOffset(ZoneOffset.ofHours(zoneOffset)).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);

                    case NanoTimestamp.SCHEMA_NAME:

                        long nano = (long) dbzObj;

                        return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000)).toLocalDateTime().atOffset(ZoneOffset.ofHours(zoneOffset)).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);

                    case Date.SCHEMA_NAME:

                        return TemporalConversions.toLocalDate(dbzObj).format(DateTimeFormatter.ISO_LOCAL_DATE);

                }

            }

            return dbzObj;

        };

    }

    

    //定义接口

    private interface DeserializationRuntimeConverter extends Serializable {

        Object convert(Object dbzObj, Schema schema);

    }




    private final JsonStringDebeziumDeserializationSchema.DeserializationRuntimeConverter runtimeConverter;







    private Map<String,Object> getRowMap(Struct after){

        //转换时使用对应的转换器

        return after.schema().fields().stream()

          .collect(Collectors.toMap(Field::name,f->runtimeConverter.convert(after.get(f),f.schema())));

    }




    ...

}

同时修改 Main 函数,在构造 JsonStringDebeziumDeserializationSchema 时传入对应的时区,再次运行时就可以看到符合我们预期的输出了。

修改时区后输出:

(true,{"f_date":"2020-10-10","f_random_str":"1","f_sequence":1,"f_timestamp":1630486762,"f_datetime":"2020-10-10T11:11:11+08:00","f_random":1,"Name":"1"})

(true,{"f_date":"2020-10-10","f_random_str":"2","f_sequence":2,"f_timestamp":1630486762,"f_datetime":"2020-10-10T11:11:11+08:00","f_random":2,"Name":"2"})

(true,{"f_date":"2020-10-10","f_random_str":"3","f_sequence":3,"f_timestamp":1630486762,"f_datetime":"2020-10-10T11:11:11+08:00","f_random":3,"Name":"3"})

(true,{"f_date":"2020-10-10","f_random_str":"4","f_sequence":33333,"f_timestamp":1630486762,"f_datetime":"2020-10-10T11:11:11+08:00","f_random":4,"Name":"3"})

最后我们可以验证一下 Schema 变更是不是可以及时同步到输出的 JSON 中,通过语句在数据库中新增一个字段,并插入一条新数据:

ALTER TABLE my_test ADD f_added_string varchar(255) NOT NULL DEFAULT '' COMMENT '新增字段';

INSERT INTO my_test VALUES(1,1,'new','new','2020-10-10 10:10:10',1630486762,'new');

可以看到输出中已经出现了新增的字段

Schema 变更后输出:

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码