Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
patflick committed Oct 3, 2017
2 parents 605cff9 + ff675f0 commit 38674a4
Show file tree
Hide file tree
Showing 13 changed files with 287 additions and 279 deletions.
192 changes: 0 additions & 192 deletions include/mxx/algos.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,120 +77,6 @@ void excl_prefix_sum(Iterator begin, const Iterator end)
}
}

#if 0

/**
* @brief Returns whether the given input range is in sorted order.
*
* @param begin An iterator to the begin of the sequence.
* @param end An iterator to the end of the sequence.
*
* @return Whether the input sequence is sorted.
*/
template <typename Iterator>
bool is_sorted(Iterator begin, Iterator end)
{
if (begin == end)
return true;
typename std::iterator_traits<Iterator>::value_type last = *(begin++);
while (begin != end)
{
if (last > *begin)
return false;
last = *(begin++);
}
return true;
}

/**
* @brief Performs a balanced partitioning.
*
* This function internally does 3-way partitioning (i.e. partitions the
* input sequence into elements of the three classes given by: <, ==, >
*
* After partioning, the elements that are in the `==` class are put into the
* class of `<` or `>`.
*
* This procedure ensures that if the pivot is part of the sequence, then
* the returned left (`<`) and right (`>`) sequences are both at least one
* element in length, i.e. neither is of length zero.
*
* @param begin An iterator to the beginning of the sequence.
* @param end An iterator to the end of the sequence.
* @param pivot A value to be used as pivot.
*
* @return An iterator pointing to the first element of the right (`>`)
* sequence.
*/
template<typename Iterator, typename T>
Iterator balanced_partitioning(Iterator begin, Iterator end, T pivot)
{
if (begin == end)
return begin;

// do 3-way partitioning and then balance the result
Iterator eq_it = begin;
Iterator le_it = begin;
Iterator ge_it = end - 1;

while (true)
{
while (le_it < ge_it && *le_it < pivot) ++le_it;
while (le_it < ge_it && *ge_it > pivot) --ge_it;

if (le_it == ge_it)
{
if (*le_it == pivot)
{
std::swap(*(le_it++), *(eq_it++));
}
else if (*le_it < pivot)
{
le_it++;
}
break;
}

if (le_it > ge_it)
break;

if (*le_it == pivot)
{
std::swap(*(le_it++), *(eq_it++));
} else if (*ge_it == pivot)
{
std::swap(*(ge_it--), *(le_it));
std::swap(*(le_it++), *(eq_it++));
}
else
{
std::swap(*(le_it++), *(ge_it--));
}
}


// if there are pivots, put them to the smaller side
if (eq_it != begin)
{
typedef typename std::iterator_traits<Iterator>::difference_type diff_t;
diff_t left_size = le_it - eq_it;
diff_t right_size = end - le_it;

// if left is bigger, put to right, otherwise just leave where they are
if (left_size > right_size)
{
// put elements to right
while (eq_it > begin)
{
std::swap(*(--le_it), *(--eq_it));
}
}
}
// no pivots, therefore simply return the partitioned sequence
return le_it;
}

#endif

/*********************************************************************
* Bucketing algorithms *
Expand Down Expand Up @@ -315,84 +201,6 @@ std::vector<size_t> bucketing_inplace(std::vector<T>& input, Func key_func, size
return bucket_counts;
}

// Tony's old implementation for bucketing:
// (needs more memory and is slower than the two versions above)

// Complexity is O(b)*O(n), where as the two version above are O(b + n)
// scales badly, same as bucketing_copy, but with a factor of 2 for large
// data.
// when fixed n, slight increase with b.. else increases with n.
template<typename T, typename Func>
std::vector<size_t> bucketing_tony(std::vector<T>& elements, Func key_func, size_t num_buckets) {

// number of elements per bucket
std::vector<size_t> send_counts(num_buckets, 0);

// if no elements, return 0 count for each bucket
if (elements.size() == 0)
return send_counts;

// for each element, track which bucket it belongs into
std::vector<long> membership(elements.size());
for (size_t i = 0; i < elements.size(); ++i)
{
membership[i] = key_func(elements[i]);
++(send_counts[membership[i]]);
}
// at this point, have target assignment for each data element, and also count for each process bucket.

// compute the offsets within the buffer
std::vector<size_t> offset = send_counts;
excl_prefix_sum(offset.begin(), offset.end());
std::vector<size_t> maxes = offset;

for (size_t i = 0; i < num_buckets; ++i) {
maxes[i] += send_counts[i];
}


//== swap elements around.
T val;
size_t tar_pos, start_pos;

long target;

// while loop will stop under 2 conditions:
// 1. returned to starting position (looped), or
// 2, tar_pos is the current pos.
// either way, we need a new starting point. instead of searching through buffer O(N), search
// for incomplete buckets via offset O(p).

for (size_t i = 0; i < num_buckets;) {
// determine the starting position.
if (offset[i] == maxes[i]) {
++i; // skip all completed buckets
continue; // have the loop check value.
}
// get the start pos.
start_pos = offset[i];

// set up the variable with the current entry.
target = membership[start_pos];
if (target > -1) {
val = ::std::move(elements[start_pos]); // value to move
membership[start_pos] = -2; // special value to indicate where we started from.

while (target > -1) { // if -1 or -2, then either visited or beginning of chain.
tar_pos = offset[target]++; // compute new position. earlier offset values for the same pid are should have final values already.
target = membership[tar_pos];

// save the info at tar_pos;
::std::swap(val, elements[tar_pos]); // put what's in src into buffer at tar_pos, and save what's at buffer[tar_pos]
membership[tar_pos] = -1; // mark as visited.

} // else already visited, so done.
}
}

return send_counts;
}

} // namespace mxx

#endif // MXX_ALGOS_H
2 changes: 1 addition & 1 deletion include/mxx/collective.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1517,7 +1517,7 @@ void all2all_func(std::vector<T>& msgs, Func target_func, const mxx::comm& comm
// TODO: replace with bucketing function, implemented elsewhere
std::vector<size_t> send_counts(comm.size(), 0);
for (auto it = msgs.begin(); it != msgs.end(); ++it) {
MXX_ASSERT(0 <= target_func(*it) && target_func(*it) < comm.size());
assert(0 <= target_func(*it) && target_func(*it) < comm.size());
send_counts[target_func(*it)]++;
}
std::vector<std::size_t> offset = impl::get_displacements(send_counts);
Expand Down
12 changes: 11 additions & 1 deletion include/mxx/comm_fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,16 @@ class comm {
return m_rank;
}

/// Returns wether this process is rank 0 in the communicator
bool is_first() const {
return m_rank == 0;
}

/// Returns whether this process has `rank == size-1` in this communicator
bool is_last() const {
return m_rank == m_size - 1;
}

/// Collective barrier call for all processes in `this` communicator.
void barrier() const {
MPI_Barrier(this->mpi_comm);
Expand Down Expand Up @@ -326,7 +336,7 @@ class comm {
inline mxx::future<void> isend(const T& msg, int dest, int tag = 0) const {
MXX_ASSERT(sizeof(T) < mxx::max_int);
MXX_ASSERT(0 <= dest && dest < this->size());
mxx::datatype dt = mxx::get_datatype<T>;
mxx::datatype dt = mxx::get_datatype<T>();
mxx::future_builder<void> f;
MPI_Isend(const_cast<T*>(&msg), 1, dt.type(), dest, tag, this->mpi_comm, &f.add_request());
return f.get_future();
Expand Down
1 change: 0 additions & 1 deletion include/mxx/datatypes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ class is_builtin_type : public std::false_type {};

class datatype;

// TODO: there has to be a better way for this
} // namespace mxx

std::false_type make_datatype();
Expand Down
Loading

0 comments on commit 38674a4

Please sign in to comment.