手把手构建基于 GBase8s 的 Flink connector
简介
本篇文章,首先会向大家阐述什么是 Flink connector 和 CDC , 然后会通过手把手的方式和大家一起构建一个简单的GBase8s的Flink connector,并完成实践项目,即通过Mysql CDC实时通过connector同步数据到GBase8s中。
什么是 Flink connector
Flink内置了一些基本数据源和接收器,这些数据源和接收器始终可用。该预定义的数据源包括文件、Mysql、RabbitMq、Kafka、ES等,同时也支持数据输出到文件、Mysql、RabbitMq、Kafka、ES等。
简单的说:flink连接器就是将某些数据源加载与数据输出做了封装(连接器),我们只要引入对应的连接器依赖,即可快速的完成对数据源的加载以及数据的输出。
什么是CDC(Change Data Capture)
首先什么是CDC ?它是Change Data Capture的缩写,即变更数据捕捉的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等操作。
其主要的应用场景:
-
异构数据库之间的数据同步或备份 / 建立数据分析计算平台
-
微服务之间共享数据状态
-
更新缓存 / CQRS 的 Query 视图更新
CDC 它是一个比较广义的概念,只要能捕获变更的数据,我们都可以称为 CDC 。业界主要有基于查询的 CDC 和基于日志的 CDC ,可以从下面表格对比他们功能和差异点。
基于查询的 CDC | 基于日志的 CDC | |
---|---|---|
概念 | 每次捕获变更发起 Select 查询进行全表扫描,过滤出查询之间变更的数据 | 读取数据存储系统的 log ,例如 MySQL 里面的 binlog持续监控 |
开源产品 | Sqoop, Kafka JDBC Source | Canal, Maxwell, Debezium |
执行模式 | Batch | Streaming |
捕获所有数据的变化 | ❌ | ✅ |
低延迟,不增加数据库负载 | ❌ | ✅ |
不侵入业务(LastUpdated字段) | ❌ | ✅ |
捕获删除事件和旧记录的状态 | ❌ | ✅ |
捕获旧记录的状态 | ❌ | ✅ |
flink-connector-gbasedbt
我们其实是可以自己手写Sink将CDC的数据直接汇入我们的目标数据库的。这样是不是不够优雅?我们是不是可以通过Flink SQL的方式将数据汇入到GBase8s呢?答案是肯定的,接下来我们就来实现一个简单的GBase8s的Flink connector
-
构建 行转换器(RowConverter)
-
构建 方言(Dialect)
-
注册动态表工厂(DynamicTableFactory),以及相关Sink程序
经过上面三步,就可以实现一个简单的connector了。接下来我们就来看,如何实现:
构建 行转换器(RowConverter)
package wang.datahub.converter;import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.flink.table.types.logical.RowType;/**
* @author lijiaqi
*/
public class GBasedbtRowConverter extends AbstractJdbcRowConverter {public GBasedbtRowConverter(RowType rowType) {super(rowType);}private static final long serialVersionUID = 1L;@Overridepublic String converterName() {return "gbasedbt";}}
构建 方言(Dialect)
package wang.datahub.dialect;import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;
import wang.datahub.converter.GBasedbtRowConverter;import java.util.Optional;/**
*
* @author lijiaqi
*/
public class GBasedbtDialect implements JdbcDialect {private static final long serialVersionUID = 1L;@Overridepublic String dialectName() {return "gbasedbt";}@Overridepublic boolean canHandle(String url) {return url.startsWith("jdbc:gbasedbt-sqli:");}@Overridepublic JdbcRowConverter getRowConverter(RowType rowType) {return new GBasedbtRowConverter(rowType);}@Overridepublic String getLimitClause(long l) {return null;}@Overridepublic void validate(TableSchema schema) throws ValidationException {JdbcDialect.super.validate(schema);}@Overridepublic Optional<String> defaultDriverName() {return Optional.of("com.gbasedbt.jdbc.Driver");}@Overridepublic String quoteIdentifier(String identifier) {return "'" + identifier + "'";}@Overridepublic Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {return JdbcDialect.super.getUpsertStatement(tableName, fieldNames, uniqueKeyFields);}@Overridepublic String getRowExistsStatement(String tableName, String[] conditionFields) {return JdbcDialect.super.getRowExistsStatement(tableName, conditionFields);}@Overridepublic String getInsertIntoStatement(String tableName, String[] fieldNames) {return JdbcDialect.super.getInsertIntoStatement(tableName, fieldNames);}@Overridepublic String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {return JdbcDialect.super.getUpdateStatement(tableName, fieldNames, conditionFields);}@Overridepublic String getDeleteStatement(String tableName, String[] conditionFields) {return JdbcDialect.super.getDeleteStatement(tableName, conditionFields);}@Overridepublic String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) {return JdbcDialect.super.getSelectFromStatement(tableName, selectFields, conditionFields);}}
注册动态表工厂(DynamicTableFactory),以及相关Sink程序
首先创建 GBasedbtSinkFunction
用于接受RowData数据输入,并将其Sink到配置的数据库中
package wang.datahub.table;import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;/**
* @author lijiaqi
*/
public class GBasedbtSinkFunction extends RichSinkFunction<RowData> {private static final long serialVersionUID = 1L;private final JdbcOptions jdbcOptions;private final SerializationSchema<RowData> serializationSchema = null;private DataType dateType;private Connection conn;private Statement stmt;public GBasedbtSinkFunction(JdbcOptions jdbcOptions) {this.jdbcOptions = jdbcOptions;}public GBasedbtSinkFunction(JdbcOptions jdbcOptions, DataType dataType) {this.jdbcOptions = jdbcOptions;this.dateType = dataType;}@Overridepublic void open(Configuration parameters) {System.out.println("open connection !!!!!");try {if (null == conn) {Class.forName(jdbcOptions.getDriverName());conn = DriverManager.getConnection(jdbcOptions.getDbURL(),jdbcOptions.getUsername().orElse(null),jdbcOptions.getPassword().orElse(null));}} catch (Exception e) {e.printStackTrace();}}@Overridepublic void invoke(RowData value, Context context) throws Exception {try {stmt = conn.createStatement();String sql = "insert into " + this.jdbcOptions.getTableName() + " values ( ";for (int i = 0; i < value.getArity(); i++) {//这里需要根据事情类型进行匹配if(dateType.getChildren().get(i).getConversionClass().equals(Integer.class)){sql += +value.getInt(i)+ " ,";}else {sql += "'"+value.getString(i) + "' ,";}}sql = sql.substring(0, sql.length() - 1);sql += " ); ";System.out.println("sql ==>" + sql);stmt.execute(sql);}catch(Exception e){e.printStackTrace();}}@Overridepublic void close() throws Exception {if (stmt != null) {stmt.close();}if (conn != null) {conn.close();}}}
构建 GBasedbtDynamicTableSink
package wang.datahub.table;import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;/**
* @author lijiaqi
*/
public class GBasedbtDynamicTableSink implements DynamicTableSink {private final JdbcOptions jdbcOptions;private final EncodingFormat<SerializationSchema<RowData>> encodingFormat;private final DataType dataType;public GBasedbtDynamicTableSink(JdbcOptions jdbcOptions, EncodingFormat<SerializationSchema<RowData>> encodingFormat, DataType dataType) {this.jdbcOptions = jdbcOptions;this.encodingFormat = encodingFormat;this.dataType = dataType;}@Overridepublic ChangelogMode getChangelogMode(ChangelogMode requestedMode) {return requestedMode;}@Overridepublic SinkRuntimeProvider getSinkRuntimeProvider(Context context) {System.out.println("SinkRuntimeProvider");System.out.println(dataType);GBasedbtSinkFunction gbasedbtSinkFunction = new GBasedbtSinkFunction(jdbcOptions,dataType);return SinkFunctionProvider.of(gbasedbtSinkFunction);}@Overridepublic DynamicTableSink copy() {return new GBasedbtDynamicTableSink(jdbcOptions, encodingFormat, dataType);}@Overridepublic String asSummaryString() {return "gbasedbt Table Sink";}}
构建GBasedbtDynamicTableFactory
package wang.datahub.table;import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;
import wang.datahub.dialect.GBasedbtDialect;import java.util.HashSet;
import java.util.Set;/**
* @author lijiaqi
*/
public class GBasedbtDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {public static final String IDENTIFIER = "gbasedbt";private static final String DRIVER_NAME = "com.gbasedbt.jdbc.Driver";public static final ConfigOption<String> URL = ConfigOptions.key("url").stringType().noDefaultValue().withDescription("the jdbc database url.");public static final ConfigOption<String> DRIVER = ConfigOptions.key("driver").stringType().defaultValue(DRIVER_NAME).withDescription("the jdbc driver.");public static final ConfigOption<String> TABLE_NAME = ConfigOptions.key("table-name").stringType().noDefaultValue().withDescription("the jdbc table name.");public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the jdbc user name.");public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the jdbc password.");// public static final ConfigOption<String> FORMAT = ConfigOptions
// .key("format")
// .stringType()
// .noDefaultValue()
// .withDescription("the format.");@Overridepublic String factoryIdentifier() {return IDENTIFIER;}@Overridepublic Set<ConfigOption<?>> requiredOptions() {Set<ConfigOption<?>> requiredOptions = new HashSet<>();requiredOptions.add(URL);requiredOptions.add(TABLE_NAME);requiredOptions.add(USERNAME);requiredOptions.add(PASSWORD);
// requiredOptions.add(FORMAT);return requiredOptions;}@Overridepublic Set<ConfigOption<?>> optionalOptions() {return new HashSet<>();}@Overridepublic DynamicTableSource createDynamicTableSource(Context context) {final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);final ReadableConfig config = helper.getOptions();helper.validate();JdbcOptions jdbcOptions = getJdbcOptions(config);TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());return new GBasedbtDynamicTableSource(jdbcOptions, physicalSchema);}@Overridepublic DynamicTableSink createDynamicTableSink(Context context) {final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);// final EncodingFormat<SerializationSchema<RowData>> encodingFormat = helper.discoverEncodingFormat(
// SerializationFormatFactory.class,
// FactoryUtil.FORMAT);final ReadableConfig config = helper.getOptions();helper.validate();JdbcOptions jdbcOptions = getJdbcOptions(config);final DataType dataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();return new GBasedbtDynamicTableSink(jdbcOptions, null, dataType);}private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {final String url = readableConfig.get(URL);final JdbcOptions.Builder builder = JdbcOptions.builder().setDriverName(DRIVER_NAME).setDBUrl(url).setTableName(readableConfig.get(TABLE_NAME)).setDialect(new GBasedbtDialect());readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);return builder.build();}}
接下来通过SPI注册动态表:创建文件resources\META-INF\services\org.apache.flink.table.factories.Factory
内容注册为wang.datahub.table.GBasedbtDynamicTableFactory
至此,我们的Flink connector 就构建完成,接下来,我们要使用其,来完成一个真正的项目。
实战项目
下面是项目的整体架构图,我们通过flink cdc 从mysql获取变更数据,然后通过 flink sql 将数据 sink 到 gbase8s 里
接下来,我们看一下如何通过Flink SQL实现CDC ,只需3条SQL语句即可。
创建数据源表
// 数据源表String sourceDDL ="CREATE TABLE mysql_binlog (\n" +" id INT NOT NULL,\n" +" name STRING,\n" +" description STRING\n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'localhost',\n" +" 'port' = '3306',\n" +" 'username' = 'flinkcdc',\n" +" 'password' = '123456',\n" +" 'database-name' = 'test',\n" +" 'table-name' = 'test_cdc'\n" +")";
创建输出表,输出到GBase8s ,这里 connector设置成gbasedbt
String url = "jdbc:gbasedbt-sqli://172.31.95.133:9088/t1:GBASEDBTSERVER=myserver;NEWCODESET=UTF8,zh_cn.UTF8,57372;DATABASE=mydb;DB_LOCALE=en_US.819;";String userName = "gbasedbt";String password = "123456";String gbasedbtSinkTable = "ta";// 输出目标表String sinkDDL ="CREATE TABLE test_cdc_sink (\n" +" id INT NOT NULL,\n" +" name STRING,\n" +" description STRING,\n" +" PRIMARY KEY (id) NOT ENFORCED \n " +") WITH (\n" +" 'connector' = 'gbasedbt',\n" +
// " 'driver' = 'com.gbasedbt.jdbc.Driver',\n" +" 'url' = '" + url + "',\n" +" 'username' = '" + userName + "',\n" +" 'password' = '" + password + "',\n" +" 'table-name' = '" + gbasedbtSinkTable + "' \n" +")";
这里我们直接将数据汇入
String transformSQL ="insert into test_cdc_sink select * from mysql_binlog";
完整参考代码
package wang.datahub.cdc;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class MysqlToGBasedbtlMain {public static void main(String[] args) throws Exception {EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);// 数据源表String sourceDDL ="CREATE TABLE mysql_binlog (\n" +" id INT NOT NULL,\n" +" name STRING,\n" +" description STRING\n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'localhost',\n" +" 'port' = '3306',\n" +" 'username' = 'flinkcdc',\n" +" 'password' = '123456',\n" +" 'database-name' = 'test',\n" +" 'table-name' = 'test_cdc'\n" +")";String url = "jdbc:gbasedbt-sqli://172.31.95.133:9088/t1:GBASEDBTSERVER=myserver;NEWCODESET=UTF8,zh_cn.UTF8,57372;DATABASE=mydb;DB_LOCALE=en_US.819;";String userName = "gbasedbt";String password = "123456";String gbasedbtSinkTable = "ta";// 输出目标表String sinkDDL ="CREATE TABLE test_cdc_sink (\n" +" id INT NOT NULL,\n" +" name STRING,\n" +" description STRING,\n" +" PRIMARY KEY (id) NOT ENFORCED \n " +") WITH (\n" +" 'connector' = 'gbasedbt',\n" +
// " 'driver' = 'com.gbasedbt.jdbc.Driver',\n" +" 'url' = '" + url + "',\n" +" 'username' = '" + userName + "',\n" +" 'password' = '" + password + "',\n" +" 'table-name' = '" + gbasedbtSinkTable + "' \n" +")";String transformSQL ="insert into test_cdc_sink select * from mysql_binlog";tableEnv.executeSql(sourceDDL);tableEnv.executeSql(sinkDDL);TableResult result = tableEnv.executeSql(transformSQL);result.print();env.execute("sync-flink-cdc");}}
运行结果
查看数据,已经录入进数据库里
参考链接:
https://blog.csdn.net/zhangjun5965/article/details/107605396
https://cloud.tencent.com/developer/article/1745233?from=article.detail.1747773
https://segmentfault.com/a/1190000039662261
https://www.cnblogs.com/weijiqian/p/13994870.html
https://blog.csdn.net/dafei1288/article/details/118192917