Hi Danny, I should be able to make the contribution to add proxy support. Please let me know the contribution process.
Thanks -Saravan From: Danny Cranmer <dannycran...@apache.org> Date: Wednesday, January 19, 2022 at 3:10 AM To: Gnanamoorthy, Saravanan <saravanan.gnanamoor...@fmr.com> Cc: user@flink.apache.org <user@flink.apache.org> Subject: Re: Flink Kinesis connector - EFO connection error with http proxy settings NOTICE: This email is from an external sender - do not click on links or attachments unless you recognize the sender and know the content is safe. Hello Saravanan, Yes you are correct. EFO uses AWS SDK v2 and the builder does not set proxy configuration [1]. The polling (non EFO) mechanism is using AWS SDK v1 which has a more general configuration deserialiser, and hence proxy is configurable. I do not believe there is a workaround for this without modifying the connector. If you are in a position to make a contribution to add support, we would appreciate this. Otherwise I can take this one. Please let me know your thoughts. [1] https://github.com/apache/flink/blob/release-1.13/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java#L113 Thanks, Danny Cranmer. On Tue, Jan 18, 2022 at 12:52 AM Gnanamoorthy, Saravanan <saravanan.gnanamoor...@fmr.com<mailto:saravanan.gnanamoor...@fmr.com>> wrote: Hello, We are using Flink kinesis connector for processing the streaming data from kinesis. We are running the application behind the proxy. After the proxyhost and proxyport settings, the Connector works with default publisher type(Polling) but it doesn’t work when we enable the publisher type as Enhanced fanout (EFO). We tried with different connector version but it the behaviours is same. I am wondering if the proxy settings are ignored for EFO type. I am looking forward to your feedback/recommendations. Flink version: 1.3.5 Java version: 11 Here is the error log: 2022-01-17 18:59:20,707 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: Print to Std. Out (1/1)#0 (fbb512e099d031470403965ba1830e8c) switched from RUNNING to FAILED with failure cause: org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil$FlinkKinesisStreamConsumerRegistrarException: Error registering stream: a367945-consumer-stream-dit at org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:125) at org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:106) at org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.lazilyRegisterStreamConsumers(StreamConsumerRegistrarUtil.java:75) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:429) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:365) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.createFetcher(FlinkKinesisConsumer.java:536) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:308) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) Suppressed: java.lang.NullPointerException at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:864) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:843) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:756) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:662) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Network is unreachable: kinesis.us-east-1.amazonaws.com/3.227.250.203:443<http://kinesis.us-east-1.amazonaws.com/3.227.250.203:443> at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.lambda$describeStreamSummary$0(KinesisProxyV2.java:101) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.invokeWithRetryAndBackoff(KinesisProxyV2.java:191) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.describeStreamSummary(KinesisProxyV2.java:100) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.StreamConsumerRegistrar.registerStreamConsumer(StreamConsumerRegistrar.java:90) at org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:122) ... 9 more Thanks -Saravan