Dear Sir/Madam, This is with refence to one of the issue we are facing in deploying the java pulsar state function in the Kubernetes for more than 2 weeks. We tried taking the help from documentation as well as from net but the issue is not resolved. We are trying to deploy a simple function the code snippet is given below. We are running the Apache version 2.7.1 in our environment. Please note when we are trying the same code in the standalone docker it is working fine.
package test.example; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.functions.LocalRunner; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; import static java.nio.charset.StandardCharsets.UTF_8; public class PublishFunction implements Function<String, Void> { public Void process(String input, Context context) { String publishTopic = "persistent://public/default/my-topic-test"; String output = String.format("%s!", input); try { System.out.println(publishTopic); System.out.println(output); context.newOutputMessage(publishTopic, Schema.STRING).value(output).sendAsync(); System.out.println("Published Successfully"); context.incrCounter("ac", 1); System.out.println("incrCounter Successfully"); String currentCount = context.getTenant(); System.out.println(currentCount); ByteBuffer buf = ByteBuffer.allocate(1000); System.out.println("buffer value" + buf + "value"); buf.putInt(20); String input1 = "20"; System.out.println("buffer value" + buf + "set"); context.putState("last", ByteBuffer.wrap(input1.getBytes(UTF_8))); System.out.println("Updated Successfully"); ByteBuffer Res = context.getState("last"); System.out.println("State Value is" + Res); } catch (PulsarClientException e) { context.getLogger().error(e.toString()); System.out.println("Oops Error"); } return null; } public static void main(String[] args) throws Exception { FunctionConfig functionConfig = new FunctionConfig(); functionConfig.setName("PublishFunction"); functionConfig.setInputs(Collections.singleton("persistent://public/default/my-topic-1")); functionConfig.setClassName(PublishFunction.class.getName()); functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); functionConfig.setOutput("persistent://public/default/my-topic-test"); LocalRunner localRunner = LocalRunner.builder().stateStorageServiceUrl("bk://127.0.0.1:4181").functionConfig(functionConfig).build(); localRunner.start(true); } } Error root@pulsar-mini-broker-0:/pulsar# ./bin/pulsar-admin functions create --classname test.example.PublishFunction --jar example-0.0.1-SNAPSHOT.jar --inputs persistent://public/default/my-topic-1 --tenant public --namespace default --name PublishFunction HTTP 500 Internal Server Error Reason: HTTP 500 Internal Server Error<html> <head> <meta http-equiv="Content-Type" content="text/html;charset=utf-8"/> <title>Error 500 Request failed.</title> </head> <body><h2>HTTP ERROR 500 Request failed.</h2> <table> <tr><th>URI:</th><td>/admin/v3/functions/public/default/PublishFunction</td></tr> <tr><th>STATUS:</th><td>500</td></tr> <tr><th>MESSAGE:</th><td>Request failed.</td></tr> <tr><th>SERVLET:</th><td>org.glassfish.jersey.servlet.ServletContainer-3a543f31</td></tr> </table> <hr><a href="https://eclipse.org/jetty">Powered by Jetty:// 9.4.35.v20201120</a><hr/> </body> </html> root@pulsar-mini-broker-0:/pulsar# Appreciate you early response on the same. Thanks and Regards, Hidayathulla.