Hi Gen, *Unaligned checkpoint & AsyncIO* Let's talk about a concrete example: DataFileRewrite. The task has 3 steps:
1. Planning - this creates multiple RewriteFileGroups, each of which contains the list of small files which should be compacted to a single new file 2. Rewriting data files - this rewrites a single RewriteFileGroup 3. Updater - this updates the Iceberg table to change the old files to the new one Step 1 happens once every trigger - AsyncIO could help here (we will not have a queue) Step 2 could happen multiple times - this is where the job will be backpressured. The queue will be full, so we need to prepare for this. Flink documentation ( https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/) suggests 2 things: - Buffer debloating - I think it is hard to correctly tune the buffers where there a so many different messages in the queue as in the maintenance tasks - Unaligned checkpoints - still we need to make sure that the individual processing time is limited, since the inbound queue will be full, or the mentioned AsyncIO issue will bite us. Step 3 should happen only once, and fast - we do not need AsyncIO here So I think we should use AsyncIO in the maintenance tasks where it is helping, but we still need a solution for handling the backpressure for checkpoints. Unaligned checkpoints seem like a good choice here. *PostCommit or Separate job* Let's hear what the community thinks about the use-cases we would like to support Thanks, Peter Gen Luo <luogen...@gmail.com> ezt írta (időpont: 2024. ápr. 19., P, 6:29): > Hi Peter, > > Thanks for the reply and explanation! > > > Unaligned checkpoint & AsyncIO > The issue only occurs when the inflight queue is full. Synchronous > operators can not do checkpointing until the processing request is > done, while async operators are always ready to do checkpointing, unless > the inflight queue is full, in which case it also only needs to wait for > one request to get finished, just like the synchronous operators. So I > suppose this should not be a critical issue. On the other hand, as the > maintenance tasks are probably IO bound, concurrently executing using the > async IO may achieve higher performance. > > By the way, async operators don't need to record the output, but record > the input of all ongoing tasks, including those that are finished but yet > not sent. > > > PostCommit or Separate job > I agree that it's better to combine small files inside the job. File and > Hive sink do this in PreCommitTopology, so that the intermediate small > files will not be committed at all. But as Iceberg can properly handle meta > and data file changing, it doesn't need to do this. > > > OperatorCoordinator > I have to say sorry that I found OperatorCoordinator itself is also > annotated Internal. And in an internal discussion I was also reminded that > combining multiple operators by coordinators may cause unexpected design > complexibility. Though proposing to make it PublicEvolving can be a choice, > maybe it's better to find another way. > > In fact the main idea here is to make a callback from executors to the > scheduler/monitor or across executors, so they can decide when the next > task can be started. But there's not a 'legal' way to do this in Flink, > excluding the coordinators. I also thought about combining all kinds of > executors in one operator, so it can schedule how the tasks are executed by > itself, but it can not properly handle split and distributed tasks, and as > you said may not be efficient enough. So maybe we have to go back to the > original plan, to have a tag or lock in Iceberg. > > > CALL statements > I meant to provide an application mode monitoring job, which doesn't start > a Flink job immediately, but monitors the table and submits a maintenance > job or a CALL statement when necessary. So the resources are applied only > when a maintenance job is actually started, and get released one the job is > done. But I have to say this may not be a suitable plan if the maintenance > jobs would be frequently submitted. > > Thanks, > Gen > > On Fri, Apr 19, 2024 at 5:26 AM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> Thanks Zhu and Gen for your reply! >> >> *Unaligned checkpoints* >> The maintenance tasks are big and complex. There are many parts which can >> be parallelized to multiple tasks, so I think we should implement them with >> chains of Flink Operators to exploit the advantages of Flink. >> A single maintenance task could take a serious amount of time, even with >> the parallelization, and most probably will cause backpressure somewhere in >> the flow. This backpressure will cause delays in checpointing, which we can >> fight one-by-one with buffer settings/AsyncIO, or use unaligned >> checkpoints, which is designed for exactly these cases. >> >> *PostCommit or Separate job* >> I agree with both of you, that in most cases separating out the >> maintenance pipelines from the actual jobs is the way to go. However there >> are 2 cases which are highlighted by the Iceberg community which will not >> be available in this case: >> - Flink job doing a quick small file reduction after collecting several >> commits >> - Single, contain all Flink jobs for sinking data to an Iceberg table >> >> Sooner or later we need to move to the new Flink SinkV2 API anyways. If >> we plan the architecture correctly, it is not too much work to add the >> maintenance tasks to the end of the sink (if the user decides that this >> serves their case the best) >> >> *OperatorCoordinator* >> Both of you mention OperatorCoordinator.Context#getCoordinatorStore() >> which might serve our needs. I checked it again, and it is marked as >> Internal by the Flink community. We are not supposed to use it outside of >> Flink, as it could be broken any time in the future. Do you know about >> plans to make this a Public interface? >> >> *AsyncIO* >> Even with unaligned checkpoint there are issues with this. See: >> https://issues.apache.org/jira/browse/FLINK-34704 >> So I do not think AsyncIO could help with checkpoints until this issue is >> fixed. After this we can use it to execute long tasks outside of the main >> thread, and maybe eventually relax the unaligned checkpoint requirement. >> (AsyncIO operators are unaligned checkpoints behind the scenes, as the >> input and output records are serialized in the state) >> >> *CALL statements* >> You mentioned that by using CALL statements, we do not reserve the >> resources when we not use them. I might miss something, but I think that >> the CALL procedure will run in the same application where it was called, >> and we need to provide enough resources to this application, so we are able >> to finish the maintenance tasks. When there are no maintenance tasks, then >> this application doesn't do anything (essentially idling the resources). I >> don't see yet, how this is different than sceduling the tasks by a specific >> operator. What do I miss? >> >> Thanks, >> Peter >> >> Gen Luo <luogen...@gmail.com> ezt írta (időpont: 2024. ápr. 18., Cs, >> 14:32): >> >>> Hi, >>> >>> Thanks for the reply, Peter. >>> And thanks Zhu for joining the discussion. >>> >>> > In the current design, using unaligned checkpoints, and splitting >>> heavy tasks to smaller ones could avoid maintenance tasks blocking the >>> checkpointing. >>> > How do you suggest executing asynchronous tasks >>> >>> As far as I know, the unaligned checkpoint is not widely used (or maybe >>> in my sight). And in some cases, it may slow down the checkpointing and >>> restarting since more data (of the original job) is snapshotted. So the >>> solution may not be able to cover all scenarios. So yes, I would >>> suggest using AsyncIO, or something like it, i.e. we can implement our own >>> async operator if we need to coordinate the async tasks. We can also give >>> the operator a separate ssg, and this should be possible too if we use the >>> AsyncIO. >>> >>> > CALL statements are one-time calls, while the goal here is to >>> continuously schedule the tasks. >>> >>> Yes it is. I meant to suggest that we can start up a separate job in >>> application mode, which can monitor the table and submit CALL statements >>> continuously. This is proposed because both using the post commit topology >>> and using a separate maintenance job would occupy the resources forever, >>> while in this way the maintenance job is started only when necessary. But >>> if the maintenance tasks are triggered frequently, holding the resources >>> would not be an issue any longer. >>> >>> > Are the OperatorCoordinators for different Operators able to >>> communicate to each other? >>> > Is it possible for the Update Operator to communicate with the Planner >>> Operator through the OperatorCoordinator? >>> >>> OperatorCoordinator.Context#getCoordinatorStore() can do this. You may >>> put messages or even a reference to a coordinator for other coordinators to >>> get. >>> >>> >>> On Thu, Apr 18, 2024 at 3:08 AM Péter Váry <peter.vary.apa...@gmail.com> >>> wrote: >>> >>>> Hi Gen, >>>> >>>> Thanks for your interest and thoughts! >>>> See my answers and questions below: >>>> >>>> Gen Luo <luogen...@gmail.com> ezt írta (időpont: 2024. ápr. 17., Sze, >>>> 20:32): >>>> >>>>> Hi, >>>>> >>>>> Sorry for joining the discussion so late. I'm from the flink community >>>>> and did some work about the SinkV2. I'd like to share some thoughts from >>>>> the view of flink. While I'm quite new to Iceberg, please feel free to >>>>> correct me if I'm making any mistakes. >>>>> >>>>> 1. checkpointing >>>>> I think the maintenance tasks are better not to disturb the original >>>>> job. So it's better not to block the checkpointing, or the checkpoints may >>>>> timeout and fail when a heavy task is running, which may cause the job >>>>> failing. To achieve this, the task executing operators need to split the >>>>> heavy task into small ones, or execute the tasks asynchronously in an >>>>> executor pool instead of the main task thread and store the task requests >>>>> in the snapshots. In this way the snapshotting can be done in a guaranteed >>>>> short time. While if the tasks are done with complex topologies, it may be >>>>> impossible to do this. If so, I think it's better to start a separate job >>>>> instead of using the post commit topology. >>>>> >>>> >>>> In the current design, using unaligned checkpoints, and splitting heavy >>>> tasks to smaller ones could avoid maintenance tasks blocking the >>>> checkpointing. Also the design makes it possible to define different slot >>>> sharing groups ( >>>> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/finegrained_resource/) >>>> which makes sure Feature support the main job threads are not blocked >>>> by the maintenance tasks. >>>> >>>> That said, you are right that in some situations it is better to start >>>> the job in a separate thread. >>>> >>>> How do you suggest executing asynchronous tasks, and how could these >>>> asynchronous tasks communicate with the main thread? Are you suggesting to >>>> use the Flink AsyncIO ( >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/). >>>> While it runs in a separate thread, it still uses the same resources >>>> (nodes) as the default jobs. >>>> >>>> 2. resources >>>>> Occupying lots of resources when tasks are idle may be an issue, but >>>>> to execute the tasks periodically in the job, it seems that there's no >>>>> perfect solution. All we can do is to reduce the resource requirements. >>>>> >>>>> On the other hand, Flink recently supported the CALL statement in >>>>> 1.18. I have to say I'm not really familiar with it, but it seems to be a >>>>> solution to execute the tasks for one time. Maybe we can create a >>>>> monitoring job (or in the main function), which can trigger tasks by >>>>> executing CALL statements. In this way resource wasting can be avoided, >>>>> but >>>>> this is quite a different direction indeed. >>>>> >>>> >>>> CALL statements are one-time calls, while the goal here is to >>>> continuously schedule the tasks. >>>> Happily the plan includes separate flows for the specific tasks, so if >>>> we decide that we want to schedule the tasks by hand, then it is possible >>>> to start them from the CALL statement implementations. >>>> >>>> >>>>> Besides, I remember that at the meeting, it's proposed to adjust the >>>>> resource for maintenance tasks by auto scaling. I'm afraid it's not a good >>>>> idea to rely on it. As far as I know, rescaling it triggers needs to >>>>> restart the job, which may not be acceptable by most of the users. >>>>> >>>> >>>> Also in the meeting I tried to explain that it should only work when >>>> the "in-place" rescaling would work, so we are absolutely aligned here. >>>> >>>> >>>>> 3. concurrency >>>>> If the table is only operated in one Flink job, we can use >>>>> OperatorCoordinator to coordinate the tasks. OperatorCoordinator may also >>>>> contact each other to ensure different kinds of tasks are executed in >>>>> expected ways. >>>>> >>>> >>>> Are the OperatorCoordinators for different Operators able to >>>> communicate to each other? >>>> In the planned RewriteDataFiles flow (data file compaction), we have 3 >>>> chained operators: >>>> 1. Planner - to plan the groups we want to compact >>>> 2. Executor - to read the small files, and create a single big file >>>> 3. Updater - to collect the new files and update the Iceberg table >>>> >>>> Is it possible for the Update Operator to communicate with the Planner >>>> Operator through the OperatorCoordinator? >>>> >>>> >>>>> While I suppose it's necessary to have a lock mechanism if we want to >>>>> coordinate the operation from different engines. >>>>> >>>>> (Am I replying to the correct mail?) >>>>> >>>> >>>> Yes :) >>>> >>>>> >>>>> Thanks, >>>>> Gen >>>>> >>>>> On Tue, Apr 9, 2024 at 12:29 AM Brian Olsen <bitsondata...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hey Iceberg nation, >>>>>> >>>>>> I would like to share about the meeting this Wednesday to further >>>>>> discuss details of Péter's proposal on Flink Maintenance Tasks. >>>>>> Calendar Link: https://calendar.app.google/83HGYWXoQJ8zXuVCA >>>>>> >>>>>> List discussion: >>>>>> https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl >>>>>> <https://www.google.com/url?q=https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl&sa=D&source=calendar&usd=2&usg=AOvVaw2-aePIRr6APFVHpRDipMgX> >>>>>> >>>>>> Design Doc: Flink table maintenance >>>>>> <https://www.google.com/url?q=https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp%3Dsharing&sa=D&source=calendar&usd=2&usg=AOvVaw1oLYQP76-G1ZEOW5pTxV1M> >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Apr 1, 2024 at 8:52 PM Manu Zhang <owenzhang1...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Peter, >>>>>>> >>>>>>> Are you proposing to create a user facing locking feature in >>>>>>>> Iceberg, or just something something for internal use? >>>>>>>> >>>>>>> >>>>>>> Since it's a general issue, I'm proposing to create a general user >>>>>>> interface first, while the implementation can be left to users. For >>>>>>> example, we use Airflow to schedule maintenance jobs and we can check >>>>>>> in-progress jobs with the Airflow API. Hive metastore lock might be >>>>>>> another >>>>>>> option we can implement for users. >>>>>>> >>>>>>> Thanks, >>>>>>> Manu >>>>>>> >>>>>>> On Tue, Apr 2, 2024 at 5:26 AM Péter Váry < >>>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Ajantha, >>>>>>>> >>>>>>>> I thought about enabling post commit topology based compaction for >>>>>>>> sinks using options, like we use for the parametrization of streaming >>>>>>>> reads >>>>>>>> [1]. I think it will be hard to do it in a user friendly way - because >>>>>>>> of >>>>>>>> the high number of parameters -, but I think it is a possible solution >>>>>>>> with >>>>>>>> sensible defaults. >>>>>>>> >>>>>>>> There is a batch-like solution for data file compaction already >>>>>>>> available [2], but I do not see how we could extend Flink SQL to be >>>>>>>> able to >>>>>>>> call it. >>>>>>>> >>>>>>>> Writing to a branch using Flink SQL should be another thread, but >>>>>>>> by my first guess, it shouldn't be hard to implement using options, >>>>>>>> like: >>>>>>>> /*+ OPTIONS('branch'='b1') */ >>>>>>>> Since writing to branch i already working through the Java API [3]. >>>>>>>> >>>>>>>> Thanks, Peter >>>>>>>> >>>>>>>> 1 - >>>>>>>> https://iceberg.apache.org/docs/latest/flink-queries/#flink-streaming-read >>>>>>>> 2 - >>>>>>>> https://github.com/apache/iceberg/blob/820fc3ceda386149f42db8b54e6db9171d1a3a6d/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java >>>>>>>> 3 - >>>>>>>> https://iceberg.apache.org/docs/latest/flink-writes/#branch-writes >>>>>>>> >>>>>>>> On Mon, Apr 1, 2024, 16:30 Ajantha Bhat <ajanthab...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks for the proposal Peter. >>>>>>>>> >>>>>>>>> I just wanted to know do we have any plans for supporting SQL >>>>>>>>> syntax for table maintenance (like CALL procedure) for pure Flink SQL >>>>>>>>> users? >>>>>>>>> I didn't see any custom SQL parser plugin support in Flink. I also >>>>>>>>> saw that Branch write doesn't have SQL support (only Branch reads use >>>>>>>>> Option), >>>>>>>>> So I am not sure about the roadmap of Iceberg SQL support in >>>>>>>>> Flink. >>>>>>>>> Was there any discussion before? >>>>>>>>> >>>>>>>>> - Ajantha >>>>>>>>> >>>>>>>>> On Mon, Apr 1, 2024 at 7:51 PM Péter Váry < >>>>>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Manu, >>>>>>>>>> >>>>>>>>>> Just to clarify: >>>>>>>>>> - Are you proposing to create a user facing locking feature in >>>>>>>>>> Iceberg, or just something something for internal use? >>>>>>>>>> >>>>>>>>>> I think we shouldn't add locking to Iceberg's user facing scope >>>>>>>>>> in this stage. A fully featured locking system has many more >>>>>>>>>> features that >>>>>>>>>> we need (priorities, fairness, timeouts etc). I could be tempted >>>>>>>>>> when we >>>>>>>>>> are talking about the REST catalog, but I think that should be >>>>>>>>>> further down >>>>>>>>>> the road, if ever... >>>>>>>>>> >>>>>>>>>> About using the tags: >>>>>>>>>> - I whole-heartedly agree that using tags is not intuitive, and I >>>>>>>>>> see your points in most of your arguments. OTOH, introducing new >>>>>>>>>> requirement (locking mechanism) seems like a wrong direction to me. >>>>>>>>>> - We already defined a requirement (atomic changes on the table) >>>>>>>>>> for the Catalog implementations which could be used to archive our >>>>>>>>>> goal >>>>>>>>>> here. >>>>>>>>>> - We also already store technical data in snapshot properties in >>>>>>>>>> Flink jobs (JobId, OperatorId, CheckpointId). Maybe technical >>>>>>>>>> tags/table >>>>>>>>>> properties is not a big stretch. >>>>>>>>>> >>>>>>>>>> Or we can look at these tags or metadata as 'technical data' >>>>>>>>>> which is internal to Iceberg, and shouldn't expressed on the >>>>>>>>>> external API. >>>>>>>>>> My concern is: >>>>>>>>>> - Would it be used often enough to worth the additional >>>>>>>>>> complexity? >>>>>>>>>> >>>>>>>>>> Knowing that Spark compaction is struggling with the same issue >>>>>>>>>> is a good indicator, but probably we would need more use cases for >>>>>>>>>> introducing a new feature with this complexity, or simpler solution. >>>>>>>>>> >>>>>>>>>> Thanks, Peter >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Apr 1, 2024, 10:18 Manu Zhang <owenzhang1...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> What would the community think of exploiting tags for preventing >>>>>>>>>>>> concurrent maintenance loop executions. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> This issue is not specific to Flink maintenance jobs. We have a >>>>>>>>>>> service scheduling Spark maintenance jobs by watching table >>>>>>>>>>> commits. When >>>>>>>>>>> we don't check in-progress maintenance jobs for the same table, >>>>>>>>>>> multiple >>>>>>>>>>> jobs will run concurrently and have conflicts. >>>>>>>>>>> >>>>>>>>>>> Basically, I think we need a lock mechanism like the metastore >>>>>>>>>>> lock >>>>>>>>>>> <https://iceberg.apache.org/docs/nightly/configuration/#hadoop-configuration> >>>>>>>>>>> if we want to handle it for users. However, using TAG doesn't look >>>>>>>>>>> intuitive to me. We are also mixing user data with system metadata. >>>>>>>>>>> Maybe we can define some general interfaces and leave the >>>>>>>>>>> implementation to users in the first version. >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> Manu >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Mar 29, 2024 at 1:59 PM Péter Váry < >>>>>>>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> What would the community think of exploiting tags for >>>>>>>>>>>> preventing concurrent maintenance loop executions. >>>>>>>>>>>> >>>>>>>>>>>> The issue: >>>>>>>>>>>> Some maintenance tasks couldn't run parallel, like >>>>>>>>>>>> DeleteOrphanFiles vs. ExpireSnapshots, or RewriteDataFiles vs. >>>>>>>>>>>> RewriteManifestFiles. We make sure, not to run tasks started by a >>>>>>>>>>>> single >>>>>>>>>>>> trigger concurrently by serializing them, but there are no loops >>>>>>>>>>>> in Flink, >>>>>>>>>>>> so we can't synchronize tasks started by the next trigger. >>>>>>>>>>>> >>>>>>>>>>>> In the document, I describe that we need to rely on the user to >>>>>>>>>>>> ensure that the rate limit is high enough to prevent concurrent >>>>>>>>>>>> triggers. >>>>>>>>>>>> >>>>>>>>>>>> Proposal: >>>>>>>>>>>> When firing a trigger, RateLimiter could check and create an >>>>>>>>>>>> Iceberg table tag [1] for the current table snapshot, with the >>>>>>>>>>>> name: >>>>>>>>>>>> '__flink_maitenance'. When the execution finishes we remove this >>>>>>>>>>>> tag. The >>>>>>>>>>>> RateLimiter keep accumulating changes, and doesn't fire new >>>>>>>>>>>> triggers until >>>>>>>>>>>> it finds this tag on the table. >>>>>>>>>>>> The solution relies on Flink 'forceNonParallel' to prevent >>>>>>>>>>>> concurrent execution of placing the tag, and on Iceberg to store >>>>>>>>>>>> it. >>>>>>>>>>>> >>>>>>>>>>>> This not uses the tags as intended, but seems like a better >>>>>>>>>>>> solution than adding/removing table properties which would clutter >>>>>>>>>>>> the >>>>>>>>>>>> table history with technical data. >>>>>>>>>>>> >>>>>>>>>>>> Your thoughts? Any other suggestions/solutions would be welcome. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Peter >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://iceberg.apache.org/docs/latest/java-api-quickstart/#branching-and-tagging >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Mar 28, 2024, 14:44 Péter Váry < >>>>>>>>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Team, >>>>>>>>>>>>> >>>>>>>>>>>>> As discussed on yesterday's community sync, I am working on >>>>>>>>>>>>> adding a possibility to the Flink Iceberg connector to run >>>>>>>>>>>>> maintenance >>>>>>>>>>>>> tasks on the Iceberg tables. This will fix the small files issues >>>>>>>>>>>>> and in >>>>>>>>>>>>> the long run help compacting the high number of positional and >>>>>>>>>>>>> equality >>>>>>>>>>>>> deletes created by Flink tasks writing CDC data to Iceberg tables >>>>>>>>>>>>> without >>>>>>>>>>>>> the need of Spark in the infrastructure. >>>>>>>>>>>>> >>>>>>>>>>>>> I did some planning, prototyping and currently trying out the >>>>>>>>>>>>> solution on a larger scale. >>>>>>>>>>>>> >>>>>>>>>>>>> I put together a document how my current solution looks like: >>>>>>>>>>>>> >>>>>>>>>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing >>>>>>>>>>>>> >>>>>>>>>>>>> I would love to hear your thoughts and feedback on this to >>>>>>>>>>>>> find a good final solution. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Peter >>>>>>>>>>>>> >>>>>>>>>>>>