Re: StreamingFileSink question

2022-08-31 Thread David Anderson
MR 6.4. I have an existing application > using DataStream API that I would like to modify to write output to S3. I > am testing the StreamingFileSink with a bounded input. I have enabled > checkpointing. > > A couple questions: > 1) When the program finishes, all the files remai

StreamingFileSink question

2022-08-31 Thread David Clutter
I am using Flink 1.13.1 on AWS EMR 6.4. I have an existing application using DataStream API that I would like to modify to write output to S3. I am testing the StreamingFileSink with a bounded input. I have enabled checkpointing. A couple questions: 1) When the program finishes, all the files

Re: StreamingFileSink & checkpoint tuning

2022-07-02 Thread Weihua Hu
> > Best regards, > Yuxia > > -- > *发件人: *"Xin Ma" > *收件人: *"User" > *发送时间: *星期四, 2022年 6 月 30日 下午 11:05:51 > *主题: *StreamingFileSink & checkpoint tuning > > Hi, > > I recently encountered an issue while usin

Re: StreamingFileSink & checkpoint tuning

2022-06-30 Thread yuxia
gards, Yuxia 发件人: "Xin Ma" 收件人: "User" 发送时间: 星期四, 2022年 6 月 30日 下午 11:05:51 主题: StreamingFileSink & checkpoint tuning Hi, I recently encountered an issue while using StreamingFileSink. I have a flink job consuming records from various sources and write to s3 w

StreamingFileSink & checkpoint tuning

2022-06-30 Thread Xin Ma
Hi, I recently encountered an issue while using StreamingFileSink. I have a flink job consuming records from various sources and write to s3 with streaming file sink. But the job sometimes fails due to checkpoint timeout, and the root cause is checkpoint alignment failure as there is data

StreamingFileSink bulk formats - small files

2022-03-03 Thread Kamil ty
Hello all, In multiple jobs I'm saving data using the datastream API with StreamingFileSink and various bulk formats (avro, parquet). As bulk formats require a rolling policy that extends the CheckpointRollingPolicy I have created a policy that rolls on file size additionally. Unfortunatel

Re: PyFlink StreamingFileSink bulk-encoded format (Avro)

2021-08-17 Thread Dian Fu
Hi Kamil, AFAIK, it should still not support Avro format in Python StreamingFileSink in the Python DataStream API. However, I guess you could convert DataStream to Table[1] and then you could use all the connectors supported in the Table & SQL. In this case, you could use the FileSy

PyFlink StreamingFileSink bulk-encoded format (Avro)

2021-08-17 Thread Kamil ty
Hello, I'm trying to save my data stream to an Avro file on HDFS. In Flink documentation I can only see explanations for Java/Scala. However, I can't seem to find a way to do it in PyFlink. Is this possible to do in PyFlink currently? Kind Regards Kamil

Re: Issue with writing to Azure blob storage using StreamingFileSink and FileSink

2021-08-03 Thread Robert Metzger
t 10:29 AM Sudhanva Purushothama < sudha...@coffeebeans.io> wrote: > Hello, > I have been trying to Use StreamingFileSink to write to parquetFiles > into azure blob storage. I am getting the following error. I did see in the > ticket https://issues.apache.org/jira/browse/

Issue with writing to Azure blob storage using StreamingFileSink and FileSink

2021-07-30 Thread Sudhanva Purushothama
Hello,     I have been trying to Use StreamingFileSink to write to parquetFiles into azure blob storage. I am getting the following error. I did see in the ticket https://issues.apache.org/jira/browse/FLINK-17444 that support for StreamingFileSink is not yet provided. code.java Description

Re: StreamingFileSink output formatting to CSV

2021-06-03 Thread Chesnay Schepler
last character. On 6/3/2021 3:45 PM, Robert Cullen wrote: I have a StreamingFileSink that writes to S3: |final StreamingFileSink> sink = StreamingFileSink.forRowFormat( new Path("s3://argo-artifacts/files"), new SimpleStringEncoder>("UTF-8")) .withBucke

StreamingFileSink output formatting to CSV

2021-06-03 Thread Robert Cullen
I have a StreamingFileSink that writes to S3: final StreamingFileSink> sink = StreamingFileSink.forRowFormat( new Path("s3://argo-artifacts/files"), new SimpleStringEn

Re: StreamingFileSink only writes data to MINIO during savepoint

2021-05-31 Thread David Anderson
The StreamingFileSink requires that you have checkpointing enabled. I'm guessing that you don't have checkpointing enabled, since that would explain the behavior you are seeing. The relevant section of the docs [1] explains: Checkpointing needs to be enabled when using the Streami

StreamingFileSink only writes data to MINIO during savepoint

2021-05-28 Thread Robert Cullen
On my kubernetes cluster when I set the StreamingFileSink to write to a local instance of S3 (MINIO - 500 GB) it only writes the data after I execute a savepoint The expected behavior is to write the data in real-time. I'm guessing the memory requirements have not been met or a configurati

Re: Dynamic StreamingFileSink

2021-02-06 Thread Sidney Feiner
If anybody is interested, I've implemented a StreamingFileSink with dynamic paths: https://github.com/sidfeiner/DynamicPathFileSink Sidney Feiner / Data Platform Developer M: +972.528197720 / Skype: sidney.feiner.startapp [emailsignature] From: Rafi

File not generated using StreamingFileSink path 1.12.0

2021-01-22 Thread Robert Cullen
I’m trying to stream data to a file on an S3 compatible system (MINIO): DataStream resultStream = tEnv.toAppendStream(log_counts, Types.ROW(Types.STRING, Types.STRING, Types.LONG)); final StreamingFileSink sink = StreamingFileSink.forRowFormat( new Path("s3:/

AW: AW: StreamingFileSink with ParquetAvroWriters

2021-01-15 Thread Jan Oelschlegel
Sorry, wrong build. It is not in the jar. Von: Jan Oelschlegel Gesendet: Freitag, 15. Januar 2021 12:52 An: Dawid Wysakowicz ; user@flink.apache.org Betreff: AW: AW: StreamingFileSink with ParquetAvroWriters Yes, after unzipping it is in the jar: [cid:image001.jpg@01D6EB3F.136D8560] Von

AW: AW: StreamingFileSink with ParquetAvroWriters

2021-01-15 Thread Jan Oelschlegel
Yes, after unzipping it is in the jar: [cid:image002.jpg@01D6EB3D.33BCC530] Von: Dawid Wysakowicz Gesendet: Freitag, 15. Januar 2021 12:10 An: Jan Oelschlegel ; user@flink.apache.org Betreff: Re: AW: StreamingFileSink with ParquetAvroWriters Have you checked if the class (org/apache/parquet

AW: StreamingFileSink with ParquetAvroWriters

2021-01-15 Thread Jan Oelschlegel
:34 An: Jan Oelschlegel ; user@flink.apache.org Betreff: Re: StreamingFileSink with ParquetAvroWriters Hi Jan, Could you have a try by adding this dependency ? org.apache.parquet parquet-avro 1.11.1 Best, Yun --Original Mail -- Sender:Jan

Re: AW: StreamingFileSink with ParquetAvroWriters

2021-01-15 Thread Dawid Wysakowicz
rg/projects/flink/flink-docs-release-1.11/dev/project-configuration.html#create-project > >   > > Best, > > Jan > >   > >   > > *Von:*Dawid Wysakowicz > *Gesendet:* Donnerstag, 14. Januar 2021 12:42 > *An:* Jan Oelschlegel ; > user@flink.apache.org >

AW: StreamingFileSink with ParquetAvroWriters

2021-01-15 Thread Jan Oelschlegel
] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/project-configuration.html#create-project Best, Jan Von: Dawid Wysakowicz Gesendet: Donnerstag, 14. Januar 2021 12:42 An: Jan Oelschlegel ; user@flink.apache.org Betreff: Re: StreamingFileSink with ParquetAvroWriters Hi Jan Could

Re: StreamingFileSink with ParquetAvroWriters

2021-01-14 Thread Dawid Wysakowicz
would like to use the StreamingFileSink > for writing into HDFS in Parquet format. > >   > > As it says in the documentation I have added the dependencies: > >   > > >    org.apache.flink >    flink-parquet_${scala.binary.version} >    ${flink.version} > >

Re: StreamingFileSink with ParquetAvroWriters

2021-01-13 Thread Yun Gao
Subject:StreamingFileSink with ParquetAvroWriters Hi, i’m using Flink (v.1.11.2) and would like to use the StreamingFileSink for writing into HDFS in Parquet format. As it says in the documentation I have added the dependencies: org.apache.flink flink-parquet_${scala.binary.version} ${flink.version} And

StreamingFileSink with ParquetAvroWriters

2021-01-13 Thread Jan Oelschlegel
Hi, i'm using Flink (v.1.11.2) and would like to use the StreamingFileSink for writing into HDFS in Parquet format. As it says in the documentation I have added the dependencies: org.apache.flink flink-parquet_${scala.binary.version} ${flink.version} And this is my file

Re: Dynamic StreamingFileSink

2020-12-26 Thread Rafi Aroch
Hi Sidney, Have a look at implementing a BucketAssigner for StreamingFileSink: https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#bucket-assignment Rafi On Sat, Dec 26, 2020 at 11:48 PM Sidney Feiner wrote: > Hey, > I would like to create a d

Dynamic StreamingFileSink

2020-12-26 Thread Sidney Feiner
Hey, I would like to create a dynamic StreamingFileSink for my Streaming pipeline. By dynamic, I mean that it will write to a different directory based on the input. For example, redirect the row to a different directory based on the first 2 characters of the input, so if the content I'm wr

Re: StreamingFileSink closed file exception

2020-12-24 Thread Yun Gao
Hi Billy, StreamingFileSink does not expect the Encoder to close the stream passed in in encode method. However, ObjectMapper would close it at the end of the write method. Thus I think you think disable the close action for ObjectMapper, or change the encode implementation to

StreamingFileSink closed file exception

2020-12-24 Thread Billy Bain
= StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); DataStreamSource lines = env.readTextFile("file:///path/to/file/input.json"); SingleOutputStreamOperator android = lines.map(new AndroidDataMapper()); StreamingFileSink sink = StreamingFileSink.forRowForma

Re: Dynamic file name prefix - StreamingFileSink

2020-10-14 Thread Piotr Nowojski
> >>>> >>>> Regards, >>>> Ravi >>>> >>>> On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav >>>> wrote: >>>> >>>>> Hi Team, >>>>> >>>>> I have tried to assign a dyna

Re: Dynamic file name prefix - StreamingFileSink

2020-10-14 Thread Vijayendra Yadav
*The Problem is Job always takes initial datetime when job first starts >>>> and never refreshes later. * >>>> *How can I get dynamic current datetime in filename at sink time ?* >>>> >>>> *.withPartPrefix >>>> (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("-MM-dd-HH-mm-ss-SSS")))* >>>> >>>> >>>> >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html >>>> >>>> val config = OutputFileConfig >>>> .builder() .withPartPrefix("prefix") >>>> .withPartSuffix(".ext") >>>> .build() >>>> val sink = StreamingFileSink >>>> .forRowFormat(new Path(outputPath), new >>>> SimpleStringEncoder[String]("UTF-8")) >>>> .withBucketAssigner(new KeyBucketAssigner()) >>>> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >>>> .withOutputFileConfig(config) >>>> .build() >>>> >>>>

Re: Dynamic file name prefix - StreamingFileSink

2020-10-14 Thread Piotr Nowojski
;>> *How can I get dynamic current datetime in filename at sink time ?* >>> >>> *.withPartPrefix >>> (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("-MM-dd-HH-mm-ss-SSS")))* >>> >

Re: Dynamic file name prefix - StreamingFileSink

2020-10-13 Thread Vijayendra Yadav
current datetime in filename at sink time ?* >> >> *.withPartPrefix >> (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("-MM-dd-HH-mm-ss-SSS")))* >> >> >> >> https://ci.apache.org

Re: Dynamic file name prefix - StreamingFileSink

2020-10-13 Thread Ravi Bhushan Ratnakar
)).format(DateTimeFormatter.ofPattern("-MM-dd-HH-mm-ss-SSS")))* > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html > > val config = OutputFileConfig > .builder() .withPartPrefix("prefix")

Dynamic file name prefix - StreamingFileSink

2020-10-12 Thread Vijayendra Yadav
") .withPartSuffix(".ext") .build() val sink = StreamingFileSink .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8")) .withBucketAssigner(new KeyBucketAssigner()) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .withOutputFileConfig(config) .build()

Re: S3 StreamingFileSink issues

2020-10-07 Thread Dan Diephouse
link-docs-release-1.11/dev/connectors/streamfile_sink.html#general >>> [2] >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs >>> [3] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s

Re: S3 StreamingFileSink issues

2020-10-07 Thread David Anderson
Fs >> [3] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials >> >> >> On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse wrote: >> >>> First, let me say, Flink is super cool - thanks everyone f

Re: S3 StreamingFileSink issues

2020-10-07 Thread Dan Diephouse
> [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials > > > On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse wrote: > >> First, let me say, Flink is super cool - thanks everyone for making my >> life easie

Re: S3 StreamingFileSink issues

2020-10-07 Thread David Anderson
Wish I had this 10 years ago > > Onto the fun stuff: I am attempting to use the StreamingFileSink with S3. > Note that Flink is embedded in my app, not running as a standalone cluster. > > I am having a few problems, which I have illustrated in the small test > case below.

S3 StreamingFileSink issues

2020-10-06 Thread Dan Diephouse
First, let me say, Flink is super cool - thanks everyone for making my life easier in a lot of ways! Wish I had this 10 years ago Onto the fun stuff: I am attempting to use the StreamingFileSink with S3. Note that Flink is embedded in my app, not running as a standalone cluster. I am having

Re: Should the StreamingFileSink mark the files "finished" when all bounded input sources are depleted?

2020-09-08 Thread Aljoscha Krettek
Flink you’re using). From an email conversation with Kostas in January of this year: Hi Ken, Jingsong and Li, Sorry for the late reply. As Jingsong pointed out, upon calling close() the StreamingFileSink does not commit the in-progress/pending files. The reason for this is that the close() meth

Should the StreamingFileSink mark the files "finished" when all bounded input sources are depleted?

2020-09-07 Thread Teunissen, F.G.J. (Fred)
Hi All, My flink-job is using bounded input sources and writes the results to a StreamingFileSink. When it has processed all the input the job is finished and closes. But the output files are still named “-0-0..inprogress.”. I expected them to be named ““-0-0.”. Did I forget some setting or

Re: Should the StreamingFileSink mark the files "finished" when all bounded input sources are depleted?

2020-09-07 Thread Ken Krugler
n calling close() the StreamingFileSink > does not commit the in-progress/pending files. > The reason for this is that the close() method of any UDF including > sink functions is called on both normal termination and termination > due to failure. > Given this, we cannot commit the fil

Re: Flink OnCheckpointRollingPolicy streamingfilesink

2020-08-30 Thread Vijayendra Yadav
Thank You Andrey. Regards, Vijay > On Aug 29, 2020, at 3:38 AM, Andrey Zagrebin wrote: > >  > Hi Vijay, > > I would apply the same judgement. It is latency vs throughput vs spent > resources vs practical need. > > The more concurrent checkpoints your system is capable of handling, the > b

Re: Flink OnCheckpointRollingPolicy streamingfilesink

2020-08-29 Thread Andrey Zagrebin
Hi Vijay, I would apply the same judgement. It is latency vs throughput vs spent resources vs practical need. The more concurrent checkpoints your system is capable of handling, the better end-to-end result latency you will observe and see computation results more frequently. On the other hand yo

Re: Flink OnCheckpointRollingPolicy streamingfilesink

2020-08-28 Thread Vijayendra Yadav
Hi Andrey, Thanks, what is recommendation for : env.getCheckpointConfig. *setMaxConcurrentCheckpoints*(concurrentchckpt) ? 1 or higher based on what factor. Regards, Vijay On Tue, Aug 25, 2020 at 8:55 AM Andrey Zagrebin wrote: > Hi Vijay, > > I think it depends on your job requirements, in

Re: Flink OnCheckpointRollingPolicy streamingfilesink

2020-08-25 Thread Andrey Zagrebin
Hi Vijay, I think it depends on your job requirements, in particular how many records are processed per second and how much resources you have to process them. If the checkpointing interval is short then the checkpointing overhead can be too high and you need more resources to efficiently keep up

Flink OnCheckpointRollingPolicy streamingfilesink

2020-08-24 Thread Vijayendra Yadav
Hi Team, Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls (ONLY) on every checkpoint. *.withRollingPolicy(OnCheckpointRollingPolicy.build())* Question: What are recommended values related to checkpointing to fsstate, should it be more frequent checkpoints, or longer intervals,

RE: BucketingSink & StreamingFileSink

2020-08-11 Thread Mariano González Núñez
Hi Robert, Thanks for the answer... De: Robert Metzger Enviado: martes, 11 de agosto de 2020 3:46 Para: Mariano González Núñez Cc: user@flink.apache.org Asunto: Re: BucketingSink & StreamingFileSink Hi Mariano, thanks a lot for your question. The resolu

Re: BucketingSink & StreamingFileSink

2020-08-11 Thread Robert Metzger
Hi Mariano, thanks a lot for your question. The resolution on StackOverflow seems to be that Azure Datalake is not yet ( https://issues.apache.org/jira/browse/FLINK-18568) supported by the StreamingFileSink. On Thu, Jul 30, 2020 at 5:34 PM Mariano González Núñez < mariano@hotmail.com>

Re: StreamingFileSink: any risk parallelizing active buckets checkpointing?

2020-08-03 Thread Till Rohrmann
> > I am trying to use S3 StreamingFileSink with a high number of active > buckets (>1000). I found that checkpointing duration will grow linearly > with the number of active buckets, which makes achieving high number of > active buckets difficult. One reason for that is the

StreamingFileSink: any risk parallelizing active buckets checkpointing?

2020-07-30 Thread Paul Bernier
Hi experts, I am trying to use S3 StreamingFileSink with a high number of active buckets (>1000). I found that checkpointing duration will grow linearly with the number of active buckets, which makes achieving high number of active buckets difficult. One reason for that is the each act

BucketingSink & StreamingFileSink

2020-07-30 Thread Mariano González Núñez
Hi Flink Team, I'm Mariano & I'm working with Apache Flink to process data and sink from Kafka to Azure Datalake (ADLS Gen1). We are having problems with the sink in parquet format in the ADLS Gen1, also don't work with the gen2. We try to do the StreamingFileSink to stor

Re: Compression Streamingfilesink ROW-encoded format

2020-07-29 Thread Vijayendra Yadav
adoop.io.compress.GzipCodec" >>>>> >>>>> val streamingFileSink:StreamingFileSink[String] = >>>>> StreamingFileSink.forBulkFormat(new >>>>> Path(outputPath),CompressWriters.forExtractor(new >>>>> D

Re: Compression Streamingfilesink ROW-encoded format

2020-07-29 Thread Ravi Bhushan Ratnakar
uot;org.apache.hadoop.io.compress.GzipCodec" >>>> >>>> val streamingFileSink:StreamingFileSink[String] = >>>> StreamingFileSink.forBulkFormat(new >>>> Path(outputPath),CompressWriters.forExtractor(new >>>> DefaultExtractor[String]

Re: Compression Streamingfilesink ROW-encoded format

2020-07-28 Thread Vijayendra Yadav
odecName = "org.apache.hadoop.io.compress.GzipCodec" >>> >>> val streamingFileSink:StreamingFileSink[String] = >>> StreamingFileSink.forBulkFormat(new >>> Path(outputPath),CompressWriters.forExtractor(new >>> DefaultExtractor[String]).withHadoopComp

Re: Compression Streamingfilesink ROW-encoded format

2020-07-28 Thread Ravi Bhushan Ratnakar
ds, >> Ravi >> >> On Tue, Jul 28, 2020 at 8:03 PM Vijayendra Yadav >> wrote: >> >>> Hi Team, >>> >>> Is there a way to enable compression in StreamingFileSink API for >>> Row-encoded formats ?. >>> >>> val sink: StreamingFileSink[String] = StreamingFileSink >>> .forRowFormat(new Path(outputPath), new >>> SimpleStringEncoder[String]("UTF-8")) >>> >>> >>> Regards, >>> Vijay >>> >>

Re: Compression Streamingfilesink ROW-encoded format

2020-07-28 Thread Vijayendra Yadav
codecName)).build() > > Regards, > Ravi > > On Tue, Jul 28, 2020 at 8:03 PM Vijayendra Yadav > wrote: > >> Hi Team, >> >> Is there a way to enable compression in StreamingFileSink API for >> Row-encoded formats ?. >> >> val sink: StreamingFil

Re: Compression Streamingfilesink ROW-encoded format

2020-07-28 Thread Ravi Bhushan Ratnakar
tor[String]).withHadoopCompression(codecName)).build() Regards, Ravi On Tue, Jul 28, 2020 at 8:03 PM Vijayendra Yadav wrote: > Hi Team, > > Is there a way to enable compression in StreamingFileSink API for > Row-encoded formats ?. > > val sink: StreamingFileSink[String] = StreamingFileSink

Compression Streamingfilesink ROW-encoded format

2020-07-28 Thread Vijayendra Yadav
Hi Team, Is there a way to enable compression in StreamingFileSink API for Row-encoded formats ?. val sink: StreamingFileSink[String] = StreamingFileSink .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8")) Regards, Vijay

Re: StreamingFileSink Not Flushing All Data

2020-07-05 Thread hubertchen
Hi Kostas, I'm confused about the implementation of the temporary workaround. Would it be possible to get a little more detail? > you could simply not stop the job after the whole input is processed How does one determine when the job has processed the whole input? > then wait until the outp

Fwd: StreamingFileSink to a S3 Bucket on a remote account using STS

2020-04-29 Thread Arvid Heise
Oh shoot, I replied privately. Let me forward the responses to the list including the solution. -- Forwarded message - From: orionemail Date: Thu, Apr 23, 2020 at 3:38 PM Subject: Re: StreamingFileSink to a S3 Bucket on a remote account using STS To: Arvid Heise Hi, Thanks

Re: StreamingFileSink to a S3 Bucket on a remote account using STS

2020-04-29 Thread Robert Metzger
Hey, sorry for the late response. Can you provide the full exceptions(s) including stacktrace you are seeing? On Mon, Apr 20, 2020 at 3:39 PM orionemail wrote: > Hi, > > New to both AWS and Flink but currently have a need to write incoming data > into a S3 bucket managed via AWS Tempory credent

Re: Change to StreamingFileSink in Flink 1.10

2020-04-22 Thread Averell
Thanks @Seth Wiesman and all. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Change to StreamingFileSink in Flink 1.10

2020-04-21 Thread Leonard Xu
his line and to see what happened. > > > Best, > Leonard > > > > > 在 2020年4月21日,17:47,Averell mailto:lvhu...@gmail.com>> > > 写道: > > > > Hello Leonard, Sivaprasanna, > > > > But my code was working fine with Flink v1.8. >

Re: Change to StreamingFileSink in Flink 1.10

2020-04-21 Thread Seth Wiesman
17:47,Averell 写道: > > > > Hello Leonard, Sivaprasanna, > > > > But my code was working fine with Flink v1.8. > > I also tried with a simple String DataStream, and got the same error. > > /StreamingFileSink > > .forRowFormat(new Path(path), new Sim

Re: Change to StreamingFileSink in Flink 1.10

2020-04-21 Thread Leonard Xu
got the same error. > /StreamingFileSink > .forRowFormat(new Path(path), new SimpleStringEncoder[String]()) > .withRollingPolicy(DefaultRollingPolicy.builder().build()) > .withBucketAssigner(new DateTimeBucketAssigner) > .build()/ > (screensho

Re: Change to StreamingFileSink in Flink 1.10

2020-04-21 Thread Averell
Hello Leonard, Sivaprasanna, But my code was working fine with Flink v1.8. I also tried with a simple String DataStream, and got the same error. /StreamingFileSink .forRowFormat(new Path(path), new SimpleStringEncoder[String]()) .withRollingPolicy

Re: Change to StreamingFileSink in Flink 1.10

2020-04-21 Thread Sivaprasanna
I agree with Leonard. I have just tried the same in Scala 2.11 with Flink 1.10.0 and it works just fine. Cheers, Sivaprasanna On Tue, Apr 21, 2020 at 12:53 PM Leonard Xu wrote: > Hi, Averell > > I guess it’s none of `#withRollingPolicy` and `#withBucketAssigner` and > may cause by generics typ

Re: Change to StreamingFileSink in Flink 1.10

2020-04-21 Thread Leonard Xu
Hi, Averell I guess it’s none of `#withRollingPolicy` and `#withBucketAssigner` and may cause by generics type that your Encoder’s element type(IN) does not match BucketAssigner element type(IN) or you lost the generics type information when instantiate them. Could you post more code phase?

Re: Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Averell
Hi, I tried to add the following cast, and it works. Doesn't look nice though / StreamingFileSink .forRowFormat(new Path(path), myEncoder) .withRollingPolicy(DefaultRollingPolicy.create().build()) .withBucketAssigner(myBucketAss

StreamingFileSink to a S3 Bucket on a remote account using STS

2020-04-20 Thread orionemail
Hi, New to both AWS and Flink but currently have a need to write incoming data into a S3 bucket managed via AWS Tempory credentials. I am unable to get this to work, but I am not entirely sure on the steps needed. I can write to S3 buckets that are not 'remote' and managed by STS tempory cred

Re: Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Averell
Hi Sivaprasanna, That is a compile-time error, not a runtime error. /value build is not a member of ?0 possible cause: maybe a semicolon is missing before `value build'?/. There won't be any issue with either *withRollingPolicy*() or /withBucketAssigner/(), but not both. Thanks and regards, Av

Re: Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Sivaprasanna
Hi Averell, Can you please the complete stacktrace of the error? On Mon, Apr 20, 2020 at 4:48 PM Averell wrote: > Hi, > > I have the following code: > / StreamingFileSink > .forRowFormat(new Path(path), myEncoder) >

Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Averell
Hi, I have the following code: / StreamingFileSink .forRowFormat(new Path(path), myEncoder) .withRollingPolicy(DefaultRollingPolicy.create().build()) .withBucketAssigner(myBucketAssigner) .build()/ This is working fine in Flink

Re: StreamingFileSink Not Flushing All Data

2020-03-05 Thread Kostas Kloudas
rs! >>>> Unfortunately there is no progress on the FLIP (or the issue). >>>> >>>> @ Austin In the meantime, what you could do --assuming that your input >>>> is bounded -- you could simply not stop the job after the whole input is >>>> proces

Re: StreamingFileSink Not Flushing All Data

2020-03-04 Thread Austin Cawley-Edwards
you could simply not stop the job after the whole input is >>> processed, then wait until the output is committed, and then cancel the >>> job. I know and I agree that this is not an elegant solution but it is a >>> temporary workaround. >>> >>> Hopefully th

Re: StreamingFileSink Not Flushing All Data

2020-03-04 Thread Kostas Kloudas
solution but it is a >> temporary workaround. >> >> Hopefully the FLIP and related issue is going to be prioritised soon. >> >> Cheers, >> Kostas >> >> On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch wrote: >> >>> Hi, >>> >

Re: StreamingFileSink Not Flushing All Data

2020-03-03 Thread Austin Cawley-Edwards
the > job. I know and I agree that this is not an elegant solution but it is a > temporary workaround. > > Hopefully the FLIP and related issue is going to be prioritised soon. > > Cheers, > Kostas > > On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch wrote: > >> Hi, >&

Re: StreamingFileSink Not Flushing All Data

2020-03-03 Thread Kostas Kloudas
> This happens because StreamingFileSink does not support a finite input > stream. > In the docs it's mentioned under "Important Considerations": > > [image: image.png] > > This behaviour often surprises users... > > There's a FLIP > <https://cwiki.apach

Re: StreamingFileSink Not Flushing All Data

2020-03-02 Thread Rafi Aroch
Hi, This happens because StreamingFileSink does not support a finite input stream. In the docs it's mentioned under "Important Considerations": [image: image.png] This behaviour often surprises users... There's a FLIP <https://cwiki.apache.org/confluence/display/FL

Re: StreamingFileSink Not Flushing All Data

2020-03-02 Thread Austin Cawley-Edwards
://github.com/austince/flink-streaming-file-sink-compression On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas wrote: > Hi Austin, > > Dawid is correct in that you need to enable checkpointing for the > StreamingFileSink to work. > > I hope this solves the problem, > Kostas >

Re: StreamingFileSink Not Flushing All Data

2020-02-24 Thread Kostas Kloudas
Hi Austin, Dawid is correct in that you need to enable checkpointing for the StreamingFileSink to work. I hope this solves the problem, Kostas On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz wrote: > > Hi Austing, > > If I am not mistaken the StreamingFileSink by defaul

Re: StreamingFileSink Not Flushing All Data

2020-02-24 Thread Dawid Wysakowicz
Hi Austing, If I am not mistaken the StreamingFileSink by default flushes on checkpoints. If you don't have checkpoints enabled it might happen that not all data is flushed. I think you can also adjust that behavior with: forBulkFormat(...) .withRollingPolicy(/* your custom logic */) I

StreamingFileSink Not Flushing All Data

2020-02-21 Thread Austin Cawley-Edwards
Hi there, Using Flink 1.9.1, trying to write .tgz files with the StreamingFileSink#BulkWriter. It seems like flushing the output stream doesn't flush all the data written. I've verified I can create valid files using the same APIs and data on there own, so thinking it must be something

Re: CSV StreamingFileSink

2020-02-19 Thread Austin Cawley-Edwards
Hey Timo, Thanks for the assignment link! Looks like most of my issues can be solved by getting better acquainted with Java file APIs and not in Flink-land. Best, Austin On Wed, Feb 19, 2020 at 6:48 AM Timo Walther wrote: > Hi Austin, > > the StreamingFileSink allows bucketing t

Re: CSV StreamingFileSink

2020-02-19 Thread Timo Walther
Hi Austin, the StreamingFileSink allows bucketing the output data. This should help for your use case: https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#bucket-assignment Regards, Timo On 19.02.20 01:00, Austin Cawley-Edwards wrote: Following up on

Re: CSV StreamingFileSink

2020-02-18 Thread Austin Cawley-Edwards
Following up on this -- does anyone know if it's possible to stream individual files to a directory using the StreamingFileSink? For instance, if I want all records that come in during a certain day to be partitioned into daily directories: 2020-02-18/ large-file-1.txt large-file-2.txt

CSV StreamingFileSink

2020-02-18 Thread Austin Cawley-Edwards
Hey all, Has anyone had success using the StreamingFileSink[1] to write CSV files? And if so, what about compressed (Gzipped, ideally) files/ which libraries did you use? Best, Austin [1]: https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html

Re: StreamingFileSink doesn't close multipart uploads to s3?

2020-01-15 Thread Kostas Kloudas
Hi Ken, Jingsong and Li, Sorry for the late reply. As Jingsong pointed out, upon calling close() the StreamingFileSink does not commit the in-progress/pending files. The reason for this is that the close() method of any UDF including sink functions is called on both normal termination and

Re: StreamingFileSink doesn't close multipart uploads to s3?

2020-01-10 Thread Ken Krugler
l close" from > "success finish close" in StreamingFileSink? > > Best, > Jingsong Lee > > On Mon, Dec 9, 2019 at 10:32 PM Kostas Kloudas <mailto:kklou...@gmail.com>> wrote: > Hi Li, > > This is the expected behavior. All the "exactly-once&q

Re: StreamingFileSink doesn't close multipart uploads to s3?

2019-12-09 Thread Jingsong Li
will move the final temp files to output path, so the result of this job is wrong. Do you have any idea about this? Can we distinguish "fail close" from "success finish close" in StreamingFileSink? Best, Jingsong Lee On Mon, Dec 9, 2019 at 10:32 PM Kostas Kloudas wrote: >

Re: StreamingFileSink doesn't close multipart uploads to s3?

2019-12-09 Thread Kostas Kloudas
> behavior? If so, the docs should probably be updated. > > Thanks, > Li > > On Fri, Dec 6, 2019 at 2:01 PM Li Peng wrote: >> >> Hey folks, I'm trying to get StreamingFileSink to write to s3 every minute, >> with flink-s3-fs-hadoop, and based on the default r

Re: StreamingFileSink doesn't close multipart uploads to s3?

2019-12-06 Thread Li Peng
ut based on this experience, StreamingFileSink.forRowFormat() requires it too! Is this the intended behavior? If so, the docs should probably be updated. Thanks, Li On Fri, Dec 6, 2019 at 2:01 PM Li Peng wrote: > Hey folks, I'm trying to get StreamingFileSink to write to s3 every &g

StreamingFileSink doesn't close multipart uploads to s3?

2019-12-06 Thread Li Peng
Hey folks, I'm trying to get StreamingFileSink to write to s3 every minute, with flink-s3-fs-hadoop, and based on the default rolling policy, which is configured to "roll" every 60 seconds, I thought that would be automatic (I interpreted rolling to mean actually close a multipa

Re: What S3 Permissions does StreamingFileSink need?

2019-12-06 Thread Li Peng
t >> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) >> at >> org.apache.flink.fs.s3base.s

Re: What S3 Permissions does StreamingFileSink need?

2019-12-06 Thread Khachatryan Roman
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) > ... 42 more > > Maybe I need to add the directory level as a resource? > > resources = [ > "arn:aws:s3:::bucket-name", > "arn:aws:s3:::bucket-name/", >

Re: What S3 Permissions does StreamingFileSink need?

2019-12-05 Thread Li Peng
receive from AWS? > > > Li Peng-2 wrote > > Hey folks, I'm trying to use StreamingFileSink with s3, using IAM roles > > for > > auth. Does anyone know what permissions the role should have for the > > specified s3 bucket to work properly? I've been gett

Re: What S3 Permissions does StreamingFileSink need?

2019-12-05 Thread r_khachatryan
Hi Li, Could you please list the permissions you see and the error message you receive from AWS? Li Peng-2 wrote > Hey folks, I'm trying to use StreamingFileSink with s3, using IAM roles > for > auth. Does anyone know what permissions the role should have for the > specified

What S3 Permissions does StreamingFileSink need?

2019-12-04 Thread Li Peng
Hey folks, I'm trying to use StreamingFileSink with s3, using IAM roles for auth. Does anyone know what permissions the role should have for the specified s3 bucket to work properly? I've been getting some auth errors, and I suspect I'm missing some permissions: data "aws

Re: StreamingFileSink duplicate data

2019-11-21 Thread Paul Lam
Hi, StreamingFileSink would not remove committed files, so if you use a non-latest checkpoint to restore state, you may need to perform a manual cleanup. WRT the part id issue, StreamingFileSink will track the global max part number, and use this value + 1 as the new id upon restoring. In this

  1   2   3   >