[ 
https://issues.apache.org/jira/browse/FLINK-25866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17484969#comment-17484969
 ] 

Fil Karnicki edited comment on FLINK-25866 at 1/31/22, 10:25 PM:
-----------------------------------------------------------------

Hi, I tried to hack something together at home which would show me being able 
to get my hands on a resource in a .jar inside a TaskManager, but I was 
unsuccessful. Does any of the below strike you as a possible reason? I tried to 
emulate our multi-tenant cloudera setup. The steps I took were:

1. created a flink and a kafka cluster using docker-compose
{code:java}
version: "2.2"
services:
  jobmanager:
    image: flink:1.13.5-scala_2.12-java8
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        classloader.parent-first-patterns.additional: 
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
        jobmanager.rpc.address: jobmanager          taskmanager:
    image: flink:1.13.5-scala_2.12-java8
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        classloader.parent-first-patterns.additional: 
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2        zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: 
PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1{code}
2. added the below code to statefun-flink-core's 
org.apache.flink.statefun.flink.core.nettyclient.NettyClient#from 
{code:java}
System.out.println("FINDME " + getResourceContentForTest("inCore.txt"));{code}
where getResourceContentForTest returns the contents of the resource wrapped in 
some text depending on the success/failure to get my hands on the resource
{code:java}
  public static String getResourceContentForTest(String resourceName) {
    StringBuilder stringBuilder = new StringBuilder("resource: 
").append(resourceName);
    try(InputStream inputStream = 
NettyClient.class.getClassLoader().getResourceAsStream(resourceName)){
      if (inputStream == null) {
        stringBuilder.append("inputStream is null for ").append(resourceName);
      } else {
        String text = new BufferedReader(
                new InputStreamReader(inputStream, StandardCharsets.UTF_8))
                .lines()
                .collect(Collectors.joining("\n"));
        stringBuilder.append("inputStream is ").append(text).append(". nice");
      }
    } catch (IOException e) {
      stringBuilder.append(e.getMessage());
    }
    return stringBuilder.toString();
  } {code}
3. put "inCore.txt" in the resources of statefun-flink-core and built the 
entire project

4. ran a random test and evaluated in intellij:
{code:java}
NettyClient.getResourceContentForTest("inCore.txt") {code}
which resulted in 
{code:java}
resource: inCore.txtinputStream is Hello from inCore.txt. nice {code}
5. created an uber-jar with a module.yaml of
{code:java}
kind: io.statefun.endpoints.v2/http
spec:
  functions: com.example.fns/*
  urlPathTemplate: https://bar.foo.com:8080/{function.name}
  transport:
    type: io.statefun.transports.v1/async
    timeouts:
      call: 1 min
      read: 10 sec
      write: 10 sec
---
kind: io.statefun.kafka.v1/ingress
spec:
  id: com.example/users
  address: kafka:9092
  consumerGroupId: my-consumer-group
  startupPosition:
    type: latest
  topics:
    - topic: messages-1
      valueType: com.example/User
      targets:
        - com.example.fns/greeter {code}
and a pom of 
{code:java}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0";
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
    <modelVersion>4.0.0</modelVersion>    <groupId>org.example</groupId>
    <artifactId>statefun-jobby</artifactId>
    <version>3.1</version>    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.5</flink.version>
        <statefun.version>3.1.1</statefun.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>    <dependencies>
        <!-- 3rd party -->
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.7.1</version>
        </dependency>        <!-- Stateful Functions sdk -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>statefun-sdk-embedded</artifactId>
            <version>${statefun.version}</version>
        </dependency>        <!-- statefun-flink -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>statefun-flink-io</artifactId>
            <version>${statefun.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>statefun-flink-io-bundle</artifactId>
            <version>${statefun.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>statefun-flink-core</artifactId>
            <version>${statefun.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>statefun-flink-launcher</artifactId>
            <version>${statefun.version}</version>
        </dependency>        <!-- flink runtime metrics -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-metrics-dropwizard</artifactId>
            <version>${flink.version}</version>
        </dependency>        <!-- flink runtime is always provided -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- We use the maven-shade plugin to create a fat jar that 
contains all necessary dependencies. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            
<createDependencyReducedPom>false</createDependencyReducedPom>
                            <artifactSet>
                                <excludes>
                                    
<exclude>org.apache.flink:force-shading</exclude>
                                    
<exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the 
META-INF folder.
                                    Otherwise, this might cause 
SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                                <filter>
                                    <artifact>org.apache.kafka:*</artifact>
                                    <excludes>
                                        
<exclude>kafka/kafka-version.properties</exclude>
                                        <exclude>LICENSE</exclude>
                                        <!-- Does not contain anything relevant.
                                            Cites a binary dependency on 
jersey, but this is neither reflected in the
                                            dependency graph, nor are any 
jersey files bundled. -->
                                        <exclude>NOTICE</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>
                                        
org.apache.flink.statefun.flink.core.StatefulFunctionsJob
                                    </mainClass>
                                </transformer>
                                <!-- required to aggregate all the 
META-INF/services files -->
                                <transformer
                                        
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <!-- remove all duplicate licenses -->
                                <transformer
                                        
implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
                                </transformer>
                                <!-- explicitly include our LICENSE file, 
located at project root dir -->
                                <transformer
                                        
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                    <resource>META-INF/LICENSE</resource>
                                    <file>${basedir}/../../LICENSE</file>
                                </transformer>
                                <transformer
                                        
implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
                                    <projectName>Apache Flink Stateful 
Functions (flink-statefun)</projectName>
                                    <encoding>UTF-8</encoding>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project> {code}
6. replaced only NettyClient.class in the uber-jar with my hacked core 
NettyClient class from 2)
{code:java}
jar uf statefun-jobby-3.1.jar 
org\apache\flink\statefun\flink\core\nettyclient\NettyClient.class {code}
7. uploaded the uber-jar to my flink cluster from 1) and pushed a message onto 
kafka
{code:java}
~/kafka/kafka_2.12-3.1.0/bin$ ./kafka-console-producer.sh --topic messages-1 
--bootstrap-server localhost:29092 --property parse.key=true --property 
"key.separator=:"
>hi:there {code}
and the logs in the TaskManager showed
{code:java}
2022-01-31 22:15:03,855 INFO  [some kafka log]

FINDME resource: inCore.txtinputStream is null for inCore.txt

2022-01-31 22:15:16,057 INFO  
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction [] - 
Bootstrapping function FunctionType(com.example.fns, greeter). Blocking 
processing until first request is completed. Successive requests will be 
performed asynchronously. {code}
Many, many thanks

Fil


was (Author: JIRAUSER284249):
Hi, I tried to hack something together at home which would show me being able 
to get my hands on a resource in a .jar inside a TaskManager, but I was 
unsuccessful. Does any of the below strike you as a possible reason? I tried to 
emulate our multi-tenant cloudera setup. The steps I took were:
 # created a flink and a kafka cluster using docker-compose

{code:java}
version: "2.2"
services:
  jobmanager:
    image: flink:1.13.5-scala_2.12-java8
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        classloader.parent-first-patterns.additional: 
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
        jobmanager.rpc.address: jobmanager          taskmanager:
    image: flink:1.13.5-scala_2.12-java8
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        classloader.parent-first-patterns.additional: 
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2        zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: 
PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1{code}

 # added the below code to statefun-flink-core's 
org.apache.flink.statefun.flink.core.nettyclient.NettyClient#from 

{code:java}
System.out.println("FINDME " + getResourceContentForTest("inCore.txt"));{code}
where getResourceContentForTest returns the contents of the resource wrapped in 
some text depending on the success/failure to get my hands on the resource

{code:java}
  public static String getResourceContentForTest(String resourceName) {
    StringBuilder stringBuilder = new StringBuilder("resource: 
").append(resourceName);
    try(InputStream inputStream = 
NettyClient.class.getClassLoader().getResourceAsStream(resourceName)){
      if (inputStream == null) {
        stringBuilder.append("inputStream is null for ").append(resourceName);
      } else {
        String text = new BufferedReader(
                new InputStreamReader(inputStream, StandardCharsets.UTF_8))
                .lines()
                .collect(Collectors.joining("\n"));
        stringBuilder.append("inputStream is ").append(text).append(". nice");
      }
    } catch (IOException e) {
      stringBuilder.append(e.getMessage());
    }
    return stringBuilder.toString();
  } {code}

 # put "inCore.txt" in the resources of statefun-flink-core and built the 
entire project
 # ran a random test and evaluated in intellij:

{code:java}
NettyClient.getResourceContentForTest("inCore.txt") {code}
which resulted in 

{code:java}
resource: inCore.txtinputStream is Hello from inCore.txt. nice {code}

 # created an uber-jar with a module.yaml of

{code:java}
kind: io.statefun.endpoints.v2/http
spec:
  functions: com.example.fns/*
  urlPathTemplate: https://bar.foo.com:8080/{function.name}
  transport:
    type: io.statefun.transports.v1/async
    timeouts:
      call: 1 min
      read: 10 sec
      write: 10 sec
---
kind: io.statefun.kafka.v1/ingress
spec:
  id: com.example/users
  address: kafka:9092
  consumerGroupId: my-consumer-group
  startupPosition:
    type: latest
  topics:
    - topic: messages-1
      valueType: com.example/User
      targets:
        - com.example.fns/greeter {code}
and a pom of 

{code:java}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0";
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
    <modelVersion>4.0.0</modelVersion>    <groupId>org.example</groupId>
    <artifactId>statefun-jobby</artifactId>
    <version>3.1</version>    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.5</flink.version>
        <statefun.version>3.1.1</statefun.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>    <dependencies>
        <!-- 3rd party -->
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.7.1</version>
        </dependency>        <!-- Stateful Functions sdk -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>statefun-sdk-embedded</artifactId>
            <version>${statefun.version}</version>
        </dependency>        <!-- statefun-flink -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>statefun-flink-io</artifactId>
            <version>${statefun.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>statefun-flink-io-bundle</artifactId>
            <version>${statefun.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>statefun-flink-core</artifactId>
            <version>${statefun.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>statefun-flink-launcher</artifactId>
            <version>${statefun.version}</version>
        </dependency>        <!-- flink runtime metrics -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-metrics-dropwizard</artifactId>
            <version>${flink.version}</version>
        </dependency>        <!-- flink runtime is always provided -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- We use the maven-shade plugin to create a fat jar that 
contains all necessary dependencies. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            
<createDependencyReducedPom>false</createDependencyReducedPom>
                            <artifactSet>
                                <excludes>
                                    
<exclude>org.apache.flink:force-shading</exclude>
                                    
<exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the 
META-INF folder.
                                    Otherwise, this might cause 
SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                                <filter>
                                    <artifact>org.apache.kafka:*</artifact>
                                    <excludes>
                                        
<exclude>kafka/kafka-version.properties</exclude>
                                        <exclude>LICENSE</exclude>
                                        <!-- Does not contain anything relevant.
                                            Cites a binary dependency on 
jersey, but this is neither reflected in the
                                            dependency graph, nor are any 
jersey files bundled. -->
                                        <exclude>NOTICE</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>
                                        
org.apache.flink.statefun.flink.core.StatefulFunctionsJob
                                    </mainClass>
                                </transformer>
                                <!-- required to aggregate all the 
META-INF/services files -->
                                <transformer
                                        
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <!-- remove all duplicate licenses -->
                                <transformer
                                        
implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
                                </transformer>
                                <!-- explicitly include our LICENSE file, 
located at project root dir -->
                                <transformer
                                        
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                    <resource>META-INF/LICENSE</resource>
                                    <file>${basedir}/../../LICENSE</file>
                                </transformer>
                                <transformer
                                        
implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
                                    <projectName>Apache Flink Stateful 
Functions (flink-statefun)</projectName>
                                    <encoding>UTF-8</encoding>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project> {code}

 # replaced only NettyClient.class in the uber-jar with my hacked core 
NettyClient class from 2)

{code:java}
jar uf statefun-jobby-3.1.jar 
org\apache\flink\statefun\flink\core\nettyclient\NettyClient.class {code}

 # uploaded the uber-jar to my flink cluster from 1) and pushed a message onto 
kafka

{code:java}
~/kafka/kafka_2.12-3.1.0/bin$ ./kafka-console-producer.sh --topic messages-1 
--bootstrap-server localhost:29092 --property parse.key=true --property 
"key.separator=:"
>hi:there {code}
and the logs in the TaskManager showed

{code:java}
2022-01-31 22:15:03,855 INFO  [some kafka log]

FINDME resource: inCore.txtinputStream is null for inCore.txt

2022-01-31 22:15:16,057 INFO  
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction [] - 
Bootstrapping function FunctionType(com.example.fns, greeter). Blocking 
processing until first request is completed. Successive requests will be 
performed asynchronously. {code}

Many, many thanks

Fil

> Support additional TLS configuration.
> -------------------------------------
>
>                 Key: FLINK-25866
>                 URL: https://issues.apache.org/jira/browse/FLINK-25866
>             Project: Flink
>          Issue Type: Improvement
>          Components: Stateful Functions
>            Reporter: Igal Shilman
>            Priority: Major
>
> Currently the default HTTP client used to invoke remote functions does not 
> support customising the TLS settings as part of the endpoint spec definition. 
> This includes
> using self-signed certificates, and providing client side certificates for 
> authentication (which is a slightly different requirement).
> This issue is about including additional TLS settings to the default endpoint 
> resource definition, and supporting them in statefun-core.
> User mailing list threads:
>  * [client cert auth in remote 
> function|https://lists.apache.org/thread/97nw245kxqp32qglwfynhhgyhgp2pxvg]
>  * [endpoint self-signed certificate 
> problem|https://lists.apache.org/thread/y2m2bpwg4n71rxfont6pgky2t8m19n7w]
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to