Hello Devs,
Here is my custom sink code.
`````````````````````````
public class FlinkNeptuneSink<IN> extends RichSinkFunction<IN> {
static HttpClient client = HttpClient.newHttpClient();
private static final long serialVersionUID = 1L;
NeptuneClientFactory neptuneClientFactory;
JsonLDWriteContext jsonLDWriteContext;
String baseURI;
Map contextJsonMap;
String namespaceURI;
public FlinkNeptuneSink(String protocol, String host, String port,
String baseURI, Map contextJsonMap, String namespaceURI) {
neptuneClientFactory = new NeptuneClientFactory(protocol, host,
port);
this.baseURI = baseURI;
this.contextJsonMap = contextJsonMap;
this.namespaceURI = namespaceURI;
}
@Override
public void invoke(IN value, Context context) throws IOException {
//neptuneClientFactory.getNeptuneClient() (repository attribute in
neptuneClientFactory is null)
try (RepositoryConnection conn =
neptuneClientFactory.getNeptuneClient().getConnection()) {
}
}
}
public class NeptuneClientFactory implements Serializable {
private transient Repository repository;
public NeptuneClientFactory(String protocol, String host, String
port) {
this.repository = createNeptuneClient(protocol, host, port);
}
public static Repository createNeptuneClient(String protocol, String
host, String port) {
String sparqlEndpoint = String.format("%s://%s:%s/sparql",
protocol, host, port);
Repository repo = new SPARQLRepository(sparqlEndpoint);
repo.init();
return repo;
}
public Repository getNeptuneClient() {
return repository;
}
}
filtredNonEmptyP5.addSink(new FlinkNeptuneSink<>("https", "neptunehost",
"8182", "https://localhost/entity", contextJsonMap, "
https://localhost/namespaces/default"));
`````````
when it invokes method then only neptuneClientFactory has a repository
value as null. not sure why, it has other attributes values properly set.
Is flink initializing sink attributes from somewhere else?
When I debug then while creating sink it
initializes neptuneClientFactory properly but when it comes to invoke
method then the repository is blank.
Please help.
--
Thanks
Jigar Gajjar