Hi,

you wrote to the Apache Flink development mailing list.
I think your question should go to the Apache Beam user mailing list:
u...@beam.apache.org

Best, Fabian

2018-02-22 14:35 GMT+01:00 shankara <shankara....@gmail.com>:

> I am new to apache beam and spring cloud dataflow. I am trying to integrate
> apache beam in spring cloud dataflow. How to get spring-kafka message as a
> source in beam pipeline ?. How to add spring-kafka as a sink in beam
> pipeline ? Wanted to run pipeline forever untilfinish. Please suggest how
> can I integrate ?
>
> example wordcount PipelineOptions options = PipelineOptionsFactory.create(
> );
>
> Pipeline p = Pipeline.create(options);
>
> p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
> ---->
> instead of TextIO.read().from want to trigger from message channel INPUT in
> spring cloud dataflow
>  .apply(FlatMapElements
>      .into(TypeDescriptors.strings())
>      .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
>  .apply(Filter.by((String word) -> !word.isEmpty()))
>  .apply(Count.<String>perElement())
>  .apply(MapElements
>      .into(TypeDescriptors.strings())
>      .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " +
> wordCount.getValue()))
> .apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
> ----> send the result to message channel OUTPUT
>
> p.run().waitUntilFinish();
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>

Reply via email to