Ah I see, an unbounded source, such as the Kafka source does not work in
batch mode (which streamStreaming(false) enables). The code should work in
streaming mode if you apply some window that is compatible with the
side-input window to the main input.

I think the code in streaming still works because there cannot be cases
where the translator is null right now. The correct check should be this,
though:
if (translator == null || !applyCanTranslate(transform, node, translator))

Cheers,
Aljoscha

On Wed, 31 Aug 2016 at 12:07 Demin Alexey <diomi...@gmail.com> wrote:

> Program for reproduce
>
> https://gist.github.com/xhumanoid/d784a4463a45e68acb124709a521156e
>
> 1) options.setStreaming(false);  - we have NPE and i can't understand how
> code work
> 2) options.setStreaming(true);  - pipeline can compile (he still have
> error, but it's my incorrect work with window)
>
>
> 2016-08-31 13:53 GMT+04:00 Demin Alexey <diomi...@gmail.com>:
>
> > Hi
> >
> > If we can change code on translator != null then next line (
> > applyStreamingTransform(transform, node, translator); ) will cause NPE
> >
> > It's main problem why I don't understand code:
> >
> > x = null;
> > if (x == null && f1_null_value_forbid(x)) { ..}
> > f2_null_value_forbid(x);
> >
> > change (x == null) => (x !=null) simple change point of NPE
> >
> >
> > 2016-08-31 13:43 GMT+04:00 Aljoscha Krettek <aljos...@apache.org>:
> >
> >> Hi,
> >> I think this is more suited for the Beam dev list. Nevertheless, I think
> >> this is a coding error and the condition should be
> >> if (translator != null && !applyCanTranslate(transform, node,
> translator))
> >>
> >> With what program did you encounter an NPE, it seems to me that this
> >> should
> >> rarely happen, at least it doesn't happen in all the Beam runner tests.
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Wed, 31 Aug 2016 at 11:27 Demin Alexey <diomi...@gmail.com> wrote:
> >>
> >> > Hi
> >> >
> >> > Sorry if i mistake with mailing list.
> >> >
> >> > After  BEAM-102 was solved in FlinkStreamingPipelineTranslator we have
> >> code
> >> > in visitPrimitiveTransform:
> >> >
> >> >
> >> > if (translator == null && applyCanTranslate(transform, node,
> >> translator)) {
> >> >   LOG.info(node.getTransform().getClass().toString());
> >> >   throw new UnsupportedOperationException(
> >> >       "The transform " + transform + " is currently not supported.");
> >> > }
> >> > applyStreamingTransform(transform, node, translator);
> >> >
> >> >
> >> > but applyCanTranslate and applyStreamingTransform always require
> NotNull
> >> > translator
> >> > as result if you try use side input in your code then you will cause
> NPE
> >> >
> >> > Maybe Aljoscha Krettek could describe how this code must work?
> >> >
> >> >
> >> > Regards,
> >> > Alexey Diomin
> >> >
> >>
> >
> >
>

Reply via email to