Thanks, will do. I only want the time stamp to reset when the job comes up with no state. Checkpoint recoveries should keep the same value.
Jacob On Sat, Mar 21, 2020 at 10:16 AM Till Rohrmann <trohrm...@apache.org> wrote: > Hi Jacob, > > if you could create patch for updating the union state metadata > documentation that would be great. I can help with the review and merging > this patch. > > If the value stays fixed over the lifetime of the job and you know it > before starting the job, then you could use the config mechanism. What > won't work is if for every restart you would need a different value. > Updating the config after a recovery is not possible. > > Cheers, > Till > > On Fri, Mar 20, 2020 at 6:29 PM Jacob Sevart <jsev...@uber.com> wrote: > >> Thanks, makes sense. >> >> What about using the config mechanism? We're collecting and distributing >> some environment variables at startup, would it also work to include a >> timestamp with that? >> >> Also, would you be interested in a patch to note the caveat about union >> state metadata in the documentation? >> >> Jacob >> >> On Tue, Mar 17, 2020 at 2:51 AM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Did I understand you correctly that you use the union state to >>> synchronize the per partition state across all operators in order to obtain >>> a global overview? If this is the case, then this will only work in case of >>> a failover. Only then, all operators are being restarted with the union of >>> all operators state. If the job would never fail, then there would never be >>> an exchange of state. >>> >>> If you really need a global view over your data, then you need to create >>> an operator with a parallelism of 1 which records all the different >>> timestamps. >>> >>> Another idea could be to use the broadcast state pattern [1]. You could >>> have an operator which extracts the java.time.Instant and outputs them as a >>> side output and simply forwards the records on the main output. Then you >>> could use the side output as the broadcast input and the main output as the >>> normal input into the broadcast operator. The problem with this approach >>> might be that you don't get order guarantees between the side and the main >>> output. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html >>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html&d=DwMFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=t8gx18WI38mWMMo9o1GAUERpXwVKG5wnYdvT3gBZxo8&s=v2kbM2mYHBcsKjNzFCaaSbg_3vyfYIhoX8stFXSzRnY&e=> >>> >>> Cheers, >>> Till >>> >>> On Tue, Mar 17, 2020 at 2:29 AM Jacob Sevart <jsev...@uber.com> wrote: >>> >>>> Thanks! That would do it. I've disabled the operator for now. >>>> >>>> The purpose was to know the age of the job's state, so that we could >>>> consider its output in terms of how much context it knows. Regular state >>>> seemed insufficient because partitions might see their first traffic at >>>> different times. >>>> >>>> How would you go about implementing something like that? >>>> >>>> On Mon, Mar 16, 2020 at 1:54 PM Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>>> Hi Jacob, >>>>> >>>>> I think you are running into some deficiencies of Flink's union state >>>>> here. The problem is that for every entry in your list state, Flink stores >>>>> a separate offset (a long value). The reason for this behaviour is that we >>>>> use the same state implementation for the union state as well as for the >>>>> split state. For the latter, the offset information is required to split >>>>> the state in case of changing the parallelism of your job. >>>>> >>>>> My recommendation would be to try to get rid of union state all >>>>> together. The union state has primarily been introduced to checkpoint some >>>>> source implementations and might become deprecated due to performance >>>>> problems once these sources can be checkpointed differently. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Sat, Mar 14, 2020 at 3:23 AM Jacob Sevart <jsev...@uber.com> wrote: >>>>> >>>>>> Oh, I should clarify that's 43MB per partition, so with 48 partitions >>>>>> it explains my 2GB. >>>>>> >>>>>> On Fri, Mar 13, 2020 at 7:21 PM Jacob Sevart <jsev...@uber.com> >>>>>> wrote: >>>>>> >>>>>>> Running *Checkpoints.loadCheckpointMetadata *under a debugger, I >>>>>>> found something: >>>>>>> *subtaskState.managedOperatorState[0].sateNameToPartitionOffsets("startup-times").offsets.value >>>>>>> *weights >>>>>>> 43MB (5.3 million longs). >>>>>>> >>>>>>> "startup-times" is an operator state of mine (union list of >>>>>>> java.time.Instant). I see a way to end up fewer items in the list, but >>>>>>> I'm >>>>>>> not sure how the actual size is related to the number of offsets. Can >>>>>>> you >>>>>>> elaborate on that? >>>>>>> >>>>>>> Incidentally, 42.5MB is the number I got out of >>>>>>> https://issues.apache.org/jira/browse/FLINK-14618 >>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D14618&d=DwMFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=3KZriZyZgBj7mReI9Giq9_Y59NZ6d_4KGE1RkGm2DCI&s=I6LhM2g2btCo31K3ox7TZhtHQiee95biqJf7Hbj9Dbo&e=>. >>>>>>> So I think my two problems are closely related. >>>>>>> >>>>>>> Jacob >>>>>>> >>>>>>> On Mon, Mar 9, 2020 at 6:36 AM Congxian Qiu <qcx978132...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi >>>>>>>> >>>>>>>> As Gordon said, the metadata will contain the >>>>>>>> ByteStreamStateHandle, when writing out the ByteStreamStateHandle, will >>>>>>>> write out the handle name -- which is a path(as you saw). The >>>>>>>> ByteStreamStateHandle will be created when state size is small than >>>>>>>> `state.backend.fs.memory-threshold`(default is 1024). >>>>>>>> >>>>>>>> If you want to verify this, you can ref the unit test >>>>>>>> `CheckpointMetadataLoadingTest#testLoadAndValidateSavepoint` and load >>>>>>>> the >>>>>>>> metadata, you can find out that there are many >>>>>>>> `ByteStreamStateHandle`, and >>>>>>>> their names are the strings you saw in the metadata. >>>>>>>> >>>>>>>> Best, >>>>>>>> Congxian >>>>>>>> >>>>>>>> >>>>>>>> Jacob Sevart <jsev...@uber.com> 于2020年3月6日周五 上午3:57写道: >>>>>>>> >>>>>>>>> Thanks, I will monitor that thread. >>>>>>>>> >>>>>>>>> I'm having a hard time following the serialization code, but if >>>>>>>>> you know anything about the layout, tell me if this makes sense. What >>>>>>>>> I see >>>>>>>>> in the hex editor is, first, many HDFS paths. Then gigabytes of >>>>>>>>> unreadable >>>>>>>>> data. Then finally another HDFS path at the end. >>>>>>>>> >>>>>>>>> If it is putting state in there, under normal circumstances, does >>>>>>>>> it make sense that it would be interleaved with metadata? I would >>>>>>>>> expect >>>>>>>>> all the metadata to come first, and then state. >>>>>>>>> >>>>>>>>> Jacob >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Jacob >>>>>>>>> >>>>>>>>> On Thu, Mar 5, 2020 at 10:53 AM Kostas Kloudas <kklou...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Jacob, >>>>>>>>>> >>>>>>>>>> As I said previously I am not 100% sure what can be causing this >>>>>>>>>> behavior, but this is a related thread here: >>>>>>>>>> >>>>>>>>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d-2540-253Cuser.flink.apache.org-253E&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=P3Xd0IFKJTDIG2MMeP-hOSfY4ohoCEUMQEJhvGecSlI&e= >>>>>>>>>> >>>>>>>>>> Which you can re-post your problem and monitor for answers. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Kostas >>>>>>>>>> >>>>>>>>>> On Wed, Mar 4, 2020 at 7:02 PM Jacob Sevart <jsev...@uber.com> >>>>>>>>>> wrote: >>>>>>>>>> > >>>>>>>>>> > Kostas and Gordon, >>>>>>>>>> > >>>>>>>>>> > Thanks for the suggestions! I'm on RocksDB. We don't have that >>>>>>>>>> setting configured so it should be at the default 1024b. This is the >>>>>>>>>> full >>>>>>>>>> "state.*" section showing in the JobManager UI. >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > Jacob >>>>>>>>>> > >>>>>>>>>> > On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai < >>>>>>>>>> tzuli...@apache.org> wrote: >>>>>>>>>> >> >>>>>>>>>> >> Hi Jacob, >>>>>>>>>> >> >>>>>>>>>> >> Apart from what Klou already mentioned, one slightly possible >>>>>>>>>> reason: >>>>>>>>>> >> >>>>>>>>>> >> If you are using the FsStateBackend, it is also possible that >>>>>>>>>> your state is small enough to be considered to be stored inline >>>>>>>>>> within the >>>>>>>>>> metadata file. >>>>>>>>>> >> That is governed by the "state.backend.fs.memory-threshold" >>>>>>>>>> configuration, with a default value of 1024 bytes, or can also be >>>>>>>>>> configured with the `fileStateSizeThreshold` argument when >>>>>>>>>> constructing the >>>>>>>>>> `FsStateBackend`. >>>>>>>>>> >> The purpose of that threshold is to ensure that the backend >>>>>>>>>> does not create a large amount of very small files, where >>>>>>>>>> potentially the >>>>>>>>>> file pointers are actually larger than the state itself. >>>>>>>>>> >> >>>>>>>>>> >> Cheers, >>>>>>>>>> >> Gordon >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas < >>>>>>>>>> kklou...@gmail.com> wrote: >>>>>>>>>> >>> >>>>>>>>>> >>> Hi Jacob, >>>>>>>>>> >>> >>>>>>>>>> >>> Could you specify which StateBackend you are using? >>>>>>>>>> >>> >>>>>>>>>> >>> The reason I am asking is that, from the documentation in [1]: >>>>>>>>>> >>> >>>>>>>>>> >>> "Note that if you use the MemoryStateBackend, metadata and >>>>>>>>>> savepoint >>>>>>>>>> >>> state will be stored in the _metadata file. Since it is >>>>>>>>>> >>> self-contained, you may move the file and restore from any >>>>>>>>>> location." >>>>>>>>>> >>> >>>>>>>>>> >>> I am also cc'ing Gordon who may know a bit more about state >>>>>>>>>> formats. >>>>>>>>>> >>> >>>>>>>>>> >>> I hope this helps, >>>>>>>>>> >>> Kostas >>>>>>>>>> >>> >>>>>>>>>> >>> [1] >>>>>>>>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.6_ops_state_savepoints.html&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=fw0c-Ct21HHJv4MzZRicIaltqHLQOrNvqchzNgCdwkA&e= >>>>>>>>>> >>> >>>>>>>>>> >>> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart <jsev...@uber.com> >>>>>>>>>> wrote: >>>>>>>>>> >>> > >>>>>>>>>> >>> > Per the documentation: >>>>>>>>>> >>> > >>>>>>>>>> >>> > "The meta data file of a Savepoint contains (primarily) >>>>>>>>>> pointers to all files on stable storage that are part of the >>>>>>>>>> Savepoint, in >>>>>>>>>> form of absolute paths." >>>>>>>>>> >>> > >>>>>>>>>> >>> > I somehow have a _metadata file that's 1.9GB. Running >>>>>>>>>> strings on it I find 962 strings, most of which look like HDFS >>>>>>>>>> paths, which >>>>>>>>>> leaves a lot of that file-size unexplained. What else is in there, >>>>>>>>>> and how >>>>>>>>>> exactly could this be happening? >>>>>>>>>> >>> > >>>>>>>>>> >>> > We're running 1.6. >>>>>>>>>> >>> > >>>>>>>>>> >>> > Jacob >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > -- >>>>>>>>>> > Jacob Sevart >>>>>>>>>> > Software Engineer, Safety >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Jacob Sevart >>>>>>>>> Software Engineer, Safety >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Jacob Sevart >>>>>>> Software Engineer, Safety >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Jacob Sevart >>>>>> Software Engineer, Safety >>>>>> >>>>> >>>> >>>> -- >>>> Jacob Sevart >>>> Software Engineer, Safety >>>> >>> >> >> -- >> Jacob Sevart >> Software Engineer, Safety >> > -- Jacob Sevart Software Engineer, Safety