I guess the simplest way would be to use a constructor parameter:

> public static class CampaignProcessor implements 
> ProcessorSupplier<Windowed<String>, List<String[]>> 
> {
>         private final String jedis_server;
> 
>         public CampaignProcessor(String jedisServer) {
>             this.jedis_server = jedisServer;
>         }
> 
>         @Override
>         public Processor<Windowed<String>, List<String[]>> get() {
>             return new Processor<Windowed<String>, List<String[]>>() {
>                 CampaignProcessorCommon campaignProcessorCommon;
> 
>                 @Override
>                 public void init(ProcessorContext context) {
> //                    // looks like Flink code over here ;)
> //                    ParameterTool parameterTool = (ParameterTool) 
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
> //                    parameterTool.getRequired("jedis_server");
> //                    LOG.info("Opening connection with Jedis to {}", 
> parameterTool.getRequired("jedis_server"));
> 
> //                    this.campaignProcessorCommon = new 
> CampaignProcessorCommon(parameterTool.getRequired("jedis_server"));
>                     //this.campaignProcessorCommon = new 
> CampaignProcessorCommon("localhost");
>                     this.campaignProcessorCommon = new 
> CampaignProcessorCommon(jedis_server);
>                     this.campaignProcessorCommon.prepare();
>                 }
> 
>                 @Override
>                 public void process(Windowed<String> key, List<String[]> 
> tupleList) {
>                     tupleList.forEach(x -> {
>                         String[] tuple = x;
>                         String campaign_id = tuple[0];
>                         String event_time = tuple[2];
>                         this.campaignProcessorCommon.execute(campaign_id, 
> event_time);
>                     });
>                 }
> 
>                 @Override
>                 public void punctuate(long timestamp) {}
> 
>                 @Override
>                 public void close() {}
>             };
>         }
>     }






On 04/16/2016 09:35 PM, rss rss wrote:
> Hello,
> 
>   I'm implementing yahoo streaming benchmark with Kafka streaming
> processing. How to pass a string into Processor or ValueTransformer
> instance? Accordingly to yahoo benchmark logic I need to transfer an
> address of Redis server.
> https://github.com/rssdev10/streaming-benchmarks/blob/master/kafka-benchmarks/src/main/java/kafka/benchmark/AdvertisingTopology.java#L212
> 
> PS: the code above is runnable but not checked on real clusters.
> 
> Best regards
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to