I guess the simplest way would be to use a constructor parameter: > public static class CampaignProcessor implements > ProcessorSupplier<Windowed<String>, List<String[]>> > { > private final String jedis_server; > > public CampaignProcessor(String jedisServer) { > this.jedis_server = jedisServer; > } > > @Override > public Processor<Windowed<String>, List<String[]>> get() { > return new Processor<Windowed<String>, List<String[]>>() { > CampaignProcessorCommon campaignProcessorCommon; > > @Override > public void init(ProcessorContext context) { > // // looks like Flink code over here ;) > // ParameterTool parameterTool = (ParameterTool) > getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); > // parameterTool.getRequired("jedis_server"); > // LOG.info("Opening connection with Jedis to {}", > parameterTool.getRequired("jedis_server")); > > // this.campaignProcessorCommon = new > CampaignProcessorCommon(parameterTool.getRequired("jedis_server")); > //this.campaignProcessorCommon = new > CampaignProcessorCommon("localhost"); > this.campaignProcessorCommon = new > CampaignProcessorCommon(jedis_server); > this.campaignProcessorCommon.prepare(); > } > > @Override > public void process(Windowed<String> key, List<String[]> > tupleList) { > tupleList.forEach(x -> { > String[] tuple = x; > String campaign_id = tuple[0]; > String event_time = tuple[2]; > this.campaignProcessorCommon.execute(campaign_id, > event_time); > }); > } > > @Override > public void punctuate(long timestamp) {} > > @Override > public void close() {} > }; > } > }
On 04/16/2016 09:35 PM, rss rss wrote: > Hello, > > I'm implementing yahoo streaming benchmark with Kafka streaming > processing. How to pass a string into Processor or ValueTransformer > instance? Accordingly to yahoo benchmark logic I need to transfer an > address of Redis server. > https://github.com/rssdev10/streaming-benchmarks/blob/master/kafka-benchmarks/src/main/java/kafka/benchmark/AdvertisingTopology.java#L212 > > PS: the code above is runnable but not checked on real clusters. > > Best regards >
signature.asc
Description: OpenPGP digital signature