Thank you Fabian.
Regards,
Averell
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi,
CMCF is not a source, only the file monitoring function is. Barriers are
injected by the FMF when the JM sends a checkpoint message. The barriers
then travel to the CMCF and trigger the Checkpoint ING.
Fabian
Averell schrieb am Di., 28. Aug. 2018, 12:02:
> Hello Fabian,
>
> Thanks for t
Hello Fabian,
Thanks for the answer. However, my question is a little bit different.
Let me rephrase my example and my question:
* I have 10,000 unsplittable small files to read, which, in total, has
about 10M output lines.
* From Flink's reporting web GUI, I can see that CFMF and
Contin
Hi Averell,
Barriers are injected into the regular data flow by source functions.
In case of a file monitoring source, the barriers are injected into the
stream of file splits that are passed to the
ContinuousFileMonitoringFunction.
The CFMF puts the splits into a queue and processes them with a d
Hello Fabian, and all,
Please excuse me for digging this old thread up.
I have a question regarding sending of the "barrier" messages in Flink's
checkpointing mechanism: I want to know when those barrier messages are sent
when I am using a file source. Where can I find it in the source code?
I'm
Thank you Fabian.
It is clear to me now. Thanks a lot for your help.
Regards,
Averell
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi Averell,
Conceptually, you are right. Checkpoints are taken at every operator at the
same "logical" time.
It is not important, that each operator checkpoints at the same wallclock
time. Instead, the need to take a checkpoint when they have processed the
same input.
This is implemented with so-c
Thank you Vino, Jorn, and Fabian.
Please forgive me for my ignorant, as I am still not able to fully
understand state/checkpointing and the statement that Fabian gave earlier:
"/In either case, some record will be read twice but if reading position can
be reset, you can still have exactly-once stat
Hi Averell,
One comment regarding what you said:
> As my files are small, I think there would not be much benefit in
checkpointing file offset state.
Checkpointing is not about efficiency but about consistency.
If the position in a split is not checkpointed, your application won't
operate with e
Or you write a custom file system for Flink... (for the tar part).
Unfortunately gz files can only be processed single threaded (there are some
multiple thread implementation but they don’t bring the big gain).
> On 10. Aug 2018, at 07:07, vino yang wrote:
>
> Hi Averell,
>
> In this case, I
Hi Averell,
In this case, I think you may need to extend Flink's existing source.
First, read your tar.gz large file, when it been decompressed, use the
multi-threaded ability to read the record in the source, and then parse the
data format (map / flatmap might be a suitable operator, you can cha
Hi Fabian, Vino,
I have one more question, which I initially planned to create a new thread,
but now I think it is better to ask here:
I need to process one big tar.gz file which contains multiple small gz
files. What is the best way to do this? I am thinking of having one single
thread process th
Thank you Vino and Fabien for your help in answering my questions. As my
files are small, I think there would not be much benefit in checkpointing
file offset state.
Thanks and regards,
Averell
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi Averall,
As Vino said, checkpoints store the state of all operators of an
application.
The state of a monitoring source function is the position in the currently
read split and all splits that have been received and are currently pending.
In case of a recovery, the splits are recovered and the
Hi Averell,
You need to understand that Flink reflects the recovery of the state, not
the recovery of the record.
Of course, sometimes your record is state, but sometimes the intermediate
result of your record is the state.
It depends on your business logic and your operators.
Thanks, vino.
Aver
Thank you Fabian.
"/In either case, some record will be read twice but if reading position can
be reset, you can still have exactly-once state consistency because the
state is reset as well./"
I do not quite understand this statement. If I have read 30 lines from the
checkpoint and sent those 30 r
Hi Averell,
please find my answers inlined.
Best, Fabian
2018-07-31 13:52 GMT+02:00 Averell :
> Hi Fabian,
>
> Thanks for the information. I will try to look at the change to that
> complex
> logic that you mentioned when I have time. That would save one more shuffle
> (from 1 to 0), wouldn't t
Hi Fabian,
Thanks for the information. I will try to look at the change to that complex
logic that you mentioned when I have time. That would save one more shuffle
(from 1 to 0), wouldn't that?
BTW, regarding fault tolerant in the file reader task, could you help
explain what would happen if the
Hi Averell,
The records emitted by the monitoring tasks are "just" file splits, i.e.,
meta information that defines which data to read from where.
The reader tasks receive these splits and process them by reading the
corresponding files.
You could of course partition the splits based on the file
Hi Averell,
Actually, Performing a key partition inside the Source Function is the same
as DataStream[Source].keyBy(cumstom partitioner), because keyBy is not a
real operator, but a virtual node in a DAG, which does not correspond to a
physical operator.
Thanks, vino.
2018-07-31 10:52 GMT+08:00
Hi Vino,
I'm a little bit confused.
If I can do the partitioning from within the source function, using the same
hash function on the key to identify the partition, would that be sufficient
to avoid shuffling in the next byKey call?
Thanks.
Averell
--
Sent from: http://apache-flink-user-maili
Hi Averell,
The keyBy transformation will trigger the key partition, which is one of
the various partition types supported by Flink, which causes the data to be
shuffled.
It routes the keys of the same hash value to the same node based on the
hash of the key you passed (or generated by the custom
Oh, Thank you Vino. I was not aware of that reshuffling after every custom
partitioning. Why would that needed though?
Thanks and regards,
Averell
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi Averell,
As far as I know, the custom partitioner will inevitably lead to shuffle of
data.
Even if it is bundled in the logic of the source function, isn't the
behavior different?
Thanks, vino.
2018-07-30 20:32 GMT+08:00 Averell :
> Thanks Vino.
>
> Yes, I can do that after the source functi
Thanks Vino.
Yes, I can do that after the source function. But that means data would be
shuffled - sending from every source to the right partition.
I think that by doing the partition from within the file source would help
to save that shuffling.
Thanks.
Averell.
--
Sent from: http://apache-f
Hi Averell,
Yes, you can not do it in the source function. I think you can call keyBy
with a partitioner (based on NodeID) after source.
Why do you have to use the customized partitioner in the source function?
Thanks, vino.
2018-07-30 19:56 GMT+08:00 Averell :
> Thank you Vino.
>
> Yes, I went
Thank you Vino.
Yes, I went thru that official guide before posting this question. The
problem was that I could not see any call to one of those mentioned
partitioning methods (partitionCustom, shuffle, rebalance, rescale, or
broadcast) in the original readFile function. I'm still trying to look i
Hi Averell,
Did you know Flink allow you to customize a partitioner?
Some resource :
official documentation :
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/#physical-partitioning
discussing in mailing list :
http://apache-flink-user-mailing-list-archive.2336050
28 matches
Mail list logo