Hi Alexey, You don't have to set the streaming mode. The Flink Runner will automatically choose to use streaming mode when it discovers UnboundedSources like Kafka. I'm wondering why that didn't work in your case. I just ran your example and it chose streaming mode and didn't return an error during pipeline translation.
So I'm curious, which version of Beam are you working with? Best, Max On Wed, Aug 31, 2016 at 12:34 PM, Demin Alexey <diomi...@gmail.com> wrote: > Thanks > > with if (translator == null || !applyCanTranslate(transform, node, > translator)) all working as expectected > > > Regards, > Alexey Diomin > > > 2016-08-31 14:12 GMT+04:00 Aljoscha Krettek <aljos...@apache.org>: > >> Ah I see, an unbounded source, such as the Kafka source does not work in >> batch mode (which streamStreaming(false) enables). The code should work in >> streaming mode if you apply some window that is compatible with the >> side-input window to the main input. >> >> I think the code in streaming still works because there cannot be cases >> where the translator is null right now. The correct check should be this, >> though: >> if (translator == null || !applyCanTranslate(transform, node, translator)) >> >> Cheers, >> Aljoscha >> >> On Wed, 31 Aug 2016 at 12:07 Demin Alexey <diomi...@gmail.com> wrote: >> >> > Program for reproduce >> > >> > https://gist.github.com/xhumanoid/d784a4463a45e68acb124709a521156e >> > >> > 1) options.setStreaming(false); - we have NPE and i can't understand how >> > code work >> > 2) options.setStreaming(true); - pipeline can compile (he still have >> > error, but it's my incorrect work with window) >> > >> > >> > 2016-08-31 13:53 GMT+04:00 Demin Alexey <diomi...@gmail.com>: >> > >> > > Hi >> > > >> > > If we can change code on translator != null then next line ( >> > > applyStreamingTransform(transform, node, translator); ) will cause NPE >> > > >> > > It's main problem why I don't understand code: >> > > >> > > x = null; >> > > if (x == null && f1_null_value_forbid(x)) { ..} >> > > f2_null_value_forbid(x); >> > > >> > > change (x == null) => (x !=null) simple change point of NPE >> > > >> > > >> > > 2016-08-31 13:43 GMT+04:00 Aljoscha Krettek <aljos...@apache.org>: >> > > >> > >> Hi, >> > >> I think this is more suited for the Beam dev list. Nevertheless, I >> think >> > >> this is a coding error and the condition should be >> > >> if (translator != null && !applyCanTranslate(transform, node, >> > translator)) >> > >> >> > >> With what program did you encounter an NPE, it seems to me that this >> > >> should >> > >> rarely happen, at least it doesn't happen in all the Beam runner >> tests. >> > >> >> > >> Cheers, >> > >> Aljoscha >> > >> >> > >> On Wed, 31 Aug 2016 at 11:27 Demin Alexey <diomi...@gmail.com> wrote: >> > >> >> > >> > Hi >> > >> > >> > >> > Sorry if i mistake with mailing list. >> > >> > >> > >> > After BEAM-102 was solved in FlinkStreamingPipelineTranslator we >> have >> > >> code >> > >> > in visitPrimitiveTransform: >> > >> > >> > >> > >> > >> > if (translator == null && applyCanTranslate(transform, node, >> > >> translator)) { >> > >> > LOG.info(node.getTransform().getClass().toString()); >> > >> > throw new UnsupportedOperationException( >> > >> > "The transform " + transform + " is currently not >> supported."); >> > >> > } >> > >> > applyStreamingTransform(transform, node, translator); >> > >> > >> > >> > >> > >> > but applyCanTranslate and applyStreamingTransform always require >> > NotNull >> > >> > translator >> > >> > as result if you try use side input in your code then you will cause >> > NPE >> > >> > >> > >> > Maybe Aljoscha Krettek could describe how this code must work? >> > >> > >> > >> > >> > >> > Regards, >> > >> > Alexey Diomin >> > >> > >> > >> >> > > >> > > >> > >>