Hi Danny,
I should be able to make the contribution to add proxy support. Please let me 
know the contribution process.

Thanks
-Saravan

From: Danny Cranmer <dannycran...@apache.org>
Date: Wednesday, January 19, 2022 at 3:10 AM
To: Gnanamoorthy, Saravanan <saravanan.gnanamoor...@fmr.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Flink Kinesis connector - EFO connection error with http proxy 
settings
NOTICE: This email is from an external sender - do not click on links or 
attachments unless you recognize the sender and know the content is safe.

Hello Saravanan,

Yes you are correct. EFO uses AWS SDK v2 and the builder does not set proxy 
configuration [1]. The polling (non EFO) mechanism is using AWS SDK v1 which 
has a more general configuration deserialiser, and hence proxy is configurable. 
I do not believe there is a workaround for this without modifying the connector.

If you are in a position to make a contribution to add support, we would 
appreciate this. Otherwise I can take this one. Please let me know your 
thoughts.

[1] 
https://github.com/apache/flink/blob/release-1.13/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java#L113

Thanks,
Danny Cranmer.

On Tue, Jan 18, 2022 at 12:52 AM Gnanamoorthy, Saravanan 
<saravanan.gnanamoor...@fmr.com<mailto:saravanan.gnanamoor...@fmr.com>> wrote:
Hello,
We are using Flink kinesis connector for processing the streaming data from 
kinesis. We are running the application behind the proxy. After the proxyhost 
and proxyport settings, the Connector works with default publisher 
type(Polling) but it doesn’t work when we enable the publisher type as Enhanced 
fanout (EFO). We tried with different connector version but it the behaviours 
is same. I am wondering if the proxy settings are ignored for EFO type. I am 
looking forward to your feedback/recommendations.

Flink version: 1.3.5
Java version: 11

Here is the error log:

2022-01-17 18:59:20,707 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: Custom Source -> Sink: Print to Std. Out (1/1)#0 
(fbb512e099d031470403965ba1830e8c) switched from RUNNING to FAILED with failure 
cause: 
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil$FlinkKinesisStreamConsumerRegistrarException:
 Error registering stream: a367945-consumer-stream-dit

        at 
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:125)

        at 
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:106)

        at 
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.lazilyRegisterStreamConsumers(StreamConsumerRegistrarUtil.java:75)

        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:429)

        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:365)

        at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.createFetcher(FlinkKinesisConsumer.java:536)

        at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:308)

        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)

        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)

        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

        Suppressed: java.lang.NullPointerException

                at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)

                at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)

                at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:864)

                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:843)

                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:756)

                at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)

                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:662)

                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)

                at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)

                at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.exception.SdkClientException:
 Unable to execute HTTP request: Network is unreachable: 
kinesis.us-east-1.amazonaws.com/3.227.250.203:443<http://kinesis.us-east-1.amazonaws.com/3.227.250.203:443>

        at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)

        at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)

        at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.lambda$describeStreamSummary$0(KinesisProxyV2.java:101)

        at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.invokeWithRetryAndBackoff(KinesisProxyV2.java:191)

        at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.describeStreamSummary(KinesisProxyV2.java:100)

        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.StreamConsumerRegistrar.registerStreamConsumer(StreamConsumerRegistrar.java:90)

        at 
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:122)

        ... 9 more



Thanks
-Saravan

Reply via email to