MR 6.4. I have an existing application
> using DataStream API that I would like to modify to write output to S3. I
> am testing the StreamingFileSink with a bounded input. I have enabled
> checkpointing.
>
> A couple questions:
> 1) When the program finishes, all the files remai
I am using Flink 1.13.1 on AWS EMR 6.4. I have an existing application
using DataStream API that I would like to modify to write output to S3. I
am testing the StreamingFileSink with a bounded input. I have enabled
checkpointing.
A couple questions:
1) When the program finishes, all the files
>
> Best regards,
> Yuxia
>
> --
> *发件人: *"Xin Ma"
> *收件人: *"User"
> *发送时间: *星期四, 2022年 6 月 30日 下午 11:05:51
> *主题: *StreamingFileSink & checkpoint tuning
>
> Hi,
>
> I recently encountered an issue while usin
gards,
Yuxia
发件人: "Xin Ma"
收件人: "User"
发送时间: 星期四, 2022年 6 月 30日 下午 11:05:51
主题: StreamingFileSink & checkpoint tuning
Hi,
I recently encountered an issue while using StreamingFileSink.
I have a flink job consuming records from various sources and write to s3 w
Hi,
I recently encountered an issue while using StreamingFileSink.
I have a flink job consuming records from various sources and write to s3
with streaming file sink. But the job sometimes fails due to checkpoint
timeout, and the root cause is checkpoint alignment failure as there is
data
Hello all,
In multiple jobs I'm saving data using the datastream API with
StreamingFileSink and various bulk formats (avro, parquet). As bulk formats
require a rolling policy that extends the CheckpointRollingPolicy I have
created a policy that rolls on file size additionally. Unfortunatel
Hi Kamil,
AFAIK, it should still not support Avro format in Python StreamingFileSink in
the Python DataStream API. However, I guess you could convert DataStream to
Table[1] and then you could use all the connectors supported in the Table &
SQL. In this case, you could use the FileSy
Hello,
I'm trying to save my data stream to an Avro file on HDFS. In Flink
documentation I can only see explanations for Java/Scala. However, I can't
seem to find a way to do it in PyFlink. Is this possible to do in PyFlink
currently?
Kind Regards
Kamil
t 10:29 AM Sudhanva Purushothama <
sudha...@coffeebeans.io> wrote:
> Hello,
> I have been trying to Use StreamingFileSink to write to parquetFiles
> into azure blob storage. I am getting the following error. I did see in the
> ticket https://issues.apache.org/jira/browse/
Hello, I have been trying to Use StreamingFileSink to write to parquetFiles into azure blob storage. I am getting the following error. I did see in the ticket https://issues.apache.org/jira/browse/FLINK-17444 that support for StreamingFileSink is not yet provided.
code.java
Description
last character.
On 6/3/2021 3:45 PM, Robert Cullen wrote:
I have a StreamingFileSink that writes to S3:
|final StreamingFileSink> sink =
StreamingFileSink.forRowFormat( new Path("s3://argo-artifacts/files"),
new SimpleStringEncoder>("UTF-8"))
.withBucke
I have a StreamingFileSink that writes to S3:
final StreamingFileSink> sink =
StreamingFileSink.forRowFormat(
new Path("s3://argo-artifacts/files"),
new SimpleStringEn
The StreamingFileSink requires that you have checkpointing enabled. I'm
guessing that you don't have checkpointing enabled, since that would
explain the behavior you are seeing.
The relevant section of the docs [1] explains:
Checkpointing needs to be enabled when using the Streami
On my kubernetes cluster when I set the StreamingFileSink to write to a
local instance of S3 (MINIO - 500 GB) it only writes the data after I
execute a savepoint
The expected behavior is to write the data in real-time. I'm guessing the
memory requirements have not been met or a configurati
If anybody is interested, I've implemented a StreamingFileSink with dynamic
paths:
https://github.com/sidfeiner/DynamicPathFileSink
Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp
[emailsignature]
From: Rafi
I’m trying to stream data to a file on an S3 compatible system (MINIO):
DataStream resultStream = tEnv.toAppendStream(log_counts,
Types.ROW(Types.STRING, Types.STRING, Types.LONG));
final StreamingFileSink sink =
StreamingFileSink.forRowFormat(
new Path("s3:/
Sorry, wrong build. It is not in the jar.
Von: Jan Oelschlegel
Gesendet: Freitag, 15. Januar 2021 12:52
An: Dawid Wysakowicz ; user@flink.apache.org
Betreff: AW: AW: StreamingFileSink with ParquetAvroWriters
Yes, after unzipping it is in the jar:
[cid:image001.jpg@01D6EB3F.136D8560]
Von
Yes, after unzipping it is in the jar:
[cid:image002.jpg@01D6EB3D.33BCC530]
Von: Dawid Wysakowicz
Gesendet: Freitag, 15. Januar 2021 12:10
An: Jan Oelschlegel ; user@flink.apache.org
Betreff: Re: AW: StreamingFileSink with ParquetAvroWriters
Have you checked if the class (org/apache/parquet
:34
An: Jan Oelschlegel ; user@flink.apache.org
Betreff: Re: StreamingFileSink with ParquetAvroWriters
Hi Jan,
Could you have a try by adding this dependency ?
org.apache.parquet
parquet-avro
1.11.1
Best,
Yun
--Original Mail --
Sender:Jan
rg/projects/flink/flink-docs-release-1.11/dev/project-configuration.html#create-project
>
>
>
> Best,
>
> Jan
>
>
>
>
>
> *Von:*Dawid Wysakowicz
> *Gesendet:* Donnerstag, 14. Januar 2021 12:42
> *An:* Jan Oelschlegel ;
> user@flink.apache.org
>
]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/project-configuration.html#create-project
Best,
Jan
Von: Dawid Wysakowicz
Gesendet: Donnerstag, 14. Januar 2021 12:42
An: Jan Oelschlegel ; user@flink.apache.org
Betreff: Re: StreamingFileSink with ParquetAvroWriters
Hi Jan
Could
would like to use the StreamingFileSink
> for writing into HDFS in Parquet format.
>
>
>
> As it says in the documentation I have added the dependencies:
>
>
>
>
> org.apache.flink
> flink-parquet_${scala.binary.version}
> ${flink.version}
>
>
Subject:StreamingFileSink with ParquetAvroWriters
Hi,
i’m using Flink (v.1.11.2) and would like to use the StreamingFileSink for
writing into HDFS in Parquet format.
As it says in the documentation I have added the dependencies:
org.apache.flink
flink-parquet_${scala.binary.version}
${flink.version}
And
Hi,
i'm using Flink (v.1.11.2) and would like to use the StreamingFileSink for
writing into HDFS in Parquet format.
As it says in the documentation I have added the dependencies:
org.apache.flink
flink-parquet_${scala.binary.version}
${flink.version}
And this is my file
Hi Sidney,
Have a look at implementing a BucketAssigner for StreamingFileSink:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#bucket-assignment
Rafi
On Sat, Dec 26, 2020 at 11:48 PM Sidney Feiner
wrote:
> Hey,
> I would like to create a d
Hey,
I would like to create a dynamic StreamingFileSink for my Streaming pipeline.
By dynamic, I mean that it will write to a different directory based on the
input.
For example, redirect the row to a different directory based on the first 2
characters of the input, so if the content I'm wr
Hi Billy,
StreamingFileSink does not expect the Encoder to close the stream passed in
in encode method. However, ObjectMapper would close it at the end of the write
method. Thus I think you think disable the close action for ObjectMapper, or
change the encode implementation to
=
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
DataStreamSource lines =
env.readTextFile("file:///path/to/file/input.json");
SingleOutputStreamOperator android = lines.map(new
AndroidDataMapper());
StreamingFileSink sink =
StreamingFileSink.forRowForma
>
>>>>
>>>> Regards,
>>>> Ravi
>>>>
>>>> On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav
>>>> wrote:
>>>>
>>>>> Hi Team,
>>>>>
>>>>> I have tried to assign a dyna
*The Problem is Job always takes initial datetime when job first starts
>>>> and never refreshes later. *
>>>> *How can I get dynamic current datetime in filename at sink time ?*
>>>>
>>>> *.withPartPrefix
>>>> (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("-MM-dd-HH-mm-ss-SSS")))*
>>>>
>>>>
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>>>>
>>>> val config = OutputFileConfig
>>>> .builder() .withPartPrefix("prefix")
>>>> .withPartSuffix(".ext")
>>>> .build()
>>>> val sink = StreamingFileSink
>>>> .forRowFormat(new Path(outputPath), new
>>>> SimpleStringEncoder[String]("UTF-8"))
>>>> .withBucketAssigner(new KeyBucketAssigner())
>>>> .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>>> .withOutputFileConfig(config)
>>>> .build()
>>>>
>>>>
;>> *How can I get dynamic current datetime in filename at sink time ?*
>>>
>>> *.withPartPrefix
>>> (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("-MM-dd-HH-mm-ss-SSS")))*
>>>
>
current datetime in filename at sink time ?*
>>
>> *.withPartPrefix
>> (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("-MM-dd-HH-mm-ss-SSS")))*
>>
>>
>>
>> https://ci.apache.org
)).format(DateTimeFormatter.ofPattern("-MM-dd-HH-mm-ss-SSS")))*
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>
> val config = OutputFileConfig
> .builder() .withPartPrefix("prefix")
")
.withPartSuffix(".ext")
.build()
val sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
.withBucketAssigner(new KeyBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config)
.build()
link-docs-release-1.11/dev/connectors/streamfile_sink.html#general
>>> [2]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s
Fs
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials
>>
>>
>> On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse wrote:
>>
>>> First, let me say, Flink is super cool - thanks everyone f
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials
>
>
> On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse wrote:
>
>> First, let me say, Flink is super cool - thanks everyone for making my
>> life easie
Wish I had this 10 years ago
>
> Onto the fun stuff: I am attempting to use the StreamingFileSink with S3.
> Note that Flink is embedded in my app, not running as a standalone cluster.
>
> I am having a few problems, which I have illustrated in the small test
> case below.
First, let me say, Flink is super cool - thanks everyone for making my life
easier in a lot of ways! Wish I had this 10 years ago
Onto the fun stuff: I am attempting to use the StreamingFileSink with S3.
Note that Flink is embedded in my app, not running as a standalone cluster.
I am having
Flink you’re using).
From an email conversation with Kostas in January of this year:
Hi Ken, Jingsong and Li,
Sorry for the late reply.
As Jingsong pointed out, upon calling close() the StreamingFileSink
does not commit the in-progress/pending files.
The reason for this is that the close() meth
Hi All,
My flink-job is using bounded input sources and writes the results to a
StreamingFileSink.
When it has processed all the input the job is finished and closes. But the
output files are still
named “-0-0..inprogress.”. I expected them to be named
““-0-0.”.
Did I forget some setting or
n calling close() the StreamingFileSink
> does not commit the in-progress/pending files.
> The reason for this is that the close() method of any UDF including
> sink functions is called on both normal termination and termination
> due to failure.
> Given this, we cannot commit the fil
Thank You Andrey.
Regards,
Vijay
> On Aug 29, 2020, at 3:38 AM, Andrey Zagrebin wrote:
>
>
> Hi Vijay,
>
> I would apply the same judgement. It is latency vs throughput vs spent
> resources vs practical need.
>
> The more concurrent checkpoints your system is capable of handling, the
> b
Hi Vijay,
I would apply the same judgement. It is latency vs throughput vs spent
resources vs practical need.
The more concurrent checkpoints your system is capable of handling, the
better end-to-end result latency you will observe and see computation
results more frequently.
On the other hand yo
Hi Andrey,
Thanks,
what is recommendation for : env.getCheckpointConfig.
*setMaxConcurrentCheckpoints*(concurrentchckpt) ?
1 or higher based on what factor.
Regards,
Vijay
On Tue, Aug 25, 2020 at 8:55 AM Andrey Zagrebin
wrote:
> Hi Vijay,
>
> I think it depends on your job requirements, in
Hi Vijay,
I think it depends on your job requirements, in particular how many records
are processed per second and how much resources you have to process them.
If the checkpointing interval is short then the checkpointing overhead can
be too high and you need more resources to efficiently keep up
Hi Team,
Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls (ONLY)
on every checkpoint.
*.withRollingPolicy(OnCheckpointRollingPolicy.build())*
Question: What are recommended values related to checkpointing to fsstate,
should it be more frequent checkpoints, or longer intervals,
Hi Robert,
Thanks for the answer...
De: Robert Metzger
Enviado: martes, 11 de agosto de 2020 3:46
Para: Mariano González Núñez
Cc: user@flink.apache.org
Asunto: Re: BucketingSink & StreamingFileSink
Hi Mariano,
thanks a lot for your question. The resolu
Hi Mariano,
thanks a lot for your question. The resolution on StackOverflow seems to be
that Azure Datalake is not yet (
https://issues.apache.org/jira/browse/FLINK-18568) supported by the
StreamingFileSink.
On Thu, Jul 30, 2020 at 5:34 PM Mariano González Núñez <
mariano@hotmail.com>
>
> I am trying to use S3 StreamingFileSink with a high number of active
> buckets (>1000). I found that checkpointing duration will grow linearly
> with the number of active buckets, which makes achieving high number of
> active buckets difficult. One reason for that is the
Hi experts,
I am trying to use S3 StreamingFileSink with a high number of active buckets
(>1000). I found that checkpointing duration will grow linearly with the number
of active buckets, which makes achieving high number of active buckets
difficult. One reason for that is the each act
Hi Flink Team,
I'm Mariano & I'm working with Apache Flink to process data and sink from Kafka
to Azure Datalake (ADLS Gen1).
We are having problems with the sink in parquet format in the ADLS Gen1, also
don't work with the gen2.
We try to do the StreamingFileSink to stor
adoop.io.compress.GzipCodec"
>>>>>
>>>>> val streamingFileSink:StreamingFileSink[String] =
>>>>> StreamingFileSink.forBulkFormat(new
>>>>> Path(outputPath),CompressWriters.forExtractor(new
>>>>> D
uot;org.apache.hadoop.io.compress.GzipCodec"
>>>>
>>>> val streamingFileSink:StreamingFileSink[String] =
>>>> StreamingFileSink.forBulkFormat(new
>>>> Path(outputPath),CompressWriters.forExtractor(new
>>>> DefaultExtractor[String]
odecName = "org.apache.hadoop.io.compress.GzipCodec"
>>>
>>> val streamingFileSink:StreamingFileSink[String] =
>>> StreamingFileSink.forBulkFormat(new
>>> Path(outputPath),CompressWriters.forExtractor(new
>>> DefaultExtractor[String]).withHadoopComp
ds,
>> Ravi
>>
>> On Tue, Jul 28, 2020 at 8:03 PM Vijayendra Yadav
>> wrote:
>>
>>> Hi Team,
>>>
>>> Is there a way to enable compression in StreamingFileSink API for
>>> Row-encoded formats ?.
>>>
>>> val sink: StreamingFileSink[String] = StreamingFileSink
>>> .forRowFormat(new Path(outputPath), new
>>> SimpleStringEncoder[String]("UTF-8"))
>>>
>>>
>>> Regards,
>>> Vijay
>>>
>>
codecName)).build()
>
> Regards,
> Ravi
>
> On Tue, Jul 28, 2020 at 8:03 PM Vijayendra Yadav
> wrote:
>
>> Hi Team,
>>
>> Is there a way to enable compression in StreamingFileSink API for
>> Row-encoded formats ?.
>>
>> val sink: StreamingFil
tor[String]).withHadoopCompression(codecName)).build()
Regards,
Ravi
On Tue, Jul 28, 2020 at 8:03 PM Vijayendra Yadav
wrote:
> Hi Team,
>
> Is there a way to enable compression in StreamingFileSink API for
> Row-encoded formats ?.
>
> val sink: StreamingFileSink[String] = StreamingFileSink
Hi Team,
Is there a way to enable compression in StreamingFileSink API for
Row-encoded formats ?.
val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path(outputPath), new
SimpleStringEncoder[String]("UTF-8"))
Regards,
Vijay
Hi Kostas,
I'm confused about the implementation of the temporary workaround. Would it
be possible to get a little more detail?
> you could simply not stop the job after the whole input is processed
How does one determine when the job has processed the whole input?
> then wait until the outp
Oh shoot, I replied privately. Let me forward the responses to the list
including the solution.
-- Forwarded message -
From: orionemail
Date: Thu, Apr 23, 2020 at 3:38 PM
Subject: Re: StreamingFileSink to a S3 Bucket on a remote account using STS
To: Arvid Heise
Hi,
Thanks
Hey,
sorry for the late response.
Can you provide the full exceptions(s) including stacktrace you are seeing?
On Mon, Apr 20, 2020 at 3:39 PM orionemail
wrote:
> Hi,
>
> New to both AWS and Flink but currently have a need to write incoming data
> into a S3 bucket managed via AWS Tempory credent
Thanks @Seth Wiesman and all.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
his line and to see what happened.
>
>
> Best,
> Leonard
>
>
>
> > 在 2020年4月21日,17:47,Averell mailto:lvhu...@gmail.com>>
> > 写道:
> >
> > Hello Leonard, Sivaprasanna,
> >
> > But my code was working fine with Flink v1.8.
>
17:47,Averell 写道:
> >
> > Hello Leonard, Sivaprasanna,
> >
> > But my code was working fine with Flink v1.8.
> > I also tried with a simple String DataStream, and got the same error.
> > /StreamingFileSink
> > .forRowFormat(new Path(path), new Sim
got the same error.
> /StreamingFileSink
> .forRowFormat(new Path(path), new SimpleStringEncoder[String]())
> .withRollingPolicy(DefaultRollingPolicy.builder().build())
> .withBucketAssigner(new DateTimeBucketAssigner)
> .build()/
> (screensho
Hello Leonard, Sivaprasanna,
But my code was working fine with Flink v1.8.
I also tried with a simple String DataStream, and got the same error.
/StreamingFileSink
.forRowFormat(new Path(path), new SimpleStringEncoder[String]())
.withRollingPolicy
I agree with Leonard. I have just tried the same in Scala 2.11 with Flink
1.10.0 and it works just fine.
Cheers,
Sivaprasanna
On Tue, Apr 21, 2020 at 12:53 PM Leonard Xu wrote:
> Hi, Averell
>
> I guess it’s none of `#withRollingPolicy` and `#withBucketAssigner` and
> may cause by generics typ
Hi, Averell
I guess it’s none of `#withRollingPolicy` and `#withBucketAssigner` and may
cause by generics type
that your Encoder’s element type(IN) does not match BucketAssigner element type(IN) or
you lost the generics type information when instantiate them.
Could you post more code phase?
Hi,
I tried to add the following cast, and it works. Doesn't look nice though
/ StreamingFileSink
.forRowFormat(new Path(path), myEncoder)
.withRollingPolicy(DefaultRollingPolicy.create().build())
.withBucketAssigner(myBucketAss
Hi,
New to both AWS and Flink but currently have a need to write incoming data into
a S3 bucket managed via AWS Tempory credentials.
I am unable to get this to work, but I am not entirely sure on the steps
needed. I can write to S3 buckets that are not 'remote' and managed by STS
tempory cred
Hi Sivaprasanna,
That is a compile-time error, not a runtime error.
/value build is not a member of ?0
possible cause: maybe a semicolon is missing before `value build'?/.
There won't be any issue with either *withRollingPolicy*() or
/withBucketAssigner/(), but not both.
Thanks and regards,
Av
Hi Averell,
Can you please the complete stacktrace of the error?
On Mon, Apr 20, 2020 at 4:48 PM Averell wrote:
> Hi,
>
> I have the following code:
> / StreamingFileSink
> .forRowFormat(new Path(path), myEncoder)
>
Hi,
I have the following code:
/ StreamingFileSink
.forRowFormat(new Path(path), myEncoder)
.withRollingPolicy(DefaultRollingPolicy.create().build())
.withBucketAssigner(myBucketAssigner)
.build()/
This is working fine in Flink
rs!
>>>> Unfortunately there is no progress on the FLIP (or the issue).
>>>>
>>>> @ Austin In the meantime, what you could do --assuming that your input
>>>> is bounded -- you could simply not stop the job after the whole input is
>>>> proces
you could simply not stop the job after the whole input is
>>> processed, then wait until the output is committed, and then cancel the
>>> job. I know and I agree that this is not an elegant solution but it is a
>>> temporary workaround.
>>>
>>> Hopefully th
solution but it is a
>> temporary workaround.
>>
>> Hopefully the FLIP and related issue is going to be prioritised soon.
>>
>> Cheers,
>> Kostas
>>
>> On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch wrote:
>>
>>> Hi,
>>>
>
the
> job. I know and I agree that this is not an elegant solution but it is a
> temporary workaround.
>
> Hopefully the FLIP and related issue is going to be prioritised soon.
>
> Cheers,
> Kostas
>
> On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch wrote:
>
>> Hi,
>&
> This happens because StreamingFileSink does not support a finite input
> stream.
> In the docs it's mentioned under "Important Considerations":
>
> [image: image.png]
>
> This behaviour often surprises users...
>
> There's a FLIP
> <https://cwiki.apach
Hi,
This happens because StreamingFileSink does not support a finite input
stream.
In the docs it's mentioned under "Important Considerations":
[image: image.png]
This behaviour often surprises users...
There's a FLIP
<https://cwiki.apache.org/confluence/display/FL
://github.com/austince/flink-streaming-file-sink-compression
On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas wrote:
> Hi Austin,
>
> Dawid is correct in that you need to enable checkpointing for the
> StreamingFileSink to work.
>
> I hope this solves the problem,
> Kostas
>
Hi Austin,
Dawid is correct in that you need to enable checkpointing for the
StreamingFileSink to work.
I hope this solves the problem,
Kostas
On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
wrote:
>
> Hi Austing,
>
> If I am not mistaken the StreamingFileSink by defaul
Hi Austing,
If I am not mistaken the StreamingFileSink by default flushes on
checkpoints. If you don't have checkpoints enabled it might happen that
not all data is flushed.
I think you can also adjust that behavior with:
forBulkFormat(...)
.withRollingPolicy(/* your custom logic */)
I
Hi there,
Using Flink 1.9.1, trying to write .tgz files with the
StreamingFileSink#BulkWriter. It seems like flushing the output stream
doesn't flush all the data written. I've verified I can create valid files
using the same APIs and data on there own, so thinking it must be something
Hey Timo,
Thanks for the assignment link! Looks like most of my issues can be solved
by getting better acquainted with Java file APIs and not in Flink-land.
Best,
Austin
On Wed, Feb 19, 2020 at 6:48 AM Timo Walther wrote:
> Hi Austin,
>
> the StreamingFileSink allows bucketing t
Hi Austin,
the StreamingFileSink allows bucketing the output data.
This should help for your use case:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#bucket-assignment
Regards,
Timo
On 19.02.20 01:00, Austin Cawley-Edwards wrote:
Following up on
Following up on this -- does anyone know if it's possible to stream
individual files to a directory using the StreamingFileSink? For instance,
if I want all records that come in during a certain day to be
partitioned into daily directories:
2020-02-18/
large-file-1.txt
large-file-2.txt
Hey all,
Has anyone had success using the StreamingFileSink[1] to write CSV files?
And if so, what about compressed (Gzipped, ideally) files/ which libraries
did you use?
Best,
Austin
[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
Hi Ken, Jingsong and Li,
Sorry for the late reply.
As Jingsong pointed out, upon calling close() the StreamingFileSink
does not commit the in-progress/pending files.
The reason for this is that the close() method of any UDF including
sink functions is called on both normal termination and
l close" from
> "success finish close" in StreamingFileSink?
>
> Best,
> Jingsong Lee
>
> On Mon, Dec 9, 2019 at 10:32 PM Kostas Kloudas <mailto:kklou...@gmail.com>> wrote:
> Hi Li,
>
> This is the expected behavior. All the "exactly-once&q
will move the final temp files to
output path, so the result of this job is wrong.
Do you have any idea about this? Can we distinguish "fail close" from
"success finish close" in StreamingFileSink?
Best,
Jingsong Lee
On Mon, Dec 9, 2019 at 10:32 PM Kostas Kloudas wrote:
>
> behavior? If so, the docs should probably be updated.
>
> Thanks,
> Li
>
> On Fri, Dec 6, 2019 at 2:01 PM Li Peng wrote:
>>
>> Hey folks, I'm trying to get StreamingFileSink to write to s3 every minute,
>> with flink-s3-fs-hadoop, and based on the default r
ut based on this
experience, StreamingFileSink.forRowFormat() requires it too! Is this the
intended behavior? If so, the docs should probably be updated.
Thanks,
Li
On Fri, Dec 6, 2019 at 2:01 PM Li Peng wrote:
> Hey folks, I'm trying to get StreamingFileSink to write to s3 every
&g
Hey folks, I'm trying to get StreamingFileSink to write to s3 every minute,
with flink-s3-fs-hadoop, and based on the default rolling policy, which is
configured to "roll" every 60 seconds, I thought that would be automatic (I
interpreted rolling to mean actually close a multipa
t
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>> at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>> at
>> org.apache.flink.fs.s3base.s
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> ... 42 more
>
> Maybe I need to add the directory level as a resource?
>
> resources = [
> "arn:aws:s3:::bucket-name",
> "arn:aws:s3:::bucket-name/",
>
receive from AWS?
>
>
> Li Peng-2 wrote
> > Hey folks, I'm trying to use StreamingFileSink with s3, using IAM roles
> > for
> > auth. Does anyone know what permissions the role should have for the
> > specified s3 bucket to work properly? I've been gett
Hi Li,
Could you please list the permissions you see and the error message you
receive from AWS?
Li Peng-2 wrote
> Hey folks, I'm trying to use StreamingFileSink with s3, using IAM roles
> for
> auth. Does anyone know what permissions the role should have for the
> specified
Hey folks, I'm trying to use StreamingFileSink with s3, using IAM roles for
auth. Does anyone know what permissions the role should have for the
specified s3 bucket to work properly? I've been getting some auth errors,
and I suspect I'm missing some permissions:
data "aws
Hi,
StreamingFileSink would not remove committed files, so if you use a non-latest
checkpoint to restore state, you may need to perform a manual cleanup.
WRT the part id issue, StreamingFileSink will track the global max part number,
and use this value + 1 as the new id upon restoring. In this
1 - 100 of 230 matches
Mail list logo