Hi Jigar, I'm moving your user question to the user ML.
The best place to initialize transient fields is in private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException; as described in [1]: Remember that transient fields will be initialized to their default values. > You can provide a readObject method that restores transient fields to > acceptable values, or alternatively, lazily initialize those fields first > time they are used. [1] Effective Java, Item 87 : Consider using a custom serialized form; https://ahdak.github.io/blog/effective-java-part-11/ On Mon, Oct 11, 2021 at 10:56 AM Till Rohrmann <trohrm...@apache.org> wrote: > Hi Jigar, > > in order to run the Sink function on the Flink cluster, it will be > serialized. Since you marked the repository as transient, it won't be > shipped to the cluster. So if Repository is Serializable, you can ship it > to the cluster. If not, then you need to reconstruct the Repository on the > cluster (e.g. on the first invoke call or the open call on the > RichSinkFunction). > > Cheers, > Till > > On Mon, Oct 11, 2021 at 10:12 AM Jigar Gajjar <jigargajjar2...@gmail.com> > wrote: > > > Hello Devs, > > > > > > Here is my custom sink code. > > > > ````````````````````````` > > > > public class FlinkNeptuneSink<IN> extends RichSinkFunction<IN> { > > static HttpClient client = HttpClient.newHttpClient(); > > private static final long serialVersionUID = 1L; > > NeptuneClientFactory neptuneClientFactory; > > JsonLDWriteContext jsonLDWriteContext; > > String baseURI; > > Map contextJsonMap; > > String namespaceURI; > > > > public FlinkNeptuneSink(String protocol, String host, String port, > > String baseURI, Map contextJsonMap, String namespaceURI) { > > neptuneClientFactory = new NeptuneClientFactory(protocol, host, > > port); > > > > this.baseURI = baseURI; > > this.contextJsonMap = contextJsonMap; > > this.namespaceURI = namespaceURI; > > } > > > > @Override > > public void invoke(IN value, Context context) throws IOException { > > //neptuneClientFactory.getNeptuneClient() (repository attribute in > > neptuneClientFactory is null) > > try (RepositoryConnection conn = > > neptuneClientFactory.getNeptuneClient().getConnection()) { > > } > > > > } > > } > > > > public class NeptuneClientFactory implements Serializable { > > private transient Repository repository; > > > > public NeptuneClientFactory(String protocol, String host, String > > port) { > > this.repository = createNeptuneClient(protocol, host, port); > > } > > > > public static Repository createNeptuneClient(String protocol, String > > host, String port) { > > String sparqlEndpoint = String.format("%s://%s:%s/sparql", > > protocol, host, port); > > Repository repo = new SPARQLRepository(sparqlEndpoint); > > repo.init(); > > return repo; > > } > > > > public Repository getNeptuneClient() { > > return repository; > > } > > } > > > > > > filtredNonEmptyP5.addSink(new FlinkNeptuneSink<>("https", "neptunehost", > > "8182", "https://localhost/entity", contextJsonMap, " > > https://localhost/namespaces/default")); > > > > ````````` > > > > when it invokes method then only neptuneClientFactory has a repository > > value as null. not sure why, it has other attributes values properly set. > > > > Is flink initializing sink attributes from somewhere else? > > When I debug then while creating sink it > > initializes neptuneClientFactory properly but when it comes to invoke > > method then the repository is blank. > > > > Please help. > > > > -- > > Thanks > > Jigar Gajjar > > >