https://github.com/apache/flink/pull/11475

On Sat, Mar 21, 2020 at 10:37 AM Jacob Sevart <jsev...@uber.com> wrote:

> Thanks, will do.
>
> I only want the time stamp to reset when the job comes up with no state.
> Checkpoint recoveries should keep the same value.
>
> Jacob
>
> On Sat, Mar 21, 2020 at 10:16 AM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hi Jacob,
>>
>> if you could create patch for updating the union state metadata
>> documentation that would be great. I can help with the review and merging
>> this patch.
>>
>> If the value stays fixed over the lifetime of the job and you know it
>> before starting the job, then you could use the config mechanism. What
>> won't work is if for every restart you would need a different value.
>> Updating the config after a recovery is not possible.
>>
>> Cheers,
>> Till
>>
>> On Fri, Mar 20, 2020 at 6:29 PM Jacob Sevart <jsev...@uber.com> wrote:
>>
>>> Thanks, makes sense.
>>>
>>> What about using the config mechanism? We're collecting and distributing
>>> some environment variables at startup, would it also work to include a
>>> timestamp with that?
>>>
>>> Also, would you be interested in a patch to note the caveat about union
>>> state metadata in the documentation?
>>>
>>> Jacob
>>>
>>> On Tue, Mar 17, 2020 at 2:51 AM Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Did I understand you correctly that you use the union state to
>>>> synchronize the per partition state across all operators in order to obtain
>>>> a global overview? If this is the case, then this will only work in case of
>>>> a failover. Only then, all operators are being restarted with the union of
>>>> all operators state. If the job would never fail, then there would never be
>>>> an exchange of state.
>>>>
>>>> If you really need a global view over your data, then you need to
>>>> create an operator with a parallelism of 1 which records all the different
>>>> timestamps.
>>>>
>>>> Another idea could be to use the broadcast state pattern [1]. You could
>>>> have an operator which extracts the java.time.Instant and outputs them as a
>>>> side output and simply forwards the records on the main output. Then you
>>>> could use the side output as the broadcast input and the main output as the
>>>> normal input into the broadcast operator. The problem with this approach
>>>> might be that you don't get order guarantees between the side and the main
>>>> output.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html&d=DwMFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=t8gx18WI38mWMMo9o1GAUERpXwVKG5wnYdvT3gBZxo8&s=v2kbM2mYHBcsKjNzFCaaSbg_3vyfYIhoX8stFXSzRnY&e=>
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Mar 17, 2020 at 2:29 AM Jacob Sevart <jsev...@uber.com> wrote:
>>>>
>>>>> Thanks! That would do it. I've disabled the operator for now.
>>>>>
>>>>> The purpose was to know the age of the job's state, so that we could
>>>>> consider its output in terms of how much context it knows. Regular state
>>>>> seemed insufficient because partitions might see their first traffic at
>>>>> different times.
>>>>>
>>>>> How would you go about implementing something like that?
>>>>>
>>>>> On Mon, Mar 16, 2020 at 1:54 PM Till Rohrmann <trohrm...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Jacob,
>>>>>>
>>>>>> I think you are running into some deficiencies of Flink's union state
>>>>>> here. The problem is that for every entry in your list state, Flink 
>>>>>> stores
>>>>>> a separate offset (a long value). The reason for this behaviour is that 
>>>>>> we
>>>>>> use the same state implementation for the union state as well as for the
>>>>>> split state. For the latter, the offset information is required to split
>>>>>> the state in case of changing the parallelism of your job.
>>>>>>
>>>>>> My recommendation would be to try to get rid of union state all
>>>>>> together. The union state has primarily been introduced to checkpoint 
>>>>>> some
>>>>>> source implementations and might become deprecated due to performance
>>>>>> problems once these sources can be checkpointed differently.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Sat, Mar 14, 2020 at 3:23 AM Jacob Sevart <jsev...@uber.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Oh, I should clarify that's 43MB per partition, so with 48
>>>>>>> partitions it explains my 2GB.
>>>>>>>
>>>>>>> On Fri, Mar 13, 2020 at 7:21 PM Jacob Sevart <jsev...@uber.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Running *Checkpoints.loadCheckpointMetadata *under a debugger, I
>>>>>>>> found something:
>>>>>>>> *subtaskState.managedOperatorState[0].sateNameToPartitionOffsets("startup-times").offsets.value
>>>>>>>>  *weights
>>>>>>>> 43MB (5.3 million longs).
>>>>>>>>
>>>>>>>> "startup-times" is an operator state of mine (union list of
>>>>>>>> java.time.Instant). I see a way to end up fewer items in the list, but 
>>>>>>>> I'm
>>>>>>>> not sure how the actual size is related to the number of offsets. Can 
>>>>>>>> you
>>>>>>>> elaborate on that?
>>>>>>>>
>>>>>>>> Incidentally, 42.5MB is the number I got out of
>>>>>>>> https://issues.apache.org/jira/browse/FLINK-14618
>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D14618&d=DwMFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=3KZriZyZgBj7mReI9Giq9_Y59NZ6d_4KGE1RkGm2DCI&s=I6LhM2g2btCo31K3ox7TZhtHQiee95biqJf7Hbj9Dbo&e=>.
>>>>>>>> So I think my two problems are closely related.
>>>>>>>>
>>>>>>>> Jacob
>>>>>>>>
>>>>>>>> On Mon, Mar 9, 2020 at 6:36 AM Congxian Qiu <qcx978132...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi
>>>>>>>>>
>>>>>>>>> As Gordon said, the metadata will contain the
>>>>>>>>> ByteStreamStateHandle, when writing out the ByteStreamStateHandle, 
>>>>>>>>> will
>>>>>>>>> write out the handle name -- which is a path(as you saw). The
>>>>>>>>> ByteStreamStateHandle will be created when state size is small than
>>>>>>>>> `state.backend.fs.memory-threshold`(default is 1024).
>>>>>>>>>
>>>>>>>>> If you want to verify this, you can ref the unit test
>>>>>>>>> `CheckpointMetadataLoadingTest#testLoadAndValidateSavepoint` and load 
>>>>>>>>> the
>>>>>>>>> metadata, you can find out that there are many 
>>>>>>>>> `ByteStreamStateHandle`, and
>>>>>>>>> their names are the strings you saw in the metadata.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Congxian
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Jacob Sevart <jsev...@uber.com> 于2020年3月6日周五 上午3:57写道:
>>>>>>>>>
>>>>>>>>>> Thanks, I will monitor that thread.
>>>>>>>>>>
>>>>>>>>>> I'm having a hard time following the serialization code, but if
>>>>>>>>>> you know anything about the layout, tell me if this makes sense. 
>>>>>>>>>> What I see
>>>>>>>>>> in the hex editor is, first, many HDFS paths. Then gigabytes of 
>>>>>>>>>> unreadable
>>>>>>>>>> data. Then finally another HDFS path at the end.
>>>>>>>>>>
>>>>>>>>>> If it is putting state in there, under normal circumstances, does
>>>>>>>>>> it make sense that it would be interleaved with metadata? I would 
>>>>>>>>>> expect
>>>>>>>>>> all the metadata to come first, and then state.
>>>>>>>>>>
>>>>>>>>>> Jacob
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Jacob
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2020 at 10:53 AM Kostas Kloudas <
>>>>>>>>>> kklou...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Jacob,
>>>>>>>>>>>
>>>>>>>>>>> As I said previously I am not 100% sure what can be causing this
>>>>>>>>>>> behavior, but this is a related thread here:
>>>>>>>>>>>
>>>>>>>>>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d-2540-253Cuser.flink.apache.org-253E&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=P3Xd0IFKJTDIG2MMeP-hOSfY4ohoCEUMQEJhvGecSlI&e=
>>>>>>>>>>>
>>>>>>>>>>> Which you can re-post your problem and monitor for answers.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Kostas
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Mar 4, 2020 at 7:02 PM Jacob Sevart <jsev...@uber.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > Kostas and Gordon,
>>>>>>>>>>> >
>>>>>>>>>>> > Thanks for the suggestions! I'm on RocksDB. We don't have that
>>>>>>>>>>> setting configured so it should be at the default 1024b. This is 
>>>>>>>>>>> the full
>>>>>>>>>>> "state.*" section showing in the JobManager UI.
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > Jacob
>>>>>>>>>>> >
>>>>>>>>>>> > On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai <
>>>>>>>>>>> tzuli...@apache.org> wrote:
>>>>>>>>>>> >>
>>>>>>>>>>> >> Hi Jacob,
>>>>>>>>>>> >>
>>>>>>>>>>> >> Apart from what Klou already mentioned, one slightly possible
>>>>>>>>>>> reason:
>>>>>>>>>>> >>
>>>>>>>>>>> >> If you are using the FsStateBackend, it is also possible that
>>>>>>>>>>> your state is small enough to be considered to be stored inline 
>>>>>>>>>>> within the
>>>>>>>>>>> metadata file.
>>>>>>>>>>> >> That is governed by the "state.backend.fs.memory-threshold"
>>>>>>>>>>> configuration, with a default value of 1024 bytes, or can also be
>>>>>>>>>>> configured with the `fileStateSizeThreshold` argument when 
>>>>>>>>>>> constructing the
>>>>>>>>>>> `FsStateBackend`.
>>>>>>>>>>> >> The purpose of that threshold is to ensure that the backend
>>>>>>>>>>> does not create a large amount of very small files, where 
>>>>>>>>>>> potentially the
>>>>>>>>>>> file pointers are actually larger than the state itself.
>>>>>>>>>>> >>
>>>>>>>>>>> >> Cheers,
>>>>>>>>>>> >> Gordon
>>>>>>>>>>> >>
>>>>>>>>>>> >>
>>>>>>>>>>> >>
>>>>>>>>>>> >> On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas <
>>>>>>>>>>> kklou...@gmail.com> wrote:
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> Hi Jacob,
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> Could you specify which StateBackend you are using?
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> The reason I am asking is that, from the documentation in
>>>>>>>>>>> [1]:
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> "Note that if you use the MemoryStateBackend, metadata and
>>>>>>>>>>> savepoint
>>>>>>>>>>> >>> state will be stored in the _metadata file. Since it is
>>>>>>>>>>> >>> self-contained, you may move the file and restore from any
>>>>>>>>>>> location."
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> I am also cc'ing Gordon who may know a bit more about state
>>>>>>>>>>> formats.
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> I hope this helps,
>>>>>>>>>>> >>> Kostas
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> [1]
>>>>>>>>>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.6_ops_state_savepoints.html&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=fw0c-Ct21HHJv4MzZRicIaltqHLQOrNvqchzNgCdwkA&e=
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart <
>>>>>>>>>>> jsev...@uber.com> wrote:
>>>>>>>>>>> >>> >
>>>>>>>>>>> >>> > Per the documentation:
>>>>>>>>>>> >>> >
>>>>>>>>>>> >>> > "The meta data file of a Savepoint contains (primarily)
>>>>>>>>>>> pointers to all files on stable storage that are part of the 
>>>>>>>>>>> Savepoint, in
>>>>>>>>>>> form of absolute paths."
>>>>>>>>>>> >>> >
>>>>>>>>>>> >>> > I somehow have a _metadata file that's 1.9GB. Running
>>>>>>>>>>> strings on it I find 962 strings, most of which look like HDFS 
>>>>>>>>>>> paths, which
>>>>>>>>>>> leaves a lot of that file-size unexplained. What else is in there, 
>>>>>>>>>>> and how
>>>>>>>>>>> exactly could this be happening?
>>>>>>>>>>> >>> >
>>>>>>>>>>> >>> > We're running 1.6.
>>>>>>>>>>> >>> >
>>>>>>>>>>> >>> > Jacob
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > --
>>>>>>>>>>> > Jacob Sevart
>>>>>>>>>>> > Software Engineer, Safety
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Jacob Sevart
>>>>>>>>>> Software Engineer, Safety
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Jacob Sevart
>>>>>>>> Software Engineer, Safety
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Jacob Sevart
>>>>>>> Software Engineer, Safety
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Jacob Sevart
>>>>> Software Engineer, Safety
>>>>>
>>>>
>>>
>>> --
>>> Jacob Sevart
>>> Software Engineer, Safety
>>>
>> --
> Jacob Sevart
> Software Engineer, Safety
>


-- 
Jacob Sevart
Software Engineer, Safety

Reply via email to