Hi Jigar,
I'm moving your user question to the user ML.
The best place to initialize transient fields is in
private void readObject(java.io.ObjectInputStream in)
throws IOException, ClassNotFoundException;
as described in [1]:
Remember that transient fields will be initialized to their default values.
> You can provide a readObject method that restores transient fields to
> acceptable values, or alternatively, lazily initialize those fields first
> time they are used.
[1] Effective Java, Item 87 : Consider using a custom serialized form;
https://ahdak.github.io/blog/effective-java-part-11/
On Mon, Oct 11, 2021 at 10:56 AM Till Rohrmann <[email protected]> wrote:
> Hi Jigar,
>
> in order to run the Sink function on the Flink cluster, it will be
> serialized. Since you marked the repository as transient, it won't be
> shipped to the cluster. So if Repository is Serializable, you can ship it
> to the cluster. If not, then you need to reconstruct the Repository on the
> cluster (e.g. on the first invoke call or the open call on the
> RichSinkFunction).
>
> Cheers,
> Till
>
> On Mon, Oct 11, 2021 at 10:12 AM Jigar Gajjar <[email protected]>
> wrote:
>
> > Hello Devs,
> >
> >
> > Here is my custom sink code.
> >
> > `````````````````````````
> >
> > public class FlinkNeptuneSink<IN> extends RichSinkFunction<IN> {
> > static HttpClient client = HttpClient.newHttpClient();
> > private static final long serialVersionUID = 1L;
> > NeptuneClientFactory neptuneClientFactory;
> > JsonLDWriteContext jsonLDWriteContext;
> > String baseURI;
> > Map contextJsonMap;
> > String namespaceURI;
> >
> > public FlinkNeptuneSink(String protocol, String host, String port,
> > String baseURI, Map contextJsonMap, String namespaceURI) {
> > neptuneClientFactory = new NeptuneClientFactory(protocol, host,
> > port);
> >
> > this.baseURI = baseURI;
> > this.contextJsonMap = contextJsonMap;
> > this.namespaceURI = namespaceURI;
> > }
> >
> > @Override
> > public void invoke(IN value, Context context) throws IOException {
> > //neptuneClientFactory.getNeptuneClient() (repository attribute in
> > neptuneClientFactory is null)
> > try (RepositoryConnection conn =
> > neptuneClientFactory.getNeptuneClient().getConnection()) {
> > }
> >
> > }
> > }
> >
> > public class NeptuneClientFactory implements Serializable {
> > private transient Repository repository;
> >
> > public NeptuneClientFactory(String protocol, String host, String
> > port) {
> > this.repository = createNeptuneClient(protocol, host, port);
> > }
> >
> > public static Repository createNeptuneClient(String protocol, String
> > host, String port) {
> > String sparqlEndpoint = String.format("%s://%s:%s/sparql",
> > protocol, host, port);
> > Repository repo = new SPARQLRepository(sparqlEndpoint);
> > repo.init();
> > return repo;
> > }
> >
> > public Repository getNeptuneClient() {
> > return repository;
> > }
> > }
> >
> >
> > filtredNonEmptyP5.addSink(new FlinkNeptuneSink<>("https", "neptunehost",
> > "8182", "https://localhost/entity", contextJsonMap, "
> > https://localhost/namespaces/default"));
> >
> > `````````
> >
> > when it invokes method then only neptuneClientFactory has a repository
> > value as null. not sure why, it has other attributes values properly set.
> >
> > Is flink initializing sink attributes from somewhere else?
> > When I debug then while creating sink it
> > initializes neptuneClientFactory properly but when it comes to invoke
> > method then the repository is blank.
> >
> > Please help.
> >
> > --
> > Thanks
> > Jigar Gajjar
> >
>