Flink CDC有两种方式同步数据库:
1. 一种是通过FlinkSQL直接输入两表数据库映射进行数据同步,缺点是只能单表进行同步;
2. 一种是通过DataStream开发一个maven项目,打成jar包上传到服务器运行,可以多表多库。
本方案使用DataStream方法,同步两表中的数据。
不需要部署Flink,可单独使用。
工程下载连接,点击进入



主要源码:
package org.apache.flink;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.jvnet.hk2.annotations.Service;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;/*** @author dake*/
@Service
public class MysqlSink extends RichSinkFunction {private Connection connection = null;Statement sqlExecute;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);try {if (connection == null) {Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动connection = DriverManager.getConnection("jdbc:mysql://IP地址:3306/test_target?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=true", "账号", "密码");//获取连接sqlExecute = connection.createStatement();}}catch (Exception e){e.printStackTrace();}}@Overridepublic void invoke(String value, Context context) {try {sqlExecute.execute(value);}catch (Exception e){e.printStackTrace();}System.out.println("value = " + value);}@Overridepublic void close() throws Exception {super.close();if (connection != null) {connection.close();}super.close();}
}
package org.apache.flink;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.config.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;public class MysqlCDC {public static void main(String[] args) throws Exception {MySqlSource mySqlSource = MySqlSource.builder().hostname("IP地址").port(3306).startupOptions(StartupOptions.initial()) //全量同步.scanNewlyAddedTableEnabled(true) // 开启支持新增表.databaseList("test_master_resource") // set captured database.tableList("test_master_resource.test1,test_master_resource.test2") // set captured table.username("账号").password("密码").debeziumProperties(getDebeziumProperties()).serverTimeZone("Asia/Shanghai").deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.build();Configuration configuration = new Configuration();// 生产环境夏下,改成参数传进来
// configuration.setString("execution.savepoint.path","file:///tmp/flink-ck/1980d53f557a886f885172bcdf4be8e8/chk-21");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// enable checkpointenv.enableCheckpointing(3000);// 设置本地env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-ck");env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").setParallelism(4).addSink(new MysqlSink());
// .print("==>").setParallelism(1); // use parallelism 1 for sink to keep message orderingenv.execute("Print MySQL Snapshot + Binlog");}
}