Apache Paimon
创始人
2025-05-31 21:53:16
0

二、典型实践应用

2.1)离线数仓加速

离线数仓加速可以说是一个非常典型的应用场景了
传统离线数仓主要有以下劣势点:

  • 时效性不高,数据严重依赖于定时调度任务进行T+1或H+1更新
  • 不支持事务,数据准确性无法保障
  • 不支持增量读写

而基于数据湖架构完美解决了这些痛点

  • 时效性取决于CP时间,用户可自行设定,一般1-5分钟
  • 数据写入基于CP实现事务
  • 数据可流读流写

Hudi作为早期的数据湖三剑客之一,社区比较活跃,生态功能丰富,被许多公司所认可和应用
因此,在2022年我们也将集团ODS层大部分的核心任务平滑的迁移到了Apache Hudi中
尽管Hudi非常的优秀,但是在实践过程中也慢慢发现了一些的问题:

  • Hudi设计较为复杂,有较多的调优参数,用户上手成本高
  • 大量的随机写入导致性能骤降
  • 实时任务运行需要较多的资源(相对成本过高)

我们在这方面也做出了不少优化:

  • 根据表数据量来制定特定参数,用户无感知
  • 调整分区策略和资源,优化大量随机写情况
  • 降低部分任务资源,维持任务正常运行和资源分配的一个平衡点

但是这样的情况也不是一个长久之计
对于现阶段的我们来说,需要的不一定是目前最好的,而是最适合公司现状的
此时Apache Paimon逐渐走入我们的视线,在去年年底积极的进行了相关应用测试,也向 @之信 老师请教了不少

因此我们目前针对在Hudi中性能下降较为严重的任务进行了迁移到Paimon的测试
借助其特性,数据写入非常平滑,把写入性能发挥到了极致,并且Paimon的整体架构相对简单,资源消耗也较少,用户学习成本较低

案例实践:存量数据导入
对于存量数据较多的表,我们采用的是通过Flink批读Hudi中的存量数据写入Paimon中,然后接再启动任务消费增量数据,Flink读取和写入的过程非常快,经过测试,3亿左右的表只需要不到20分钟即可导入完成,当然也可以直接读取hive表导入Paimon,具体实践根据实际场景
下面展示伪代码:

INSERT INTO paimon.ods.order_info
/*+ OPTIONS('sink.parallelism'='100','write-buffer-size'='1024m','sink.partition-shuffle' = 'true') */
SELECT
*
FROM
hudi.ods.order_info/*+ OPTIONS('read.tasks' = '100') */
;

目前,我们构建了如下架构

基于FTS的数仓架构

  • 数据源流批一体:
    • 在批处理场景下,借助FTS表数据准实时的特性,避免了凌晨大量的任务合并以及相关资源调度,并且加快了下游任务的启动时间
    • 在流处理场景下,直接流读FTS表,流批口径相同,不需要用户再去单独消费topic,避免繁杂的工作,让用户更注重业务逻辑实现
  • 统一数据集成:
    • 一键构建全增量的进行数据写入流程,屏蔽底层实现细节,便于用户上手
    • 一键新增字段以及数据补数
    • 凌晨统一进行数据可用性检测,保障下游数据质量
    • 构建基于数据湖表的大盘指标监控,重点关注当前运行任务的CP指标(CP失败率、CP耗时等)

2.2)基于Partial update的准实时宽表

准实时是介于离线和实时之间,其中准实时宽表是一个常见的案例
在以前

  • 通过微批调度(分钟,小时)进行数据更新,但是延迟相对较高
  • 通过流式引擎构建,则会存在保留大量状态造成资源严重浪费的情况

目前像Hudi、FTS等提供了Partial update的功能,其中FTS是原生自带的,可通过merge-engine参数来指定

'merge-engine' = 'partial-update'

Partial update的特点:

  • 结果表字段由多个数据源提供组成,可使用 union all 的方式进行逻辑拼接
  • 数据在存储层进行join拼接,与计算引擎无关,不需要保留状态,节省资源

案例实践:数据写入

一、FlinkSQL参数设置
set `table.dynamic-table-options.enabled`=`true`;
SET `env.state.backend`=`rocksdb`;
SET `execution.checkpointing.interval`=`60000`;
SET `execution.checkpointing.tolerable-failed-checkpoints`=`3`;
SET `execution.checkpointing.min-pause`=`60000`;二、创建FTS catalog
CREATE CATALOG paimon WITH ('type' = 'paimon','metastore' = 'hive','uri' = 'thrift://localhost:9083','warehouse' = 'hdfs://paimon','table.type' = 'EXTERNAL'
);三、创建Partial update结果表
CREATE TABLE if not EXISTS paimon.dw.order_detail
(`order_id` string ,`product_type` string ,`plat_name` string ,`ref_id` bigint ,`start_city_name` string ,`end_city_name` string ,`create_time` timestamp(3),`update_time` timestamp(3) ,`dispatch_time` timestamp(3) ,`decision_time` timestamp(3) ,`finish_time` timestamp(3) ,`order_status` int ,`binlog_time` bigint,PRIMARY KEY (order_id) NOT ENFORCED
) 
WITH ('bucket' = '20', -- 指定20个bucket'bucket-key' = 'order_id','sequence.field' = 'binlog_time', -- 记录排序字段'changelog-producer' = 'full-compaction',  -- 选择 full-compaction ,在compaction后产生完整的changelog'changelog-producer.compaction-interval' = '2 min', -- compaction 间隔时间'merge-engine' = 'partial-update','partial-update.ignore-delete' = 'true' -- 忽略DELETE数据
);INSERT INTO paimon.dw.order_detail
SELECT
order_id,
product_type,
plat_name,
ref_id,
cast(null as string) as start_city_name,
cast(null as string) as end_city_name,
create_time,
update_time,
dispatch_time,
decision_time,
finish_time,     
order_status,
binlog_time
FROM
paimon.ods.order_info /*+ OPTIONS ('scan.mode'='latest') */union all SELECT
order_id,
cast(null as string) as product_type,
cast(null as string) as plat_name,
cast(null as bigint) as ref_id,
start_city_name,
end_city_name,
cast(null as timestamp(3)) as create_time,
cast(null as timestamp(3)) as update_time,
cast(null as timestamp(3)) as dispatch_time,
cast(null as timestamp(3)) as decision_time,
cast(null as timestamp(3)) as finish_time,  
cast(null as int) as order_status,
binlog_time
FROM
paimon.ods.order_address /*+ OPTIONS ('scan.mode'='latest') */
;

案例实践:数据查询

一、Spark查询
select * from paimon.dw.order_detail;二、Flink查询
select * from paimon.dw.order_detail;
select * from paimon.dw.order_detail /*+ OPTIONS ('scan.mode'='latest') */;
select * from paimon.dw.order_detail /*+ OPTIONS ('scan.mode'='from-snapshot','scan.snapshot-id'='300') */;
select * from paimon.dw.order_detail /*+ OPTIONS ('scan.mode'='from-timestamp','scan.timestamp-millis'='1675180800000') */

2.3)AppendOnly应用

除了Binlog数据源,还有大量日志、埋点相关的AppendOnly数据源,这类数据基本都是数据量非常大的存在,一般来说,这类数据都是直接消费落在分布式文件系统上的

当我们采用FTS来构建AppendOnly表时,数据不仅可以实时写入,还可以实时读取,读写顺序一致
完全可以替换部分消息队列的场景,达到解耦和降本增效的效果

案例实践:数据写入

CREATE TABLE if not exists paimon.ods.event_log(uuid string,intotime TIMESTAMP(3),openid string,userid string,.......network string,partition_year int,partition_month int,partition_day int,partition_hour int
) 
PARTITIONED BY (`partition_year`,`partition_month`,`partition_day`,`partition_hour`)
WITH ('bucket' = '100','bucket-key' = 'uuid','snapshot.time-retained' = '7 d','write-mode' = 'append-only'
);INSERT INTO paimon.ods.event_log
SELECT uuid,intotime,openid,userid.......,cast(date_format(intotime,'yyyy') as int) as partition_year,cast(date_format(intotime,'MM') as int) as partition_month,cast(date_format(intotime,'dd') as int) as partition_day,cast(date_format(intotime,'HH') as int) as partition_hour
FROM realtime_event_kafka_source
;

三、问题解决方案

相关内容

热门资讯

嘉兴男子与妻争吵,突然将行李箱... 近日,浙江嘉兴一对夫妻因琐事发生争吵,丈夫突然将装满衣物的行李箱从6楼扔到楼下,引发关注。11月22...
三地107家律所齐聚丰台,京津... 11月22日,京津冀律师驿站举办“党建业务深度融合 促进行业规范发展”主题活动,发布“百千万行动计划...
家装预付资金安全困局如何破解,... 家装预付资金安全困局如何破解 专家提出:建立“先验收后付款”装修资金存管制度 预交数万元甚至数十万元...
工行安康解放路支行积极开展《反... 为深入贯彻落实《国家金融监督管理总局安康监管分局办公室关于开展<反有组织犯罪法>宣传活动的通知》要求...
重庆公布育儿补贴制度实施方案 原标题:每孩每年3600元 重庆公布育儿补贴制度实施方案 11月21日,记者了解到,市卫生健康委、市...
十五运会组委会在深总结本届赛事... 深圳新闻网2025年11月22日讯(深圳报业集团记者 林炜航)11月21日,十五运会组委会在深圳市民...
中国军视网:日本妄言击沉福建舰... 本文转自【中国军视网】; 日本首相高市早苗发表涉台错误言论,公然挑战一个中国原则,甚至还有日本无知政...
重磅!东莞长安50万㎡产城发布... 在当下竞争激烈的市场环境中,中小企业如何突破成本压力,找到一片既能扎根成长又能眺望未来的理想栖息地?...
毕马威:政策、资本等多维着力 ... 由毕马威联合长三角G60科创走廊创新研究中心主办的“长三角高端装备新质领袖榜单发布仪式”于11月21...