Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
patflick committed Mar 7, 2016
2 parents 24a6ae5 + 445656f commit 627e134
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 40 deletions.
29 changes: 15 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,22 @@ development, prototyping, and deployment.

### Planned / TODO

- Parallel random number engines (for use with `C++11` standard library distributions)
- More parallel (standard) algorithms
- Wrappers for non-blocking collectives
- (maybe) serialization/de-serialization of non contiguous data types
- macros for easy datatype creation and handling for custom/own structs and classes
- Implementing and tuning different sorting algorithms
- Communicator classes for different topologies
- `mxx::env` similar to `boost::mpi::env` for wrapping `MPI_Init` and `MPI_Finalize`
- Increase test coverage:
- [ ] Parallel random number engines (for use with `C++11` standard library distributions)
- [ ] More parallel (standard) algorithms
- [ ] Wrappers for non-blocking collectives
- [ ] serialization/de-serialization of non contiguous data types (maybe)
- [x] ~~macros for easy datatype creation and handling for custom/own structs and classes~~
- [ ] Implementing and tuning different sorting algorithms
- [ ] Communicator classes for different topologies
- [x] ~~`mxx::env` similar to `boost::mpi::env` for wrapping `MPI_Init` and `MPI_Finalize`~~
- [ ] full-code and intro documentations
- [ ] Increase test coverage:
![codecov.io](https://codecov.io/github/patflick/mxx/branch.svg?branch=master)

### Status

Currently `mxx` is just a small personal project at very early stages.
Currently `mxx` is a small personal project at early stages, with lots of
changes still going on. However, feel free to contribute.

### Examples

Expand Down Expand Up @@ -151,10 +153,9 @@ your project, and you'll be all set.

### Dependencies

At this point, `mxx` requires a `C++11` compatible version of `gcc`. Some
functions still rely on `gcc` specific function calls. `mxx` currently works
with `MPI-2` and `MPI-3`. However, some collective operations and sorting will
work on data sizes `>= 2 GB` only with `MPI-3`.
`mxx` requires a `C++11` compatible compiler.
`mxx` currently works with `MPI-2` and `MPI-3`.
However, some collective operations and sorting will work on data sizes `>= 2 GB` only with `MPI-3`.

### Compiling

Expand Down
24 changes: 12 additions & 12 deletions include/mxx/collective.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ template <typename T>
void scatterv(const T* msgs, const std::vector<size_t>& sizes, T* out, size_t recv_size, int root, const mxx::comm& comm = mxx::comm()) {
MXX_ASSERT(root != comm.rank() || sizes.size() == static_cast<size_t>(comm.size()));
// get total send size
size_t send_size = std::accumulate(sizes.begin(), sizes.end(), 0);
size_t send_size = std::accumulate(sizes.begin(), sizes.end(), static_cast<size_t>(0));
mxx::datatype sizedt = mxx::get_datatype<size_t>();
MPI_Bcast(&send_size, 1, sizedt.type(), root, comm);
// check if we need to use the custom BIG scatterv
Expand Down Expand Up @@ -701,7 +701,7 @@ std::vector<T> gather(const T& x, int root, const mxx::comm& comm = mxx::comm())
*/
template <typename T>
void gatherv(const T* data, size_t size, T* out, const std::vector<size_t>& recv_sizes, int root, const mxx::comm& comm = mxx::comm()) {
size_t total_size = std::accumulate(recv_sizes.begin(), recv_sizes.end(), 0);
size_t total_size = std::accumulate(recv_sizes.begin(), recv_sizes.end(), static_cast<size_t>(0));
mxx::datatype mpi_sizet = mxx::get_datatype<size_t>();
// tell everybody about the total size
MPI_Bcast(&total_size, 1, mpi_sizet.type(), root, comm);
Expand Down Expand Up @@ -756,7 +756,7 @@ void gatherv(const T* data, size_t size, T* out, const std::vector<size_t>& recv
template <typename T>
std::vector<T> gatherv(const T* data, size_t size, const std::vector<size_t>& recv_sizes, int root, const mxx::comm& comm = mxx::comm()) {
std::vector<T> result;
size_t total_size = std::accumulate(recv_sizes.begin(), recv_sizes.end(), 0);
size_t total_size = std::accumulate(recv_sizes.begin(), recv_sizes.end(), static_cast<size_t>(0));
if (comm.rank() == root)
result.resize(total_size);
gatherv(data, size, &result[0], recv_sizes, root, comm);
Expand Down Expand Up @@ -995,7 +995,7 @@ std::vector<T> allgather(const T& x, const mxx::comm& comm = mxx::comm()) {
*/
template <typename T>
void allgatherv(const T* data, size_t size, T* out, const std::vector<size_t>& recv_sizes, const mxx::comm& comm = mxx::comm()) {
size_t total_size = std::accumulate(recv_sizes.begin(), recv_sizes.end(), 0);
size_t total_size = std::accumulate(recv_sizes.begin(), recv_sizes.end(), static_cast<size_t>(0));
MXX_ASSERT(recv_sizes.size() == static_cast<size_t>(comm.size()));
mxx::datatype mpi_sizet = mxx::get_datatype<size_t>();
if (total_size >= mxx::max_int) {
Expand Down Expand Up @@ -1040,7 +1040,7 @@ void allgatherv(const T* data, size_t size, T* out, const std::vector<size_t>& r
*/
template <typename T>
std::vector<T> allgatherv(const T* data, size_t size, const std::vector<size_t>& recv_sizes, const mxx::comm& comm = mxx::comm()) {
size_t total_size = std::accumulate(recv_sizes.begin(), recv_sizes.end(), 0);
size_t total_size = std::accumulate(recv_sizes.begin(), recv_sizes.end(), static_cast<size_t>(0));
std::vector<T> result(total_size);
allgatherv(data, size, &result[0], recv_sizes, comm);
return result;
Expand Down Expand Up @@ -1241,8 +1241,8 @@ std::vector<T> all2all(const std::vector<T>& msgs, const mxx::comm& comm = mxx::
*/
template <typename T>
void all2allv(const T* msgs, const std::vector<size_t>& send_sizes, T* out, const std::vector<size_t>& recv_sizes, const mxx::comm& comm = mxx::comm()) {
size_t total_send_size = std::accumulate(send_sizes.begin(), send_sizes.end(), 0);
size_t total_recv_size = std::accumulate(recv_sizes.begin(), recv_sizes.end(), 0);
size_t total_send_size = std::accumulate(send_sizes.begin(), send_sizes.end(), static_cast<size_t>(0));
size_t total_recv_size = std::accumulate(recv_sizes.begin(), recv_sizes.end(), static_cast<size_t>(0));
size_t local_max_size = std::max(total_send_size, total_recv_size);
mxx::datatype mpi_sizet = mxx::get_datatype<size_t>();
size_t max;
Expand Down Expand Up @@ -1296,8 +1296,8 @@ void all2allv(const T* msgs, const std::vector<size_t>& send_sizes, T* out, cons

template <typename T>
void all2allv(const T* msgs, const std::vector<size_t>& send_sizes, const std::vector<size_t>& send_displs, T* out, const std::vector<size_t>& recv_sizes, const std::vector<size_t>& recv_displs, const mxx::comm& comm = mxx::comm()) {
size_t total_send_size = std::accumulate(send_sizes.begin(), send_sizes.end(), 0);
size_t total_recv_size = std::accumulate(recv_sizes.begin(), recv_sizes.end(), 0);
size_t total_send_size = std::accumulate(send_sizes.begin(), send_sizes.end(), static_cast<size_t>(0));
size_t total_recv_size = std::accumulate(recv_sizes.begin(), recv_sizes.end(), static_cast<size_t>(0));
size_t local_max_size = std::max(total_send_size, total_recv_size);
mxx::datatype mpi_sizet = mxx::get_datatype<size_t>();
size_t max;
Expand Down Expand Up @@ -1338,7 +1338,7 @@ void all2allv(const T* msgs, const std::vector<size_t>& send_sizes, const std::v
*/
template <typename T>
std::vector<T> all2allv(const T* msgs, const std::vector<size_t>& send_sizes, const std::vector<size_t>& recv_sizes, const mxx::comm& comm = mxx::comm()) {
size_t recv_size = std::accumulate(recv_sizes.begin(), recv_sizes.end(), 0);
size_t recv_size = std::accumulate(recv_sizes.begin(), recv_sizes.end(), static_cast<size_t>(0));
std::vector<T> result(recv_size);
all2allv(msgs, send_sizes, &result[0], recv_sizes, comm);
return result;
Expand All @@ -1365,7 +1365,7 @@ std::vector<T> all2allv(const T* msgs, const std::vector<size_t>& send_sizes, co
*/
template <typename T>
std::vector<T> all2allv(const std::vector<T>& msgs, const std::vector<size_t>& send_sizes, const std::vector<size_t>& recv_sizes, const mxx::comm& comm = mxx::comm()) {
MXX_ASSERT(msgs.size() == std::accumulate(send_sizes.begin(), send_sizes.end(), (size_t)0));
MXX_ASSERT(msgs.size() == std::accumulate(send_sizes.begin(), send_sizes.end(), static_cast<size_t>(0)));
return all2allv(&msgs[0], send_sizes, recv_sizes, comm);
}

Expand Down Expand Up @@ -1447,7 +1447,7 @@ void all2all_func(std::vector<T>& msgs, Func target_func, const mxx::comm& comm
std::vector<size_t> recv_counts = all2all(send_counts, comm);

// resize messages to fit recv
std::size_t recv_size = std::accumulate(recv_counts.begin(), recv_counts.end(), 0);
std::size_t recv_size = std::accumulate(recv_counts.begin(), recv_counts.end(), static_cast<size_t>(0));
msgs.clear();
msgs.shrink_to_fit();
msgs.resize(recv_size);
Expand Down
81 changes: 81 additions & 0 deletions include/mxx/datatypes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,89 @@ datatype get_datatype(const T&) {
return dt;
}


/*********************************************************************
* Custom struct datatypes *
*********************************************************************/

// for each macros from: https://stackoverflow.com/questions/1872220/is-it-possible-to-iterate-over-arguments-in-variadic-macros
// Make a FOREACH macro
#define FE_1(WHAT, X) WHAT(X)
#define FE_2(WHAT, X, ...) WHAT(X)FE_1(WHAT, __VA_ARGS__)
#define FE_3(WHAT, X, ...) WHAT(X)FE_2(WHAT, __VA_ARGS__)
#define FE_4(WHAT, X, ...) WHAT(X)FE_3(WHAT, __VA_ARGS__)
#define FE_5(WHAT, X, ...) WHAT(X)FE_4(WHAT, __VA_ARGS__)
#define FE_6(WHAT, X, ...) WHAT(X)FE_5(WHAT, __VA_ARGS__)
#define FE_7(WHAT, X, ...) WHAT(X)FE_6(WHAT, __VA_ARGS__)
#define FE_8(WHAT, X, ...) WHAT(X)FE_7(WHAT, __VA_ARGS__)
#define FE_9(WHAT, X, ...) WHAT(X)FE_8(WHAT, __VA_ARGS__)
#define FE_10(WHAT, X, ...) WHAT(X)FE_9(WHAT, __VA_ARGS__)

//... repeat as needed
#define GET_MACRO(_1,_2,_3,_4,_5,_6,_7,_8,_9,_10,NAME,...) NAME
#define FOR_EACH(action,...) \
GET_MACRO(__VA_ARGS__,FE_10,FE_9,FE_8,FE_7,FE_6,FE_5,FE_4,FE_3,FE_2,FE_1)(action,__VA_ARGS__)

#define MXX_DT_PREAMBLE(BASE_TYPE) \
MPI_Datatype _type; \
BASE_TYPE p; \
BASE_TYPE* pt = &p; \
MPI_Aint p_adr; \
MPI_Get_address(pt, &p_adr); \
std::map<MPI_Aint, mxx::datatype> type_map;

#define MXX_DT_MEMBER_DISPLS(member) \
MPI_Aint member ## _adr; \
MPI_Get_address(&pt-> member, & member ## _adr); \
type_map[member ## _adr - p_adr] = get_datatype<decltype(pt-> member )>();

#define MXX_DT_POSTAMBLE(BASE_TYPE) \
int num_members = type_map.size(); \
std::vector<int> blocklen(num_members, 1); \
std::vector<MPI_Datatype> types(num_members); \
std::vector<MPI_Aint> displs(num_members); \
int i = 0; \
for (auto& t : type_map) { \
displs[i] = t.first; \
types[i] = t.second.type(); \
i++; \
} \
MPI_Datatype struct_type; \
MPI_Type_create_struct((num_members), &blocklen[0], &displs[0], &types[0], &struct_type); \
MPI_Type_create_resized(struct_type, 0, sizeof( BASE_TYPE ), &_type); \
MPI_Type_commit(&_type); \
MPI_Type_free(&struct_type); \
return _type;

#define MXX_DT_STRUCT_MEMBERS_GET_TYPE(BASE_TYPE, ...) MXX_DT_PREAMBLE(BASE_TYPE); FOR_EACH(MXX_DT_MEMBER_DISPLS, __VA_ARGS__); MXX_DT_POSTAMBLE(BASE_TYPE);
#define MXX_DT_STRUCT_MEMBER_NUM_BASIC(MEMBER) datatype_builder<decltype(p. MEMBER)>::num_basic_elements()
#define MXX_DT_STRUCT_MEMBER_ADD_NUM_BASIC(MEMBER) + datatype_builder<decltype(p. MEMBER)>::num_basic_elements()

#define MXX_DT_STRUCT_MEMBERS_NUM_BASIC(BASE_TYPE, FIRST_MEMBER, ...) \
static size_t num_basic_elements() { \
BASE_TYPE p; \
return MXX_DT_STRUCT_MEMBER_NUM_BASIC(FIRST_MEMBER) \
FOR_EACH(MXX_DT_STRUCT_MEMBER_ADD_NUM_BASIC, __VA_ARGS__) ;\
}

#define MXX_CUSTOM_STRUCT_(BASE_TYPE, ...) \
struct datatype_builder<BASE_TYPE> { \
static MPI_Datatype get_type() { \
MXX_DT_STRUCT_MEMBERS_GET_TYPE(BASE_TYPE, __VA_ARGS__); \
} \
MXX_DT_STRUCT_MEMBERS_NUM_BASIC(BASE_TYPE, __VA_ARGS__); \
};

#define MXX_CUSTOM_STRUCT(BASE_TYPE, ...) \
namespace mxx { \
template <> \
MXX_CUSTOM_STRUCT_(BASE_TYPE, __VA_ARGS__); \
} // namespace mxx


#define MXX_CUSTOM_TEMPLATE_STRUCT(BASE_TYPE, ...) MXX_CUSTOM_STRUCT_(BASE_TYPE, __VA_ARGS__)

} // namespace mxx


#endif // MXX_DATATYPES_HPP
18 changes: 9 additions & 9 deletions include/mxx/distribution.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,19 +222,19 @@ void distribute(_InIterator begin, _InIterator end, _OutIterator out, const mxx:
// allgather surpluses
// TODO figure out how to do surplus send pairing without requiring allgather
std::vector<impl::signed_size_t> surpluses = mxx::allgather(surplus, comm);
MXX_ASSERT(std::accumulate(surpluses.begin(), surpluses.end(), (impl::signed_size_t)0) == 0);
MXX_ASSERT(std::accumulate(surpluses.begin(), surpluses.end(), static_cast<impl::signed_size_t>(0)) == 0);

// get send counts
std::vector<size_t> send_counts = impl::surplus_send_pairing(surpluses, comm.size(), comm.rank(), false);
std::vector<size_t> recv_counts = mxx::all2all(send_counts, comm);

if (surplus > 0) {
MXX_ASSERT(std::accumulate(recv_counts.begin(), recv_counts.end(), (size_t)0) == 0);
MXX_ASSERT(std::accumulate(recv_counts.begin(), recv_counts.end(), static_cast<size_t>(0)) == 0);
// TODO: use iterators not pointers
mxx::all2allv(&(*(begin+((impl::signed_size_t)local_size-surplus))), send_counts, (T*)nullptr, recv_counts, comm);
std::copy(begin, begin+(local_size-surplus), out);
} else {
MXX_ASSERT(std::accumulate(send_counts.begin(), send_counts.end(), (size_t)0) == 0);
MXX_ASSERT(std::accumulate(send_counts.begin(), send_counts.end(), static_cast<size_t>(0)) == 0);
mxx::all2allv((const T*)nullptr, send_counts, &(*(out+local_size)), recv_counts, comm);
std::copy(begin, end, out);
}
Expand Down Expand Up @@ -316,19 +316,19 @@ struct distribute_container {
// allgather surpluses
// TODO figure out how to do surplus send pairing without requiring allgather
std::vector<impl::signed_size_t> surpluses = mxx::allgather(surplus, comm);
MXX_ASSERT(std::accumulate(surpluses.begin(), surpluses.end(), (impl::signed_size_t)0) == 0);
MXX_ASSERT(std::accumulate(surpluses.begin(), surpluses.end(), static_cast<impl::signed_size_t>(0)) == 0);

// get send counts
std::vector<size_t> send_counts = impl::surplus_send_pairing(surpluses, comm.size(), comm.rank(), false);
std::vector<size_t> recv_counts = mxx::all2all(send_counts, comm);

if (surplus > 0) {
MXX_ASSERT(std::accumulate(recv_counts.begin(), recv_counts.end(), (size_t)0) == 0);
MXX_ASSERT(std::accumulate(recv_counts.begin(), recv_counts.end(), static_cast<size_t>(0)) == 0);
// TODO: use iterators not pointers
mxx::all2allv(&(*(std::begin(c)+((impl::signed_size_t)local_size-surplus))), send_counts, (T*)nullptr, recv_counts, comm);
c.resize(part.local_size());
} else {
MXX_ASSERT(std::accumulate(send_counts.begin(), send_counts.end(), (size_t)0) == 0);
MXX_ASSERT(std::accumulate(send_counts.begin(), send_counts.end(), static_cast<size_t>(0)) == 0);
c.resize(part.local_size());
mxx::all2allv((T*)nullptr, send_counts, &(*(std::begin(c)+local_size)), recv_counts, comm);
}
Expand Down Expand Up @@ -437,7 +437,7 @@ void redo_arbit_decomposition(_InIterator begin, _InIterator end, _OutIterator o
// NOTE: this all-gather is what makes the arbitrary decomposition worse
// in terms of complexity than when assuming a block decomposition
std::vector<std::size_t> new_local_sizes = mxx::allgather(new_local_size, comm);
MXX_ASSERT(std::accumulate(new_local_sizes.begin(), new_local_sizes.end(), 0ull) == total_size);
MXX_ASSERT(std::accumulate(new_local_sizes.begin(), new_local_sizes.end(), static_cast<size_t>(0)) == total_size);

// calculate where to send elements
std::vector<size_t> send_counts(comm.size(), 0);
Expand Down Expand Up @@ -522,12 +522,12 @@ _Iterator block_decompose_partitions(_Iterator begin, _Iterator mid, _Iterator e
return mid;
std::vector<impl::signed_size_t> surpluses = mxx::allgather(surplus, comm);

MXX_ASSERT(std::accumulate(surpluses.begin(), surpluses.end(), 0) == 0);
MXX_ASSERT(std::accumulate(surpluses.begin(), surpluses.end(), static_cast<size_t>(0)) == 0);

// get send counts
std::vector<size_t> send_counts = impl::surplus_send_pairing(surpluses, comm.size(), comm.rank(), true);

std::size_t send_size = std::accumulate(send_counts.begin(), send_counts.end(), 0);
std::size_t send_size = std::accumulate(send_counts.begin(), send_counts.end(), static_cast<size_t>(0));
std::vector<T> buffer;
if (send_size > 0)
buffer.resize(send_size);
Expand Down
6 changes: 3 additions & 3 deletions include/mxx/samplesort.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ std::vector<size_t> split(_Iterator begin, _Iterator end, _Compare comp, const s
// send last elements to last processor
std::size_t out_size = std::distance(pos, end);
send_counts[comm.size() - 1] += out_size;
MXX_ASSERT(std::accumulate(send_counts.begin(), send_counts.end(), 0ull) == local_size);
MXX_ASSERT(std::accumulate(send_counts.begin(), send_counts.end(), static_cast<size_t>(0)) == local_size);
return send_counts;
}

Expand Down Expand Up @@ -323,7 +323,7 @@ std::vector<size_t> stable_split(_Iterator begin, _Iterator end, _Compare comp,
// send last elements to last processor
std::size_t out_size = std::distance(pos, end);
send_counts[comm.size() - 1] += out_size;
MXX_ASSERT(std::accumulate(send_counts.begin(), send_counts.end(), 0ull) == local_size);
MXX_ASSERT(std::accumulate(send_counts.begin(), send_counts.end(), static_cast<size_t>(0)) == local_size);
return send_counts;
}

Expand Down Expand Up @@ -399,7 +399,7 @@ void samplesort(_Iterator begin, _Iterator end, _Compare comp, MPI_Datatype mpi_


std::vector<size_t> recv_counts = mxx::all2all(send_counts, comm);
std::size_t recv_n = std::accumulate(recv_counts.begin(), recv_counts.end(), 0ull);
std::size_t recv_n = std::accumulate(recv_counts.begin(), recv_counts.end(), static_cast<size_t>(0));
// TODO: use different approach if there are less than p local elements
MXX_ASSERT(!_AssumeBlockDecomp || (local_size <= (size_t)p || recv_n <= 2* local_size));
std::vector<value_type> recv_elements(recv_n);
Expand Down
4 changes: 2 additions & 2 deletions test/test_collective.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ TEST(MxxColl, All2allvGeneral) {
msgs.push_back(std::make_pair(c.rank()*i, 1.0 / rand()));
}
}
size_t recv_size = std::accumulate(recv_counts.begin(), recv_counts.end(), 0);
size_t recv_size = std::accumulate(recv_counts.begin(), recv_counts.end(), static_cast<size_t>(0));

// send one way
std::vector<std::pair<int, double> > result = mxx::all2allv(msgs, send_counts, recv_counts, c);
Expand Down Expand Up @@ -657,7 +657,7 @@ TEST(MxxColl, All2allvUnknownSize) {

// get the expected receive counts
std::vector<size_t> actual_recv_count = mxx::all2all(send_counts);
size_t recv_size = std::accumulate(actual_recv_count.begin(), actual_recv_count.end(), 0);
size_t recv_size = std::accumulate(actual_recv_count.begin(), actual_recv_count.end(), static_cast<size_t>(0));

ASSERT_EQ(recv_size, result.size());
std::vector<char>::iterator it = result.begin();
Expand Down

0 comments on commit 627e134

Please sign in to comment.