https://github.com/apache/flink/pull/11475
On Sat, Mar 21, 2020 at 10:37 AM Jacob Sevart <jsev...@uber.com> wrote: > Thanks, will do. > > I only want the time stamp to reset when the job comes up with no state. > Checkpoint recoveries should keep the same value. > > Jacob > > On Sat, Mar 21, 2020 at 10:16 AM Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Jacob, >> >> if you could create patch for updating the union state metadata >> documentation that would be great. I can help with the review and merging >> this patch. >> >> If the value stays fixed over the lifetime of the job and you know it >> before starting the job, then you could use the config mechanism. What >> won't work is if for every restart you would need a different value. >> Updating the config after a recovery is not possible. >> >> Cheers, >> Till >> >> On Fri, Mar 20, 2020 at 6:29 PM Jacob Sevart <jsev...@uber.com> wrote: >> >>> Thanks, makes sense. >>> >>> What about using the config mechanism? We're collecting and distributing >>> some environment variables at startup, would it also work to include a >>> timestamp with that? >>> >>> Also, would you be interested in a patch to note the caveat about union >>> state metadata in the documentation? >>> >>> Jacob >>> >>> On Tue, Mar 17, 2020 at 2:51 AM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Did I understand you correctly that you use the union state to >>>> synchronize the per partition state across all operators in order to obtain >>>> a global overview? If this is the case, then this will only work in case of >>>> a failover. Only then, all operators are being restarted with the union of >>>> all operators state. If the job would never fail, then there would never be >>>> an exchange of state. >>>> >>>> If you really need a global view over your data, then you need to >>>> create an operator with a parallelism of 1 which records all the different >>>> timestamps. >>>> >>>> Another idea could be to use the broadcast state pattern [1]. You could >>>> have an operator which extracts the java.time.Instant and outputs them as a >>>> side output and simply forwards the records on the main output. Then you >>>> could use the side output as the broadcast input and the main output as the >>>> normal input into the broadcast operator. The problem with this approach >>>> might be that you don't get order guarantees between the side and the main >>>> output. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html >>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html&d=DwMFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=t8gx18WI38mWMMo9o1GAUERpXwVKG5wnYdvT3gBZxo8&s=v2kbM2mYHBcsKjNzFCaaSbg_3vyfYIhoX8stFXSzRnY&e=> >>>> >>>> Cheers, >>>> Till >>>> >>>> On Tue, Mar 17, 2020 at 2:29 AM Jacob Sevart <jsev...@uber.com> wrote: >>>> >>>>> Thanks! That would do it. I've disabled the operator for now. >>>>> >>>>> The purpose was to know the age of the job's state, so that we could >>>>> consider its output in terms of how much context it knows. Regular state >>>>> seemed insufficient because partitions might see their first traffic at >>>>> different times. >>>>> >>>>> How would you go about implementing something like that? >>>>> >>>>> On Mon, Mar 16, 2020 at 1:54 PM Till Rohrmann <trohrm...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Jacob, >>>>>> >>>>>> I think you are running into some deficiencies of Flink's union state >>>>>> here. The problem is that for every entry in your list state, Flink >>>>>> stores >>>>>> a separate offset (a long value). The reason for this behaviour is that >>>>>> we >>>>>> use the same state implementation for the union state as well as for the >>>>>> split state. For the latter, the offset information is required to split >>>>>> the state in case of changing the parallelism of your job. >>>>>> >>>>>> My recommendation would be to try to get rid of union state all >>>>>> together. The union state has primarily been introduced to checkpoint >>>>>> some >>>>>> source implementations and might become deprecated due to performance >>>>>> problems once these sources can be checkpointed differently. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Sat, Mar 14, 2020 at 3:23 AM Jacob Sevart <jsev...@uber.com> >>>>>> wrote: >>>>>> >>>>>>> Oh, I should clarify that's 43MB per partition, so with 48 >>>>>>> partitions it explains my 2GB. >>>>>>> >>>>>>> On Fri, Mar 13, 2020 at 7:21 PM Jacob Sevart <jsev...@uber.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Running *Checkpoints.loadCheckpointMetadata *under a debugger, I >>>>>>>> found something: >>>>>>>> *subtaskState.managedOperatorState[0].sateNameToPartitionOffsets("startup-times").offsets.value >>>>>>>> *weights >>>>>>>> 43MB (5.3 million longs). >>>>>>>> >>>>>>>> "startup-times" is an operator state of mine (union list of >>>>>>>> java.time.Instant). I see a way to end up fewer items in the list, but >>>>>>>> I'm >>>>>>>> not sure how the actual size is related to the number of offsets. Can >>>>>>>> you >>>>>>>> elaborate on that? >>>>>>>> >>>>>>>> Incidentally, 42.5MB is the number I got out of >>>>>>>> https://issues.apache.org/jira/browse/FLINK-14618 >>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D14618&d=DwMFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=3KZriZyZgBj7mReI9Giq9_Y59NZ6d_4KGE1RkGm2DCI&s=I6LhM2g2btCo31K3ox7TZhtHQiee95biqJf7Hbj9Dbo&e=>. >>>>>>>> So I think my two problems are closely related. >>>>>>>> >>>>>>>> Jacob >>>>>>>> >>>>>>>> On Mon, Mar 9, 2020 at 6:36 AM Congxian Qiu <qcx978132...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi >>>>>>>>> >>>>>>>>> As Gordon said, the metadata will contain the >>>>>>>>> ByteStreamStateHandle, when writing out the ByteStreamStateHandle, >>>>>>>>> will >>>>>>>>> write out the handle name -- which is a path(as you saw). The >>>>>>>>> ByteStreamStateHandle will be created when state size is small than >>>>>>>>> `state.backend.fs.memory-threshold`(default is 1024). >>>>>>>>> >>>>>>>>> If you want to verify this, you can ref the unit test >>>>>>>>> `CheckpointMetadataLoadingTest#testLoadAndValidateSavepoint` and load >>>>>>>>> the >>>>>>>>> metadata, you can find out that there are many >>>>>>>>> `ByteStreamStateHandle`, and >>>>>>>>> their names are the strings you saw in the metadata. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Congxian >>>>>>>>> >>>>>>>>> >>>>>>>>> Jacob Sevart <jsev...@uber.com> 于2020年3月6日周五 上午3:57写道: >>>>>>>>> >>>>>>>>>> Thanks, I will monitor that thread. >>>>>>>>>> >>>>>>>>>> I'm having a hard time following the serialization code, but if >>>>>>>>>> you know anything about the layout, tell me if this makes sense. >>>>>>>>>> What I see >>>>>>>>>> in the hex editor is, first, many HDFS paths. Then gigabytes of >>>>>>>>>> unreadable >>>>>>>>>> data. Then finally another HDFS path at the end. >>>>>>>>>> >>>>>>>>>> If it is putting state in there, under normal circumstances, does >>>>>>>>>> it make sense that it would be interleaved with metadata? I would >>>>>>>>>> expect >>>>>>>>>> all the metadata to come first, and then state. >>>>>>>>>> >>>>>>>>>> Jacob >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Jacob >>>>>>>>>> >>>>>>>>>> On Thu, Mar 5, 2020 at 10:53 AM Kostas Kloudas < >>>>>>>>>> kklou...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Jacob, >>>>>>>>>>> >>>>>>>>>>> As I said previously I am not 100% sure what can be causing this >>>>>>>>>>> behavior, but this is a related thread here: >>>>>>>>>>> >>>>>>>>>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d-2540-253Cuser.flink.apache.org-253E&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=P3Xd0IFKJTDIG2MMeP-hOSfY4ohoCEUMQEJhvGecSlI&e= >>>>>>>>>>> >>>>>>>>>>> Which you can re-post your problem and monitor for answers. >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Kostas >>>>>>>>>>> >>>>>>>>>>> On Wed, Mar 4, 2020 at 7:02 PM Jacob Sevart <jsev...@uber.com> >>>>>>>>>>> wrote: >>>>>>>>>>> > >>>>>>>>>>> > Kostas and Gordon, >>>>>>>>>>> > >>>>>>>>>>> > Thanks for the suggestions! I'm on RocksDB. We don't have that >>>>>>>>>>> setting configured so it should be at the default 1024b. This is >>>>>>>>>>> the full >>>>>>>>>>> "state.*" section showing in the JobManager UI. >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > Jacob >>>>>>>>>>> > >>>>>>>>>>> > On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai < >>>>>>>>>>> tzuli...@apache.org> wrote: >>>>>>>>>>> >> >>>>>>>>>>> >> Hi Jacob, >>>>>>>>>>> >> >>>>>>>>>>> >> Apart from what Klou already mentioned, one slightly possible >>>>>>>>>>> reason: >>>>>>>>>>> >> >>>>>>>>>>> >> If you are using the FsStateBackend, it is also possible that >>>>>>>>>>> your state is small enough to be considered to be stored inline >>>>>>>>>>> within the >>>>>>>>>>> metadata file. >>>>>>>>>>> >> That is governed by the "state.backend.fs.memory-threshold" >>>>>>>>>>> configuration, with a default value of 1024 bytes, or can also be >>>>>>>>>>> configured with the `fileStateSizeThreshold` argument when >>>>>>>>>>> constructing the >>>>>>>>>>> `FsStateBackend`. >>>>>>>>>>> >> The purpose of that threshold is to ensure that the backend >>>>>>>>>>> does not create a large amount of very small files, where >>>>>>>>>>> potentially the >>>>>>>>>>> file pointers are actually larger than the state itself. >>>>>>>>>>> >> >>>>>>>>>>> >> Cheers, >>>>>>>>>>> >> Gordon >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas < >>>>>>>>>>> kklou...@gmail.com> wrote: >>>>>>>>>>> >>> >>>>>>>>>>> >>> Hi Jacob, >>>>>>>>>>> >>> >>>>>>>>>>> >>> Could you specify which StateBackend you are using? >>>>>>>>>>> >>> >>>>>>>>>>> >>> The reason I am asking is that, from the documentation in >>>>>>>>>>> [1]: >>>>>>>>>>> >>> >>>>>>>>>>> >>> "Note that if you use the MemoryStateBackend, metadata and >>>>>>>>>>> savepoint >>>>>>>>>>> >>> state will be stored in the _metadata file. Since it is >>>>>>>>>>> >>> self-contained, you may move the file and restore from any >>>>>>>>>>> location." >>>>>>>>>>> >>> >>>>>>>>>>> >>> I am also cc'ing Gordon who may know a bit more about state >>>>>>>>>>> formats. >>>>>>>>>>> >>> >>>>>>>>>>> >>> I hope this helps, >>>>>>>>>>> >>> Kostas >>>>>>>>>>> >>> >>>>>>>>>>> >>> [1] >>>>>>>>>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.6_ops_state_savepoints.html&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=fw0c-Ct21HHJv4MzZRicIaltqHLQOrNvqchzNgCdwkA&e= >>>>>>>>>>> >>> >>>>>>>>>>> >>> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart < >>>>>>>>>>> jsev...@uber.com> wrote: >>>>>>>>>>> >>> > >>>>>>>>>>> >>> > Per the documentation: >>>>>>>>>>> >>> > >>>>>>>>>>> >>> > "The meta data file of a Savepoint contains (primarily) >>>>>>>>>>> pointers to all files on stable storage that are part of the >>>>>>>>>>> Savepoint, in >>>>>>>>>>> form of absolute paths." >>>>>>>>>>> >>> > >>>>>>>>>>> >>> > I somehow have a _metadata file that's 1.9GB. Running >>>>>>>>>>> strings on it I find 962 strings, most of which look like HDFS >>>>>>>>>>> paths, which >>>>>>>>>>> leaves a lot of that file-size unexplained. What else is in there, >>>>>>>>>>> and how >>>>>>>>>>> exactly could this be happening? >>>>>>>>>>> >>> > >>>>>>>>>>> >>> > We're running 1.6. >>>>>>>>>>> >>> > >>>>>>>>>>> >>> > Jacob >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > -- >>>>>>>>>>> > Jacob Sevart >>>>>>>>>>> > Software Engineer, Safety >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Jacob Sevart >>>>>>>>>> Software Engineer, Safety >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Jacob Sevart >>>>>>>> Software Engineer, Safety >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Jacob Sevart >>>>>>> Software Engineer, Safety >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Jacob Sevart >>>>> Software Engineer, Safety >>>>> >>>> >>> >>> -- >>> Jacob Sevart >>> Software Engineer, Safety >>> >> -- > Jacob Sevart > Software Engineer, Safety > -- Jacob Sevart Software Engineer, Safety