FlinkSql中的窗口
创始人
2024-03-03 15:34:18
0
        窗口可以将无界流切割成大小有限的“桶”(bucket)来做计算,通过截取有限数据集来处理无限的流数据。

窗口表值函数(Window TVF)

        从 1.13 版本开始,Flink 开始使用窗口表值函数(Windowing table-valued functions,
Windowing TVFs)来定义窗口。窗口表值函数是 Flink 定义的多态表函数(PTF),可以将表
进行扩展后返回
。表函数(table function)可以看作是返回一个表的函数。

        目前 Flink 提供了以下几个窗口 TVF:

  • 滚动窗口(Tumbling Windows);
  • 滑动窗口(Hop Windows,跳跃窗口);
  • 累积窗口(Cumulate Windows);
  • 会话窗口(Session Windows,目前尚未完全支持)。

        在窗口 TVF 的返回值中,除去原始表中的所有列,还增加了用来描述窗口的额外 3 个列:窗口起始点(window_start)、窗口结束点(window_end)、窗口时间(window_time)。起始点和结束点比较好理解,这里的“窗口时间”指的是窗口中的时间属性,它的值等于window_end - 1ms,所以相当于是窗口中能够包含数据的最大时间戳

滚动窗口(TUMBLE)

滚动窗口在 SQL 中的概念与 DataStream API 中的定义完全一样,是长度固定、时间对齐、无重叠的窗口,一般用于周期性的统计计算。在 SQL 中通过调用 TUMBLE()函数就可以声明一个滚动窗口,只有一个核心参数就是窗口大小(size)。在 SQL 中不考虑计数窗口,所以滚动窗口就是滚动时间窗口,参数中还需要将当前的时间属性字段传入;另外,窗口 TVF 本质上是表函数,可以对表进行扩展,所以还应该把当前查询的表作为参数整体传入。具体声明如下:

TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)

这里基于时间字段 ts,对表 EventTable 中的数据开了大小为 1 小时的滚动窗口。窗口会将表中的每一行数据,按照它们 ts 的值分配到一个指定的窗口中。

应用场景:统计每小时的pv,uv。

滑动窗口(HOP)

        滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在 SQL中通过调用 HOP()来声明滑动窗口;除了也要传入表名、时间属性外,还需要传入窗口大小(size)
和滑动步长(slide)
两个参数。

HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));

这里我们基于时间属性 ts,在表 EventTable 上创建了大小为 1 小时的滑动窗口,每 5 分钟滑动一次。需要注意的是,紧跟在时间属性字段后面的第三个参数是步长(slide),第四个参数才是窗口大小(size)。

应用场景:每5分钟统计一下最近一小时的pv,uv。

累积窗口(CUMULATE)

        在实际应用中还会遇到这样一类需求:我们的统计周期可能较长,因此希望中间每隔一段时间就输出一次当前的统计值;与滑动窗口不同的是,在一个统计周期内,我们会多次输出统计值,它们应该是不断叠加累积的。这种特殊的窗口就叫作“累积窗口”(Cumulate Window),它会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)。所谓的最大窗口长度其实就是我们所说的“统计周期”,最终目的就是统计这段时间内的数据。开始时,创建的第一个窗口大小就是步长 step;之后的每个窗口都会在之前的基础上再扩展 step 的长度,直到达到最大窗口长度。在 SQL 中可以用 CUMULATE()函数来定义,具体如下:

CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))
这里我们基于时间属性 ts,在表 EventTable 上定义了一个统计周期为 1 天、累积步长为 1 小时的累积窗口。注意第三个参数为步长 step,第四个参数则是最大窗口长度。 使用场景:每小时统计一次当天的pv,uv。(如果用 1 天的滚动窗口,那需要到每天 24 点才会计算一次,输出频率太低;如果用滑动窗口,计算频率可以更高,但统计的就变成了“过去 24 小时的 PV”。所以我们真正希望的是,还是按照自然日统计每天的PV,不过需要每隔 1 小时就输出一次当天到目前为止的 PV 值)。

实际案例

每天的截⽌当前分钟的累计 money(sum(money),去重 id 数(count(distinct id))。每天代表渐进式窗⼝⼤⼩为 1 天,分钟代表渐进式窗⼝移动步⻓为分钟级别。

--数据源表
CREATE TABLE source_table (user_id BIGINT,money BIGINT,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
...
);-- 数据处理逻辑
SELECT   UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end ,window_start,sum(money) as sum_money,count(distinct id) as count_distinct_id
FROM TABLE(CUMULATE(TABLE source_table,DESCRIPTOR(row_time),INTERVAL '60' SECOND,INTERVAL '1' DAY))
GROUP BY window_start,UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000

可以看到 Windowing TVF 滚动窗⼝的写法就是把 cumulate window 的声明写在了数据源的 Table ⼦句中,所以可理解为将表进行扩展后返回。

Window TVF ⽀持 Grouping Sets、Rollup、Cube

应⽤场景:实际的案例场景中,经常会有多个维度进⾏组合(cube)计算指标的场景。如果把每个维度组合 的代码写⼀遍,然后 union all 起来,这样写起来⾮常麻烦,⽽且会导致⼀个数据源读取多遍。

-- ⽤户访问明细表
CREATE TABLE source_table (age STRING,sex STRING,user_id BIGINT,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
...
);--处理逻辑
SELECT   UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,if(age is null, 'ALL', age) as age,if(sex is null, 'ALL', sex) as sex,count(distinct user_id) as bucket_uv
FROM TABLE(CUMULATE(TABLE source_table,DESCRIPTOR(row_time),INTERVAL '5' SECOND,INTERVAL '1' DAY))
GROUP BY window_start,window_end,GROUPING SETS ((),(age),(sex),(age, sex))
;

⽬前 Grouping Sets 只在 Window TVF 中⽀持,不⽀持 Group Window Aggregation。

相关内容

热门资讯

快手最新公告:直播功能已逐步恢... 12月23日,快手发布最新公告,公告称,公司快手应用的直播功能于2025年12月22日22:00左右...
原创 《...  鲁网12月23日讯(记者 魏萱)12月22日,烟台市人民政府新闻办公室召开《烟台市海上交通安全条例...
昆明出台条例监管学校食品安全,... 12月23日,澎湃新闻从相关渠道获悉,《昆明市学校食品安全管理条例》(以下简称条例)已审查通过,自2...
广告语被质疑“大字吹牛,小字免... 近日,因一句“10户中国家庭,7户用公牛”的广告语,国内插座行业龙头公牛集团(603195)与竞争对...
最高法院:代理人是承担商业诋毁... 最高法院:代理人是承担商业诋毁责任的主体吗? 代理行为的法律后果由被代理人承担,代理人并非被诉行为实...
一图读懂!一次性信用修复政策→ 来源:中国人民银行 博尔塔拉融媒体中心出品 监审:葛慧慧 编审:刘素针 编辑:邹小梅 责编:邹小梅
原创 诚... 近期,据媒体报道,上海的车主原价30余万元、购买仅2个月的丰田新车在湖南一高速公路被追尾,交警认定后...
人民法院报评论员:大力推动行政... 为进一步发挥典型案例指导引领作用,12月22日,最高人民法院、最高人民检察院从过去一年的全国行政公益...