Hello Devs,
Here is my custom sink code. ````````````````````````` public class FlinkNeptuneSink<IN> extends RichSinkFunction<IN> { static HttpClient client = HttpClient.newHttpClient(); private static final long serialVersionUID = 1L; NeptuneClientFactory neptuneClientFactory; JsonLDWriteContext jsonLDWriteContext; String baseURI; Map contextJsonMap; String namespaceURI; public FlinkNeptuneSink(String protocol, String host, String port, String baseURI, Map contextJsonMap, String namespaceURI) { neptuneClientFactory = new NeptuneClientFactory(protocol, host, port); this.baseURI = baseURI; this.contextJsonMap = contextJsonMap; this.namespaceURI = namespaceURI; } @Override public void invoke(IN value, Context context) throws IOException { //neptuneClientFactory.getNeptuneClient() (repository attribute in neptuneClientFactory is null) try (RepositoryConnection conn = neptuneClientFactory.getNeptuneClient().getConnection()) { } } } public class NeptuneClientFactory implements Serializable { private transient Repository repository; public NeptuneClientFactory(String protocol, String host, String port) { this.repository = createNeptuneClient(protocol, host, port); } public static Repository createNeptuneClient(String protocol, String host, String port) { String sparqlEndpoint = String.format("%s://%s:%s/sparql", protocol, host, port); Repository repo = new SPARQLRepository(sparqlEndpoint); repo.init(); return repo; } public Repository getNeptuneClient() { return repository; } } filtredNonEmptyP5.addSink(new FlinkNeptuneSink<>("https", "neptunehost", "8182", "https://localhost/entity", contextJsonMap, " https://localhost/namespaces/default")); ````````` when it invokes method then only neptuneClientFactory has a repository value as null. not sure why, it has other attributes values properly set. Is flink initializing sink attributes from somewhere else? When I debug then while creating sink it initializes neptuneClientFactory properly but when it comes to invoke method then the repository is blank. Please help. -- Thanks Jigar Gajjar