Thanks, will do.

I only want the time stamp to reset when the job comes up with no state.
Checkpoint recoveries should keep the same value.

Jacob

On Sat, Mar 21, 2020 at 10:16 AM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Jacob,
>
> if you could create patch for updating the union state metadata
> documentation that would be great. I can help with the review and merging
> this patch.
>
> If the value stays fixed over the lifetime of the job and you know it
> before starting the job, then you could use the config mechanism. What
> won't work is if for every restart you would need a different value.
> Updating the config after a recovery is not possible.
>
> Cheers,
> Till
>
> On Fri, Mar 20, 2020 at 6:29 PM Jacob Sevart <jsev...@uber.com> wrote:
>
>> Thanks, makes sense.
>>
>> What about using the config mechanism? We're collecting and distributing
>> some environment variables at startup, would it also work to include a
>> timestamp with that?
>>
>> Also, would you be interested in a patch to note the caveat about union
>> state metadata in the documentation?
>>
>> Jacob
>>
>> On Tue, Mar 17, 2020 at 2:51 AM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Did I understand you correctly that you use the union state to
>>> synchronize the per partition state across all operators in order to obtain
>>> a global overview? If this is the case, then this will only work in case of
>>> a failover. Only then, all operators are being restarted with the union of
>>> all operators state. If the job would never fail, then there would never be
>>> an exchange of state.
>>>
>>> If you really need a global view over your data, then you need to create
>>> an operator with a parallelism of 1 which records all the different
>>> timestamps.
>>>
>>> Another idea could be to use the broadcast state pattern [1]. You could
>>> have an operator which extracts the java.time.Instant and outputs them as a
>>> side output and simply forwards the records on the main output. Then you
>>> could use the side output as the broadcast input and the main output as the
>>> normal input into the broadcast operator. The problem with this approach
>>> might be that you don't get order guarantees between the side and the main
>>> output.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html&d=DwMFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=t8gx18WI38mWMMo9o1GAUERpXwVKG5wnYdvT3gBZxo8&s=v2kbM2mYHBcsKjNzFCaaSbg_3vyfYIhoX8stFXSzRnY&e=>
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Mar 17, 2020 at 2:29 AM Jacob Sevart <jsev...@uber.com> wrote:
>>>
>>>> Thanks! That would do it. I've disabled the operator for now.
>>>>
>>>> The purpose was to know the age of the job's state, so that we could
>>>> consider its output in terms of how much context it knows. Regular state
>>>> seemed insufficient because partitions might see their first traffic at
>>>> different times.
>>>>
>>>> How would you go about implementing something like that?
>>>>
>>>> On Mon, Mar 16, 2020 at 1:54 PM Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Jacob,
>>>>>
>>>>> I think you are running into some deficiencies of Flink's union state
>>>>> here. The problem is that for every entry in your list state, Flink stores
>>>>> a separate offset (a long value). The reason for this behaviour is that we
>>>>> use the same state implementation for the union state as well as for the
>>>>> split state. For the latter, the offset information is required to split
>>>>> the state in case of changing the parallelism of your job.
>>>>>
>>>>> My recommendation would be to try to get rid of union state all
>>>>> together. The union state has primarily been introduced to checkpoint some
>>>>> source implementations and might become deprecated due to performance
>>>>> problems once these sources can be checkpointed differently.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Sat, Mar 14, 2020 at 3:23 AM Jacob Sevart <jsev...@uber.com> wrote:
>>>>>
>>>>>> Oh, I should clarify that's 43MB per partition, so with 48 partitions
>>>>>> it explains my 2GB.
>>>>>>
>>>>>> On Fri, Mar 13, 2020 at 7:21 PM Jacob Sevart <jsev...@uber.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Running *Checkpoints.loadCheckpointMetadata *under a debugger, I
>>>>>>> found something:
>>>>>>> *subtaskState.managedOperatorState[0].sateNameToPartitionOffsets("startup-times").offsets.value
>>>>>>>  *weights
>>>>>>> 43MB (5.3 million longs).
>>>>>>>
>>>>>>> "startup-times" is an operator state of mine (union list of
>>>>>>> java.time.Instant). I see a way to end up fewer items in the list, but 
>>>>>>> I'm
>>>>>>> not sure how the actual size is related to the number of offsets. Can 
>>>>>>> you
>>>>>>> elaborate on that?
>>>>>>>
>>>>>>> Incidentally, 42.5MB is the number I got out of
>>>>>>> https://issues.apache.org/jira/browse/FLINK-14618
>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D14618&d=DwMFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=3KZriZyZgBj7mReI9Giq9_Y59NZ6d_4KGE1RkGm2DCI&s=I6LhM2g2btCo31K3ox7TZhtHQiee95biqJf7Hbj9Dbo&e=>.
>>>>>>> So I think my two problems are closely related.
>>>>>>>
>>>>>>> Jacob
>>>>>>>
>>>>>>> On Mon, Mar 9, 2020 at 6:36 AM Congxian Qiu <qcx978132...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi
>>>>>>>>
>>>>>>>> As Gordon said, the metadata will contain the
>>>>>>>> ByteStreamStateHandle, when writing out the ByteStreamStateHandle, will
>>>>>>>> write out the handle name -- which is a path(as you saw). The
>>>>>>>> ByteStreamStateHandle will be created when state size is small than
>>>>>>>> `state.backend.fs.memory-threshold`(default is 1024).
>>>>>>>>
>>>>>>>> If you want to verify this, you can ref the unit test
>>>>>>>> `CheckpointMetadataLoadingTest#testLoadAndValidateSavepoint` and load 
>>>>>>>> the
>>>>>>>> metadata, you can find out that there are many 
>>>>>>>> `ByteStreamStateHandle`, and
>>>>>>>> their names are the strings you saw in the metadata.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Congxian
>>>>>>>>
>>>>>>>>
>>>>>>>> Jacob Sevart <jsev...@uber.com> 于2020年3月6日周五 上午3:57写道:
>>>>>>>>
>>>>>>>>> Thanks, I will monitor that thread.
>>>>>>>>>
>>>>>>>>> I'm having a hard time following the serialization code, but if
>>>>>>>>> you know anything about the layout, tell me if this makes sense. What 
>>>>>>>>> I see
>>>>>>>>> in the hex editor is, first, many HDFS paths. Then gigabytes of 
>>>>>>>>> unreadable
>>>>>>>>> data. Then finally another HDFS path at the end.
>>>>>>>>>
>>>>>>>>> If it is putting state in there, under normal circumstances, does
>>>>>>>>> it make sense that it would be interleaved with metadata? I would 
>>>>>>>>> expect
>>>>>>>>> all the metadata to come first, and then state.
>>>>>>>>>
>>>>>>>>> Jacob
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Jacob
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2020 at 10:53 AM Kostas Kloudas <kklou...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Jacob,
>>>>>>>>>>
>>>>>>>>>> As I said previously I am not 100% sure what can be causing this
>>>>>>>>>> behavior, but this is a related thread here:
>>>>>>>>>>
>>>>>>>>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d-2540-253Cuser.flink.apache.org-253E&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=P3Xd0IFKJTDIG2MMeP-hOSfY4ohoCEUMQEJhvGecSlI&e=
>>>>>>>>>>
>>>>>>>>>> Which you can re-post your problem and monitor for answers.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Kostas
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 4, 2020 at 7:02 PM Jacob Sevart <jsev...@uber.com>
>>>>>>>>>> wrote:
>>>>>>>>>> >
>>>>>>>>>> > Kostas and Gordon,
>>>>>>>>>> >
>>>>>>>>>> > Thanks for the suggestions! I'm on RocksDB. We don't have that
>>>>>>>>>> setting configured so it should be at the default 1024b. This is the 
>>>>>>>>>> full
>>>>>>>>>> "state.*" section showing in the JobManager UI.
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > Jacob
>>>>>>>>>> >
>>>>>>>>>> > On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai <
>>>>>>>>>> tzuli...@apache.org> wrote:
>>>>>>>>>> >>
>>>>>>>>>> >> Hi Jacob,
>>>>>>>>>> >>
>>>>>>>>>> >> Apart from what Klou already mentioned, one slightly possible
>>>>>>>>>> reason:
>>>>>>>>>> >>
>>>>>>>>>> >> If you are using the FsStateBackend, it is also possible that
>>>>>>>>>> your state is small enough to be considered to be stored inline 
>>>>>>>>>> within the
>>>>>>>>>> metadata file.
>>>>>>>>>> >> That is governed by the "state.backend.fs.memory-threshold"
>>>>>>>>>> configuration, with a default value of 1024 bytes, or can also be
>>>>>>>>>> configured with the `fileStateSizeThreshold` argument when 
>>>>>>>>>> constructing the
>>>>>>>>>> `FsStateBackend`.
>>>>>>>>>> >> The purpose of that threshold is to ensure that the backend
>>>>>>>>>> does not create a large amount of very small files, where 
>>>>>>>>>> potentially the
>>>>>>>>>> file pointers are actually larger than the state itself.
>>>>>>>>>> >>
>>>>>>>>>> >> Cheers,
>>>>>>>>>> >> Gordon
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >> On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas <
>>>>>>>>>> kklou...@gmail.com> wrote:
>>>>>>>>>> >>>
>>>>>>>>>> >>> Hi Jacob,
>>>>>>>>>> >>>
>>>>>>>>>> >>> Could you specify which StateBackend you are using?
>>>>>>>>>> >>>
>>>>>>>>>> >>> The reason I am asking is that, from the documentation in [1]:
>>>>>>>>>> >>>
>>>>>>>>>> >>> "Note that if you use the MemoryStateBackend, metadata and
>>>>>>>>>> savepoint
>>>>>>>>>> >>> state will be stored in the _metadata file. Since it is
>>>>>>>>>> >>> self-contained, you may move the file and restore from any
>>>>>>>>>> location."
>>>>>>>>>> >>>
>>>>>>>>>> >>> I am also cc'ing Gordon who may know a bit more about state
>>>>>>>>>> formats.
>>>>>>>>>> >>>
>>>>>>>>>> >>> I hope this helps,
>>>>>>>>>> >>> Kostas
>>>>>>>>>> >>>
>>>>>>>>>> >>> [1]
>>>>>>>>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.6_ops_state_savepoints.html&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=fw0c-Ct21HHJv4MzZRicIaltqHLQOrNvqchzNgCdwkA&e=
>>>>>>>>>> >>>
>>>>>>>>>> >>> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart <jsev...@uber.com>
>>>>>>>>>> wrote:
>>>>>>>>>> >>> >
>>>>>>>>>> >>> > Per the documentation:
>>>>>>>>>> >>> >
>>>>>>>>>> >>> > "The meta data file of a Savepoint contains (primarily)
>>>>>>>>>> pointers to all files on stable storage that are part of the 
>>>>>>>>>> Savepoint, in
>>>>>>>>>> form of absolute paths."
>>>>>>>>>> >>> >
>>>>>>>>>> >>> > I somehow have a _metadata file that's 1.9GB. Running
>>>>>>>>>> strings on it I find 962 strings, most of which look like HDFS 
>>>>>>>>>> paths, which
>>>>>>>>>> leaves a lot of that file-size unexplained. What else is in there, 
>>>>>>>>>> and how
>>>>>>>>>> exactly could this be happening?
>>>>>>>>>> >>> >
>>>>>>>>>> >>> > We're running 1.6.
>>>>>>>>>> >>> >
>>>>>>>>>> >>> > Jacob
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > --
>>>>>>>>>> > Jacob Sevart
>>>>>>>>>> > Software Engineer, Safety
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Jacob Sevart
>>>>>>>>> Software Engineer, Safety
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Jacob Sevart
>>>>>>> Software Engineer, Safety
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Jacob Sevart
>>>>>> Software Engineer, Safety
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Jacob Sevart
>>>> Software Engineer, Safety
>>>>
>>>
>>
>> --
>> Jacob Sevart
>> Software Engineer, Safety
>>
> --
Jacob Sevart
Software Engineer, Safety

Reply via email to