离线数仓加速可以说是一个非常典型的应用场景了
传统离线数仓主要有以下劣势点:
而基于数据湖架构完美解决了这些痛点
Hudi作为早期的数据湖三剑客之一,社区比较活跃,生态功能丰富,被许多公司所认可和应用
因此,在2022年我们也将集团ODS层大部分的核心任务平滑的迁移到了Apache Hudi中
尽管Hudi非常的优秀,但是在实践过程中也慢慢发现了一些的问题:
我们在这方面也做出了不少优化:
但是这样的情况也不是一个长久之计
对于现阶段的我们来说,需要的不一定是目前最好的,而是最适合公司现状的
此时Apache Paimon逐渐走入我们的视线,在去年年底积极的进行了相关应用测试,也向 @之信 老师请教了不少
因此我们目前针对在Hudi中性能下降较为严重的任务进行了迁移到Paimon的测试
借助其特性,数据写入非常平滑,把写入性能发挥到了极致,并且Paimon的整体架构相对简单,资源消耗也较少,用户学习成本较低
案例实践:存量数据导入
对于存量数据较多的表,我们采用的是通过Flink批读Hudi中的存量数据写入Paimon中,然后接再启动任务消费增量数据,Flink读取和写入的过程非常快,经过测试,3亿左右的表只需要不到20分钟即可导入完成,当然也可以直接读取hive表导入Paimon,具体实践根据实际场景
下面展示伪代码:
INSERT INTO paimon.ods.order_info
/*+ OPTIONS('sink.parallelism'='100','write-buffer-size'='1024m','sink.partition-shuffle' = 'true') */
SELECT
*
FROM
hudi.ods.order_info/*+ OPTIONS('read.tasks' = '100') */
;
目前,我们构建了如下架构
准实时是介于离线和实时之间,其中准实时宽表是一个常见的案例
在以前
目前像Hudi、FTS等提供了Partial update的功能,其中FTS是原生自带的,可通过merge-engine参数来指定
'merge-engine' = 'partial-update'
Partial update的特点:
案例实践:数据写入
一、FlinkSQL参数设置
set `table.dynamic-table-options.enabled`=`true`;
SET `env.state.backend`=`rocksdb`;
SET `execution.checkpointing.interval`=`60000`;
SET `execution.checkpointing.tolerable-failed-checkpoints`=`3`;
SET `execution.checkpointing.min-pause`=`60000`;二、创建FTS catalog
CREATE CATALOG paimon WITH ('type' = 'paimon','metastore' = 'hive','uri' = 'thrift://localhost:9083','warehouse' = 'hdfs://paimon','table.type' = 'EXTERNAL'
);三、创建Partial update结果表
CREATE TABLE if not EXISTS paimon.dw.order_detail
(`order_id` string ,`product_type` string ,`plat_name` string ,`ref_id` bigint ,`start_city_name` string ,`end_city_name` string ,`create_time` timestamp(3),`update_time` timestamp(3) ,`dispatch_time` timestamp(3) ,`decision_time` timestamp(3) ,`finish_time` timestamp(3) ,`order_status` int ,`binlog_time` bigint,PRIMARY KEY (order_id) NOT ENFORCED
)
WITH ('bucket' = '20', -- 指定20个bucket'bucket-key' = 'order_id','sequence.field' = 'binlog_time', -- 记录排序字段'changelog-producer' = 'full-compaction', -- 选择 full-compaction ,在compaction后产生完整的changelog'changelog-producer.compaction-interval' = '2 min', -- compaction 间隔时间'merge-engine' = 'partial-update','partial-update.ignore-delete' = 'true' -- 忽略DELETE数据
);INSERT INTO paimon.dw.order_detail
SELECT
order_id,
product_type,
plat_name,
ref_id,
cast(null as string) as start_city_name,
cast(null as string) as end_city_name,
create_time,
update_time,
dispatch_time,
decision_time,
finish_time,
order_status,
binlog_time
FROM
paimon.ods.order_info /*+ OPTIONS ('scan.mode'='latest') */union all SELECT
order_id,
cast(null as string) as product_type,
cast(null as string) as plat_name,
cast(null as bigint) as ref_id,
start_city_name,
end_city_name,
cast(null as timestamp(3)) as create_time,
cast(null as timestamp(3)) as update_time,
cast(null as timestamp(3)) as dispatch_time,
cast(null as timestamp(3)) as decision_time,
cast(null as timestamp(3)) as finish_time,
cast(null as int) as order_status,
binlog_time
FROM
paimon.ods.order_address /*+ OPTIONS ('scan.mode'='latest') */
;
案例实践:数据查询
一、Spark查询
select * from paimon.dw.order_detail;二、Flink查询
select * from paimon.dw.order_detail;
select * from paimon.dw.order_detail /*+ OPTIONS ('scan.mode'='latest') */;
select * from paimon.dw.order_detail /*+ OPTIONS ('scan.mode'='from-snapshot','scan.snapshot-id'='300') */;
select * from paimon.dw.order_detail /*+ OPTIONS ('scan.mode'='from-timestamp','scan.timestamp-millis'='1675180800000') */
除了Binlog数据源,还有大量日志、埋点相关的AppendOnly数据源,这类数据基本都是数据量非常大的存在,一般来说,这类数据都是直接消费落在分布式文件系统上的
当我们采用FTS来构建AppendOnly表时,数据不仅可以实时写入,还可以实时读取,读写顺序一致
完全可以替换部分消息队列的场景,达到解耦和降本增效的效果
案例实践:数据写入
CREATE TABLE if not exists paimon.ods.event_log(uuid string,intotime TIMESTAMP(3),openid string,userid string,.......network string,partition_year int,partition_month int,partition_day int,partition_hour int
)
PARTITIONED BY (`partition_year`,`partition_month`,`partition_day`,`partition_hour`)
WITH ('bucket' = '100','bucket-key' = 'uuid','snapshot.time-retained' = '7 d','write-mode' = 'append-only'
);INSERT INTO paimon.ods.event_log
SELECT uuid,intotime,openid,userid.......,cast(date_format(intotime,'yyyy') as int) as partition_year,cast(date_format(intotime,'MM') as int) as partition_month,cast(date_format(intotime,'dd') as int) as partition_day,cast(date_format(intotime,'HH') as int) as partition_hour
FROM realtime_event_kafka_source
;
上一篇:美媒为劝特朗普,搬出了钱学森
下一篇:C++析构函数详解