
作用:决定何时,触发窗口计算函数,开始计算
每个窗口都有一个默认触发器,也可以自定义触发器。

示例1:
当流中元素达到5个以后,触发窗口计算。
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.ReducingStateDescriptor
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport java.util.Properties//case class StockPrice(stockId:String, timestamp: Long, price:Double)object trigger {def main(args: Array[String]): Unit = {//create envval env = StreamExecutionEnvironment.getExecutionEnvironment//set parallelismenv.setParallelism(1)//set process timeenv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)// //for kafka connection
// val kafkaProps = new Properties()
//
// //kafka's attribute
// kafkaProps.setProperty("bootstrap.servers","192.12.249.10:9092")
//
// //set the consumer's group
// kafkaProps.setProperty("group.id","groupq")
//
// //create the consumer
// val kafkaSource = new FlinkKafkaConsumer[String]("stockPrice", new SimpleStringSchema, kafkaProps)
//
// //set offset
// kafkaSource.setStartFromEarliest()
//
// //auto commit offset
// kafkaSource.setCommitOffsetsOnCheckpoints(true)
//
// //band data source
// val ds = env.addSource(kafkaSource)
//
// val stockPriceStream = ds.map(s => s.split(","))
// .map(s => StockPrice(s(0).toString, s(1).toLong, s(2).toDouble))//create dsval pricesList = List(StockPrice("stock1", 10, 1), StockPrice("stock1", 11, 2), StockPrice("stock2", 10, 666), StockPrice("stock3", 10, 888.23), StockPrice("stock3", 10, 888.23))val ds = env.fromCollection(pricesList)
// ds.print()val sumedStream = ds.keyBy(s => s.stockId).timeWindow(Time.seconds(10)).trigger(new MyTrigger(3)).reduce((s1, s2) => StockPrice(s1.stockId, s1.timeStamp, s1.price + s2.price))sumedStream.print()env.execute()}class MyTrigger extends Trigger[StockPrice, TimeWindow] {//to receive the paradef this(maxCount:Int){this()this.maxCount = maxCount}//declare ( if reach max num ,then trigger windows)private var maxCount:Long = _//get trigger's stateprivate lazy val countStateDescriptor = new ReducingStateDescriptor[Long]("count", new Sum, classOf[Long])//override on elementoverride def onElement(t: StockPrice, l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {//get the trigger's stateval countState = triggerContext.getPartitionedState(countStateDescriptor)//state addcountState.add(1L)//judge state more than max trigger numif(countState.get() >= this.maxCount){//reach max num,then clear and trigger window compute//clear statecountState.clear()//computeTriggerResult.FIRE}else{TriggerResult.CONTINUE}}override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {//do nothing, cause we don't need deal process Time window, but need to override funcTriggerResult.CONTINUE}override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {//do nothingTriggerResult.CONTINUE}//clear the state, when window reach max num to trigger the computeoverride def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {println("@--now, window is closeed")triggerContext.getPartitionedState(countStateDescriptor).clear()}//update the state,class Sum extends ReduceFunction[Long]{override def reduce(t: Long, t1: Long): Long = {t+t1}}}}