Weird behaviour testing TwoPhaseCommit

2020-03-05 Thread David Magalhães
I've implemented a CustomSink with TwoPhaseCommit. To test this I've create a test using the baselines of this [1] one, and it works fine. To test the integration with S3 (and with an exponential back off), I've tried to implement a new test, using the following code: ... val invalidWriter = writ

Re: Weird behaviour testing TwoPhaseCommit

2020-03-05 Thread David Magalhães
unately, there is > already a working implementation in our test bed. > > https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java > > On Thu, Mar 5, 2020 at 7:54 PM David Magalhães > wrote:

Parallisation of S3 write sink

2020-04-03 Thread David Magalhães
I have a scenario where multiple small files need to be written on S3. I'm using TwoPhaseCommit sink since I have a specific scenario where I can't use StreamingFileSink. I've notice that because the way the S3 write is done (sequencially), the checkpoint is timining out (10 minutes), because it t

Re: Parallisation of S3 write sink

2020-04-03 Thread David Magalhães
olerate at least once guarantees, then you > could try to build an async operator which writes files to S3. But you > could do the same in your custom TwoPhaseCommitSink implementations by > spawning a ThreadPool and submitting multiple write operations. > > Cheers, > Till > >

Re: K8s native - checkpointing to S3 with RockDBStateBackend

2020-04-24 Thread David Magalhães
I think the classloaders for the uberjar and the link are different. Not sure if this is the right explanation, but that is why you need to add flink-s3-fs-hadoop inside the plugin folder in the cluster. On Fri, Apr 24, 2020 at 4:07 PM Averell wrote: > Thank you Yun Tang. > Building my own docke

Re: Flink s3 streaming performance

2020-05-31 Thread David Magalhães
Hi Venkata. 300 requests per minute look like a 200ms per request, which should be a normal response time to send a file if there isn't any speed limitation (how big are the files?). Have you changed the parallelization to be higher than 1? I also recommend to limit the source parallelization, be

User / Job Manager (permissions) for Flink

2020-06-02 Thread David Magalhães
Hi, not sure if this was discussed (for a brief search I couldn't find anything), but I would like to know if there is an application that uses Flink REST API to provide some kind of user management, like allow a certain user to login and manage some jobs running in the link, limit the parallelizat

Kafka Rate Limit in FlinkConsumer ?

2020-07-06 Thread David Magalhães
I've noticed that this FLINK-11501 was implemented in flink-connector-kafka-0.10 [1], but it wasn't in the current version of the flink-connector-kafka. There is any reason for this, and why should be the best solution to implement a rate limit functionality in the current Kafka consumer? Thanks,

Re: Kafka Rate Limit in FlinkConsumer ?

2020-07-06 Thread David Magalhães
subtaskRateLimiter.acquire(); > if (record == null) { > consumerMetricGroup.counter("invalidRecord").inc(); > } > super.emitRecordWithTimestamp(record, partitionState, offset, > timestamp); > } > }; > > } > > Thanks, >

Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-10 Thread David Magalhães
Hi, yesterday when I was creating a savepoint (to S3, around 8GB of state) using 2 TaskManager (8 GB) and it failed because one of the task managers fill up the disk (probably didn't have enough RAM to save the state into S3 directly,I don't know what was the disk space, and reached 100% usage spac

Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-14 Thread David Magalhães
e quickly, maybe you can try retained > checkpoint[1], and multiple threads uploads[2] > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html#retained-checkpoints > [2] https://issues.apache.org/jira/browse/FLINK-11008 > > Best, > Congxian > &g

Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-21 Thread David Magalhães
ectly write to the target file > system. If this should result in temporary files on your TM, then this > might be a problem of the file system implementation. Having access to the > logs could also help to better understand whats going on. > > Cheers, > Till > > On Tue, Jul

Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-21 Thread David Magalhães
n/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java#L160 > > Best, > Congxian > > > David Magalhães 于2020年7月21日周二 下午4:10写道: > >> Hi Till, I'm using s3:// schema, but not sure what was the default used >> if s3a or s3p. >> >> then the st

Custom File Sink using EventTime and defined custom file name for parquet file

2020-01-09 Thread David Magalhães
Hi, I'm working for the first time with Flink and I'm trying to create solution that will store events from Kafka into Parquet files in S3. This also should support re-injection of events from Parquet files into a Kafka topic. Here

Re: Custom File Sink using EventTime and defined custom file name for parquet file

2020-01-15 Thread David Magalhães
gt; >> I remembered a backfill user solution from Pinterest which is very >> similar to yours and using Flink too[1], hope that can help you. >> >> Best, >> Leonard >> >> [1] >> https://www.youtube.com/watch?v=3-X6FJ5JS4E&list=PLDX4T_cnKjD207Aa8b5CsZjc7Z_K

Custom Metrics outside RichFunctions

2020-01-20 Thread David Magalhães
Hi, I want to create a custom metric that shows the number of message that couldn't be deserialized using a custom deserializer inside FlinkKafkaConsumer. Looking into Metrics page ( https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html ) that doesn't seem to be possible,

Re: Flink Performance

2020-01-21 Thread David Magalhães
I've found this ( https://stackoverflow.com/questions/50580756/flink-window-dragged-stream-performance ) post on StackOverflow, where someone complains about performance drop in KeyBy. On Tue, Jan 21, 2020 at 1:24 PM Dharani Sudharsan < dharani.sudhar...@outlook.in> wrote: > Hi All, > > Currently

Re: Custom Metrics outside RichFunctions

2020-01-22 Thread David Magalhães
in itself is RichParallelSourceFunction, and you could > call function below to register your metrics group: > > getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter") > > > > > Best > Yun Tang > ---

Re: Custom Metrics outside RichFunctions

2020-01-22 Thread David Magalhães
> custom kafka consumer that checks the schema class, casts it to your > specific class, and then calls a method on your schema that accepts a > metric group). > > On 22/01/2020 14:33, David Magalhães wrote: > > Hi Yun, I'm trying to use inside a custom *Deserializat

Re: Usage of KafkaDeserializationSchema and KafkaSerializationSchema

2020-01-22 Thread David Magalhães
Hi Jason, The topic is used in *FlinkKafkaConsumer*, following the *KafkaDeserializationSchema* and then *Properties*. https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html new FlinkKafkaConsumer(kafkaTopic, new M

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-27 Thread David Magalhães
Does StreamingFileSink use core-site.xml ? When I was using it, it didn't load any configurations from core-site.xml. On Mon, Jan 27, 2020 at 12:08 PM Mark Harris wrote: > Hi Piotr, > > Thanks for the link to the issue. > > Do you know if there's a workaround? I've tried setting the following in

Re: Using s3 for checkpointing

2020-02-01 Thread David Magalhães
Did you put each inside a different folder with their name? Like /opt/flink/plugins/s3-fs-presto/flink-s3-fs-presto-1.9.1.jar ? check https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html On Sat, Feb 1, 2020, 07:42 Navneeth Krishnan wrote: > Hi Arvid, > > Thanks for the

NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

2020-02-05 Thread David Magalhães
I'm implementing an exponential backoff inside a custom sink that uses an AvroParquetWriter to write to S3. I've change the number of attempts to 0 inside the core-site.xml, and I'm capturing the timeout exception, doing a Thread.sleep for X seconds. This is working as intended, and when S3 is offl

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

2020-02-06 Thread David Magalhães
ably transitively > depends on org/joda/time/format/DateTimeParserBucket > and it is missing on the runtime classpath of Flink. > > Best, > Andrey > > On Wed, Feb 5, 2020 at 5:22 PM David Magalhães > wrote: > >> I'm implementing an exponential backoff inside

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

2020-02-12 Thread David Magalhães
>> ├── joda.jar >> └── flink-s3-fs-hadoop.jar >> >> If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into that. >> >> Adding joda to your user code will unfortunately not work. >> >> Best, >> >> Arvi

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

2020-02-12 Thread David Magalhães
tps://stackoverflow.com/a/5756989/568695 > > On Wed, Feb 12, 2020 at 12:36 PM David Magalhães > wrote: > >> Hi Arvid, >> >> I'm using flink-s3-fs-hadoop-1.9.1.jar in plugins folder. Like I said >> previously, this works normally until an exception is throw insid

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

2020-02-12 Thread David Magalhães
doop.jar > > I will investigate your error deeply in the next few days but I'd like to > have a final confirmation about the folder structure. > > > On Wed, Feb 12, 2020 at 8:56 PM David Magalhães > wrote: > >> Hi Robert, I couldn't found any previous mention

Test sink behaviour

2020-02-13 Thread David Magalhães
Hi, I've created a CustomSink that writes parquet file to S3. Inside the `invoke` method I have a loop to check if S3 is down, and if it is it will wait exponentially until it is online again. Now I want to write a test for this, and I can execute everything and see that the Sink is doing what is

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

2020-02-18 Thread David Magalhães
> org.apache.flink.formats.parquet.ParquetWriterFactory, which uses your > mentioned StreamOutputFile. > > Best, > > Arvid > > On Thu, Feb 13, 2020 at 12:04 AM David Magalhães > wrote: > >> Hi Arvid, I use a docker image. Here is the Dockerfile: >> >&g

AWS Client Builder with default credentials

2020-02-20 Thread David Magalhães
I'm using org.apache.flink.fs.s3base.shaded.com.amazonaws.client.builder.AwsClientBuilder to create a S3 client to copy objects and delete object inside a TwoPhaseCommitSinkFunction. Shouldn't be another way to set up configurations without put them hardcoded ? Something like core-site.xml or flin

Re: AWS Client Builder with default credentials

2020-02-24 Thread David Magalhães
t;> CerentialsProvider implementations, >> that can derive credentials from environment variables, system >> properties, files on the classpath and many more. >> >> Ultimately though, you're kind of asking us how to use AWS APIs, for >> which I would direct you

Scala string interpolation weird behaviour with Flink Streaming Java tests dependency.

2020-02-26 Thread David Magalhães
I'm testing a custom sink that uses TwoPhaseCommit with the test harness provided by flink-streaming-java. "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "test" classifier "tests" Using this, in some tests that I use scala string interpolation, the string output have a strange beha

Re: Scala string interpolation weird behaviour with Flink Streaming Java tests dependency.

2020-02-28 Thread David Magalhães
; the supported version of the binaries that you have downloaded, bad things > could happen. > > Piotrek > > On 26 Feb 2020, at 12:56, David Magalhães wrote: > > I'm testing a custom sink that uses TwoPhaseCommit with the test harness > provided by flink-streaming-java. >