[
https://issues.apache.org/jira/browse/BEAM-4132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17510883#comment-17510883
]
Valentyn Tymofieiev commented on BEAM-4132:
-------------------------------------------
Ok, looks like the issue is reproducible only on the default output after
Partitioning, but not on other outputs. Although inference there also seems
suboptimal. Repro:
{noformat}
import apache_beam as beam
PLAYER_1 = 'Team A Player 1'
PLAYER_2 = 'Team B Player 2'
PLAYER_3 = 'Player 3'
def split_fn(kv):
k, v = kv
if 'Team A' in k:
yield beam.pvalue.TaggedOutput('TeamA', kv)
elif 'Team B' in k:
yield beam.pvalue.TaggedOutput('TeamB', kv)
else:
yield kv
with beam.Pipeline() as p:
scores = p | beam.Create([(PLAYER_1, 15), (PLAYER_2, 10), (PLAYER_1, 100),
(PLAYER_3, 25), (PLAYER_2, 75)])
partitioned = scores | beam.FlatMap(split_fn).with_outputs('TeamA', 'TeamB',
main='UnknownTeam')
team_a = partitioned['TeamA']
print("Partitioned output element type:", team_a.element_type)
unknown_team = partitioned['UnknownTeam']
print("Default / main output element type:", unknown_team.element_type)
team_a | "GBK passes" >> beam.GroupByKey()
unknown_team | "GBK fails with type inference error" >> beam.GroupByKey()
{noformat}
Output:
{noformat}
$ python -m pipeline
Partitioned output element type: Any <-- loss of typing info
Default / main output element type: Union[TaggedOutput, Tuple[str, int]] <--
Having TaggedOutput here likely not helpful.
# Second GBK fails with:
Traceback (most recent call last):
File "/home/valentyn/.pyenv/versions/3.9.4/lib/python3.9/runpy.py", line 197,
in _run_module_as_main
return _run_code(code, main_globals, None,
File "/home/valentyn/.pyenv/versions/3.9.4/lib/python3.9/runpy.py", line 87,
in _run_code
exec(code, run_globals)
File "/tmp/pipe1.py", line 34, in <module>
unknown_team | "GBK fails with type inference error" >> beam.GroupByKey()
File
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/pvalue.py",
line 137, in __or__
return self.pipeline.apply(ptransform, self)
File
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/pipeline.py",
line 651, in apply
return self.apply(
File
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/pipeline.py",
line 662, in apply
return self.apply(transform, pvalueish)
File
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/pipeline.py",
line 706, in apply
transform.type_check_inputs(pvalueish)
File
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/transforms/ptransform.py",
line 457, in type_check_inputs
self.type_check_inputs_or_outputs(pvalueish, 'input')
File
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/transforms/ptransform.py",
line 486, in type_check_inputs_or_outputs
raise TypeCheckError(
apache_beam.typehints.decorators.TypeCheckError: Input type hint violation at
GBK fails with type inference error: expected Tuple[TypeVariable[K],
TypeVariable[V]], got Union[TaggedOutput, Tuple[str, int]]
Full type hint:
IOTypeHints[inputs=((Tuple[TypeVariable[K], TypeVariable[V]],), {}),
outputs=((Tuple[TypeVariable[K], Iterable[TypeVariable[V]]],), {})]
File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 790, in exec_module
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/transforms/core.py",
line 2546, in <module>
class GroupByKey(PTransform):
File
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/typehints/decorators.py",
line 775, in annotate_input_types
th = getattr(f, '_type_hints', IOTypeHints.empty()).with_input_types(
based on:
IOTypeHints[inputs=None, outputs=((Tuple[TypeVariable[K],
Iterable[TypeVariable[V]]],), {})]
File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 790, in exec_module
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/transforms/core.py",
line 2546, in <module>
class GroupByKey(PTransform):
File
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/typehints/decorators.py",
line 863, in annotate_output_types
f._type_hints = th.with_output_types(return_type_hint) # pylint:
disable=protected-access
{noformat}
> Element type inference doesn't work for multi-output DoFns
> ----------------------------------------------------------
>
> Key: BEAM-4132
> URL: https://issues.apache.org/jira/browse/BEAM-4132
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: 2.4.0
> Reporter: Chuan Yu Foo
> Priority: P3
> Labels: types
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> TLDR: if you have a multi-output DoFn, then the non-main PCollections with
> incorrectly have their element types set to None. This affects type checking
> for pipelines involving these PCollections.
> Minimal example:
> {code}
> import apache_beam as beam
> class TripleDoFn(beam.DoFn):
> def process(self, elem):
> yield_elem
> if elem % 2 == 0:
> yield beam.pvalue.TaggedOutput('ten_times', elem * 10)
> if elem % 3 == 0:
> yield beam.pvalue.TaggedOutput('hundred_times', elem * 100)
>
> @beam.typehints.with_input_types(int)
> @beam.typehints.with_output_types(int)
> class MultiplyBy(beam.DoFn):
> def __init__(self, multiplier):
> self._multiplier = multiplier
> def process(self, elem):
> return elem * self._multiplier
>
> def main():
> with beam.Pipeline() as p:
> x, a, b = (
> p
> | 'Create' >> beam.Create([1, 2, 3])
> | 'TripleDo' >> beam.ParDo(TripleDoFn()).with_outputs(
> 'ten_times', 'hundred_times', main='main_output'))
> _ = a | 'MultiplyBy2' >> beam.ParDo(MultiplyBy(2))
> if __name__ == '__main__':
> main()
> {code}
> Running this yields the following error:
> {noformat}
> apache_beam.typehints.decorators.TypeCheckError: Type hint violation for
> 'MultiplyBy2': requires <type 'int'> but got None for elem
> {noformat}
> Replacing {{a}} with {{b}}Â yields the same error. Replacing {{a}} with {{x}}
> instead yields the following error:
> {noformat}
> apache_beam.typehints.decorators.TypeCheckError: Type hint violation for
> 'MultiplyBy2': requires <type 'int'> but got Union[TaggedOutput, int] for elem
> {noformat}
> I would expect Beam to correctly infer that {{a}} and {{b}} have element
> types of {{int}} rather than {{None}}, and I would also expect Beam to
> correctly figure out that the element types of {{x}} are compatible with
> {{int}}.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)