Hi Jigar,

I'm moving your user question to the user ML.

The best place to initialize transient fields is in

 private void readObject(java.io.ObjectInputStream in)
     throws IOException, ClassNotFoundException;

as described in [1]:

Remember that transient fields will be initialized to their default values.
> You can provide a readObject method that restores transient fields to
> acceptable values, or alternatively, lazily initialize those fields first
> time they are used.

[1] Effective Java, Item 87 : Consider using a custom serialized form;
https://ahdak.github.io/blog/effective-java-part-11/


On Mon, Oct 11, 2021 at 10:56 AM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Jigar,
>
> in order to run the Sink function on the Flink cluster, it will be
> serialized. Since you marked the repository as transient, it won't be
> shipped to the cluster. So if Repository is Serializable, you can ship it
> to the cluster. If not, then you need to reconstruct the Repository on the
> cluster (e.g. on the first invoke call or the open call on the
> RichSinkFunction).
>
> Cheers,
> Till
>
> On Mon, Oct 11, 2021 at 10:12 AM Jigar Gajjar <jigargajjar2...@gmail.com>
> wrote:
>
> > Hello Devs,
> >
> >
> > Here is my custom sink code.
> >
> > `````````````````````````
> >
> > public class FlinkNeptuneSink<IN> extends RichSinkFunction<IN> {
> >     static HttpClient client = HttpClient.newHttpClient();
> >     private static final long serialVersionUID = 1L;
> >     NeptuneClientFactory neptuneClientFactory;
> >     JsonLDWriteContext jsonLDWriteContext;
> >     String baseURI;
> >     Map contextJsonMap;
> >     String namespaceURI;
> >
> >     public FlinkNeptuneSink(String protocol, String host, String port,
> > String baseURI, Map contextJsonMap, String namespaceURI) {
> >         neptuneClientFactory = new NeptuneClientFactory(protocol, host,
> > port);
> >
> >         this.baseURI = baseURI;
> >         this.contextJsonMap = contextJsonMap;
> >         this.namespaceURI = namespaceURI;
> >     }
> >
> >     @Override
> >     public void invoke(IN value, Context context) throws IOException {
> >     //neptuneClientFactory.getNeptuneClient()   (repository  attribute in
> > neptuneClientFactory   is null)
> >     try (RepositoryConnection conn =
> > neptuneClientFactory.getNeptuneClient().getConnection())      {
> >     }
> >
> >     }
> > }
> >
> > public class NeptuneClientFactory implements Serializable {
> >         private transient Repository repository;
> >
> >         public NeptuneClientFactory(String protocol, String host, String
> > port) {
> >             this.repository = createNeptuneClient(protocol, host, port);
> >         }
> >
> >     public static Repository createNeptuneClient(String protocol, String
> > host, String port) {
> >         String sparqlEndpoint = String.format("%s://%s:%s/sparql",
> > protocol, host, port);
> >         Repository repo = new SPARQLRepository(sparqlEndpoint);
> >         repo.init();
> >         return repo;
> >     }
> >
> >         public Repository getNeptuneClient() {
> >             return repository;
> >         }
> > }
> >
> >
> > filtredNonEmptyP5.addSink(new FlinkNeptuneSink<>("https", "neptunehost",
> > "8182", "https://localhost/entity";, contextJsonMap, "
> > https://localhost/namespaces/default";));
> >
> > `````````
> >
> > when  it invokes method then only neptuneClientFactory has a repository
> > value as null. not sure why, it has other attributes values properly set.
> >
> > Is flink initializing sink attributes from somewhere else?
> > When I debug  then while creating sink  it
> > initializes neptuneClientFactory  properly but when it comes to invoke
> > method then the repository is blank.
> >
> > Please help.
> >
> > --
> > Thanks
> > Jigar Gajjar
> >
>

Reply via email to