??????????
??????if(stateDate.equals("") ||
stateDate.equals(date))????????????????????pv_st.clear()????1.??????????????????pv_st????????????????????????????????2.state.clear()
????????????????????????????null????????????????????????null????????????????????????pv_st.update(pv_st.value()
+ c_st)????????????????????
------------------ ???????? ------------------
??????: "Yun Tang"<[email protected]>;
????????: 2020??3??31??(??????) ????3:59
??????: "user-zh"<[email protected]>;
????: Re: ProcessWindowFunction??????????????state??
Hi
???????? if(stateDate.equals("") || stateDate.equals(date))
??????????????????????????stateDate????????????????????????????????????????????????
??????state.clear()
????????????????????????????null??????????????????????????????????????????????
????
????
________________________________
From: ???? <[email protected]>
Sent: Tuesday, March 31, 2020 12:33
To: user-zh <[email protected]>
Subject: ProcessWindowFunction??????????????state??
????????
--????
FLINK 1.10.0 ON YARN
--????
1.????????&nbsp; .window(TumblingProcessingTimeWindows.of(Time.days(1)))????
2.????????new
Trigger(????.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))
3.????????new
ProcessWindowFunction(),????????????????????????????????0??????????????????????????????????????????
--????
&nbsp;?? new
ProcessWindowFunction()????????ValueState????????????0????????ValueState??????????????????????????ValueState??????????????????????????????????????????.clear()??????????????????????????????????
--????????
.window(TumblingProcessingTimeWindows.of(Time.days(1)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))
.process(new ProcessWindowFunction[(String,String,Long), String, Tuple,
TimeWindow] {
&nbsp; &nbsp; &nbsp; &nbsp; private var pv_st: ValueState[Long]
= _&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; override def open(parameters:
Configuration): Unit = {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;pv_st =
getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("pv_stCount",
classOf[Long]))
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp;override def process(key: Tuple,
context: Context, elements: Iterable[(String,String,Long)], out:
Collector[String]): Unit = {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var c_st = 0
&nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; val elementsIterator =
elements.iterator
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //
??????????????????????word
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while
(elementsIterator.hasNext) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; val ac_name =
elementsIterator.next()._2
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
if(!ac_name.isEmpty &amp;&amp; ac_name.equals("listentime")){
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
c_st +=1
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; val time: Date = new
Date()
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; val dateFormat:
SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; val date =
dateFormat.format(time)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // add current
&nbsp; &nbsp; &nbsp; &nbsp;
&nbsp;pv_st.update(pv_st.value() + c_st)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var jsonStr =
""+key.getField(0)+"_"+date+"&amp;" // json????????
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; jsonStr += "{"+
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp;
"\"yesterday_foreground_play_pv\":\""+pv_st.value()+
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; "\"}";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
//??????????????????????????????????????????????????????????????
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(stateDate.equals("")
|| stateDate.equals(date)){
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; stateDate=date
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
out.collect(jsonStr)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }else{
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
out.collect(jsonStr)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; pv_st.clear()
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; stateDate=date
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; })