FOR SYSTEM_TIME AS OFlookup.cache.max-rows和lookup.cache.ttl参数来启用| SQL参数 | 说明 |
|---|---|
connector | 连接器,可以是jdbc、kafka、filesystem… |
driver | 数据库驱动 |
lookup.cache.ttl | Lookup Cache中每行数据 的 最大 存活时间 |
lookup.cache.max-rows | Lookup Cache中的最大行数 |
当 缓存的行数>
lookup.cache.max-rows时,将清除存活时间最久的记录
缓存中的行 的存货时间 超过lookup.cache.ttl也会被清除
环境:WIN10+IDEA+JDK1.8+Flink1.13+MySQL8
8 8 1.13.6 2.12 2.0.3 2.17.2 2.0.19 1.18.24 8.0.31
org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-runtime-web_${scala.binary.version} ${flink.version} org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-csv ${flink.version} org.apache.flink flink-json ${flink.version} org.apache.flink flink-connector-jdbc_${scala.binary.version} ${flink.version} mysql mysql-connector-java ${mysql.version} org.slf4j slf4j-api ${slf4j.version} org.slf4j slf4j-log4j12 ${slf4j.version} org.apache.logging.log4j log4j-to-slf4j ${log4j.version} com.alibaba fastjson ${fastjson.version} org.projectlombok lombok ${lombok.version}
DROP DATABASE IF EXISTS db0;
CREATE DATABASE db0;
CREATE TABLE db0.tb0 (a VARCHAR(255) PRIMARY KEY,b INT(3),c BIGINT(5),d FLOAT(3,2),e DOUBLE(4,2),f DATE DEFAULT '2022-10-24',g TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
INSERT db0.tb0 (a,b,c,d,e) VALUES
('aa',1,11,1.11,11.11),
('bb',2,22,2.22,22.22),
('cc',3,33,3.33,33.33);
SELECT * FROM db0.tb0;
SQL
CREATE TEMPORARY TABLE temp_tb0 (a STRING,b INT,c BIGINT,d FLOAT,e DOUBLE,f DATE,g TIMESTAMP,PRIMARY KEY(a) NOT ENFORCED)
WITH ('lookup.cache.max-rows' = '2','lookup.cache.ttl' = '30 second','connector' = 'jdbc','driver' = 'com.mysql.cj.jdbc.Driver','url' = 'jdbc:mysql://localhost:3306/db0','username' = 'root','password' = '123456','table-name' = 'tb0'
)
测试代码
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Hello {public static void main(String[] args) {//创建流和表的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);//创建表,连接MySQL表tbEnv.executeSql("CREATE TEMPORARY TABLE temp_tb0 (\n" +" a STRING,\n" +" b INT,\n" +" c BIGINT,\n" +" d FLOAT,\n" +" e DOUBLE,\n" +" f DATE,\n" +" g TIMESTAMP,\n" +" PRIMARY KEY(a) NOT ENFORCED)\n" +"WITH (\n" +" 'lookup.cache.max-rows' = '2',\n" +" 'lookup.cache.ttl' = '30 second',\n" +" 'connector' = 'jdbc',\n" +" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +" 'url' = 'jdbc:mysql://localhost:3306/db0',\n" +" 'username' = 'root',\n" +" 'password' = '123456',\n" +" 'table-name' = 'tb0'\n" +")");//执行查询,打印tbEnv.sqlQuery("SELECT * FROM temp_tb0").execute().print();}
}
测试结果打印
+----+----+---+----+------+-------+------------+----------------------------+
| op | a | b | c | d | e | f | g |
+----+----+---+----+------+-------+------------+----------------------------+
| +I | aa | 1 | 11 | 1.11 | 11.11 | 2022-10-24 | 2022-11-29 14:57:50.000000 |
| +I | bb | 2 | 22 | 2.22 | 22.22 | 2022-10-24 | 2022-11-29 14:57:50.000000 |
| +I | cc | 3 | 33 | 3.33 | 33.33 | 2022-10-24 | 2022-11-29 14:57:50.000000 |
+----+----+---+----+------+-------+------------+----------------------------+
SELECT * FROM v
JOIN tFOR SYSTEM_TIME AS OF v.yON v.x=t.a
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.util.Scanner;import static org.apache.flink.table.api.Expressions.$;public class Hi {public static void main(String[] args) {//创建流和表的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);//创建左表DataStreamSource d = env.addSource(new ManualSource());Table tb = tbEnv.fromDataStream(d, $("x"), $("y").proctime());tbEnv.createTemporaryView("v", tb);//创建右表(维度表)tbEnv.executeSql("CREATE TEMPORARY TABLE t ( " +" a STRING, " +" b INT, " +" c BIGINT, " +" d FLOAT, " +" e DOUBLE, " +" f DATE, " +" g TIMESTAMP, " +" PRIMARY KEY(a) NOT ENFORCED) " +"WITH ( " +" 'lookup.cache.max-rows' = '2', " +" 'lookup.cache.ttl' = '30 second', " +" 'connector' = 'jdbc', " +" 'driver' = 'com.mysql.cj.jdbc.Driver', " +" 'url' = 'jdbc:mysql://localhost:3306/db0', " +" 'username' = 'root', " +" 'password' = '123456', " +" 'table-name' = 'tb0' " +")");//执行查询,打印tbEnv.sqlQuery("SELECT * FROM v " +"JOIN t " +" FOR SYSTEM_TIME AS OF v.y " +" ON v.x=t.a").execute().print();}/** 手动输入的数据源 */public static class ManualSource implements SourceFunction {public ManualSource() {}@Overridepublic void run(SourceFunction.SourceContext sc) {Scanner scanner = new Scanner(System.in);while (true) {String str = scanner.nextLine().trim();if (str.equals("STOP")) {break;}if (!str.equals("")) {sc.collect(str);}}scanner.close();}@Overridepublic void cancel() {}}
}
测试结果