Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Dask Client to be configured #86

Open
AlecThomson opened this issue Jun 28, 2022 · 2 comments
Open

Allow Dask Client to be configured #86

AlecThomson opened this issue Jun 28, 2022 · 2 comments

Comments

@AlecThomson
Copy link

Hi there,

I'm wondering if it would be possible to add an optional configuration within tricolour to allow either a Dask Client object to passed in, or an address to a externally running dask-client from the command-line. This would, if specified, bypass the currently created ThreadPool object. The upside of this would allow all Dask Client types, such as dask-mpi and dask-jobqueue which can run jobs over multiple nodes.

Apologies for not opening a PR or similar, but I'm not fully familiar with all the workigs of this project, nor contextlib specifically.

A simple change to app.py's main could just be:

def main():
    with contextlib.ExitStack() as stack:
        # Limit numpy/blas etc threads to 1, as we obtain
        # our parallelism with dask threads
        stack.enter_context(threadpool_limits(limits=1))

        args = create_parser().parse_args()
        if args.client_address is None: #assuming it defaults to None
                # Configure dask pool
        
                if args.nworkers <= 1:
                    log.warn("Entering single threaded mode per user request!")
                    dask.config.set(scheduler='single-threaded')
                else:
                    stack.enter_context(dask.config.set(
                        pool=ThreadPool(args.nworkers)))
        else:
            client = Client(args.client_address)
            # Some context magic here...
            # Maybe stack.enter_context(Client(args.client_address)) ?

        _main(args)

Sorry if this had already been considered and subsequently ruled out!

@sjperkins
Copy link
Member

sjperkins commented Jun 28, 2022

Hi @AlecThomson! Thanks for posting this.

What you're suggesting is indeed feasible. If I might correctly your suggestion slightly, one would pass a scheduler address as a tricolour argument, allowing the tricolour graph to be submitted to a distributed dask scheduler for execution on dask worker nodes.

Some effort would need to investigate whether the dask distributed scheduler successfully handles these graphs.
See dask/distributed#6360 that discuss some the existing issues with the distributed scheduler.

In particular, tricolour packs scan data into a (baseline, time, chan, corr) chunk which is then rechunked per baseline so that flagging is parallelised over this dimension.

The packing step can be done in two ways, depending on the --window-backend option:

  1. in memory, which results in gathering all scan chunks into a single window chunk which is then rechunked.
  2. on disk using zarr, which might be a bit slower, but IIRC has an embarrassingly parallel independent graph.

I would guess (2) would work better on the distributed scheduler, for instance.

@bennahugo
Copy link
Collaborator

bennahugo commented Jun 28, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants