Hi Danny,
I should be able to make the contribution to add proxy support. Please let me
know the contribution process.
Thanks
-Saravan
From: Danny Cranmer
Date: Wednesday, January 19, 2022 at 3:10 AM
To: Gnanamoorthy, Saravanan
Cc: user@flink.apache.org
Subject: Re: Flink Kinesis connector - EFO connection error with http proxy
settings
NOTICE: This email is from an external sender - do not click on links or
attachments unless you recognize the sender and know the content is safe.
Hello Saravanan,
Yes you are correct. EFO uses AWS SDK v2 and the builder does not set proxy
configuration [1]. The polling (non EFO) mechanism is using AWS SDK v1 which
has a more general configuration deserialiser, and hence proxy is configurable.
I do not believe there is a workaround for this without modifying the connector.
If you are in a position to make a contribution to add support, we would
appreciate this. Otherwise I can take this one. Please let me know your
thoughts.
[1]
https://github.com/apache/flink/blob/release-1.13/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java#L113
Thanks,
Danny Cranmer.
On Tue, Jan 18, 2022 at 12:52 AM Gnanamoorthy, Saravanan
mailto:saravanan.gnanamoor...@fmr.com>> wrote:
Hello,
We are using Flink kinesis connector for processing the streaming data from
kinesis. We are running the application behind the proxy. After the proxyhost
and proxyport settings, the Connector works with default publisher
type(Polling) but it doesn’t work when we enable the publisher type as Enhanced
fanout (EFO). We tried with different connector version but it the behaviours
is same. I am wondering if the proxy settings are ignored for EFO type. I am
looking forward to your feedback/recommendations.
Flink version: 1.3.5
Java version: 11
Here is the error log:
2022-01-17 18:59:20,707 WARN org.apache.flink.runtime.taskmanager.Task
[] - Source: Custom Source -> Sink: Print to Std. Out (1/1)#0
(fbb512e099d031470403965ba1830e8c) switched from RUNNING to FAILED with failure
cause:
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil$FlinkKinesisStreamConsumerRegistrarException:
Error registering stream: a367945-consumer-stream-dit
at
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:125)
at
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:106)
at
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.lazilyRegisterStreamConsumers(StreamConsumerRegistrarUtil.java:75)
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.(KinesisDataFetcher.java:429)
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.(KinesisDataFetcher.java:365)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.createFetcher(FlinkKinesisConsumer.java:536)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:308)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Suppressed: java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:864)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:843)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:756)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:662)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.kinesis.shaded.software.amazo