Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added ddata and spmd modes #112

Merged
merged 1 commit into from
Nov 29, 2016
Merged

added ddata and spmd modes #112

merged 1 commit into from
Nov 29, 2016

Conversation

amitmurthy
Copy link
Contributor

@amitmurthy amitmurthy commented Nov 10, 2016

Introduces DData and support for SPMD modes. I have just copied the README changes below. Please also take a look at test/ddata.jl and test/spmd.jl for more examples on usage.

DData (Distributed non-array data)

Sometimes we just want to distribute non-array type items across workers. This is where a DData comes in handy.
Like DArrays s, but without the array operations and assumptions. A DData object can be constructed in a variety of ways

DData{T}(init::Function, pids=workers() - executes init(pididx) on all specified workers. The type T must be specified
and must match the return type of init. pididx is the index of the worker pid into the pids array.

DData{T}(data::Array, pids=workers(); mode=:copy|:distribute) - distributes data onto specified workers.
- Default mode is :copy, which copies the entire data onto all pids.
- With mode=:distribute, data is partitioned across all pids. data must be an array type, and length(data)
must be a multiple of length(pids). If length(data) equals length(pids) and T is a non-array type,
individual items are distributed, else the localparts are appropriate array segments of data.

A couple of helper functions are

gather{T}(d::DData{T}) returns an Array{T} consisting of all distributed elements of d

Given a DData object d, d[] returns the localpart on a worker. d[i] returns the localpart
on the ith worker that d is distributed over.

SPMD Mode (An MPI Style SPMD mode with MPI like primitives)

We can easily run the same block of code on all workers in an SPMD mode using the @everywhere macro or
the spmd function.

For example:

@everywhere begin
  ....
  ....
end

or

# define foo() on all workers
@everywhere function foo(arg1, arg2)
    ....
end

# call foo() everywhere using the `spmd` function
d_in=DArray(.....)
d_out=DData(.....)
spmd(foo,d_in,d_out; pids=workers()) # executes on all workers

The following primitives can be used in SPMD mode.

s_sendto(pid, data; tag=nothing) - sends data to pid
s_recvfrom(pid; tag=nothing) - receives data from pid
s_recvfrom_any(; tag=nothing) - receives data from any pid
s_barrier(;pids=procs(), tag=nothing) - all tasks wait and then proceeed
s_bcast(data, pid; tag=nothing, pids=procs()) - broadcasts the same data over pids from pid
s_scatter(x, pid; tag=nothing, pids=procs()) - distributes x over pids from pid
s_gather(x, pid; tag=nothing, pids=procs()) - collects data from pids onto worker pid
spmd(f, args...; pids=procs()) - Executes f(args...) on all pids

Tag tag should be used to differentiate between consecutive calls of the same type, consecutive s_bcast calls.

NOTE: Instantiating DArray or DData within an SPMD function/block will result in N copies of the
the object. Similarly calling @everywhere or spmd from within a an SPMD function/block will result in N*N parallel
runs. In SPMD mode the function/block is executed concurrently on all workers.

Using DData, SPMD mode and DArrays together

An example:


d_in=d=DArray(I->fill(myid(), (map(length,I)...)), (nworkers(), 2), workers(), [nworkers(),1])
d_out=DData{Any}(i->nothing)

# define the function everywhere
@everywhere function foo_spmd(d_in, d_out)
    pids=sort(vec(procs(d_in)))
    pididx = findfirst(pids, myid())
    mylp = localpart(d_in)
    localsum = 0

    # Have each guy exchange data with its neighbors
    n_pididx = pididx+1 > length(pids) ? 1 : pididx+1
    p_pididx = pididx-1 < 1 ? length(pids) : pididx-1

#    println(p_pididx, " p", pids[p_pididx], " ", n_pididx, " p", pids[n_pididx])
#    println(mylp)

    for i in 1:length(pids)
        s_sendto(pids[n_pididx], mylp[2])
        s_sendto(pids[p_pididx], mylp[1])

        mylp[2] = s_recvfrom(pids[p_pididx])
        mylp[1] = s_recvfrom(pids[n_pididx])

 #       println(mylp)

        s_barrier(;pids=pids)
        localsum = localsum + mylp[1] + mylp[2]
    end

    # finally store the sum in d_out
    d_out[] = localsum
end

# run foo_spmd on all workers
spmd(foo_spmd, d_in, d_out; pids=workers())
@test Any[sum(workers())*2 for i in 1:nworkers()] == gather(d_out)

@ViralBShah
Copy link
Member

Some quick questions and comments:

  1. Does the assignment operator in d_out[] = localsum overwrite the local part of d_out? Why not just make the syntax d_out[myid] = localsum, which is then consistent with setindex?
  2. The pid stuff seems a bit too verbose. It would be nice to eventually get to a point where the pid does not figure in the computation.
  3. Bikeshedding - Can we drop the s_ prefixes?

This definitely seems easier than doing remotecalls for doing the same bit of work.

@amitmurthy
Copy link
Contributor Author

Does the assignment operator in d_out[] = localsum overwrite the local part of d_out?

Yes.

Why not just make the syntax d_out[myid] = localsum, which is then consistent with setindex?

It will need to be d_out[myid()]. Also the index is the ith processor in the participating pids arrays. Not the pid itself.

The pid stuff seems a bit too verbose. It would be nice to eventually get to a point where the pid does not figure in the computation.

I don't see how. Even in regular MPI programs the rank of a processor is part of every API.

Bikeshedding - Can we drop the s_ prefixes?

I would rather not. It makes it clear that the s_ calls are behaviorally different and valid only within an SPMD block/function and not outside.

@ViralBShah
Copy link
Member

Why not put all the spmd calls in an SPMD submodule in that case, and say SPMD.send etc.? That way, one can even import them to avoid the extra typing should one choose to.

@amitmurthy
Copy link
Contributor Author

That is a good suggestion.

@ViralBShah
Copy link
Member

Maybe this is only for experimentation, but this package is a strange place to have the SPMD mode and messaging API.

@amitmurthy
Copy link
Contributor Author

SPMD mode is not very useful without DArray / DData. In regular MPI the entire program runs in SPMD mode. Here only a function / code block runs SPMD while modifying distributed data created previously.

If folks see value in this I would rather rename DistributedArrays.jl to Distributed.jl and work towards making it the default way to do parallel programming in Julia.

@ViralBShah
Copy link
Member

Using a different name for the whole thing is a good idea - but I do not like Distributed.jl. Maybe Parallel.jl, since we do expect this to become a standard way to do parallel computing in julia.

@ViralBShah
Copy link
Member

As we spoke, we should put some thought into composability of nested spmd mode. While not necessary to be super fancy, it should do something reasonable.

@JeffBezanson
Copy link
Contributor

Nice, I like it!

  • There doesn't seem to be much difference between DData and a distributed Vector. It might be easier to add new functions/constructors for distributed vectors instead of adding this type.
  • The worker channels could be wrapped in an SPMD context object instead of being global. Then there could be many simultaneous SPMD operations without interference.

@ViralBShah
Copy link
Member

ViralBShah commented Nov 15, 2016

The key difference Amit explained was doing slightly different things with indexing and assignment, specifically
d_out[] = localsum

but that could easily have been something like
d_out[myid()] = localsum

@ViralBShah
Copy link
Member

Otherwise it is close enough to a distributed vector.

@rofinn
Copy link

rofinn commented Nov 16, 2016

Cool, I've been working on something similar. In my case, I wanted an spmd style interface for distributing a bunch of Tasks. This is the general interface I've been working on.

@everywhere using Hermes

println("Testing `app`")

# Start the app with 100 copies of the function.
# The function is run in 100 `Tasks` across all
# available processors.
app(100) do routine
    println(myid(routine))
    # We use the reference to our specific routine to
    # organize our communication.
    if myid(routine) == RoutineId(1, 1)
        # Loop over peers and send a random array 100 elements
        for peer_id in routines(routine)
            if peer_id != myid(routine)
                datum = rand(100)
                send(routine, peer_id, datum)
            end
        end

        # Collect our results.
        results = gather(routine)
        println("Result: $(sum(results))")
    else
        # Recv the data from the controller `Task`
        datum = recv(routine, RoutineId(1, 1))
        result = sum(datum)
        # Send results back.
        send(routine, RoutineId(1, 1), result)
    end
end

println("Finished Testing.")

I'll note that I don't really like the naming conventions, but a Routine is just a type providing the message passing interface for the function that'll form our Task. Also, I realize this example is rather pointless, but hopefully it demonstrates the basic interface.

@StefanKarpinski
Copy link

I'll note that I don't really like the naming conventions,

To clarify: in this code or in the example code you posted?

@rofinn
Copy link

rofinn commented Nov 16, 2016

Sorry, in the example code I posted.

@amitmurthy
Copy link
Contributor Author

Updated.

There doesn't seem to be much difference between DData and a distributed Vector. It might be easier to add new functions/constructors for distributed vectors instead of adding this type.

from the README.

Working with distributed non-array data
---------------------------------------

The function `ddata(;T::Type=Any, init::Function=I->nothing, pids=workers(), data::Vector=[])` can
 be used to created a distributed vector whose localparts need not be Arrays.

It returns a `DArray{T,1,T}`, i.e., the element type and localtype of the array are the same.

`ddata()` constructs a distributed vector of length `nworkers()` where each localpart can hold 
any value, initially initialized to `nothing`.

Argument `data` if supplied is distributed over the `pids`. `length(data)` must be a multiple of
 `length(pids)`. If the multiple is 1, returns a `DArray{T,1,T}` where T is `eltype(data)`. If the 
multiple is greater than 1, returns a `DArray{T,1,Array{T,1}}`, i.e., it is equivalent to calling 
`distribute(data)`.

`gather{T}(d::DArray{T,1,T})` returns an Array{T,1} consisting of all distributed elements of `d`

Given a `DArray{T,1,T}` object `d`, `d[:L]` returns the localpart on a worker. `d[i]` returns the 
`localpart` on the ith worker that `d` is distributed over.

The worker channels could be wrapped in an SPMD context object instead of being global. Then there could be many simultaneous SPMD operations without interference.

Each SPMD run is implictly executed in a different context. This allows for multiple `spmd` calls to
be active at the same time. A SPMD context can be explicitly specified via keyword arg `context` to
 `spmd`.

`context(pids=procs())` returns a new SPMD context.

A SPMD context also provides a context local storage, a dict, which can be used to store
key-value pairs between spmd runs under the same context.

`context_local_storage()` returns the dictionary associated with the context.

Implementation changes:
Every message is now also tagged with a context_id. The same set of RemoteChannels are used for inter-process communication. Within a process, incoming messages are directed to different local channels, one for each execution context.

The key difference Amit explained ..............

localpart can now be accessed as d[:L]. d[:LP], d[:l], d[:lp] are also equivalent. localparts can also be assigned to via this syntax.

@amitmurthy amitmurthy changed the title RFC/WIP: added ddata and spmd modes added ddata and spmd modes Nov 24, 2016
@amitmurthy
Copy link
Contributor Author

I'll merge this in a day or two. Can refine it further over the next few weeks. Docstrings can be added once the API is more or less stable.

@amitmurthy amitmurthy merged commit c5b8afc into master Nov 29, 2016
@amitmurthy amitmurthy deleted the amitm/ddata_mpistyle branch November 29, 2016 03:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants