[ 
https://issues.apache.org/jira/browse/BEAM-12554?focusedWorklogId=772224&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-772224
 ]

ASF GitHub Bot logged work on BEAM-12554:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/May/22 03:01
            Start Date: 19/May/22 03:01
    Worklog Time Spent: 10m 
      Work Description: Abacn opened a new pull request, #17708:
URL: https://github.com/apache/beam/pull/17708

   Apparently sink_fn does not produce new instance of FileSink as expected. 
This is due to a bug in the lambda constructed in _get_sink_fn.
   
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
    - [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI.
   




Issue Time Tracking
-------------------

            Worklog Id:     (was: 772224)
    Remaining Estimate: 0h
            Time Spent: 10m

> WriteToFiles destination no changing condition
> ----------------------------------------------
>
>                 Key: BEAM-12554
>                 URL: https://issues.apache.org/jira/browse/BEAM-12554
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.27.0, 2.28.0, 2.29.0, 2.30.0, 2.35.0
>            Reporter: Inigo San Jose Visiers
>            Priority: P1
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> `WriteToFiles` seems to not be working as it should. I have been running some 
> tests and my conclusion is that once the condition for the destination is met 
> once, it is not checked again until a new condition (not seen prev) is met. 
> This results in wrong distribution of elements across files.
> Example code 1:
> {code:python}
> (p | "Create" >> beam.Create(range(100))
>    | beam.Map(lambda x: str(x))
>    | fileio.WriteToFiles(
>               path="./dynamic/",
>               destination=lambda n: "in" if n in ["17"] else "out",
>               sink=fileio.TextSink(),
>               file_naming=fileio.destination_prefix_naming("test"))
> )
> {code}
> Here, the expected result should be a file called "in-xyz" containing only 
> "17" and another called "out-xyz" containing the rest. What we see is that 
> "out" contains numbers 0 to 16 and once 17 condition is met, the rest of 
> numbers would go to "in". So we would have "out" from 0 to 16, "in" from 17 
> on, which is wrong.
> Changing the number shows it too.
> ____________________
> Example code 2:
> {code:python}
> def odd_even(x):
>     value = "even" if int(x) % 2 == 0 else "odd"
>     print(value, x)
>     return value
> (p | "Create" >> beam.Create(range(100))
>    | beam.Map(lambda x: str(x))
>    | fileio.WriteToFiles(
>               path="./dynamic/",
>               destination=odd_even,
>               sink=fileio.TextSink(),
>               file_naming=fileio.destination_prefix_naming("test"))
> )
> {code}
> We can see that the `odd_even` fn is return the right value,  but destination 
> is still wrong. We get "even" only with 0 and "odd" with the rest of numbers, 
> since the condition changed with element "1"
> ____________________
> Example code 3:
> Trying more conditionals or different `file_naming` doesn't fix this
> {code:python}
> def test_15(n):
>     three = "three" if int(n) % 3 == 0 else ""
>     five = "five" if int(n) % 5 == 0 else ""
>     return f"value-{three}{five}"
> def time_format():
>     def _inner(window, pane, shard_index, total_shards, compression, 
> destination):
>         print(window, pane, shard_index, total_shards, compression, 
> destination)
>         return f"dest-{destination}-shards-{shard_index}-of-{total_shards}"
>     return _inner
> (p | "Create" >> beam.Create(range(N))
>    | beam.Map(lambda x: str(x))
>    | fileio.WriteToFiles(
>               path="./dynamic/",
>               destination=test_15,
>               sink=fileio.TextSink(),
>               file_naming=time_format())
> )
> {code}
> adding shards or other variables don't help either.
> I have tested this in different SDKs (27, 29, 30) and Dataflow, DirectRunner, 
> InteractiveRunner



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to