inside my function and I don’t want to generate a
default time or an error time, how should I deal with it?
Looking forward to your answer, thank you very much.
Best,
forideal
update the file.
Best,
Forideal
At 2021-05-07 19:41:45, "forideal" wrote:
Hi My friends:
I use FlieSystem in Flink SQL, and I found that my success file was
submitted late, probably dozens of minutes late.
Here I provide some information:
1.Flink version is 1.11.1.
, watermarks));
}
long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();
List needCommit = new ArrayList<>();
Iterator iter = pendingPartitions.iterator();
while (iter.hasNext()) {
String partition = iter.next();
LocalDateTime partTime = extractor.extract(
partitionKeys, extractPartitionValues(new Path(partition)));
if (watermark > toMills(partTime) + commitDelay) {
needCommit.add(partition);
iter.remove();
}
}
return needCommit;
}
Best,
Forideal
Hi My friends:
My watermark added 8 more hours to the timestamp displayed on the flink
web. What is the reason for this? Actually looking at the data, it is correct.
I don't know where the problem occurred? Is it because of the time zone?
Flink 1.11.1
Best Wishes!!!
forideal
Hi Arvid Heise,
Thank you for your reply.
Yes,my connection to the JM is bad !!!
Best wishes,forideal
At 2020-11-04 15:32:38, "Arvid Heise" wrote:
A jar upload shouldn't take minutes. There are two possibilities that likely
co-occured:
- your jar is m
re.java:1595)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Thank you very much for your reply, forideal
-fire.delay = 60 s
[1]http://apache-flink.147419.n8.nabble.com/FLINKSQL1-10-UV-td4003.html
Best, forideal
At 2020-08-16 13:21:25, "Chengcheng Zhang" <274522...@qq.com> wrote:
Hi, forideal
Thank you so much, it does help a lot.
The approach you mentioned earlier
BY time_str;
In this sql, time_str is an hour in 2020081600, 2020081601,...2020081623.
[1]http://apache-flink.147419.n8.nabble.com/flink-sql-5-5-td2011.html
[2]http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/
Hope this helps.
Best, forideal
At 2020
ts Arraylist
public class ConcatString extends ArrayList {
@Override
public boolean add(String toString) {
if (this.size() < 1000) {
super.add(toString);
return true;
}
return false;
}
public List getList() {
return this;
}
}
Best forideal
At 2020-08-14 21:46:45,
catString createAccumulator() {
return new ConcatString();
}
@Override
public void open(FunctionContext context)
throws Exception {
}
Best forideal
me = t1.event_time;
When i exec this sql, i will get the follewing exception.
Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be
cast to java.sql.Timestamp, field index: 1, field value: 2020-05-22T14:00.
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java#L236
Why don't we support LocalDateTime?
Best wishes.
forideal
Hello, my friend.
I have a dimension table.
createtabledim_u_score(u_idbigint,varchar,score_adouble,score_bdouble)with{xxx}Inascene
The condition of lookup is fliter score_a > 0.9
In another scenario
The condition of lookup is fliter score_b > 1
In Flink, at present, lookup join can use on to
atas.put(key, value);
}
@Override
public String getValue(Map acc) {
return JSON.toJSONString(acc);
}
@Override
public TypeInformation getResultType() {
return Types.STRING;
}
}
Best forideal
At 2020-04-21 10:05:05, "Kurt Young" wrote:
Thanks, once you can reproduce this issu
Best forideal
At 2020-04-18 21:51:13, "Jark Wu" wrote:
Hi,
What's the statebackend are you using? Is it Heap statebackend?
Best,
Jark
On Sat, 18 Apr 2020 at 07:06, tison wrote:
Hi,
Could you share the stack traces?
Best,
tison.
forideal 于2020年4月18日周六 上午12:33写
roup_key
Article Link:https://zhuanlan.zhihu.com/p/132764573
Picture Link:
https://pic4.zhimg.com/80/v2-d3b1105b1419fef3ea6b9176085a5597_1440w.jpg
https://pic3.zhimg.com/80/v2-b6ea7b4a8368c4bae03afb94c723bcca_1440w.jpg
Best, forideal
ate(sql);
});
env.execute(jobName);
Best Wishes
At 2020-04-10 16:35:33, "Jark Wu" wrote:
Hi forideal,
Are you using `StreamTableEnvironment` or SQL CLI?
Currently, only `TableEnvironemnt` with Blink planner have the multi-sink
optimization (reuse shared upstream operators).
Be
Hello
There are 3 SQLs all querying the same table, but the generated GAG is 3
independent topologies.I think, the better result is that there is one Source
and 3 Sinks.
createtablegood_sink(datavarchar)with(
'connector.type'='console',
'connector.dry-run'='false','connector.property
the job is fully initiated?
No,the job can't init.
Topology
op1-hash->op2-hash->op3-hash->op4
|
|-hash->op5
op1 parallelism is 200
op2 parallelism is 400
op3 parallelism is 400
op4 parallelism is 400
op5 parallelism is 400
Best Wishes
forideal
At 2020-03-20 15:20:07, &quo
emory.min', and 'taskmanager.network.memory.max'.
But actually this waste too many resource.
Memory Segments
| Type | Count |
| Available | 698,838 |
|
Total
| 700,000 |
| Direct | 700,103 | 21.4 GB | 21.4 GB |
| Mapped | 0 | 0 B | 0 B |
Best Wishes
forideal
0ne is Source operator, parallelism is 3
Two is GroupWindowAggregate operator,parallelism is 3
Three is LookupJoin operator,parallelism is 3
I want to change the parallelism of GroupWindowAggregate,but i can't.
Best wishes
forideal
Hello everyone
Now i have a job with big state in RocksDB.This job's source is Kafka. If i
want to replay data, the job will crash.
One of the motivation of FLIP 27 is event time alignment , however , it is
not already for me.
How can i work around?
Here is an immature solution, I don
21 matches
Mail list logo