Hi, Can you share more details what do you mean that you loose some records? Can you share what data are you ingesting what are the expected results and what are the actual results you are getting. Without that it's impossible to help you. So far your code looks rather correct.
Best, Dawid On 26/03/2020 08:52, Jim Chen wrote: > Hi, All > > When i use the Tumbling Windows, find lost some record. My code as > follow > > /env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);/ > /env.addSource(FlinkKafkaConsumer011......) > / > /.assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.minutes(3)) { > @Override > public long extractTimestamp(JSONObject jsonObject) { > long logTime = jsonObject.getLongValue("logTime"); > return logTime; > } > }) > / > /.keyBy(jsonObject -> { > return jsonObject.getString("userId"); > })/ > /.timeWindow(Time.seconds(30)) > / > /.process(new ProcessWindowFunction<JSONObject, JSONObject, String, > TimeWindow>() { > / > / public void process(String key, Context context, > Iterable<JSONObject> iterable, Collector<JSONObject> collector) throws > Exception {/ > / SimpleDateFormat sdf = new > SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); > String start = sdf.format(new > Date(context.window().getStart())); > String end = sdf.format(new > Date(context.window().getEnd())); > System.out.println(start + "----" + end); > / > / for (JSONObject jsonObject : iterable) {/ > / collector.collect(jsonObject);/ > /}}}/ > /.print("====");/ > / > / > From the print result, i found lost some record in the tumbling > window. I can't figure out, any one can help me ?
signature.asc
Description: OpenPGP digital signature