Skip to content

Commit

Permalink
Merge branch 'master' of github.com:patflick/mxx
Browse files Browse the repository at this point in the history
  • Loading branch information
patflick committed Mar 18, 2016
2 parents f3a3fc8 + f967540 commit c6ddac5
Show file tree
Hide file tree
Showing 9 changed files with 576 additions and 123 deletions.
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ before_script:
script:
# build mxx and run tests
- make
- ./bin/test-all # run without mpi (= -np 1)
- mpiexec -np 4 ./bin/test-all
- mpiexec -np 13 ./bin/test-all
- ./bin/mxx-test-all # run without mpi (= -np 1)
- mpiexec -np 4 ./bin/mxx-test-all
- mpiexec -np 13 ./bin/mxx-test-all

after_success:
# only collect coverage if compiled with gcc
Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ include_directories("${PROJECT_SOURCE_DIR}")
# build tests
add_subdirectory(gtest)
add_subdirectory(test)
add_subdirectory(src)
440 changes: 364 additions & 76 deletions include/mxx/benchmark.hpp

Large diffs are not rendered by default.

137 changes: 110 additions & 27 deletions include/mxx/collective.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,7 @@ std::vector<T> all2all(const std::vector<T>& msgs, const mxx::comm& comm = mxx::
* All-to-all-V *
*********************************************************************/

#define MXX_CHAR_ALL2ALL_ALIGN 1
#define MXX_BENCHMARK_ALL2ALL 1

/**
Expand All @@ -1230,40 +1231,44 @@ std::vector<T> all2all(const std::vector<T>& msgs, const mxx::comm& comm = mxx::
*
* @see MPI_Alltoallv
*
* @tparam T The data type of the elements.
* @param msgs Pointer to memory containing the elements to be send.
* @tparam T The data type of the elements.
* @param msgs Pointer to memory containing the elements to be send.
* @param send_sizes A `std::vector` of size `comm.size()`, this contains
* the number of elements to be send to each of the processes.
* @param out Pointer to memory for writing the received messages.
* @param send_displs Offsets (in number of elements) of where each
* message starts for each process.
* @param out Pointer to memory for writing the received messages.
* @param recv_sizes A `std::vector` of size `comm.size()`, this contains
* the number of elements to be received from each process.
* @param comm The communicator (`comm.hpp`). Defaults to `world`.
* @param recv_displs Offsets (in number of elements) of where each
* message should be received for each process.
* @param comm The communicator (`comm.hpp`). Defaults to `world`.
*/
template <typename T>
void all2allv(const T* msgs, const std::vector<size_t>& send_sizes, T* out, const std::vector<size_t>& recv_sizes, const mxx::comm& comm = mxx::comm()) {
void all2allv(const T* msgs, const std::vector<size_t>& send_sizes, const std::vector<size_t>& send_displs, T* out, const std::vector<size_t>& recv_sizes, const std::vector<size_t>& recv_displs, const mxx::comm& comm = mxx::comm()) {
size_t total_send_size = std::accumulate(send_sizes.begin(), send_sizes.end(), static_cast<size_t>(0));
size_t total_recv_size = std::accumulate(recv_sizes.begin(), recv_sizes.end(), static_cast<size_t>(0));
size_t local_max_size = std::max(total_send_size, total_recv_size);
mxx::datatype mpi_sizet = mxx::get_datatype<size_t>();
size_t max;
MPI_Allreduce(&local_max_size, &max, 1, mpi_sizet.type(), MPI_MAX, comm);
if (max >= mxx::max_int) {
impl::all2allv_big(msgs, send_sizes, out, recv_sizes, comm);
impl::all2allv_big(msgs, send_sizes, send_displs, out, recv_sizes, recv_displs, comm);
} else {
// convert vectors to integer counts
std::vector<int> send_counts(send_sizes.begin(), send_sizes.end());
std::vector<int> recv_counts(recv_sizes.begin(), recv_sizes.end());
// get displacements
std::vector<int> send_displs = impl::get_displacements(send_counts);
std::vector<int> recv_displs = impl::get_displacements(recv_counts);
std::vector<int> send_dis(send_displs.begin(), send_displs.end());
std::vector<int> recv_dis(recv_displs.begin(), recv_displs.end());
// call regular alltoallv
mxx::datatype dt = mxx::get_datatype<T>();
#if MXX_BENCHMARK_ALL2ALL
comm.barrier();
auto start = std::chrono::steady_clock::now();
#endif
MPI_Alltoallv(const_cast<T*>(msgs), &send_counts[0], &send_displs[0], dt.type(),
out, &recv_counts[0], &recv_displs[0], dt.type(), comm);
MPI_Alltoallv(const_cast<T*>(msgs), &send_counts[0], &send_dis[0], dt.type(),
out, &recv_counts[0], &recv_dis[0], dt.type(), comm);
#if MXX_BENCHMARK_ALL2ALL
auto end = std::chrono::steady_clock::now();
// time in microseconds
Expand Down Expand Up @@ -1294,30 +1299,108 @@ void all2allv(const T* msgs, const std::vector<size_t>& send_sizes, T* out, cons
}
}

/**
* @brief Character All-to-all which first fixes alignment for better performance.
*
* This version is a workaround/fix for badly performing character all2alls
* for when the different messages are not aligned the same on the sending
* and receiving part
*
* @see MPI_Alltoallv
*
* @tparam T The data type of the elements.
* @param msgs Pointer to memory containing the elements to be send.
* @param send_sizes A `std::vector` of size `comm.size()`, this contains
* the number of elements to be send to each of the processes.
* @param out Pointer to memory for writing the received messages.
* @param recv_sizes A `std::vector` of size `comm.size()`, this contains
* the number of elements to be received from each process.
* @param comm The communicator (`comm.hpp`). Defaults to `world`.
*/
template <typename T>
void all2allv(const T* msgs, const std::vector<size_t>& send_sizes, const std::vector<size_t>& send_displs, T* out, const std::vector<size_t>& recv_sizes, const std::vector<size_t>& recv_displs, const mxx::comm& comm = mxx::comm()) {
void char_all2allv(const T* in, const std::vector<size_t>& send_counts, T* out, const std::vector<size_t>& recv_counts, const mxx::comm& c) {
std::vector<size_t> displs = impl::get_displacements(send_counts);
std::vector<size_t> counts(send_counts);
std::vector<unsigned int> offsets(c.size(), 0);
// increase the size of the message and decrease the displacements
// so that each message is aligned to `align`
size_t align = 8;
for (int i = 0; i < c.size(); ++i) {
// round dipls down to next align
if (i > 0 && displs[i] % align != 0) {
offsets[i] = displs[i] % align;
displs[i] -= offsets[i];
counts[i] += offsets[i];
}
// round up to next align
if (i+1 < c.size() && counts[i] % align != 0) {
counts[i] += align - (counts[i] % align);
}
}

// calculate new all2all parameters
std::vector<unsigned int> recv_offsets = mxx::all2all(offsets, c);
std::vector<size_t> aligned_recv_counts = mxx::all2all(counts, c);
std::vector<size_t> aligned_recv_displs = impl::get_displacements(aligned_recv_counts);
size_t aligned_recv_size = std::accumulate(aligned_recv_counts.begin(), aligned_recv_counts.end(), static_cast<size_t>(0));
std::vector<T> recv_buffer(aligned_recv_size);

// call all2allv with custom displacements
mxx::all2allv(in, counts, displs, &recv_buffer[0], aligned_recv_counts, aligned_recv_displs, c);

// fix offsets and copy into real recv buffer
T* o = out;
for (int i = 0; i < c.size(); ++i) {
const T* it = &recv_buffer[0]+aligned_recv_displs[i]+recv_offsets[i];
std::copy(it, it + recv_counts[i], o);
o += recv_counts[i];
}
}

/**
* @brief All-to-all message exchange between all processes in the communicator.
*
* This function if for the case that the number of elements send to and received
* from each process is not always the same, such that `all2all()` can not
* be used.
*
* The number of elements send to each process from this process are given
* by the parameter `send_sizes`.
* The number of elements received from each process to this process are given
* by the parameter `recv_sizes`.
*
* @see MPI_Alltoallv
*
* @tparam T The data type of the elements.
* @param msgs Pointer to memory containing the elements to be send.
* @param send_sizes A `std::vector` of size `comm.size()`, this contains
* the number of elements to be send to each of the processes.
* @param out Pointer to memory for writing the received messages.
* @param recv_sizes A `std::vector` of size `comm.size()`, this contains
* the number of elements to be received from each process.
* @param comm The communicator (`comm.hpp`). Defaults to `world`.
*/
template <typename T>
void all2allv(const T* msgs, const std::vector<size_t>& send_sizes, T* out, const std::vector<size_t>& recv_sizes, const mxx::comm& comm = mxx::comm()) {
#if MXX_CHAR_ALL2ALL_ALIGN
size_t total_send_size = std::accumulate(send_sizes.begin(), send_sizes.end(), static_cast<size_t>(0));
size_t total_recv_size = std::accumulate(recv_sizes.begin(), recv_sizes.end(), static_cast<size_t>(0));
size_t local_max_size = std::max(total_send_size, total_recv_size);
mxx::datatype mpi_sizet = mxx::get_datatype<size_t>();
size_t max;
MPI_Allreduce(&local_max_size, &max, 1, mpi_sizet.type(), MPI_MAX, comm);
if (max >= mxx::max_int) {
impl::all2allv_big(msgs, send_sizes, send_displs, out, recv_sizes, recv_displs, comm);
bool use_align = mxx::any_of(total_send_size/comm.size() >= 100, comm);
if (sizeof(T) == 1 && use_align) {
// align before sending
char_all2allv(msgs, send_sizes, out, recv_sizes, comm);
} else {
// convert vectors to integer counts
std::vector<int> send_counts(send_sizes.begin(), send_sizes.end());
std::vector<int> recv_counts(recv_sizes.begin(), recv_sizes.end());
#endif
// get displacements
std::vector<int> send_dis(send_displs.begin(), send_displs.end());
std::vector<int> recv_dis(recv_displs.begin(), recv_displs.end());
// call regular alltoallv
mxx::datatype dt = mxx::get_datatype<T>();
MPI_Alltoallv(const_cast<T*>(msgs), &send_counts[0], &send_dis[0], dt.type(),
out, &recv_counts[0], &recv_dis[0], dt.type(), comm);
std::vector<size_t> send_displs = impl::get_displacements(send_sizes);
std::vector<size_t> recv_displs = impl::get_displacements(recv_sizes);
// call more specialized all2allv
all2allv(msgs, send_sizes, send_displs, out, recv_sizes, recv_displs, comm);
#if MXX_CHAR_ALL2ALL_ALIGN
}
#endif
}


/**
* @brief All-to-all message exchange between all processes in the communicator.
*
Expand Down
3 changes: 2 additions & 1 deletion include/mxx/comm_fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class comm {
comm(const MPI_Comm& c) {
mpi_comm = c;
do_free = false;
init_ranksize();
if (c != MPI_COMM_NULL)
init_ranksize();
}

// disable copying
Expand Down
3 changes: 3 additions & 0 deletions include/mxx/env.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
#define MXX_ENV_HPP

#include <mpi.h>
#include <exception>
#include <stdexcept>
#include <string>

namespace mxx {

Expand Down
7 changes: 7 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
cmake_minimum_required(VERSION 2.6)

# project settings
project(mxx-bm)

add_executable(mxx-bm-vote-off vote_off.cpp)
target_link_libraries(mxx-bm-vote-off ${MPI_LIBRARIES})
70 changes: 70 additions & 0 deletions src/vote_off.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@

#include <mxx/env.hpp>
#include <mxx/comm.hpp>
#include <mxx/benchmark.hpp>

std::string exec_name;

void print_usage() {
std::cerr << "Usage: " << exec_name << " <n> <out-node-filename>" << std::endl;
std::cerr << "where" << std::endl;
std::cerr << " <n> Number of nodes to vote off." << std::endl;
std::cerr << " <out-node-filename> Filename for the new nodefile, output by this program." << std::endl;
}

int main(int argc, char* argv[]) {
mxx::env e(argc, argv);
mxx::comm comm;

// create shared-mem MPI+MPI hybrid communicator
mxx::hybrid_comm hc(comm);

// assert same number processors per node
int proc_per_node = hc.local.size();
if (!mxx::all_same(proc_per_node, comm)) {
std::cerr << "Error: this benchmark assumes the same number of processors per node" << std::endl;
MPI_Abort(comm, -1);
}

// assert we have an even number of nodes
int num_nodes = hc.num_nodes();
if (num_nodes % 2 != 0) {
std::cerr << "Error: this benchmark assumes an even number of nodes" << std::endl;
MPI_Abort(comm, -1);
}

// parse input arguments
exec_name = argv[0];
if (argc < 3) {
print_usage();
MPI_Abort(comm, -1);
}
int n_vote_off = atoi(argv[1]);
std::string output_nodefile(argv[2]);
if (n_vote_off < 0) {
print_usage();
MPI_Abort(comm, -1);
}

bool benchmark_char_align = false;

std::vector<double> bw_row = mxx::pairwise_bw_matrix(hc);
mxx::print_bw_matrix_stats(hc, bw_row);
bool part = mxx::vote_off(hc, n_vote_off, bw_row);
if (hc.global.rank() == 0)
std::cout << "Before vote off: " << std::endl;
mxx::bw_all2all(hc.global, hc.local);
if (hc.global.rank() == 0)
std::cout << "After vote off: " << std::endl;
hc.with_nodes(part, [&](const mxx::hybrid_comm& subhc) {
mxx::bw_all2all(subhc.global, subhc.local);
if (benchmark_char_align) {
mxx::bw_all2all_char(subhc.global, subhc.local);
mxx::bw_all2all_unaligned_char(subhc.global, subhc.local, false);
if (subhc.global.rank() == 0)
std::cout << "== With re-alignment" << std::endl;
mxx::bw_all2all_unaligned_char(subhc.global, subhc.local, true);
}
});
mxx::write_new_nodefile(hc, part, output_nodefile);
}
32 changes: 16 additions & 16 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,30 @@ cmake_minimum_required(VERSION 2.6)
# project settings
project(mxx-test)

add_executable(test-collective test_collective.cpp)
target_link_libraries(test-collective mxx-gtest-main)
add_executable(mxx-test-collective test_collective.cpp)
target_link_libraries(mxx-test-collective mxx-gtest-main)

add_executable(test-reductions test_reductions.cpp)
target_link_libraries(test-reductions mxx-gtest-main)
add_executable(mxx-test-reductions test_reductions.cpp)
target_link_libraries(mxx-test-reductions mxx-gtest-main)

add_executable(test-send test_send.cpp)
target_link_libraries(test-send mxx-gtest-main)
add_executable(mxx-test-send test_send.cpp)
target_link_libraries(mxx-test-send mxx-gtest-main)

add_executable(test-sort test_sort.cpp)
target_link_libraries(test-sort mxx-gtest-main)
add_executable(mxx-test-sort test_sort.cpp)
target_link_libraries(mxx-test-sort mxx-gtest-main)

add_executable(test-distribution test_distribution.cpp)
target_link_libraries(test-distribution mxx-gtest-main)
add_executable(mxx-test-distribution test_distribution.cpp)
target_link_libraries(mxx-test-distribution mxx-gtest-main)

add_executable(benchmarks benchmarks.cpp)
target_link_libraries(benchmarks mxx-gtest-main)
add_executable(mxx-benchmarks benchmarks.cpp)
target_link_libraries(mxx-benchmarks mxx-gtest-main)

# TODO: sequential tests should use different main
add_executable(test-bucketing test_bucketing.cpp)
target_link_libraries(test-bucketing mxx-gtest-main)
add_executable(mxx-test-bucketing test_bucketing.cpp)
target_link_libraries(mxx-test-bucketing mxx-gtest-main)

add_executable(test-all test_collective.cpp test_reductions.cpp test_send.cpp test_sort.cpp test_distribution.cpp)
target_link_libraries(test-all mxx-gtest-main)
add_executable(mxx-test-all test_collective.cpp test_reductions.cpp test_send.cpp test_sort.cpp test_distribution.cpp)
target_link_libraries(mxx-test-all mxx-gtest-main)

################
# Old tests: #
Expand Down

0 comments on commit c6ddac5

Please sign in to comment.