Small update to that - with 2 machines that would not work perfectly(I
leave the exercise for others to think about), you'd **really** need a
Yuinikorn Executor for that.
But it would work with 1 machine with 10 GPUs and 20 CPUS.

And another comment - I used to work with a similar setup when I worked
with NoMagic.ai - pretty successful Robotic/AI startup. We had a dedicated,
big GPU + CPU on-prem machine (which was 3x cheaper than cloud) and we were
strapped for money.
We run most of our workflows for machine learning workflows there (that was
long before LLMs), but that machine could also be used for CPU workflow -
it had plenty of unused CPUs. Back then we have not chosen Airflow to run
our workflows (there were other reasons we have not, but if we knew thanks
to Airflow we could heavily optimise what we had to do, maybe the decision
would be different.

Again - GPU is just an example - there are other cases when the resource is
actually "outside" and we want to limit it only when running and not when
deferring.

Another example (you can replace GCP with another service that might
potentially have limits):

1) we do not want to have more than 10 parallel downloads from GCP - huge
files, and we do not want to download more than 10 of them in parallel
2) we use asyncio for triggers to listen to pubsub messages - to check if
there are new GCP files in the bucket - and we have 100 of tasks waiting in
10 DAGs- each subscribed to a different file pattern
3) when deferred in triggerrer those tasks are waiting on pubsub, they do
not have opened GCP connection at all
4) when suddenly 50 of those files appear in the bucket at about the same
time
5) if we do not have "gcp" pool - > all of the deferred tasks are suddenly
brought back from deferred state and start to download 50 tasks from GCP in
parallel -> this is not what we want
6) so wa add "gcp" pool with size 10 and mark all the 100 tasks to use that
pool

Effect -> 50 files appear, all 50 tasks are schedulable, but only 10 of
them are actually scheduled - because pool limits number of the tasks
across multiple DAGs accessing the same GCP instance. -> only 10 of them
start pulling the files from GCP. Every time one of the tasks completes,
one of the remaining ones immediately gets scheduled and starts pulling
their files.

I think it's pretty realistic too.


J.


On Sun, Oct 20, 2024 at 11:46 PM Jarek Potiuk <ja...@potiuk.com> wrote:

> Also just to anticipate one of the possible counter-solutions. The same
> scenario applies when there are several DAGs - each using the same GPU
> operator - having "pool" being the only limiting factor (in one DAG you
> could have max_active_tasks etc.). But the customer has several different
> DAGs - each of them with the same GPU- based deferrable operator. There is
> no other way than pools in Airflow currently to limit maximum total
> "scheduled" GPU based tasks to have max=x across several DAGs.
>
> But this customer actually wants to optimize their hardware usage even
> more, and added several more DAGs to run on the same hardware.
>
> J.
>
>
> On Sun, Oct 20, 2024 at 11:42 PM Jarek Potiuk <ja...@potiuk.com> wrote:
>
>> Sure.
>>
>> Say the company has  2 workers with 5 GPUs and 10 CPUS each and wants to
>> optimize the load so that they can run up-to-20 CPU bound tasks (very short
>> one, not doing much) and up-to-10 GPU bound tasks..
>>
>> When GPU bound tasks start - they request (and wait for) GPU being
>> available. Airflow has no built-in way to know if a GPU is available, so it
>> will schedule all eligible tasks that need GPU for running.
>> Having GPU-pool = 10 for GPU-only tasks (queued with high priority, to
>> maximize GPU usage) will achieve what I think such a user would want - i.e
>> optimize their hardware usage.
>>
>> - when they have only CPU-bound tasks running, they will have up to 20 of
>> them running in parallel, so far, so good
>> - the GPU processing operator has "use_pool_when_deffered=False" - so the
>> deferrable part of the GPU bound tasks is running on a separate Triggerer
>> running on another (very small) hardware (neither queue or pool have effect
>> on those triggers running there)
>> - the deferrable "sensor" part of the task will only let the "worker"
>> part of the operator to work when the "model" the task is waiting for is
>> available. They are 100 mapped tasks - each waiting for another model.
>> - say 100 GPU tasks are  eligible for running but their model are not
>> available (yet)
>> - so we still have 20 CPUs to run CPU-bound tasks as all the GPU tasks
>> are deferred in triggerer
>> - then in about the same time - 50 of those GPU tasks models are ready ->
>> this make 50 GPU tasks moved out of their triggered state and ready to be
>> scheduled on the 2 machines
>> - if we have no pool limiting the number of GPU bound tasks, 20 of them
>> would actually start running and 10 of them will wait for GPU being
>> available - they will do nothing - but they will be blocking any of the
>> CPU-bound tasks from running
>> - with GPU pool = 10, only 10 of those GPU tasks would start running,
>> while the remaining 10 workers are free to run CPU-bound tasks
>>
>> Surely you could add another deferral "defer when GPU is not available"
>> but then it's extremely difficult for triggerer to know when to "bring back
>> such task from deferred state - because effectively you'd have to get
>> triggerer to know when GPU is available on the worker./
>>
>> Yes. It's entirely made up, but also entirely practical. I think. And
>> it's mostly about optimizing the hardware - which I think (and that's the
>> part of YuniKorn discussion) - one of the most important things everyone is
>> looking after - to optimize their workflows.
>>
>> J.
>>
>>
>>
>> On Sun, Oct 20, 2024 at 8:56 PM Daniel Standish
>> <daniel.stand...@astronomer.io.invalid> wrote:
>>
>>> With your GPU example, that seems like a bit of a stretch.  Can you flesh
>>> out the example a little more, get more into the details of how it
>>> actually works (and I understand it's made up anyway)?
>>>
>>> Sounds like something other than this task is spinning up a gpu resource?
>>> And this is just a "wait then run" operator?  If something else is
>>> controlling the resource spin up, then why does this task need a pool at
>>> all?  It's not controlling the increase in load.
>>>
>>> No example needed with use_pool_when_deferred=True because that's how I
>>> think it makes sense to be universally.
>>>
>>>
>>> On Sun, Oct 20, 2024 at 10:05 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>>>
>>> > Yeah... Off-by one... Good eye - I lied too :) (noticed it after I
>>> sent the
>>> > message. I wish email had the ability to correct typos)..
>>> >
>>> > 2) -> yes we agree, but to clarify a bit - we need it at OPERATOR
>>> level not
>>> > TASK level. The difference comes from who defines it should be the
>>> > Operator's Author not the  DAG Author. I.e. we should be able to define
>>> > "use_pool_when_deferred" (working name) when you define the operator,
>>> not
>>> > when you create the operator as a task in DAG. So basically IMHO it
>>> should
>>> > have the ability to internally set this property of the BaseOperator,
>>> but
>>> > it's not necessary to expose it via the `__init__` method of the actual
>>> > CustomDeferrableOperator(BaseOperator). We still CAN expose it via
>>> > __init__, but I'd not say it's desired.
>>> >
>>> > Example:
>>> >
>>> > 1) example 1. RunMyGPUFineTuningOperator. Pool = num shared GPU, The
>>> > operator does: a) wait in deferrable for a MODEL to appear b) upload
>>> the
>>> > model and fine-tunes it (non-deferrable, uses GPU).
>>> > "use_pool_when_deferred" = False
>>> > 2) example 2. UpdateNewSalesforceUsersOperator. Pool = num salesforce
>>> > connections. (Protect Salesforce API from being overloaded - our
>>> licence
>>> > has only 10 parallel connections possible) The operator does a) checks
>>> if
>>> > new users are defined (by polling Salesforce API) - deferred b)
>>> updates the
>>> > user with new fields via Salesforce API. "use_pool_when_deferred" =
>>> True
>>> >
>>> > Enough.
>>> >
>>> > J.
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > On Sun, Oct 20, 2024 at 4:45 PM Daniel Standish
>>> > <daniel.stand...@astronomer.io.invalid> wrote:
>>> >
>>> > > So yeah hopefully we all agree that if we keep it, we should move it
>>> to
>>> > > task.
>>> > >
>>> > > I guess we can think of this thread as two items:
>>> > >
>>> > >    1. if we keep ability to have tasks not occupy a pool slot,
>>> shouldn't
>>> > it
>>> > >    be configured at task level?  I think so.
>>> > >    2. but should we keep the ability to have tasks not be considered
>>> at
>>> > >    task level?
>>> > >    3. if tasks are to stay in pools when deferred, ideally they
>>> should do
>>> > >    so continuously (e.g. including when in between worker and
>>> triggerer)
>>> > >
>>> > >
>>> > > Ok I lied, three items. But 3 is more like a reminder that there is
>>> this
>>> > > bad behavior :)
>>> > >
>>> > > Anyway, let's move on to focus on number 2, whether we should provide
>>> > users
>>> > > a configuration option to make tasks "drop out" of the pool when
>>> > deferred.
>>> > >
>>> > > After reading your message Jarek, I did not come away with an
>>> > understanding
>>> > > of a practical use case for having the task vacate the pool slot when
>>> > > deferred.  Can you offer an example or two?
>>> > >
>>> > >
>>> > >
>>> > > On Sun, Oct 20, 2024 at 7:29 AM Daniel Standish <
>>> > > daniel.stand...@astronomer.io> wrote:
>>> > >
>>> > > > Totally agree that if we *must* make it configurable, then at task
>>> is
>>> > the
>>> > > > right place.  Will read the rest of it later :)
>>> > >
>>> >
>>>
>>

Reply via email to