Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Semaphore and other objects as task condition #4104

Open
pvanderlinden opened this issue Sep 10, 2020 · 25 comments
Open

Semaphore and other objects as task condition #4104

pvanderlinden opened this issue Sep 10, 2020 · 25 comments

Comments

@pvanderlinden
Copy link

When you are using the builtin Semaphore to limit task concurrency of a certain type, there are currently two behaviours you can choose:

  • request the semaphore: this will hold a worker while the semaphore is not acquirable, which means workers are doing nothing (running a task which is waiting for a semaphore) because they are all waiting for the same semaphore
  • request the semaphore after seceding from the worker, then rejoin: this will still have a similar issue as above, although less severe/less likely (I think). This leads to the possibility of having a task scheduled, then secede for the semaphore, start a different task on the same worker, acquire the semaphore but waiting to rejoin the worker. In the mean time a different worker starts a different task which tries to acquire the semaphore but can't because of the blocked task holding the semaphore. I have seen this happening (although it might only be possible if you have 2 different semaphore's, and due to a configuration error there were less workers available then intended)

Unless there is a different option, the only solution would be to make it possible to add a semaphore or similar primitives to be added as a condition to a task (like the result of another task is a condition before the task can be scheduled).

@pvanderlinden
Copy link
Author

This actually seems to happen more then it doesn't. New tasks are getting scheduled on the workers which don't do anything but have tasks waiting for Semaphores. While there are 6 workers, only 2 workers will do anything because of this issue. The Semaphore will also prevent task priorities to work properly (as they might get scheduled first, but might not get the lease first).

Is there any advice on how to get this working while limiting certain types of tasks?

@pvanderlinden
Copy link
Author

Although I'm not sure this is the full story. I can also see tasks which are marked as ready, and don't get run at all, while only 2 tasks are actually running with 6 workers. I'm not sure why this is?

@pvanderlinden
Copy link
Author

test3

An example of a timeline:

  • the upper half is color coded for task type, lower half is just alternating to show task changes (if 2 of the same happen after eachother)
  • all green tasks are kicked of at the same time, but are limited with a semaphore to one task at the same time
  • each work item consists of a sequence (for now) of green, black, yellow
  • black and yellow tasks are limited with a semaphore to two at the same time

This means there are long periods of time where other tasks are not started while there are plenty of workers and plenty of "semaphore slots" available.

Some remarks:

  • current cluster is simply created by starting a Dask distributed Client (1 thread per worker, 5 processes, 4gb memory limit), plan is to spin up on demand clusters in Kubernetes in the long run.
  • main issue seems to be the yellow tasks. Is this task not allowing Dask itself to do any work? I don't think so, it might not get much cycles, and the scheduler is running on the client side in the above situation. This tasks is a CPU heavy python load, but also has plenty of database IO (and psycopg2 should be releasing the GIL).
  • When there are long periods of one task running, it always restarts with a green task in the same worker as the only running task, which means my first thought from the original post is correct.

I think my issue is two fold:

  • I don't want to block all the workers with tasks waiting for a semaphore (which will happend if I don't secede), and I don't want to block all work, because the task with the acquired semaphore can't rejoin the worker.
  • I also want to prioritize larger workloads, but priorities and semaphores don't mix.

@pvanderlinden
Copy link
Author

@mrocklin @fjetter any thoughts on the above issues with Semaphores?

@fjetter
Copy link
Member

fjetter commented Sep 29, 2020

First things first, the scheduler is actually agnostic about the semaphore values and the task transitioning and work assignment doesn't know about this, neither does the work stealing or any other internal administration. So there is definitively a lot of potential for improved scheduling if the synchronization objects were implemented as task annotations or conditions. Effectively only transitioning as many tasks to processing as are allowed by the semaphore value. From what I understand, that would require a vastly different implementation and I don't feel confident at the moment to suggest what would be required to enable this. I think before this can be enabled there is a lot of ground work needed to be done up front.

Regarding your example, I can't really tell what's going on. Just looking at the dashboard I would assume something is broken since during the first period of the long yellow task, something else should be able to compute. Generally, the task progress view should show a task which is blocked by a lock/semaphore as "computing".
I cannot really tell how the system behaves if seceding comes into play. I could imagine a few scenarios where we cause deadlocks of some sort to appear, or at the very least causing congestions. I also don't know how seceding would show up on that graph. Do you observer the same without seceding?

@pvanderlinden
Copy link
Author

@fjetter I'm using the Semaphore's to limit the usage of an SQL usage and other resources (not per worker resources, otherwise I would use the builtin resource feature). The issue without seceding is that I would not process things in parallel much as all workers will be busy working on waiting for the semaphore. The scenario: I schedule 60+ tasks with the same Semaphore, each of those tasks then have follow up tasks, with or without their own Semaphore. If I would not secede, all workers will just process a task waiting on a Semaphore. I can not just deploy more workers, as it would be a big waste (the tasks are large, so I would need 60+ workers with several GB's of memory just to have a parrellization of about 5-6 tasks).

My graph is actually not the the dashboard, I can't really get a decent dashboard output for this (the full graph takes several hours). But what basically happens: there are many tasks running which are seceded and trying to acquire a semaphore on multiple workers. When a task completes, the random tasks get's it semaphore. Then that tasks tries to rejoin, but if the task got the semaphore happens to be on a busy worker, it won't do anything until the other task finished.

Basically I'm trying to use the semaphore to limit the amount of tasks of a certain type running at the same time. But I don't see a way to do this correctly with the current Semaphore's or other feature?

@pvanderlinden
Copy link
Author

Also I do understand that the scheduler is agnostic about the Semaphore, which is basically what is causing the issue. I can't see a way of using the Semaphore's without having the scheduler being aware of the Semaphore's.

I'm also trying to find a work around, as this is a big issue.

@fjetter
Copy link
Member

fjetter commented Sep 29, 2020

The thing about seceding is that on a given worker multiple threads may secede and while doing so acquiring a lease. If only one of them can rejoin, this effectively leads to resource starvation which is what might be happening in the case of these very heavy yellow computations.

I don't have much experience with the seceding approach. I actually have no idea about how it is implemented, but from a conceptional point of view, if we could timeout the rejoin and would release the semaphore if the rejoin did not succeed, this might free up this deadlock situation. I can't say if this is feasible or possible, though.

Taking a step back, A rather practical approach I would suggest is to fuse the tasks into one computation. This ensures that Green+Black+Yellow is always executed on the same worker, always immediately after the previous one finishes. If I understood the problem correctly, this might help and would be less complex to implement.

@pvanderlinden
Copy link
Author

The main reason (green-black-yellow) are separate tasks is because they have way different resource requirements and more black and yellow can run at the same time (then green). The second issue is that some yellow tasks are going to depend on black tasks (other then their "own" black task).

@pvanderlinden
Copy link
Author

The other work around would be (not great though), to not secede, and use worker resources. That way I can limit which workers run which tasks. It is not a great work around, but it might solve my issue till the Semaphores & Scheduling work together. I need to test if this works, and also it makes the workers non generic. The other way might be to build more like a queuing system, like the feature Dask had before. It would make it difficult to build the graph though.

@pvanderlinden
Copy link
Author

pvanderlinden commented Oct 1, 2020

@fjetter There is basically no way of limiting concurrency of tasks correctly as I understand it now?

  • Semaphore's will cause one of the two issues I described above
  • if I start using queues, I can't really construct a graph, as tasks will all be submitted from the workers, and it will cause other issues
  • after thinking through using worker resources, I don't think this is an option either. It will cause issues, as it is not really designed for what I'm trying to do.

@fjetter
Copy link
Member

fjetter commented Oct 1, 2020

no way of limiting concurrency of tasks correctly as I understand it now?

None of the synchronization objects (Events, Locks, Semaphores) are designed to limit the concurrency of tasks but rather about limiting concurrent code execution of code within a task. I actually believe there are valid applications for both but what you are requiring is definitely out of scope of the current implementation.


One additional thing you might consider trying to mitigate the "worker is blocked" issue, is the Reschedule exception.

something like

from distributed import Reschedule
def func(sem):
    try:
        sem.acquire(timeout="10s")  # Or however much you're willing to sacrifice
    except TimeoutError:
        raise Reschedule
    try:
        # do stuff to your DB
    finally:
        sem.release()

This way a task will only wait and block the worker pool for a limited amount of time and is scheduled later on again. Not sure how robust the rescheduling actually is but conceptionally this would avoid the "Multiple threads want to rejoin" issue we discussed above.

@pvanderlinden
Copy link
Author

Thanks, I will experiment with the reschedule mechanism.

@pvanderlinden
Copy link
Author

Fixed an issue in the code:

from distributed import Reschedule
def func(sem):
    if not sem.acquire(timeout="1s")  # Or however much you're willing to sacrifice
        raise Reschedule
    try:
        # do stuff to your DB
    finally:
        sem.release()

@fjetter Unfortunately this doesn't work. It still keeps the workers busy waiting on Semaphores while "real" tasks are still waiting for a worker.

@pvanderlinden
Copy link
Author

Is it possible to modify the rejoin function to be able to timeout? This way it would work like this:

  • secede
  • trying to acquire semaphore
  • after that: rejoin
  • if that times out: release semaphore and reschedule
    That might solve the issue. @fjetter sorry, I just try to get solve this issue, as it slows down everything so much, non parralel processing is almost as fast

@pvanderlinden
Copy link
Author

I thought I found a solution, but this makes the issue even worse, and I don't understand why. I would expect the submitted task to be scheduled on a different worker. and the worker "acquiring and releasing the semaphore" secedes, and is "free" to pick up tasks. Instead the functions submitted by this code don't always get scheduled while there are free workers available. Does this mean there is an even bigger scheduling issue, or am I missing something?

def limit_concurrency(name, max_running=1, func=None):
    if func is None:
        return functools.partial(limit_concurrency, name, max_running)

    @functools.wraps(func)
    def wrapped(*args, **kwargs):
        with dd.worker_client(separate_thread=True) as client:
            sem = dd.Semaphore(name=name, max_leases=max_running)
            with sem:
                future = client.submit(apply, func, args, kwargs)
                return future.result()

    return wrapped

@lr4d
Copy link
Contributor

lr4d commented Oct 14, 2020

I schedule 60+ tasks with the same Semaphore, each of those tasks then have follow up tasks, with or without their own Semaphore. If I would not secede, all workers will just process a task waiting on a Semaphore

Have you looked into fusing? Depending on the fusing parameters, you will be executing some of the (logical) tasks after the task involving a semaphore as part of a single (actual) task.

You could also look into using futures.

@pvanderlinden
Copy link
Author

@lr4d Fusing would only complicate things. They are split because they have completely different scheduling requirements. And it will make it impossible to describe dependencies.

What do you mean with using futures? I'm scheduling tasks like that in the example. But for some reason Dask doesn't schedule the tasks on the 5 idle workers.

@pvanderlinden
Copy link
Author

The main reason to fuse, is if you have many very small tasks which take more time to schedule then to compute. The graph I'm talking about has about 8 hours in 180 tasks on average, which is several minutes per task.

@fjetter
Copy link
Member

fjetter commented Oct 15, 2020

I thought about the rescheduling again and realised this could never work. The task priorities would still enforce the same tasks to be rescheduled over and over again. Have you considered giving the follow up tasks higher priorities (maybe in combination w/ rescheduling)? Different priorities, of course, could then just flip the problem, s.t. the follow up tasks block. Maybe there is some sweet spot when putting application knowledge into it.

Apart from this, I don't see any more space. What you are asking for is something where we require task annotations and this is something currently in design, see #6701. Once these annotations arrive we can revisit the semaphore implementation but for now this is all we got.

s it possible to modify the rejoin function to be able to timeout?

Not sure if this helps or if it is possible. I'm actually not even sure why the rejoin blocks in the first place. To my understanding the seceded thread should not stop working but just work on the task it had assigned. Therefore, you should have more than one task on a given machine running.

@pvanderlinden
Copy link
Author

Rejoin is ensuring you don't run multiple tasks on a single worker.

Do you have any idea why submitting a task from a task won't schedule it on any of the idle workers, but instead puts them almost all on a single worker, making the issue even worse? @fjetter

@lr4d
Copy link
Contributor

lr4d commented Oct 15, 2020

I'm scheduling tasks like that in the example

My bad, didn't go through the whole thread.

What do you mean with using futures?

I see you're already doing this, my idea was something like the below. (Mostly pseudocode, don't expect this to work).
e.g.

data = [ ... ] # list of futures
max_leases = n

processed_data = [client.submit(semaphore_using_fn, data.pop(), semaphore=semaphore) for i in range(max_leases)]

# Start running whatever tasks on `processed_data`
...

while data:
   if semaphore.get_value():  # leases available
      processed_data.append(client.submit(semaphore_using_function, data.pop(), semaphore=semaphore))
      # Again, you can schedule a future for the next batch of `processed_data`
      ...
      time.sleep(0.1) # don't choke scheduler

@pvanderlinden
Copy link
Author

@lr4d I have two versions trying to limit concurrencies of certain tasks:

production:

def limit_concurrency(name, max_running=1, func=None):
    if func is None:
        return functools.partial(limit_concurrency, name, max_running)

    @functools.wraps(func)
    def wrapped(*args, **kwargs):
        sem = dd.Semaphore(name=name, max_leases=max_running)
        dd.secede()
        with sem:
            dd.rejoin()
            return func(*args, **kwargs)
    return wrapped

Which has the issue of giving the Semaphore some of the time to a worker which is busy, making it impossible to rejoin, and not really distributing the work.

Attempt 2:

def limit_concurrency(name, max_running=1, func=None):
    if func is None:
        return functools.partial(limit_concurrency, name, max_running)

    @functools.wraps(func)
    def wrapped(*args, **kwargs):
        with dd.worker_client(separate_thread=True) as client:
            sem = dd.Semaphore(name=name, max_leases=max_running)
            with sem:
                future = client.submit(apply, func, args, kwargs)
                return future.result()

    return wrapped

This version seems to trigger a bug in the scheduler, as 4/5 workers are not doing any work most of the time, while there are available tasks ready to run (but submitted by this task). This also makes me unsure about my other usage of the worker_client, won't that also schedule tasks on mostly one worker? I can try to make an isolated example if that helps.

If I look at your example, I have 2 questions: Wouldn't it also produce the same issue where it schedules tasks on mostly one worker? Secondly it makes submitting the follow up tasks difficult, as I have to check for each tasks if the futures belonging to it's dependencies exists already (use multithreading, queues, etc).

@lr4d
Copy link
Contributor

lr4d commented Oct 20, 2020

@pvanderlinden As @fjetter states earlier, there is no supported way to accommodate this use case atm. The code example I shared was just an idea that came to my mind, due to issues with state and parallel operations, I wouldn't expect it to work "properly" either.

@pvanderlinden
Copy link
Author

@lr4d Do you have any idea why the client.submit only/mostly seems to submit tasks to the current worker only?. I literally have 4/5 workers doing nothing (only running deceded tasks) and a long backlog of tasks on one worker. Do I have to give it some hint to schedule it somewhere else?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants