Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Restructure cluster manager #96

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
69a9914
Implement Comm_dup and Comm_free
eschnett Nov 25, 2015
d5e7419
Implement Testall and Waitany
eschnett Nov 25, 2015
eced9f3
Convert some MPI functions results to Int
eschnett Nov 25, 2015
c9d31da
Reformat / clean up source code
eschnett Nov 25, 2015
ca5aff2
Clean up source code
eschnett Nov 25, 2015
a454737
Clean up Monte Carlo example
eschnett Nov 25, 2015
57226be
Restructure MPI cluster manager
eschnett Nov 25, 2015
a4d454c
Disable Testany, which doesn't work on OS X in MPICH
eschnett Nov 25, 2015
739f84a
Re-enable TestAny; install mpich from source on OS X
eschnett Nov 25, 2015
0b9c910
Add --verbose to `brew --install` to avoid Travis timing out
eschnett Nov 25, 2015
399d99f
Implement Waitsome!, Testsome!
eschnett Nov 25, 2015
03bdbbf
Use Testsome! instead of Testany! in cluster manager
eschnett Nov 25, 2015
9274c1f
Don't build MPICH from source in Travis on OS X
eschnett Nov 25, 2015
59a503f
Correct handling of index arrays
eschnett Nov 25, 2015
80ec5be
Don't need to set finished requests to REQUEST_NULL
eschnett Nov 25, 2015
6bb01e3
Build MPICH and OpenMPI on OS X, replacing output with a progress marker
eschnett Nov 25, 2015
be8e4a6
Clean up index handling
eschnett Nov 25, 2015
440b204
Add MPI functions to CMakeLists.txt
eschnett Nov 25, 2015
eb9d62d
Add MPI_Scatter to CMakeLists.txt
eschnett Nov 25, 2015
9f1103c
Reformat travis-install-mpi.sh
eschnett Nov 25, 2015
f9fc07c
Re-indent auto-generated files
eschnett Nov 26, 2015
192226d
Merge branch 'master' into eschnett/cman
eschnett Nov 26, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion conf/travis-install-mpi.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ case "$os" in
openmpi)
# Temporarily build OpenMPI from source, since apparently our
# cmake doesn't know how to handle OpenMPI 1.10 yet
brew install openmpi --build-from-source
brew install openmpi --build-from-source --verbose |
sed -e 's/^.*$/./'
;;
*)
echo "Unknown MPI implementation: $MPI_IMPL"
Expand Down
67 changes: 57 additions & 10 deletions deps/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,63 @@ ENDIF(CMAKE_INSTALL_PREFIX_INITIALIZED_TO_DEFAULT)

include(FortranCInterface)

FortranCInterface_HEADER(jlmpi_f2c.h MACRO_NAMESPACE "JLMPI_"
SYMBOLS MPI_SEND MPI_RECV MPI_GET_COUNT MPI_BSEND MPI_SSEND MPI_RSEND
MPI_ISEND MPI_IRECV MPI_WAIT MPI_TEST MPI_REQUEST_FREE MPI_WAITANY MPI_IPROBE
MPI_PROBE MPI_CANCEL MPI_PACK MPI_UNPACK MPI_PACK_SIZE MPI_BARRIER MPI_BCAST
MPI_GATHER MPI_REDUCE MPI_ALLREDUCE MPI_COMM_SIZE MPI_COMM_RANK MPI_COMM_DUP
MPI_COMM_SPLIT MPI_COMM_FREE MPI_GET_PROCESSOR_NAME MPI_WTIME MPI_INIT
MPI_FINALIZE MPI_INITIALIZED MPI_FINALIZED MPI_ABORT MPI_ALLTOALL MPI_SEND_INIT
MPI_RECV_INIT MPI_OP_CREATE MPI_WAITALL MPI_OP_FREE MPI_SCAN MPI_SCATTER
MPI_GATHER MPI_EXSCAN MPI_SCATTERV MPI_GATHERV MPI_ALLGATHER MPI_ALLGATHERV
MPI_ALLTOALLV)
# Keep these function names sorted alphabetically
FortranCInterface_HEADER(jlmpi_f2c.h MACRO_NAMESPACE "JLMPI_" SYMBOLS
MPI_ABORT
MPI_ALLGATHER
MPI_ALLGATHERV
MPI_ALLREDUCE
MPI_ALLTOALL
MPI_ALLTOALLV
MPI_BARRIER
MPI_BCAST
MPI_BSEND
MPI_CANCEL
MPI_COMM_DUP
MPI_COMM_FREE
MPI_COMM_RANK
MPI_COMM_SIZE
MPI_COMM_SPLIT
MPI_EXSCAN
MPI_FINALIZE
MPI_FINALIZED
MPI_GATHER
MPI_GATHER
MPI_GATHERV
MPI_GET_COUNT
MPI_GET_PROCESSOR_NAME
MPI_INIT
MPI_INITIALIZED
MPI_IPROBE
MPI_IRECV
MPI_ISEND
MPI_OP_CREATE
MPI_OP_FREE
MPI_PACK
MPI_PACK_SIZE
MPI_PROBE
MPI_RECV
MPI_RECV_INIT
MPI_REDUCE
MPI_REQUEST_FREE
MPI_RSEND
MPI_SCAN
MPI_SCATTER
MPI_SCATTERV
MPI_SEND
MPI_SEND_INIT
MPI_SSEND
MPI_TEST
MPI_TESTALL
MPI_TESTANY
MPI_TESTSOME
MPI_UNPACK
MPI_WAIT
MPI_WAITALL
MPI_WAITANY
MPI_WAITSOME
MPI_WTIME
)

Include_directories(${MPI_C_INCLUDE_PATH})
include_directories(${MPI_Fortran_INCLUDE_PATH})
Expand Down
88 changes: 44 additions & 44 deletions deps/gen_constants.f90
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,58 @@ program gen_constants
implicit none
include 'mpif.h'

call output("MPI_BYTE ", MPI_BYTE)
call output("MPI_BYTE ", MPI_BYTE)
! Older versions of OpenMPI (such as those used by default by
! Travis) do not define MPI_WCHAR and the MPI_*INT*_T types for
! Fortran. We thus don't require them (yet).
! call output("MPI_WCHAR ", MPI_WCHAR)
! call output("MPI_INT8_T ", MPI_INT8_T)
! call output("MPI_UINT8_T ", MPI_UINT8_T)
! call output("MPI_INT16_T ", MPI_INT16_T)
! call output("MPI_UINT16_T ", MPI_UINT16_T)
! call output("MPI_INT32_T ", MPI_INT32_T)
! call output("MPI_UINT32_T ", MPI_UINT32_T)
! call output("MPI_INT64_T ", MPI_INT64_T)
! call output("MPI_UINT64_T ", MPI_UINT64_T)
call output("MPI_INTEGER1 ", MPI_INTEGER1)
call output("MPI_INTEGER2 ", MPI_INTEGER2)
call output("MPI_INTEGER4 ", MPI_INTEGER4)
call output("MPI_INTEGER8 ", MPI_INTEGER8)
call output("MPI_REAL4 ", MPI_REAL4)
call output("MPI_REAL8 ", MPI_REAL8)
call output("MPI_COMPLEX8 ", MPI_COMPLEX8)
call output("MPI_COMPLEX16 ", MPI_COMPLEX16)
! call output("MPI_WCHAR ", MPI_WCHAR)
! call output("MPI_INT8_T ", MPI_INT8_T)
! call output("MPI_UINT8_T ", MPI_UINT8_T)
! call output("MPI_INT16_T ", MPI_INT16_T)
! call output("MPI_UINT16_T ", MPI_UINT16_T)
! call output("MPI_INT32_T ", MPI_INT32_T)
! call output("MPI_UINT32_T ", MPI_UINT32_T)
! call output("MPI_INT64_T ", MPI_INT64_T)
! call output("MPI_UINT64_T ", MPI_UINT64_T)
call output("MPI_INTEGER1 ", MPI_INTEGER1)
call output("MPI_INTEGER2 ", MPI_INTEGER2)
call output("MPI_INTEGER4 ", MPI_INTEGER4)
call output("MPI_INTEGER8 ", MPI_INTEGER8)
call output("MPI_REAL4 ", MPI_REAL4)
call output("MPI_REAL8 ", MPI_REAL8)
call output("MPI_COMPLEX8 ", MPI_COMPLEX8)
call output("MPI_COMPLEX16 ", MPI_COMPLEX16)

call output("MPI_COMM_NULL ", MPI_COMM_NULL)
call output("MPI_COMM_SELF ", MPI_COMM_SELF)
call output("MPI_COMM_WORLD ", MPI_COMM_WORLD)
call output("MPI_COMM_NULL ", MPI_COMM_NULL)
call output("MPI_COMM_SELF ", MPI_COMM_SELF)
call output("MPI_COMM_WORLD ", MPI_COMM_WORLD)

call output("MPI_OP_NULL ", MPI_OP_NULL)
call output("MPI_BAND ", MPI_BAND)
call output("MPI_BOR ", MPI_BOR)
call output("MPI_BXOR ", MPI_BXOR)
call output("MPI_LAND ", MPI_LAND)
call output("MPI_LOR ", MPI_LOR)
call output("MPI_LXOR ", MPI_LXOR)
call output("MPI_MAX ", MPI_MAX)
call output("MPI_MAXLOC ", MPI_MAXLOC)
call output("MPI_MIN ", MPI_MIN)
call output("MPI_MINLOC ", MPI_MINLOC)
call output("MPI_PROD ", MPI_PROD)
call output("MPI_REPLACE ", MPI_REPLACE)
call output("MPI_SUM ", MPI_SUM)
call output("MPI_OP_NULL ", MPI_OP_NULL)
call output("MPI_BAND ", MPI_BAND)
call output("MPI_BOR ", MPI_BOR)
call output("MPI_BXOR ", MPI_BXOR)
call output("MPI_LAND ", MPI_LAND)
call output("MPI_LOR ", MPI_LOR)
call output("MPI_LXOR ", MPI_LXOR)
call output("MPI_MAX ", MPI_MAX)
call output("MPI_MAXLOC ", MPI_MAXLOC)
call output("MPI_MIN ", MPI_MIN)
call output("MPI_MINLOC ", MPI_MINLOC)
call output("MPI_PROD ", MPI_PROD)
call output("MPI_REPLACE ", MPI_REPLACE)
call output("MPI_SUM ", MPI_SUM)

call output("MPI_REQUEST_NULL ", MPI_REQUEST_NULL)
call output("MPI_REQUEST_NULL", MPI_REQUEST_NULL)

call output("MPI_STATUS_SIZE ", MPI_STATUS_SIZE)
call output("MPI_ERROR ", MPI_ERROR)
call output("MPI_SOURCE ", MPI_SOURCE)
call output("MPI_TAG ", MPI_TAG)
call output("MPI_STATUS_SIZE ", MPI_STATUS_SIZE)
call output("MPI_ERROR ", MPI_ERROR)
call output("MPI_SOURCE ", MPI_SOURCE)
call output("MPI_TAG ", MPI_TAG)

call output("MPI_ANY_SOURCE ", MPI_ANY_SOURCE)
call output("MPI_ANY_TAG ", MPI_ANY_TAG)
call output("MPI_TAG_UB ", MPI_TAG_UB)
call output("MPI_UNDEFINED ", MPI_UNDEFINED)
call output("MPI_ANY_SOURCE ", MPI_ANY_SOURCE)
call output("MPI_ANY_TAG ", MPI_ANY_TAG)
call output("MPI_TAG_UB ", MPI_TAG_UB)
call output("MPI_UNDEFINED ", MPI_UNDEFINED)

contains

Expand Down
16 changes: 11 additions & 5 deletions deps/gen_functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
#define STRING1(s) #s
#define STRING(s) STRING1(s)

int main(int argc, char *argv[])
{
printf("const WRAPPER_VERSION = \"%s\"\n", VERSION);
int main(int argc, char *argv[]) {
printf("const WRAPPER_VERSION = \"%s\"\n", VERSION);
printf("\n");
printf("const MPI_ABORT = :%s\n", STRING(MPI_ABORT));
printf("const MPI_ALLGATHER = :%s\n", STRING(MPI_ALLGATHER));
printf("const MPI_ALLGATHERV = :%s\n", STRING(MPI_ALLGATHERV));
Expand All @@ -30,7 +30,8 @@ int main(int argc, char *argv[])
printf("const MPI_GATHER = :%s\n", STRING(MPI_GATHER));
printf("const MPI_GATHERV = :%s\n", STRING(MPI_GATHERV));
printf("const MPI_GET_COUNT = :%s\n", STRING(MPI_GET_COUNT));
printf("const MPI_GET_PROCESSOR_NAME = :%s\n", STRING(MPI_GET_PROCESSOR_NAME));
printf("const MPI_GET_PROCESSOR_NAME = :%s\n",
STRING(MPI_GET_PROCESSOR_NAME));
printf("const MPI_INIT = :%s\n", STRING(MPI_INIT));
printf("const MPI_INITIALIZED = :%s\n", STRING(MPI_INITIALIZED));
printf("const MPI_IPROBE = :%s\n", STRING(MPI_IPROBE));
Expand All @@ -53,12 +54,17 @@ int main(int argc, char *argv[])
printf("const MPI_SEND_INIT = :%s\n", STRING(MPI_SEND_INIT));
printf("const MPI_SSEND = :%s\n", STRING(MPI_SSEND));
printf("const MPI_TEST = :%s\n", STRING(MPI_TEST));
printf("const MPI_TESTALL = :%s\n", STRING(MPI_TESTALL));
printf("const MPI_TESTANY = :%s\n", STRING(MPI_TESTANY));
printf("const MPI_TESTSOME = :%s\n", STRING(MPI_TESTSOME));
printf("const MPI_UNPACK = :%s\n", STRING(MPI_UNPACK));
printf("const MPI_WAIT = :%s\n", STRING(MPI_WAIT));
printf("const MPI_WAITALL = :%s\n", STRING(MPI_WAITALL));
printf("const MPI_WAITANY = :%s\n", STRING(MPI_WAITANY));
printf("const MPI_WAITSOME = :%s\n", STRING(MPI_WAITSOME));
printf("const MPI_WTIME = :%s\n", STRING(MPI_WTIME));
printf("\n");
printf("bitstype %d CComm\n", (int)(sizeof(MPI_Comm) * 8));

printf("bitstype %lu CComm\n", sizeof(MPI_Comm) * 8);
return 0;
}
15 changes: 8 additions & 7 deletions examples/05-juliacman.jl
Original file line number Diff line number Diff line change
@@ -1,35 +1,36 @@
# Note: Run this script without using `mpirun`

using MPI

manager=MPIManager(np=4)
manager = MPIManager(np=4)
addprocs(manager)

println("Added procs $(procs())")

println("Running 01-hello as part of a Julia cluster")
@mpi_do manager (include("01-hello-impl.jl"); do_hello())
@mpi_do manager (include("01-hello-impl.jl"); do_hello())

#Interspersed julia parallel call
# Interspersed julia parallel call
nheads = @parallel (+) for i=1:10^8
Int(rand(Bool))
end
println("@parallel nheads $nheads")

println("Running 02-broadcast as part of a Julia cluster")
@mpi_do manager (include("02-broadcast-impl.jl"); do_broadcast())
@mpi_do manager (include("02-broadcast-impl.jl"); do_broadcast())

M = [rand(10,10) for i=1:10]
pmap(svd, M)
println("pmap successful")

println("Running 03-reduce as part of a Julia cluster")
@mpi_do manager (include("03-reduce-impl.jl"); do_reduce())
@mpi_do manager (include("03-reduce-impl.jl"); do_reduce())

pids = [remotecall_fetch(myid, p) for p in workers()]
println("julia pids $pids")

println("Running 04-sendrecv as part of a Julia cluster")
@mpi_do manager (include("04-sendrecv-impl.jl"); do_sendrecv())
@mpi_do manager (include("04-sendrecv-impl.jl"); do_sendrecv())

println("Exiting")
exit()

84 changes: 42 additions & 42 deletions examples/06-cman-transport.jl
Original file line number Diff line number Diff line change
@@ -1,59 +1,59 @@
using MPI

comm, comm_size, rank = MPI.init_mpi()
MPI.Init()
rank = MPI.Comm_rank(MPI.COMM_WORLD)
size = MPI.Comm_size(MPI.COMM_WORLD)

include("01-hello-impl.jl")
include("02-broadcast-impl.jl")
include("03-reduce-impl.jl")
include("04-sendrecv-impl.jl")

if length(ARGS) == 0
print("Please specify a transport option to use [MPI|TCP]\n")
exit()
println("Please specify a transport option to use [MPI|TCP]")
MPI.Finalize()
exit(1)
elseif ARGS[1] == "TCP"
manager = MPI.start(TCP_TRANSPORT_ALL) # does not return on worker
manager = MPI.start_main_loop(TCP_TRANSPORT_ALL) # does not return on worker
elseif ARGS[1] == "MPI"
manager = MPI.start(MPI_TRANSPORT_ALL) # does not return on worker
manager = MPI.start_main_loop(MPI_TRANSPORT_ALL) # does not return on worker
else
print("Valid transport options are [MPI|TCP]\n")
exit()
println("Valid transport options are [MPI|TCP]")
MPI.Finalize()
exit(1)
end

if rank == 0
nloops = 10^2
function foo(n)
a=ones(n)
remotecall_fetch(x->x, 2, a);
# Check whether a worker accidentally returned
@assert rank == 0

@elapsed for i in 1:nloops
remotecall_fetch(x->x, 2, a)
end
end

n=10^3
foo(1)
t=foo(n)
println("$t seconds for $nloops loops of send-recv of array size $n")

n=10^6
foo(1)
t=foo(n)
println("$t seconds for $nloops loops of send-recv of array size $n")


print("EXAMPLE: HELLO\n")
@mpi_do manager do_hello()
print("EXAMPLE: BROADCAST\n")
@mpi_do manager do_broadcast()
print("EXAMPLE: REDUCE\n")
@mpi_do manager do_reduce()
print("EXAMPLE: SENDRECV\n")
@mpi_do manager do_sendrecv()

# Abscence of a MPI Finalize causes the cluster to hang - don't yet know why
if ARGS[1] == "TCP"
@mpi_do manager MPI.Finalize()
elseif ARGS[1] == "MPI"
@everywhere (MPI.Finalize(); exit())
nloops = 10^2
function foo(n)
a=ones(n)
remotecall_fetch(x->x, mod1(2, size), a);
@elapsed for i in 1:nloops
remotecall_fetch(x->x, mod1(2, size), a)
end
end

n=10^3
foo(1)
t=foo(n)
println("$t seconds for $nloops loops of send-recv of array size $n")

n=10^6
foo(1)
t=foo(n)
println("$t seconds for $nloops loops of send-recv of array size $n")

# We cannot run these examples since they use MPI.Barrier and other blocking
# communication, disabling our event loop
# print("EXAMPLE: HELLO\n")
# @mpi_do manager do_hello()
# print("EXAMPLE: BROADCAST\n")
# @mpi_do manager do_broadcast()
# print("EXAMPLE: REDUCE\n")
# @mpi_do manager do_reduce()
# print("EXAMPLE: SENDRECV\n")
# @mpi_do manager do_sendrecv()

MPI.stop_main_loop(manager)
Loading