Hi,
had the same problem.
Try this:
env.setStateBackend((StateBackend) new
FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
yeah... not the cleanest way... I guess the API is not that clean after all.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.co
Hi,
I do believe that example from [1] where you see DatabaseClient is just a
hint that whatever library you would use (db or REST based or whatever else)
should be asynchronous or should actually not block. It does not have to be
non blocking until it runs on its own thread pool that will return a
Hi Arvid,
thank you for the respond.
Yeah I tried to run my job shortly after posting my message and I got "State
is not supported in rich async function" ;)
I came up with a solution that would solve my initial problem -
concurrent/Async problem of processing messages with the same key but
unfor
Thanks Arvid,
I like your propositions in my case I wanted to use the state value to
decide if I should do the Async Call to external system. The result of this
call would be a state input. So having this:
Process1(calcualteValue or take it from state) -> AsyncCall to External
system to persist/Va
Hi,
I faced an issue on Flink 1.11. It was for now one time thing and I cannot
reproduce it. However I think something is lurking there...
I cannot post full stack trace and user code however I will try to describe
the problem.
Setup without any resource groups with only one Operator chain restri
Hi Arvid,
Thank you for your answer.
And what if a) would block task's thread?
Let's say I'm ok with making entire task thread to wait on this third party
lib.
In that case I would be safe from having this exception even though I would
not use AsyncIO?
--
Sent from: http://apache-flink-user
Hi Mars,
Were you able to solve this problem?
I'm facing exact same issue. I dont see logs from taskmanager from my
operators (taskmnager.out file) on EMR although running this locally from
IDE logs are printed.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi,
I would like to write few tests that would check the message flow in my
Flink pipeline.
I would like to base my test on [1].
My StreamJob class, that has the main method has all Sinks and Source
pluggable. The implementations are based also on [1].
In all examples available online I can see
Hi,
that helped however there is a problem with JobStatus. Please refer to [1]
In my case JobStatus is already Running but not all task are running.
Any idea how to get task status from MiniCluster?
[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issue-with-job-status-td
Thanks about clarification for NAT,
Moving NAT issue aside for a moment",
Is the process of sending "task deployment descriptor" that you mentioned in
"Feb 26, 2020; 4:18pm" a specially the process of notifying TaskManager
about IP of participating TaskManagers in job described somewhere? I'm
fam
Hi all,
In AWS documentation [1] we can see that AWS provides some set of connectors
for Flink. I would need to use an ActiveMQ one provided by [2]. Currently
I'm using Docker based stand alone Job Cluster and not AWS one.
Whats up with those connectors provided by AWS? Will I be able to use my
c
Hi Tzu-Li,
I think you misunderstood Oskar's question.
The question was if there are there any plans to support Java's
LocalDateTime in Flink's "native" de/serialization mechanism. As we can read
in [1], for basic types, Flink supports all Java primitives and their boxed
form, plus void, String, D
Thanks,
do you have any example how I could use it?
Basically I have a POJO class that has LocalDateTime filed in it.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi all,
When I run Flink from IDE i can see this prefix in logs
"Legacy Source Thread"
Running the same job as JobCluster on docker, this prefix is not present.
What this prefix means?
Btw, I'm using [1] as ActiveMQ connector.
Thanks.
[1]
https://github.com/apache/bahir-flink/tree/master/flink-c
Hi,
Im trying to test my RichAsyncFunction implementation with
OneInputStreamOperatorTestHarness based on [1]. I'm using Flink 1.9.2
My test setup is:
this.processFunction = new MyRichAsyncFunction();
this.testHarness = new OneInputStreamOperatorTestHarness<>(
new AsyncWaitOperator<>(
I've debug it a little bit and I found that it fails in
InstantiationUtil.readObjectFromConfig method when we execute
byte[] bytes = config.getBytes(key, (byte[])null); This returns null.
The key that it is looking for is "edgesInOrder". In the config map, there
are only two entries though.
For
I think I got this to work, although with "nasty" workaround.
I've debugged that configuration for this testHarnes operator was missing
two entries:
"edgesInOrder"
"typeSerializer_in_1"
I added conditional break points to InstantiationUtils.readObjectFromConfig
method for those two keys and I ran
Hi,
another update on this one.
I managed to make the workaround a little bit cleaner.
The test setup I have now is like this:
ByteArrayOutputStream streamEdgesBytes = new ByteArrayOutputStream();
ObjectOutputStream oosStreamEdges = new
ObjectOutputStream(streamEdgesBytes);
oosStreamEd
HI :) I have finally figured it out :)
On top of changes from last email,
in my test method, I had to wrap "testHarness.processElement" in
synchronized block, like this:
@Test
public void foo() throws Exception {
synchronized (this.testHarness.getCheckpointLock()) {
testHarness.proce
Thanks,
I would suggest adding my "tutorial" about using testHarnes for
AsynOperators, to the documentation. Or maybe build something based on this
use case, that could be helpful for others in the future :)
Thanks,
Krzysztof
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050
Hi,
sorry for a long wait.
Answering our questions:
1 - yes
2 - thx
3 - rigth, understood
4 - well, in general I want to understand how this works. To be able in
future to modify my job, for example extracting cpu heavy operators to
separate tasks. Actually in my job some of my operators are ch
Hi
I have few question regarding Flink's state.
Lets say we have:
Case 1.
stream.keybBy(...).process(myProcessFunction).parallelism(3).
MyProcessFucntion uses a managed state (mapState, ListState etc). I'm using
state checkpoints.
Flink will redistribute events across 3 instances of myProcessF
Hi,
I'm interested with uising Managed Flink service.
does anyone has an experience with hosting long running (generally 24/7)
Flink jobs on AWS EMR?
I'm interested, is it stable enough to host long running state size
intensive job.
With EMR i have all the config, HA zookeeper part handled by
Thank you for your answers.
I have one more question.
The Key Managed state for Keyed stream is per key or per operator?
For example I have a keyed stream that is processed by MyProcessFunction
with parallelism = 3. So I have three instances of MyProcessFuntion. The
process function has a KeyMa
Hi,
according to [1] operator state and broadcast state (which is a "special"
type of operator state) are not stored in RocksDb during runtime when
RocksDb is choosed as state backend.
Are there any plans to change this?
I'm working around a case where I will have a quite large control stream.
Hi Seth,
I would like to piggyback on this question :)
You wrote:
"I would strongly encourage you to create one instance of your object per
ProcessFunction, inside of open. That would be one instance per slot which
is not equal to the parallelism of your operator."
Especially the second part "Tha
Hi,
I would to ask about what has more memory footprint and what could be more
efficient regarding
less keys with bigger keyState vs many keys with smaller keyState
For this use case I'm using RocksDB StateBackend and state TTL is, well..
infinitive. So I'm keeping the state forever in Flink.
Th
Thanks Congxian Qiu,
I'm aware about your second point. In Value state I will keep String or very
simple POJO, without any collections inside.
I didn't get your third point, could you clarify it please?
"disk read/write is somewhat about the whole state size"
Actually what I will keep in Value s
Hi all,
I have a special use case that I'm not sure how I can fulfill.
The use case is:
I have my main business processing pipe line that has a MQ source,
processFunction1, processFunction2 and MQ sink
PocessFunction1 apart from processing the main business message is also
emitting some side eff
Thank you very much for your answer.
I have a question regarding your first paragraph:
" it requires that a sink participates in the pipeline. So it is not located
as a "leaf" operator but location somewhere in the middle."
Isn't Sink a terminating operator? So as far as I know Sinks cannot be in
My point was, that as far as I know, Sinks are "terminating" operators, that
ends the stream like .collect in Java 8 stream API. The don't emit elements
further and I cannot link then in a way:
source - proces - sink - process - sink
Sink function produces DataStreamSink which is used for emittin
Actually it seems there is already ongoing discussion about installing Flink
1.10 on EMR
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/upgrade-flink-from-1-9-1-to-1-10-0-on-EMR-td34114.html
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi all,
I'm using Flink 1.9.2 and I would like to ask about my use case and approach
I've took to meet it.
The use case:
I have a keyed stream, where I have to buffer messages with logic:
1. Buffering should start only when message arrives.
2. The max buffer time should not be longer than 3 seco
One addition:
in clear method of my custom trigger I do call
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi Marco Villalobos-2
unfortunately I don't think Tumbling window will work in my case.
The reasons:
1. Window must start only when there is a new event, and previous window is
closed. The new Tumbling window is created just after previews one is
purged. In my case I have to use SessionWindow wher
It seems that I'm clearing the timers in a right way, but there is a new
timer created from WindowOperator::registerCleanupTimer method. This one is
called from WindowOperator::processElement at the end of both if/else
branches.
How can I mitigate this? I dont want to have any "late firings" for m
Hi all,
is it possible that Custom Window Trigger (extending Trigger class) will
also implement CheckpointedFunction?
In my custom Trigger I have a complicated logic executed in
Trigger::onElement method.
Currently I'm using a triggerctx.getPartitionedState to do all reads and
writes for data man
I think I've figured it out.
I switched to GlobalWidnow with my custom trigger. My Trigger combines
processingTime trigger logic and onElement trigger logic. Only one should be
executed in scope of particular window.
I managed to do this by returning FIRE_AND_PURGE and cleat all timers and
state
Hi,
I would like to ask Flink Pojo Serialziation described in [1]
I have a case where my custom event source produces Events described by
Pojo:
public class DataPoint
{
public long timestamp;
public double value;
public BadPojo badPojo = new BadPojo();
public Data
Hi,
Any ideas about that one?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Theo,
thank you for clarification and code examples.
I was actually suspectign that this is becase the Java type erasure.s
The thing that bothers me though is fact that Flink was failing over to Kryo
silently in my case. Without any information in the logs. And actually we
found it just by luck.
Hi guys,
I'm using Flink 1.9.2
I have a question about uses case where I would like to use FLink's managed
keyed state with Async IO [1]
Lets take as a base line below example taken from [1] and lets assume that
we are executing this on a keyed stream.
final Future result = client.query(key);
Hi,
Im having the same problem now. What is your approach now after gaining
some experience?
Also do you use Spring DI to setup/initialize your jobs/process functions?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi Arvid Heise-3,
Thanks for your answer. I took this approach.
I did not want to start a new thread since I wanted to avoid "subject
duplication" :)
Regards,
Krzysztof
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi,
Is it possible to use an field that does not represent timestamp to order
events in Flink's pipeline?
In other words, I will receive a stream of events that will ha a sequence
number (gaps are possible).
Can I maintain the order of those events based on this field same as I would
do for time r
Hi,
I was playing around with BroadcastProcessFunction and I've observe a
specific behavior.
My setup:
MapStateDescriptor ruleStateDescriptor = new
MapStateDescriptor<>(
"RulesBroadcastState",
Types.VOID,
TypeInformation.of(new TypeHint() {
Hi,
I think this would be the very basic use case for Broadcast State Pattern
but I would like to know what are the best approaches to solve this problem.
I have an operator that extends BroadcastProcessFunction. The
brodcastElement is an element sent as Json format message by Kafka. It
describes
Hi Vino,
Thank you for your response and provided links.
So just to clarify and small follow up.
1. Methods will be called only by one thread right?
2. The links you provided are tackling a case when we got a "fast stream"
element before we received broadcast stream element. In my case we had
Br
Hi community,
I'm trying to build a PoC pipeline for my project and I have few questions
regarding load balancing between task managers and ensuring that keyed
stream events for the same key will go to the same Task Manager (hence the
same task slot).
Lets assume that we have 3 task managers, 3 t
Thank you for your reply Timo.
Regarding point 2. I'm sorry for the delay. I rerun my test and everything
seems to be in order. Open method was called as first. I guess it was a
false alarm. Sorry for that.
Regards,
Krzysztof
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.
Hi,
I have a question regarding job/operator deployment on Task Managers.
If I understand correctly, my job will be spitted into individual tasks,
which will be "deployed and executed" on particular task slot/s of Task
Manager (depending on parallelism level of course).
Lets imagine I have a Job
Hi,
thanks for the replay.
Just to clarify, I will have to have *a new Flink Cluster* (Job Manager and
Task Manager) that will run in the secure zone which will ran the
AsyncEnrich Job right?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi :)
Any thoughts about this?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi,
I've noticed that SplitStream class is marked as deprecated, although split
method of DataStream is not.
Also there is no alternative proposed in SplitStream doc for it.
In my use case I will have a stream of events that I have to split into two
separate streams based on some function. Events
Kostas, thank you for your response,
Well although the Side Outputs would do the job, I was just surprised that
those are the replacements for stream splitting.
The thing is, and this is might be only a subjective opinion, it that I
would assume that Side Outputs should be used only to produce so
Hi Kostas,
Thank you for the answer and clarification.
If Side-outputs are treated in the same way and there is no significant
performance penalty then it seems that they are ok for my use case.
I can accept the name mismatch ;)
Regards,
Krzysztof
--
Sent from: http://apache-flink-user-maili
Hi all,
I'm exploring Flink for our new project.
Currently I'm playing with Session Windows with dynamic Gap. In short, I
would like to be able to change the value of the gap on demand, for example
on config update.
So I'm having this code:
messageStream
.keyBy(tradeKeySelector
Thank you for the answer,
the thing is that I would not like to call external system for each Window,
rather I woudl like to keep the gap size in Flink's state which I will be
able to change from external system, for example handle configUpdate message
from Kafka.
So if SessionWindowTimeGapExtra
So I was trying to have something like this:
PipelineConfigOperator pipelineConfigOperator = new
PipelineConfigOperator();
messageStream
.connect(pipelineConfigStream)
.process(*pipelineConfigOperator*)
.keyBy(tradeKeySelector)
.wind
Hi all,
In my pipeline setup I cannot see side outputs for Session Window (Flink
1.9.1)
What I have is:
messageStream.
.keyBy(tradeKeySelector)
.window(ProcessingTimeSessionWindows.withDynamicGap(new
TradeAggregationGapExtractor()))
.sideOutputLateData(lateTradeMessages)
.process
Ok,
I did some more tests and yep, it seems that there is no way to use Flink's
State in class that will implement SessionWindowTimeGapExtractor.
Even if I will implement this interface on a class that is an operator,
whenever extract method is called it does not have any access to Flink's
state
After following suggestion from SO
I added few changes, so now I'm using Event Time
Water marks are progressing, I've checked them in Flink's metrics. The
Window operator is triggered but still I don't see any late outputs for
this.
StreamExecutionEnvironment env =
StreamExecutionEnvironment.get
Hi,
thank you for your SO comment [1]. You are right. Sorry, I miss understand
the "late message" concepts.
In fact I was never sending "late events" that should match just ended
window.
Thank you for your comments and clarification.
[1]
https://stackoverflow.com/questions/59570445/late-outpu
Hi all,
I must say I'm very impressed by Flink and what it can do.
I was trying to play around with Flink operator parallelism and scalability
and I have few questions regarding this subject.
My setup is:
1. Flink 1.9.1
2. Docker Job Cluster, where each Task manager has only one task slot. I'm
f
Hi Aljoscha,
Thanks for the response.
This sounds ok for me. It's as if the message carries additional information
that can "tell" operators how to handle this message. Maybe we could use
this approach also for different use cases.
I will try this approach, thanks.
--
Sent from: http://apache
Thank you David and Zhu Zhu,
this helps a lot.
I have follow up questions though.
Having this
/"Instead the Job must be stopped via a savepoint and restarted with a new
parallelism"/
and slot sharing [1] feature, I got the impression that if I would start my
cluster with more than 6 task slots,
Hi all,
I'm researching docker/k8s deployment possibilities for Flink 1.9.1.
I'm after reading/watching [1][2][3][4].
Currently we do think that we will try go with Job Cluster approach although
we would like to know what is the community trend with this? We would rather
not deploy more than one
Hi,
are there any plans to support Java 11?
Thanks,
Krzysztof
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi guys,
well We have requirement in our project to use Java 11, although we would
really like to use Flink because it seems to match our needs perfectly.
We were testing it on java 1.8 and all looks fine.
We tried to run it on Java 11 and also looks fine, at least for now.
We were also running
Hi Zhu Zhu,
well In my last test I did not change the job config, so I did not change
the parallelism level of any operator and I did not change policy regarding
slot sharing (it stays as default one). Operator Chaining is set to true
without any extra actions like "start new chain, disable chain e
Hi,
Yangze Guo, Chesnay Schepler thank you very much for your answers.
I have actually a funny setup.
So I have a Flink Job module, generated from Flink's maven archetype.
This module has all operators and Flink environment config and execution.
This module is compiled by maven with "maven.compil
Hi all,
we have a use case where order of received events matters and it should be
kept across pipeline.
Our pipeline would be paralleled. We can key the stream just after Source
operator, but in order to keep the ordering among next operators we would
have to still keep the stream keyed.
Obviou
Hi,
thank you for the answer.
I think I understand.
In my uses case I have to keep the order of events for each key, but I dont
have to process keys in the same order that I received them. On one point of
my pipeline I'm also using a SessionWindow.
My Flink environment has operator chaining en
Hi Piotr,
I'm not sure about:
"Note that if you want your state (your HashMap) to be actually
checkpointed, it must be either already defined as Flink manageād state
(like `ListState` in the example [1]), or you must copy content of your
`HashMap` to Flink managed state during `snapshotState` call.
Hi,
In documentation [1] we can read that
All internal connections are SSL authenticated and encrypted. The
connections use mutual authentication, meaning both server and client side
of each connection need to present the certificate to each other. The
certificate acts effectively as a shared secr
Hi,
In [1] where we can find setup for Stand Alone an YARN clusters to achieve
Job Manager's HA.
Is Standalone Cluster High Availability with a zookeeper the same approach
for Docker's Job Cluster approach with Kubernetes?
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/jobma
Thanks you both for answers.
So I just want to have this right.
I can I achieve HA for Job Cluster Docker config having the zookeeper quorum
configured like mentioned in [1] right (with s3 and zookeeper)?
I assume to modify default Job Cluster config to match the [1] setup.
[1]
https://ci.apach
Hi all,
well this may be a little bit strange question, but are there any minimal
machine requirements (memory size, CPU etc) and non functional requirements
(number of nodes, network ports ports, etc) for Flink?
I know it all boils down to what my deployed Job will be, but if we just
could put t
Thanks you both for answers.
So I just want to have this right.
I can I achieve HA for Job Cluster Docker config having the zookeeper quorum
configured like mentioned in [1] right (with s3 and zookeeper)?
I assume to modify default Job Cluster config to match the [1] setup.
[1]
https://ci.apach
Hi all,
I have a small question regarding 1.10
Correct me if I'm wrong, but 1.10 should support Java 11 right?
If so, then I noticed that docker images [1] referenced in [2] are still
based on openjdk8 not Java 11.
Whats up with that?
P.S.
Congrats on releasing 1.10 ;)
[1]
https://github.com/a
Hi all,
Is there a way to emit a side output from RichAsyncFunction operator like it
is possible with ProcessFunctions via ctx.output(outputTag, value); At first
glance I don't see a way to do it
In my use case RichAsyncFunction is used to call REST services and I would
like to handle REST error c
Hi,
any thoughts about this one?
Regards,
Krzysztof
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi all,
I was wondering how JobManager and TaskManager find each other?
Do they use multicast for this?
Can it be configure to use domain names instead IP's?
What I have to do to have two Flink Clusters in same IP network?
How I should start task manager in order to tell him, to connect to cluster
Hi all,
are there any obstacles from running Flink Cluster on PaaS like OpenShift
for example?
Where for example, task manager could be reasigned to different physical
box?
Especially when Flink will be in form of Docker Job Cluster.
Regards,
Krzysztof
--
Sent from: http://apache-flink-user-mai
Thank you Yang Wang,
Regarding [1] and a sentence from that doc.
"This page describes deploying a standalone Flink session"
I always wonder what do you guys mean by "Standalone Flink session" or
"Standalone Cluster" that can be found here [2].
I'm using a Docker with Job Cluster approach, I know
Thanks all for the answers,
One more question though. In [1] we can see that task managers are talking
with each other - sending data streams. How each task manager knows the
address of other task managers?
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/runtime.html#jo
Thank you very much,
what about if node Ip changes? Does it also supports dns or "raw" IP
addresses only.
I'm thinking about cloud deployments where actual service/process can be
rescheduled to a different box but there is name resolving mechanism.
Also what if there is NAT between Task Manager a
/So do you mean the ip address changes during running or the taskmanager
failed and relaunched with a same hostname, but the ip address is
different?/
Well that also but actually I was thinking about running FLink on PaaS
platforms where process can be re-spawned during runtime on different
machin
Hi,
I had same case but with FsStateBackend.
Cast it to StateBackend type.
env.setStateBackend((StateBackend) new FsStateBackend("some/Path"));
I think this is some inconsistency in API.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
89 matches
Mail list logo