基于Flink CDC datastream mysql to mysql 序列化sql 数据同步
创始人
2024-05-28 17:11:39
0

基于Flink CDC datastream mysql to mysql 序列化sql 数据同步

Flink CDC有两种方式同步数据库:

1. 一种是通过FlinkSQL直接输入两表数据库映射进行数据同步,缺点是只能单表进行同步;
2. 一种是通过DataStream开发一个maven项目,打成jar包上传到服务器运行,可以多表多库。

本方案使用DataStream方法,同步两表中的数据。
不需要部署Flink,可单独使用。

主要工作

工程下载连接,点击进入

  • 运行简单,只需要配置源数据库与目标数据库信息,运行MysqlCDC文件中的main函数即可使用。
  • 修复了mysql数据时区问题、修复了deatatime同步到数据库报错问题
  • 使用分为以下三步:

一、 修改源数据库信息

在这里插入图片描述

二、 修改目标数据库信息

在这里插入图片描述

三、 启动服务

在这里插入图片描述
主要源码:

package org.apache.flink;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.jvnet.hk2.annotations.Service;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;/*** @author dake*/
@Service
public class MysqlSink extends RichSinkFunction {private Connection connection = null;Statement sqlExecute;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);try {if (connection == null) {Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动connection = DriverManager.getConnection("jdbc:mysql://IP地址:3306/test_target?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=true", "账号", "密码");//获取连接sqlExecute = connection.createStatement();}}catch (Exception e){e.printStackTrace();}}@Overridepublic void invoke(String value, Context context) {try {sqlExecute.execute(value);}catch (Exception e){e.printStackTrace();}System.out.println("value = " + value);}@Overridepublic void close() throws Exception {super.close();if (connection != null) {connection.close();}super.close();}
}
package org.apache.flink;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.config.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;public class MysqlCDC {public static void main(String[] args) throws Exception {MySqlSource mySqlSource = MySqlSource.builder().hostname("IP地址").port(3306).startupOptions(StartupOptions.initial()) //全量同步.scanNewlyAddedTableEnabled(true) // 开启支持新增表.databaseList("test_master_resource") // set captured database.tableList("test_master_resource.test1,test_master_resource.test2") // set captured table.username("账号").password("密码").debeziumProperties(getDebeziumProperties()).serverTimeZone("Asia/Shanghai").deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.build();Configuration configuration = new Configuration();// 生产环境夏下,改成参数传进来
//        configuration.setString("execution.savepoint.path","file:///tmp/flink-ck/1980d53f557a886f885172bcdf4be8e8/chk-21");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// enable checkpointenv.enableCheckpointing(3000);// 设置本地env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-ck");env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").setParallelism(4).addSink(new MysqlSink());
//                .print("==>").setParallelism(1); // use parallelism 1 for sink to keep message orderingenv.execute("Print MySQL Snapshot + Binlog");}
}

相关内容

热门资讯

招商证券:已审议通过市值管理制... 有投资者在互动平台向招商证券提问:“为什么股价一直承压,有促进公司股价回升的计划吗?后续改革是否有计...
中缅泰联合清剿!952名涉电诈... 中缅泰联合开展清剿缅甸妙瓦底地区 赌诈园区行动 952名缅甸妙瓦底地区涉电诈犯罪嫌疑人 被押解回国 ...
鑫铂股份:正在研究市值管理相关... 证券之星消息,鑫铂股份(003038)12月26日在投资者关系平台上答复投资者关心的问题。 投资者提...
敦化市黑石乡卫生院开展医保缴费... 为进一步优化医保服务,提升辖区居民对医保政策的知晓率,12月24日,敦化市黑石乡卫生院组织医保工作人...
八道沟人民法庭:调解继承纠纷,... 八道沟人民法庭扎根基层沃土,以新时代枫桥式人民法庭的宝贵经验为指引,积极践行马锡武式“就地办案、司法...
【科技部:开展创新积分制政策实... 【科技部:开展创新积分制政策实施“揭榜挂帅”】科技部办公厅发布关于开展创新积分制“揭榜挂帅”的通知。...
明阳电气:已制定《市值管理制度... 证券之星消息,明阳电气(301291)12月26日在投资者关系平台上答复投资者关心的问题。 投资者提...
长白法院秉持高效便民理念,以柔... 长白县人民法院诉讼服务中心紧跟时代步伐,扎实做好先行调解工作,牢牢筑起维护社会和谐稳定的前沿防线。该...
安徽含山:政策上门零距离 惠农... 人民网记者 李希蒙 “我都快八十了,从没生过病,每年花400块买医保钱图个啥?”在“联通惠民 合作兴...