Flink学习26:触发器
创始人
2024-03-28 14:42:51
0

触发器

 作用:决定何时,触发窗口计算函数,开始计算

 

每个窗口都有一个默认触发器,也可以自定义触发器。

自定义触发器

示例1:

当流中元素达到5个以后,触发窗口计算。

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.ReducingStateDescriptor
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport java.util.Properties//case class StockPrice(stockId:String, timestamp: Long, price:Double)object trigger {def main(args: Array[String]): Unit = {//create envval env = StreamExecutionEnvironment.getExecutionEnvironment//set parallelismenv.setParallelism(1)//set process timeenv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)//    //for kafka connection
//    val kafkaProps = new Properties()
//
//    //kafka's attribute
//    kafkaProps.setProperty("bootstrap.servers","192.12.249.10:9092")
//
//    //set the consumer's group
//    kafkaProps.setProperty("group.id","groupq")
//
//    //create the consumer
//    val kafkaSource = new FlinkKafkaConsumer[String]("stockPrice", new SimpleStringSchema, kafkaProps)
//
//    //set offset
//    kafkaSource.setStartFromEarliest()
//
//    //auto commit offset
//    kafkaSource.setCommitOffsetsOnCheckpoints(true)
//
//    //band data source
//    val ds = env.addSource(kafkaSource)
//
//    val stockPriceStream = ds.map(s => s.split(","))
//      .map(s => StockPrice(s(0).toString, s(1).toLong, s(2).toDouble))//create dsval pricesList = List(StockPrice("stock1", 10, 1), StockPrice("stock1", 11, 2), StockPrice("stock2", 10, 666), StockPrice("stock3", 10, 888.23), StockPrice("stock3", 10, 888.23))val ds = env.fromCollection(pricesList)
//    ds.print()val sumedStream = ds.keyBy(s => s.stockId).timeWindow(Time.seconds(10)).trigger(new MyTrigger(3)).reduce((s1, s2) => StockPrice(s1.stockId, s1.timeStamp, s1.price + s2.price))sumedStream.print()env.execute()}class MyTrigger extends Trigger[StockPrice, TimeWindow] {//to receive the paradef this(maxCount:Int){this()this.maxCount = maxCount}//declare ( if reach max num ,then trigger windows)private var maxCount:Long = _//get trigger's stateprivate lazy val countStateDescriptor = new ReducingStateDescriptor[Long]("count", new Sum, classOf[Long])//override on elementoverride def onElement(t: StockPrice, l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {//get the trigger's stateval countState = triggerContext.getPartitionedState(countStateDescriptor)//state addcountState.add(1L)//judge state more than max trigger numif(countState.get() >= this.maxCount){//reach max num,then clear and trigger window compute//clear statecountState.clear()//computeTriggerResult.FIRE}else{TriggerResult.CONTINUE}}override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {//do nothing, cause we don't need deal process Time window, but need to override funcTriggerResult.CONTINUE}override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {//do nothingTriggerResult.CONTINUE}//clear the state, when window reach max num to trigger the computeoverride def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {println("@--now, window is closeed")triggerContext.getPartitionedState(countStateDescriptor).clear()}//update the state,class Sum extends ReduceFunction[Long]{override def reduce(t: Long, t1: Long): Long = {t+t1}}}}

相关内容

热门资讯

向上·2025湖湘经济关键词盘... 三湘都市报·新湖南客户端 全媒体记者 龙思言 “守住消费券发放时间,买了就是省钱!”近日,已有身孕的...
宁波这类车淘汰更新,补贴政策调... 此前 宁波市生态环境局等5部门联合印发了 《宁波市国四排放标准 非营运中、重型货车提前淘汰 及新能源...
成都警方通报:男子因纠纷引燃易... 12月28日,成都市公安局高新区分局发布警情通报: 12月28日下午,高新区南三环路四段一汽车销售服...
爱奇艺回应“男子充25年会员退... 近日,河南许昌黄先生充了25年爱奇艺会员后遭遇退费难一事受到关注,相关词条登上微博热搜。 据河南广播...
成都警方:一男子因纠纷引发燃爆... 12月28日,成都市公安局高新区分局发布警情通报: 12月28日下午,我区南三环路四段一汽车销售服务...
成都高新警方通报:男子因纠纷在... 本文转自【成都高新公安】; 12月28日,成都市公安局高新区分局发布警情通报:
青海一人在出租屋内遇害!犯罪嫌... 关于规范非机动车停放秩序的通告 2000年2月2日,同仁县(现同仁市)发生一起恶性刑事案件,受害人马...
成都警方:男子因纠纷于4S店外... 新京报讯 12月28日,成都警方发布警情通报,内容如下: 编辑 李忆林子
原创 被... 被苹果起诉还敢爆料?Jon Prosser这次直接甩出王炸,iPhone首款折叠屏渲染图全网刷屏!书...
财政政策如何继续“更加积极”—... 新华社北京12月28日电 题:财政政策如何继续“更加积极”——解读2026年财政工作新看点 新华社记...