Ok... Is it really perform distribution of the field to several instances
of a cluster? I expected to see some way via ProcessorContext...
But may be this is result of the Flink's experience...

Thanks

2016-04-17 12:38 GMT+02:00 Matthias J. Sax <matth...@confluent.io>:

> I guess the simplest way would be to use a constructor parameter:
>
> > public static class CampaignProcessor implements
> ProcessorSupplier<Windowed<String>, List<String[]>>
> > {
> >         private final String jedis_server;
> >
> >         public CampaignProcessor(String jedisServer) {
> >             this.jedis_server = jedisServer;
> >         }
> >
> >         @Override
> >         public Processor<Windowed<String>, List<String[]>> get() {
> >             return new Processor<Windowed<String>, List<String[]>>() {
> >                 CampaignProcessorCommon campaignProcessorCommon;
> >
> >                 @Override
> >                 public void init(ProcessorContext context) {
> > //                    // looks like Flink code over here ;)
> > //                    ParameterTool parameterTool = (ParameterTool)
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
> > //                    parameterTool.getRequired("jedis_server");
> > //                    LOG.info("Opening connection with Jedis to {}",
> parameterTool.getRequired("jedis_server"));
> >
> > //                    this.campaignProcessorCommon = new
> CampaignProcessorCommon(parameterTool.getRequired("jedis_server"));
> >                     //this.campaignProcessorCommon = new
> CampaignProcessorCommon("localhost");
> >                     this.campaignProcessorCommon = new
> CampaignProcessorCommon(jedis_server);
> >                     this.campaignProcessorCommon.prepare();
> >                 }
> >
> >                 @Override
> >                 public void process(Windowed<String> key, List<String[]>
> tupleList) {
> >                     tupleList.forEach(x -> {
> >                         String[] tuple = x;
> >                         String campaign_id = tuple[0];
> >                         String event_time = tuple[2];
> >
>  this.campaignProcessorCommon.execute(campaign_id, event_time);
> >                     });
> >                 }
> >
> >                 @Override
> >                 public void punctuate(long timestamp) {}
> >
> >                 @Override
> >                 public void close() {}
> >             };
> >         }
> >     }
>
>
>
>
>
>
> On 04/16/2016 09:35 PM, rss rss wrote:
> > Hello,
> >
> >   I'm implementing yahoo streaming benchmark with Kafka streaming
> > processing. How to pass a string into Processor or ValueTransformer
> > instance? Accordingly to yahoo benchmark logic I need to transfer an
> > address of Redis server.
> >
> https://github.com/rssdev10/streaming-benchmarks/blob/master/kafka-benchmarks/src/main/java/kafka/benchmark/AdvertisingTopology.java#L212
> >
> > PS: the code above is runnable but not checked on real clusters.
> >
> > Best regards
> >
>
>

Reply via email to