Apache Paimon
创始人
2025-05-31 21:53:16
0

二、典型实践应用

2.1)离线数仓加速

离线数仓加速可以说是一个非常典型的应用场景了
传统离线数仓主要有以下劣势点:

  • 时效性不高,数据严重依赖于定时调度任务进行T+1或H+1更新
  • 不支持事务,数据准确性无法保障
  • 不支持增量读写

而基于数据湖架构完美解决了这些痛点

  • 时效性取决于CP时间,用户可自行设定,一般1-5分钟
  • 数据写入基于CP实现事务
  • 数据可流读流写

Hudi作为早期的数据湖三剑客之一,社区比较活跃,生态功能丰富,被许多公司所认可和应用
因此,在2022年我们也将集团ODS层大部分的核心任务平滑的迁移到了Apache Hudi中
尽管Hudi非常的优秀,但是在实践过程中也慢慢发现了一些的问题:

  • Hudi设计较为复杂,有较多的调优参数,用户上手成本高
  • 大量的随机写入导致性能骤降
  • 实时任务运行需要较多的资源(相对成本过高)

我们在这方面也做出了不少优化:

  • 根据表数据量来制定特定参数,用户无感知
  • 调整分区策略和资源,优化大量随机写情况
  • 降低部分任务资源,维持任务正常运行和资源分配的一个平衡点

但是这样的情况也不是一个长久之计
对于现阶段的我们来说,需要的不一定是目前最好的,而是最适合公司现状的
此时Apache Paimon逐渐走入我们的视线,在去年年底积极的进行了相关应用测试,也向 @之信 老师请教了不少

因此我们目前针对在Hudi中性能下降较为严重的任务进行了迁移到Paimon的测试
借助其特性,数据写入非常平滑,把写入性能发挥到了极致,并且Paimon的整体架构相对简单,资源消耗也较少,用户学习成本较低

案例实践:存量数据导入
对于存量数据较多的表,我们采用的是通过Flink批读Hudi中的存量数据写入Paimon中,然后接再启动任务消费增量数据,Flink读取和写入的过程非常快,经过测试,3亿左右的表只需要不到20分钟即可导入完成,当然也可以直接读取hive表导入Paimon,具体实践根据实际场景
下面展示伪代码:

INSERT INTO paimon.ods.order_info
/*+ OPTIONS('sink.parallelism'='100','write-buffer-size'='1024m','sink.partition-shuffle' = 'true') */
SELECT
*
FROM
hudi.ods.order_info/*+ OPTIONS('read.tasks' = '100') */
;

目前,我们构建了如下架构

基于FTS的数仓架构

  • 数据源流批一体:
    • 在批处理场景下,借助FTS表数据准实时的特性,避免了凌晨大量的任务合并以及相关资源调度,并且加快了下游任务的启动时间
    • 在流处理场景下,直接流读FTS表,流批口径相同,不需要用户再去单独消费topic,避免繁杂的工作,让用户更注重业务逻辑实现
  • 统一数据集成:
    • 一键构建全增量的进行数据写入流程,屏蔽底层实现细节,便于用户上手
    • 一键新增字段以及数据补数
    • 凌晨统一进行数据可用性检测,保障下游数据质量
    • 构建基于数据湖表的大盘指标监控,重点关注当前运行任务的CP指标(CP失败率、CP耗时等)

2.2)基于Partial update的准实时宽表

准实时是介于离线和实时之间,其中准实时宽表是一个常见的案例
在以前

  • 通过微批调度(分钟,小时)进行数据更新,但是延迟相对较高
  • 通过流式引擎构建,则会存在保留大量状态造成资源严重浪费的情况

目前像Hudi、FTS等提供了Partial update的功能,其中FTS是原生自带的,可通过merge-engine参数来指定

'merge-engine' = 'partial-update'

Partial update的特点:

  • 结果表字段由多个数据源提供组成,可使用 union all 的方式进行逻辑拼接
  • 数据在存储层进行join拼接,与计算引擎无关,不需要保留状态,节省资源

案例实践:数据写入

一、FlinkSQL参数设置
set `table.dynamic-table-options.enabled`=`true`;
SET `env.state.backend`=`rocksdb`;
SET `execution.checkpointing.interval`=`60000`;
SET `execution.checkpointing.tolerable-failed-checkpoints`=`3`;
SET `execution.checkpointing.min-pause`=`60000`;二、创建FTS catalog
CREATE CATALOG paimon WITH ('type' = 'paimon','metastore' = 'hive','uri' = 'thrift://localhost:9083','warehouse' = 'hdfs://paimon','table.type' = 'EXTERNAL'
);三、创建Partial update结果表
CREATE TABLE if not EXISTS paimon.dw.order_detail
(`order_id` string ,`product_type` string ,`plat_name` string ,`ref_id` bigint ,`start_city_name` string ,`end_city_name` string ,`create_time` timestamp(3),`update_time` timestamp(3) ,`dispatch_time` timestamp(3) ,`decision_time` timestamp(3) ,`finish_time` timestamp(3) ,`order_status` int ,`binlog_time` bigint,PRIMARY KEY (order_id) NOT ENFORCED
) 
WITH ('bucket' = '20', -- 指定20个bucket'bucket-key' = 'order_id','sequence.field' = 'binlog_time', -- 记录排序字段'changelog-producer' = 'full-compaction',  -- 选择 full-compaction ,在compaction后产生完整的changelog'changelog-producer.compaction-interval' = '2 min', -- compaction 间隔时间'merge-engine' = 'partial-update','partial-update.ignore-delete' = 'true' -- 忽略DELETE数据
);INSERT INTO paimon.dw.order_detail
SELECT
order_id,
product_type,
plat_name,
ref_id,
cast(null as string) as start_city_name,
cast(null as string) as end_city_name,
create_time,
update_time,
dispatch_time,
decision_time,
finish_time,     
order_status,
binlog_time
FROM
paimon.ods.order_info /*+ OPTIONS ('scan.mode'='latest') */union all SELECT
order_id,
cast(null as string) as product_type,
cast(null as string) as plat_name,
cast(null as bigint) as ref_id,
start_city_name,
end_city_name,
cast(null as timestamp(3)) as create_time,
cast(null as timestamp(3)) as update_time,
cast(null as timestamp(3)) as dispatch_time,
cast(null as timestamp(3)) as decision_time,
cast(null as timestamp(3)) as finish_time,  
cast(null as int) as order_status,
binlog_time
FROM
paimon.ods.order_address /*+ OPTIONS ('scan.mode'='latest') */
;

案例实践:数据查询

一、Spark查询
select * from paimon.dw.order_detail;二、Flink查询
select * from paimon.dw.order_detail;
select * from paimon.dw.order_detail /*+ OPTIONS ('scan.mode'='latest') */;
select * from paimon.dw.order_detail /*+ OPTIONS ('scan.mode'='from-snapshot','scan.snapshot-id'='300') */;
select * from paimon.dw.order_detail /*+ OPTIONS ('scan.mode'='from-timestamp','scan.timestamp-millis'='1675180800000') */

2.3)AppendOnly应用

除了Binlog数据源,还有大量日志、埋点相关的AppendOnly数据源,这类数据基本都是数据量非常大的存在,一般来说,这类数据都是直接消费落在分布式文件系统上的

当我们采用FTS来构建AppendOnly表时,数据不仅可以实时写入,还可以实时读取,读写顺序一致
完全可以替换部分消息队列的场景,达到解耦和降本增效的效果

案例实践:数据写入

CREATE TABLE if not exists paimon.ods.event_log(uuid string,intotime TIMESTAMP(3),openid string,userid string,.......network string,partition_year int,partition_month int,partition_day int,partition_hour int
) 
PARTITIONED BY (`partition_year`,`partition_month`,`partition_day`,`partition_hour`)
WITH ('bucket' = '100','bucket-key' = 'uuid','snapshot.time-retained' = '7 d','write-mode' = 'append-only'
);INSERT INTO paimon.ods.event_log
SELECT uuid,intotime,openid,userid.......,cast(date_format(intotime,'yyyy') as int) as partition_year,cast(date_format(intotime,'MM') as int) as partition_month,cast(date_format(intotime,'dd') as int) as partition_day,cast(date_format(intotime,'HH') as int) as partition_hour
FROM realtime_event_kafka_source
;

三、问题解决方案

相关内容

热门资讯

懵圈了!西亚卡姆捧杯东决MVP... 懵圈了!西亚卡姆捧杯东决MVP竟不知有奖?步行者时隔25年再进总决赛! 导语:篮球场上什么奇事没有?...
没悬念了?金球奖最新赔率:登贝... 北京时间6月2日消息,相关机构更新了2025年金球奖的赔率,巴黎前锋登贝莱领跑。2024-25赛季,...
乌克兰最大胆无人机攻击,预示着... (一) 不得不说,这是迄今乌克兰最大胆也是最成功的一次无人机集群攻击。 看了一下,攻击目标甚至远到俄...
加紧实施更加积极有为的宏观政策 金观平 近期召开的中央政治局会议强调,要加紧实施更加积极有为的宏观政策,用好用足更加积极的财政政策和...
新修订的《快递暂行条例》施行 央视网消息(新闻联播):6月1日起,《国务院关于修改〈快递暂行条例〉的决定》正式施行。此次修改,专门...
here we go!罗马诺:... 直播吧06月01日讯 here we go!罗马诺:凯莱赫总价1800万英镑加盟布伦特福德
美国又一州出手:内布拉斯加州拟... IT之家 6 月 2 日消息,据外媒 The Verge 5 月 31 日报道,美国内布拉斯加州州长...
“法援护苗”专项行动一年来 未... 本报北京5月29日电(记者张璁)记者从司法部获悉:“法援护苗”专项行动开展一年来,各级司法行政机关从...