[ https://issues.apache.org/jira/browse/FLINK-25866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17484969#comment-17484969 ]
Fil Karnicki edited comment on FLINK-25866 at 1/31/22, 10:25 PM: ----------------------------------------------------------------- Hi, I tried to hack something together at home which would show me being able to get my hands on a resource in a .jar inside a TaskManager, but I was unsuccessful. Does any of the below strike you as a possible reason? I tried to emulate our multi-tenant cloudera setup. The steps I took were: 1. created a flink and a kafka cluster using docker-compose {code:java} version: "2.2" services: jobmanager: image: flink:1.13.5-scala_2.12-java8 ports: - "8081:8081" command: jobmanager environment: - | FLINK_PROPERTIES= classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf jobmanager.rpc.address: jobmanager taskmanager: image: flink:1.13.5-scala_2.12-java8 depends_on: - jobmanager command: taskmanager scale: 1 environment: - | FLINK_PROPERTIES= classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 2 zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 22181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1{code} 2. added the below code to statefun-flink-core's org.apache.flink.statefun.flink.core.nettyclient.NettyClient#from {code:java} System.out.println("FINDME " + getResourceContentForTest("inCore.txt"));{code} where getResourceContentForTest returns the contents of the resource wrapped in some text depending on the success/failure to get my hands on the resource {code:java} public static String getResourceContentForTest(String resourceName) { StringBuilder stringBuilder = new StringBuilder("resource: ").append(resourceName); try(InputStream inputStream = NettyClient.class.getClassLoader().getResourceAsStream(resourceName)){ if (inputStream == null) { stringBuilder.append("inputStream is null for ").append(resourceName); } else { String text = new BufferedReader( new InputStreamReader(inputStream, StandardCharsets.UTF_8)) .lines() .collect(Collectors.joining("\n")); stringBuilder.append("inputStream is ").append(text).append(". nice"); } } catch (IOException e) { stringBuilder.append(e.getMessage()); } return stringBuilder.toString(); } {code} 3. put "inCore.txt" in the resources of statefun-flink-core and built the entire project 4. ran a random test and evaluated in intellij: {code:java} NettyClient.getResourceContentForTest("inCore.txt") {code} which resulted in {code:java} resource: inCore.txtinputStream is Hello from inCore.txt. nice {code} 5. created an uber-jar with a module.yaml of {code:java} kind: io.statefun.endpoints.v2/http spec: functions: com.example.fns/* urlPathTemplate: https://bar.foo.com:8080/{function.name} transport: type: io.statefun.transports.v1/async timeouts: call: 1 min read: 10 sec write: 10 sec --- kind: io.statefun.kafka.v1/ingress spec: id: com.example/users address: kafka:9092 consumerGroupId: my-consumer-group startupPosition: type: latest topics: - topic: messages-1 valueType: com.example/User targets: - com.example.fns/greeter {code} and a pom of {code:java} <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>statefun-jobby</artifactId> <version>3.1</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.13.5</flink.version> <statefun.version>3.1.1</statefun.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> <!-- 3rd party --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.7.1</version> </dependency> <!-- Stateful Functions sdk --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>statefun-sdk-embedded</artifactId> <version>${statefun.version}</version> </dependency> <!-- statefun-flink --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>statefun-flink-io</artifactId> <version>${statefun.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>statefun-flink-io-bundle</artifactId> <version>${statefun.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>statefun-flink-core</artifactId> <version>${statefun.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>statefun-flink-launcher</artifactId> <version>${statefun.version}</version> </dependency> <!-- flink runtime metrics --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-metrics-dropwizard</artifactId> <version>${flink.version}</version> </dependency> <!-- flink runtime is always provided --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <artifactSet> <excludes> <exclude>org.apache.flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> <filter> <artifact>org.apache.kafka:*</artifact> <excludes> <exclude>kafka/kafka-version.properties</exclude> <exclude>LICENSE</exclude> <!-- Does not contain anything relevant. Cites a binary dependency on jersey, but this is neither reflected in the dependency graph, nor are any jersey files bundled. --> <exclude>NOTICE</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass> org.apache.flink.statefun.flink.core.StatefulFunctionsJob </mainClass> </transformer> <!-- required to aggregate all the META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- remove all duplicate licenses --> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"> </transformer> <!-- explicitly include our LICENSE file, located at project root dir --> <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer"> <resource>META-INF/LICENSE</resource> <file>${basedir}/../../LICENSE</file> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> <projectName>Apache Flink Stateful Functions (flink-statefun)</projectName> <encoding>UTF-8</encoding> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project> {code} 6. replaced only NettyClient.class in the uber-jar with my hacked core NettyClient class from 2) {code:java} jar uf statefun-jobby-3.1.jar org\apache\flink\statefun\flink\core\nettyclient\NettyClient.class {code} 7. uploaded the uber-jar to my flink cluster from 1) and pushed a message onto kafka {code:java} ~/kafka/kafka_2.12-3.1.0/bin$ ./kafka-console-producer.sh --topic messages-1 --bootstrap-server localhost:29092 --property parse.key=true --property "key.separator=:" >hi:there {code} and the logs in the TaskManager showed {code:java} 2022-01-31 22:15:03,855 INFO [some kafka log] FINDME resource: inCore.txtinputStream is null for inCore.txt 2022-01-31 22:15:16,057 INFO org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction [] - Bootstrapping function FunctionType(com.example.fns, greeter). Blocking processing until first request is completed. Successive requests will be performed asynchronously. {code} Many, many thanks Fil was (Author: JIRAUSER284249): Hi, I tried to hack something together at home which would show me being able to get my hands on a resource in a .jar inside a TaskManager, but I was unsuccessful. Does any of the below strike you as a possible reason? I tried to emulate our multi-tenant cloudera setup. The steps I took were: # created a flink and a kafka cluster using docker-compose {code:java} version: "2.2" services: jobmanager: image: flink:1.13.5-scala_2.12-java8 ports: - "8081:8081" command: jobmanager environment: - | FLINK_PROPERTIES= classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf jobmanager.rpc.address: jobmanager taskmanager: image: flink:1.13.5-scala_2.12-java8 depends_on: - jobmanager command: taskmanager scale: 1 environment: - | FLINK_PROPERTIES= classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 2 zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 22181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1{code} # added the below code to statefun-flink-core's org.apache.flink.statefun.flink.core.nettyclient.NettyClient#from {code:java} System.out.println("FINDME " + getResourceContentForTest("inCore.txt"));{code} where getResourceContentForTest returns the contents of the resource wrapped in some text depending on the success/failure to get my hands on the resource {code:java} public static String getResourceContentForTest(String resourceName) { StringBuilder stringBuilder = new StringBuilder("resource: ").append(resourceName); try(InputStream inputStream = NettyClient.class.getClassLoader().getResourceAsStream(resourceName)){ if (inputStream == null) { stringBuilder.append("inputStream is null for ").append(resourceName); } else { String text = new BufferedReader( new InputStreamReader(inputStream, StandardCharsets.UTF_8)) .lines() .collect(Collectors.joining("\n")); stringBuilder.append("inputStream is ").append(text).append(". nice"); } } catch (IOException e) { stringBuilder.append(e.getMessage()); } return stringBuilder.toString(); } {code} # put "inCore.txt" in the resources of statefun-flink-core and built the entire project # ran a random test and evaluated in intellij: {code:java} NettyClient.getResourceContentForTest("inCore.txt") {code} which resulted in {code:java} resource: inCore.txtinputStream is Hello from inCore.txt. nice {code} # created an uber-jar with a module.yaml of {code:java} kind: io.statefun.endpoints.v2/http spec: functions: com.example.fns/* urlPathTemplate: https://bar.foo.com:8080/{function.name} transport: type: io.statefun.transports.v1/async timeouts: call: 1 min read: 10 sec write: 10 sec --- kind: io.statefun.kafka.v1/ingress spec: id: com.example/users address: kafka:9092 consumerGroupId: my-consumer-group startupPosition: type: latest topics: - topic: messages-1 valueType: com.example/User targets: - com.example.fns/greeter {code} and a pom of {code:java} <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>statefun-jobby</artifactId> <version>3.1</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.13.5</flink.version> <statefun.version>3.1.1</statefun.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> <!-- 3rd party --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.7.1</version> </dependency> <!-- Stateful Functions sdk --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>statefun-sdk-embedded</artifactId> <version>${statefun.version}</version> </dependency> <!-- statefun-flink --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>statefun-flink-io</artifactId> <version>${statefun.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>statefun-flink-io-bundle</artifactId> <version>${statefun.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>statefun-flink-core</artifactId> <version>${statefun.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>statefun-flink-launcher</artifactId> <version>${statefun.version}</version> </dependency> <!-- flink runtime metrics --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-metrics-dropwizard</artifactId> <version>${flink.version}</version> </dependency> <!-- flink runtime is always provided --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <artifactSet> <excludes> <exclude>org.apache.flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> <filter> <artifact>org.apache.kafka:*</artifact> <excludes> <exclude>kafka/kafka-version.properties</exclude> <exclude>LICENSE</exclude> <!-- Does not contain anything relevant. Cites a binary dependency on jersey, but this is neither reflected in the dependency graph, nor are any jersey files bundled. --> <exclude>NOTICE</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass> org.apache.flink.statefun.flink.core.StatefulFunctionsJob </mainClass> </transformer> <!-- required to aggregate all the META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- remove all duplicate licenses --> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"> </transformer> <!-- explicitly include our LICENSE file, located at project root dir --> <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer"> <resource>META-INF/LICENSE</resource> <file>${basedir}/../../LICENSE</file> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> <projectName>Apache Flink Stateful Functions (flink-statefun)</projectName> <encoding>UTF-8</encoding> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project> {code} # replaced only NettyClient.class in the uber-jar with my hacked core NettyClient class from 2) {code:java} jar uf statefun-jobby-3.1.jar org\apache\flink\statefun\flink\core\nettyclient\NettyClient.class {code} # uploaded the uber-jar to my flink cluster from 1) and pushed a message onto kafka {code:java} ~/kafka/kafka_2.12-3.1.0/bin$ ./kafka-console-producer.sh --topic messages-1 --bootstrap-server localhost:29092 --property parse.key=true --property "key.separator=:" >hi:there {code} and the logs in the TaskManager showed {code:java} 2022-01-31 22:15:03,855 INFO [some kafka log] FINDME resource: inCore.txtinputStream is null for inCore.txt 2022-01-31 22:15:16,057 INFO org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction [] - Bootstrapping function FunctionType(com.example.fns, greeter). Blocking processing until first request is completed. Successive requests will be performed asynchronously. {code} Many, many thanks Fil > Support additional TLS configuration. > ------------------------------------- > > Key: FLINK-25866 > URL: https://issues.apache.org/jira/browse/FLINK-25866 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions > Reporter: Igal Shilman > Priority: Major > > Currently the default HTTP client used to invoke remote functions does not > support customising the TLS settings as part of the endpoint spec definition. > This includes > using self-signed certificates, and providing client side certificates for > authentication (which is a slightly different requirement). > This issue is about including additional TLS settings to the default endpoint > resource definition, and supporting them in statefun-core. > User mailing list threads: > * [client cert auth in remote > function|https://lists.apache.org/thread/97nw245kxqp32qglwfynhhgyhgp2pxvg] > * [endpoint self-signed certificate > problem|https://lists.apache.org/thread/y2m2bpwg4n71rxfont6pgky2t8m19n7w] > > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)