Thanks with if (translator == null || !applyCanTranslate(transform, node, translator)) all working as expectected
Regards, Alexey Diomin 2016-08-31 14:12 GMT+04:00 Aljoscha Krettek <aljos...@apache.org>: > Ah I see, an unbounded source, such as the Kafka source does not work in > batch mode (which streamStreaming(false) enables). The code should work in > streaming mode if you apply some window that is compatible with the > side-input window to the main input. > > I think the code in streaming still works because there cannot be cases > where the translator is null right now. The correct check should be this, > though: > if (translator == null || !applyCanTranslate(transform, node, translator)) > > Cheers, > Aljoscha > > On Wed, 31 Aug 2016 at 12:07 Demin Alexey <diomi...@gmail.com> wrote: > > > Program for reproduce > > > > https://gist.github.com/xhumanoid/d784a4463a45e68acb124709a521156e > > > > 1) options.setStreaming(false); - we have NPE and i can't understand how > > code work > > 2) options.setStreaming(true); - pipeline can compile (he still have > > error, but it's my incorrect work with window) > > > > > > 2016-08-31 13:53 GMT+04:00 Demin Alexey <diomi...@gmail.com>: > > > > > Hi > > > > > > If we can change code on translator != null then next line ( > > > applyStreamingTransform(transform, node, translator); ) will cause NPE > > > > > > It's main problem why I don't understand code: > > > > > > x = null; > > > if (x == null && f1_null_value_forbid(x)) { ..} > > > f2_null_value_forbid(x); > > > > > > change (x == null) => (x !=null) simple change point of NPE > > > > > > > > > 2016-08-31 13:43 GMT+04:00 Aljoscha Krettek <aljos...@apache.org>: > > > > > >> Hi, > > >> I think this is more suited for the Beam dev list. Nevertheless, I > think > > >> this is a coding error and the condition should be > > >> if (translator != null && !applyCanTranslate(transform, node, > > translator)) > > >> > > >> With what program did you encounter an NPE, it seems to me that this > > >> should > > >> rarely happen, at least it doesn't happen in all the Beam runner > tests. > > >> > > >> Cheers, > > >> Aljoscha > > >> > > >> On Wed, 31 Aug 2016 at 11:27 Demin Alexey <diomi...@gmail.com> wrote: > > >> > > >> > Hi > > >> > > > >> > Sorry if i mistake with mailing list. > > >> > > > >> > After BEAM-102 was solved in FlinkStreamingPipelineTranslator we > have > > >> code > > >> > in visitPrimitiveTransform: > > >> > > > >> > > > >> > if (translator == null && applyCanTranslate(transform, node, > > >> translator)) { > > >> > LOG.info(node.getTransform().getClass().toString()); > > >> > throw new UnsupportedOperationException( > > >> > "The transform " + transform + " is currently not > supported."); > > >> > } > > >> > applyStreamingTransform(transform, node, translator); > > >> > > > >> > > > >> > but applyCanTranslate and applyStreamingTransform always require > > NotNull > > >> > translator > > >> > as result if you try use side input in your code then you will cause > > NPE > > >> > > > >> > Maybe Aljoscha Krettek could describe how this code must work? > > >> > > > >> > > > >> > Regards, > > >> > Alexey Diomin > > >> > > > >> > > > > > > > > >