大数据(9h)FlinkSQL之Lookup Join
创始人
2024-02-28 12:12:23
0

文章目录

  • 概述
  • pom.xml
  • MySQL建表
    • 对应Flink的建表SQL
  • Lookup Join
    • FlinkSQL
    • 完整Java代码

概述

  • lookup join通常是 查询外部系统的数据 来 充实FlinkSQL的主表
    例如:事实表 关联 维度表,维度表在外部系统(如MySQL)
  • 要求:
    1个表具有处理时间属性(基于Processing Time Temporal Join语法)
    语法上,和一般JOIN比较,多了FOR SYSTEM_TIME AS OF
    另1个表由连接器(a lookup source connector)支持
  • Lookup Cache
    默认情况下,不启用Lookup Cache
    可设置lookup.cache.max-rowslookup.cache.ttl参数来启用
    启用Lookup Cache后,Flink会先查询缓存,缓存未命中才查询外部数据库
    启用缓存可加快查询速,但缓存中的记录未必是最新的
SQL参数说明
connector连接器,可以是jdbckafkafilesystem
driver数据库驱动
lookup.cache.ttlLookup Cache中每行数据 的 最大 存活时间
lookup.cache.max-rowsLookup Cache中的最大行数

当 缓存的行数>lookup.cache.max-rows 时,将清除存活时间最久的记录
缓存中的行 的存货时间 超过lookup.cache.ttl 也会被清除

pom.xml

环境:WIN10+IDEA+JDK1.8+Flink1.13+MySQL8

881.13.62.122.0.32.17.22.0.191.18.248.0.31


org.apache.flinkflink-java${flink.version}org.apache.flinkflink-streaming-java_${scala.binary.version}${flink.version}org.apache.flinkflink-clients_${scala.binary.version}${flink.version}org.apache.flinkflink-runtime-web_${scala.binary.version}${flink.version}org.apache.flinkflink-table-planner-blink_${scala.binary.version}${flink.version}org.apache.flinkflink-streaming-scala_${scala.binary.version}${flink.version}org.apache.flinkflink-csv${flink.version}org.apache.flinkflink-json${flink.version}org.apache.flinkflink-connector-jdbc_${scala.binary.version}${flink.version}mysqlmysql-connector-java${mysql.version}org.slf4jslf4j-api${slf4j.version}org.slf4jslf4j-log4j12${slf4j.version}org.apache.logging.log4jlog4j-to-slf4j${log4j.version}com.alibabafastjson${fastjson.version}org.projectlomboklombok${lombok.version}

MySQL建表

DROP DATABASE IF EXISTS db0;
CREATE DATABASE db0;
CREATE TABLE db0.tb0 (a VARCHAR(255) PRIMARY KEY,b INT(3),c BIGINT(5),d FLOAT(3,2),e DOUBLE(4,2),f DATE DEFAULT '2022-10-24',g TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
INSERT db0.tb0 (a,b,c,d,e) VALUES
('aa',1,11,1.11,11.11),
('bb',2,22,2.22,22.22),
('cc',3,33,3.33,33.33);
SELECT * FROM db0.tb0;

对应Flink的建表SQL

SQL

CREATE TEMPORARY TABLE temp_tb0 (a STRING,b INT,c BIGINT,d FLOAT,e DOUBLE,f DATE,g TIMESTAMP,PRIMARY KEY(a) NOT ENFORCED)
WITH ('lookup.cache.max-rows' = '2','lookup.cache.ttl' = '30 second','connector' = 'jdbc','driver' = 'com.mysql.cj.jdbc.Driver','url' = 'jdbc:mysql://localhost:3306/db0','username' = 'root','password' = '123456','table-name' = 'tb0'
)

测试代码

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Hello {public static void main(String[] args) {//创建流和表的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);//创建表,连接MySQL表tbEnv.executeSql("CREATE TEMPORARY TABLE temp_tb0 (\n" +"  a STRING,\n" +"  b INT,\n" +"  c BIGINT,\n" +"  d FLOAT,\n" +"  e DOUBLE,\n" +"  f DATE,\n" +"  g TIMESTAMP,\n" +"  PRIMARY KEY(a) NOT ENFORCED)\n" +"WITH (\n" +"  'lookup.cache.max-rows' = '2',\n" +"  'lookup.cache.ttl' = '30 second',\n" +"  'connector' = 'jdbc',\n" +"  'driver' = 'com.mysql.cj.jdbc.Driver',\n" +"  'url' = 'jdbc:mysql://localhost:3306/db0',\n" +"  'username' = 'root',\n" +"  'password' = '123456',\n" +"  'table-name' = 'tb0'\n" +")");//执行查询,打印tbEnv.sqlQuery("SELECT * FROM temp_tb0").execute().print();}
}

测试结果打印

+----+----+---+----+------+-------+------------+----------------------------+
| op |  a | b |  c |    d |     e |          f |                          g |
+----+----+---+----+------+-------+------------+----------------------------+
| +I | aa | 1 | 11 | 1.11 | 11.11 | 2022-10-24 | 2022-11-29 14:57:50.000000 |
| +I | bb | 2 | 22 | 2.22 | 22.22 | 2022-10-24 | 2022-11-29 14:57:50.000000 |
| +I | cc | 3 | 33 | 3.33 | 33.33 | 2022-10-24 | 2022-11-29 14:57:50.000000 |
+----+----+---+----+------+-------+------------+----------------------------+

Lookup Join

FlinkSQL

SELECT * FROM v
JOIN tFOR SYSTEM_TIME AS OF v.yON v.x=t.a

完整Java代码

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.util.Scanner;import static org.apache.flink.table.api.Expressions.$;public class Hi {public static void main(String[] args) {//创建流和表的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);//创建左表DataStreamSource d = env.addSource(new ManualSource());Table tb = tbEnv.fromDataStream(d, $("x"), $("y").proctime());tbEnv.createTemporaryView("v", tb);//创建右表(维度表)tbEnv.executeSql("CREATE TEMPORARY TABLE t ( " +"  a STRING, " +"  b INT, " +"  c BIGINT, " +"  d FLOAT, " +"  e DOUBLE, " +"  f DATE, " +"  g TIMESTAMP, " +"  PRIMARY KEY(a) NOT ENFORCED) " +"WITH ( " +"  'lookup.cache.max-rows' = '2', " +"  'lookup.cache.ttl' = '30 second', " +"  'connector' = 'jdbc', " +"  'driver' = 'com.mysql.cj.jdbc.Driver', " +"  'url' = 'jdbc:mysql://localhost:3306/db0', " +"  'username' = 'root', " +"  'password' = '123456', " +"  'table-name' = 'tb0' " +")");//执行查询,打印tbEnv.sqlQuery("SELECT * FROM v " +"JOIN t " +"  FOR SYSTEM_TIME AS OF v.y " +"  ON v.x=t.a").execute().print();}/** 手动输入的数据源 */public static class ManualSource implements SourceFunction {public ManualSource() {}@Overridepublic void run(SourceFunction.SourceContext sc) {Scanner scanner = new Scanner(System.in);while (true) {String str = scanner.nextLine().trim();if (str.equals("STOP")) {break;}if (!str.equals("")) {sc.collect(str);}}scanner.close();}@Overridepublic void cancel() {}}
}

测试结果

相关内容

热门资讯

怎样选到靠谱刑事律师?赵可律师... 靠谱刑事律师的衡量标准在寻找靠谱的刑事律师时,有多个衡量标准。 专业能力是关键,律师需具备扎实的法学...
吉林省刑辩律师哪家强?辛明律师... 吉林省刑辩律师的重要性在吉林省,刑事案件的复杂性和多样性使得刑辩律师的作用愈发凸显。 他们不仅要熟悉...
江苏多地推出公租房调换政策 就... 原题:就医养老更方便 按需调换更贴心 公租房也能“换着住” 公共租赁房是由政府提供支持,为中低收入困...
法治日报:跨境犯罪治理需要更完... 跨境犯罪呈现多重犯罪形态交织特征 各国代表建言 跨境犯罪治理需要更完善的司法保障 编者按 携手30年...
原创 刘... 2025年12月18日,海南自由贸易港全岛封关运作正式启动,标志着我国高水平对外开放进入新阶段。全球...
美联储内部分歧加剧:哈马克称政... 智通财经APP获悉,克利夫兰联邦储备银行行长贝丝·哈马克表示,在评估第一季度累计75个基点的降息对经...
【深圳特区报】深港融通新格局 ... 前海港资企业突破万家、累计105项制度创新成果在全国复制推广、现代服务业增加值达1460亿元……12...
犯罪对象和受贿数额认定问题分析 实践中,有的行贿人为了送给国家工作人员好处,不直接送给国家工作人员财物,而是先委托国家工作人员代为出...
用好制度创新“加速器” 制度创新是破解发展难题、激发区域活力的核心密钥。上海浦东开发开放30余载的实践证明,唯有以制度创新破...