Ok... Is it really perform distribution of the field to several instances of a cluster? I expected to see some way via ProcessorContext... But may be this is result of the Flink's experience...
Thanks 2016-04-17 12:38 GMT+02:00 Matthias J. Sax <matth...@confluent.io>: > I guess the simplest way would be to use a constructor parameter: > > > public static class CampaignProcessor implements > ProcessorSupplier<Windowed<String>, List<String[]>> > > { > > private final String jedis_server; > > > > public CampaignProcessor(String jedisServer) { > > this.jedis_server = jedisServer; > > } > > > > @Override > > public Processor<Windowed<String>, List<String[]>> get() { > > return new Processor<Windowed<String>, List<String[]>>() { > > CampaignProcessorCommon campaignProcessorCommon; > > > > @Override > > public void init(ProcessorContext context) { > > // // looks like Flink code over here ;) > > // ParameterTool parameterTool = (ParameterTool) > getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); > > // parameterTool.getRequired("jedis_server"); > > // LOG.info("Opening connection with Jedis to {}", > parameterTool.getRequired("jedis_server")); > > > > // this.campaignProcessorCommon = new > CampaignProcessorCommon(parameterTool.getRequired("jedis_server")); > > //this.campaignProcessorCommon = new > CampaignProcessorCommon("localhost"); > > this.campaignProcessorCommon = new > CampaignProcessorCommon(jedis_server); > > this.campaignProcessorCommon.prepare(); > > } > > > > @Override > > public void process(Windowed<String> key, List<String[]> > tupleList) { > > tupleList.forEach(x -> { > > String[] tuple = x; > > String campaign_id = tuple[0]; > > String event_time = tuple[2]; > > > this.campaignProcessorCommon.execute(campaign_id, event_time); > > }); > > } > > > > @Override > > public void punctuate(long timestamp) {} > > > > @Override > > public void close() {} > > }; > > } > > } > > > > > > > On 04/16/2016 09:35 PM, rss rss wrote: > > Hello, > > > > I'm implementing yahoo streaming benchmark with Kafka streaming > > processing. How to pass a string into Processor or ValueTransformer > > instance? Accordingly to yahoo benchmark logic I need to transfer an > > address of Redis server. > > > https://github.com/rssdev10/streaming-benchmarks/blob/master/kafka-benchmarks/src/main/java/kafka/benchmark/AdvertisingTopology.java#L212 > > > > PS: the code above is runnable but not checked on real clusters. > > > > Best regards > > > >