??????????????????midStream????????????????????????????????????????????
|
//6???????????????????????? ????????????????
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//????????????1?? ??????????????????
??????????????????????????????????????????????????
//tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));
// tableEnv.createTemporaryView("tableRequest", outRequestDataStream);
// tableEnv.createTemporaryView("tableAnswer", outAnswerDataStream);
Table tableRequest =tableEnv.fromDataStream(outRequestDataStream, $("funcId"),
$("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"),
$("eventTime").rowtime().as("et"));
// Table tableRequest = tableEnv.fromDataStream(outRequestDataStream,
Schema.newBuilder()
// .column("funcId", DataTypes.STRING())
// .column("serverIp", DataTypes.STRING())
// .column("outTime", DataTypes.BIGINT())
// .column("handleSerialNo", DataTypes.STRING())
// .column("info", DataTypes.STRING())
// .column("funcIdDesc", DataTypes.STRING())
// .column("eventTime", DataTypes.TIMESTAMP(3))
// .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
// .build());
Table tableAnswer =tableEnv.fromDataStream(outAnswerDataStream, $("funcId"),
$("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"),
$("eventTime").rowtime());
// Table tableAnswer = tableEnv.fromDataStream(outAnswerDataStream,
Schema.newBuilder()
// .column("funcId", DataTypes.STRING())
// .column("serverIp", DataTypes.STRING())
// .column("outTime", DataTypes.BIGINT())
// .column("handleSerialNo", DataTypes.STRING())
// .column("info", DataTypes.STRING())
// .column("funcIdDesc", DataTypes.STRING())
// .column("eventTime", DataTypes.TIMESTAMP(3))
// .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
// .build());
Table result = tableEnv.sqlQuery("select \n" +
"\ta.funcId as funcId ,\n" +
"\ta.funcIdDesc as funcIdDesc,\n" +
"\ta.serverIp as serverIp,\n" +
"\tb.outTime as maxTime,\n" +
"\ta.outTime as minTime,\t\n" +
"\tconcat(a.funcId,a.serverIp) as pk ,\n" +
" a.et as et\n" +
" from " + tableRequest + " a\n " +
" inner join " + tableAnswer + " b" +
" on a.handleSerialNo=b.handleSerialNo ");
System.out.println("??????resultTable" + result);
result.printSchema();
tableEnv.createTemporaryView("resultTable", result);
DataStream<MidInfo> midStream = tableEnv.toAppendStream(result,
MidInfo.class);
Table midTable = tableEnv.fromDataStream(midStream, $("funcId"),
$("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"),
$("et").rowtime())
.select($("funcId"), $("funcIdDesc"), $("serverIp"),
$("maxTime"), $("minTime"), $("pk"), $("et"));
midTable.printSchema();
tableEnv.createTemporaryView("midTable1", midTable);
//????TVF????????????????????????????
Table resulTable = tableEnv.sqlQuery("SELECT
funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +
"FROM TABLE(CUMULATE(\n" +
" TABLE midTable1" +
//" TABLE "+ midTable +
" , DESCRIPTOR(et)\n" +
" , INTERVAL '60' SECOND\n" +
" , INTERVAL '1' DAY))\n" +
" GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk");
resulTable.printSchema();
|
????????????????????
|
package job;
import bean.BaseInfo;
import bean.MidInfo;
import bean.OutInfo;
import bean.ResultInfo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import function.MyProcessFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.OutputTag;
import sink.Sink2Mysql;
import utils.DateUtil;
import utils.DateUtils;
import utils.JdbcUtil;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Timestamp;
import java.time.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;
import static org.apache.flink.table.api.Expressions.$;
/**
* @Author ??
* @Time 2023/5/10 8:32
* ???? flink process????????????????????
????????????????????????????????????????????????
*
??????????????????????????????????????????????????????????????????????row_time
as cast(CURRENT_TIMESTAMP AS timestamp(3) )??
* ????????????????????????????
*/
public class RytLogAnly4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//????????????
OutputTag<BaseInfo> requestStream = new OutputTag<BaseInfo>("requestStream") {
};
OutputTag<BaseInfo> answerStream = new
OutputTag<BaseInfo>("answerStream") {
};
//1??????????????kafka??????
String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName = FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource<String> sourceStream = env.addSource(new
FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop));
sourceStream.print("sourceStream");
//{"ip":"10.125.8.141","data":"????: -- 14:28:05.111 --
<44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=f7olmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
//2????????????????????????baseInfo??????????
SingleOutputStreamOperator<BaseInfo> baseInfoStream = sourceStream.map(new
MapFunction<String, BaseInfo>() {
@Override
public BaseInfo map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//??????????????????IP
String serverIp = jsonObject.getString("ip");
//????????????data??????
String datas = jsonObject.getString("data");
String[] splits = datas.split("\n");
HashMap<String, String> dataMap = new HashMap<>();
//??time??????????????????????????????????num??????????????????
String time = splits[0].substring(7, 19);
//??subData??????????????????????????????????????????
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo(dataMap.get("action"), serverIp,
DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
}
});
baseInfoStream.print("baseInfoStream");
//3??????process????????baseInfoStream??????
SingleOutputStreamOperator<BaseInfo> tagStream = baseInfoStream.process(new
MyProcessFunction(requestStream, answerStream));
//4????????????tag????????????????????
DataStream<BaseInfo> requestDataStream = tagStream.getSideOutput(requestStream);
DataStream<BaseInfo> answerDataStream =
tagStream.getSideOutput(answerStream);
requestDataStream.print("requestDataStream");
answerDataStream.print("answerDataStream");
//5????????????????????????action????????????????action????????????????????????MySQL??????
//5.1 ??????????????????
SingleOutputStreamOperator<OutInfo> outRequestDataStream =
requestDataStream.map(new MapFunction<BaseInfo, OutInfo>() {
@Override
public OutInfo map(BaseInfo value) throws Exception {
//??????????????????????action
String actionType = value.getFuncId();
System.out.println(actionType);
String actionName = null;
Connection connection = null;
PreparedStatement ps = null;
//??????????action??MySQL??????????????????????
try {
String sql = "select action_name from ActionType where
action = ?";
connection = JdbcUtil.getConnection();
ps = connection.prepareStatement(sql);
ps.setString(1, actionType);
ResultSet resultSet = ps.executeQuery();
System.out.println("resultSet??" + resultSet);
if (resultSet.next()) {
actionName = resultSet.getString("action_name");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
JdbcUtil.closeResource(connection, ps);
}
// return new OutInfo(value.getFuncId(), value.getServerIp(),
value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(),
actionName,DateUtils.format(new Date()));
return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(),
value.getHandleSerialNo(), value.getInfo(), actionName,
System.currentTimeMillis() );
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.<OutInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)->
element.getEventTime()));;
outRequestDataStream.print("outRequestDataStream");
//5.2 ????????????????
SingleOutputStreamOperator<OutInfo> outAnswerDataStream =
answerDataStream.map(new MapFunction<BaseInfo, OutInfo>() {
@Override
public OutInfo map(BaseInfo value) throws Exception {
//??????????????????????action
String actionType = value.getFuncId();
System.out.println(actionType);
String actionName = null;
Connection connection = null;
PreparedStatement ps = null;
//??????????action??MySQL??????????????????????
try {
String sql = "select action_name from ActionType where
action = ?";
connection = JdbcUtil.getConnection();
ps = connection.prepareStatement(sql);
ps.setString(1, actionType);
ResultSet resultSet = ps.executeQuery();
System.out.println("resultSet??" + resultSet);
if (resultSet.next()) {
actionName = resultSet.getString("action_name");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
JdbcUtil.closeResource(connection, ps);
}
// return new OutInfo(value.getFuncId(), value.getServerIp(),
value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName,
DateUtils.format(new Date()));
return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(),
value.getHandleSerialNo(), value.getInfo(), actionName,
System.currentTimeMillis() );
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.<OutInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)->
element.getEventTime()));
outAnswerDataStream.print("outAnswerDataStream");
//6???????????????????????? ????????????????
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//????????????1?? ??????????????????
??????????????????????????????????????????????????
//tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));
// tableEnv.createTemporaryView("tableRequest", outRequestDataStream);
// tableEnv.createTemporaryView("tableAnswer", outAnswerDataStream);
Table tableRequest =tableEnv.fromDataStream(outRequestDataStream, $("funcId"),
$("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"),
$("eventTime").rowtime().as("et"));
// Table tableRequest = tableEnv.fromDataStream(outRequestDataStream,
Schema.newBuilder()
// .column("funcId", DataTypes.STRING())
// .column("serverIp", DataTypes.STRING())
// .column("outTime", DataTypes.BIGINT())
// .column("handleSerialNo", DataTypes.STRING())
// .column("info", DataTypes.STRING())
// .column("funcIdDesc", DataTypes.STRING())
// .column("eventTime", DataTypes.TIMESTAMP(3))
// .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
// .build());
Table tableAnswer =tableEnv.fromDataStream(outAnswerDataStream, $("funcId"),
$("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"),
$("eventTime").rowtime());
// Table tableAnswer = tableEnv.fromDataStream(outAnswerDataStream,
Schema.newBuilder()
// .column("funcId", DataTypes.STRING())
// .column("serverIp", DataTypes.STRING())
// .column("outTime", DataTypes.BIGINT())
// .column("handleSerialNo", DataTypes.STRING())
// .column("info", DataTypes.STRING())
// .column("funcIdDesc", DataTypes.STRING())
// .column("eventTime", DataTypes.TIMESTAMP(3))
// .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
// .build());
Table result = tableEnv.sqlQuery("select \n" +
"\ta.funcId as funcId ,\n" +
"\ta.funcIdDesc as funcIdDesc,\n" +
"\ta.serverIp as serverIp,\n" +
"\tb.outTime as maxTime,\n" +
"\ta.outTime as minTime,\t\n" +
"\tconcat(a.funcId,a.serverIp) as pk ,\n" +
" a.et as et\n" +
" from " + tableRequest + " a\n " +
" inner join " + tableAnswer + " b" +
" on a.handleSerialNo=b.handleSerialNo ");
System.out.println("??????resultTable" + result);
result.printSchema();
tableEnv.createTemporaryView("resultTable", result);
DataStream<MidInfo> midStream = tableEnv.toAppendStream(result,
MidInfo.class);
Table midTable = tableEnv.fromDataStream(midStream, $("funcId"),
$("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"),
$("et").rowtime())
.select($("funcId"), $("funcIdDesc"), $("serverIp"),
$("maxTime"), $("minTime"), $("pk"), $("et"));
midTable.printSchema();
tableEnv.createTemporaryView("midTable1", midTable);
//????TVF????????????????????????????
Table resulTable = tableEnv.sqlQuery("SELECT
funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +
"FROM TABLE(CUMULATE(\n" +
" TABLE midTable1" +
//" TABLE "+ midTable +
" , DESCRIPTOR(et)\n" +
" , INTERVAL '60' SECOND\n" +
" , INTERVAL '1' DAY))\n" +
" GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk");
resulTable.printSchema();
//tableEnv.executeSql("select * from "+resulTable).print();
// DataStream<Tuple2<Boolean, ResultInfo>> resultStream =
tableEnv.toRetractStream(resulTable, ResultInfo.class);
// resultStream.print("resultStream");
// SingleOutputStreamOperator<ResultInfo> resultInfoStream =
resultStream.map(new MapFunction<Tuple2<Boolean, ResultInfo>, ResultInfo>() {
// @Override
// public ResultInfo map(Tuple2<Boolean, ResultInfo> value) throws
Exception {
// return value.f1;
// }
// });
// resultInfoStream.print("resultInfoStream");
// resultInfoStream.addSink(new Sink2Mysql());
env.execute();
}
}
|
| |
????????
|
|
[email protected]
|
---- ???????????? ----
| ?????? | L Y<[email protected]> |
| ???????? | 2023??5??20?? 01:10 |
| ?????? | user-zh<[email protected]> |
| ???? | ??????table api????rowtime?????? |
HI??????????
??????????????????midStream??????????????????????????????????????????????midStream????????????????????????????????????????????????????????????????????????
??????
SingleOutputStreamOperator<Event> eventStream = env
.fromElements(
.............. ).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}
)
);
??????????????????????midStream????????????????flink??????
LY
[email protected]
L Y
[email protected]
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2023??5??17??(??????) ????9:28
??????: "user-zh"<[email protected]>;
????: table api????rowtime??????
????????????????????????????
| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"),
$("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"),
$("eventTime").rowtime());
tableEnv.createTemporaryView("midTable1",midTable); Table resulTable =
tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as
minTime\n"
+
"FROM TABLE(CUMULATE(\n"
+
" TABLE
midTable1"+
//" TABLE "+ midTable
+
" , DESCRIPTOR(eventTime)\n"
+
" , INTERVAL '60' SECOND\n"
+
" , INTERVAL '1' DAY))\n"
+
" GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
??????????????????????????????????????eventTime??rowtime,????????????????sqlQuery??????????????????????Rowtime
timestamp is not defined. Please make sure that a proper TimestampAssigner is
defined and the stream environment uses the EventTime time characteristic
??????????????????????????
| |
????????
|
|
[email protected]
|