Hello Devs,

Here is my custom sink code.

`````````````````````````

public class FlinkNeptuneSink<IN> extends RichSinkFunction<IN> {
    static HttpClient client = HttpClient.newHttpClient();
    private static final long serialVersionUID = 1L;
    NeptuneClientFactory neptuneClientFactory;
    JsonLDWriteContext jsonLDWriteContext;
    String baseURI;
    Map contextJsonMap;
    String namespaceURI;

    public FlinkNeptuneSink(String protocol, String host, String port,
String baseURI, Map contextJsonMap, String namespaceURI) {
        neptuneClientFactory = new NeptuneClientFactory(protocol, host,
port);

        this.baseURI = baseURI;
        this.contextJsonMap = contextJsonMap;
        this.namespaceURI = namespaceURI;
    }

    @Override
    public void invoke(IN value, Context context) throws IOException {
    //neptuneClientFactory.getNeptuneClient()   (repository  attribute in
neptuneClientFactory   is null)
    try (RepositoryConnection conn =
neptuneClientFactory.getNeptuneClient().getConnection())      {
    }

    }
}

public class NeptuneClientFactory implements Serializable {
        private transient Repository repository;

        public NeptuneClientFactory(String protocol, String host, String
port) {
            this.repository = createNeptuneClient(protocol, host, port);
        }

    public static Repository createNeptuneClient(String protocol, String
host, String port) {
        String sparqlEndpoint = String.format("%s://%s:%s/sparql",
protocol, host, port);
        Repository repo = new SPARQLRepository(sparqlEndpoint);
        repo.init();
        return repo;
    }

        public Repository getNeptuneClient() {
            return repository;
        }
}


filtredNonEmptyP5.addSink(new FlinkNeptuneSink<>("https", "neptunehost",
"8182", "https://localhost/entity";, contextJsonMap, "
https://localhost/namespaces/default";));

`````````

when  it invokes method then only neptuneClientFactory has a repository
value as null. not sure why, it has other attributes values properly set.

Is flink initializing sink attributes from somewhere else?
When I debug  then while creating sink  it
initializes neptuneClientFactory  properly but when it comes to invoke
method then the repository is blank.

Please help.

-- 
Thanks
Jigar Gajjar

Reply via email to