-
Notifications
You must be signed in to change notification settings - Fork 182
/
bulk_join.hpp
181 lines (150 loc) · 5.62 KB
/
bulk_join.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <unifex/bind_back.hpp>
#include <unifex/execution_policy.hpp>
#include <unifex/get_execution_policy.hpp>
#include <unifex/receiver_concepts.hpp>
#include <unifex/sender_concepts.hpp>
#include <unifex/tag_invoke.hpp>
#include <unifex/detail/prologue.hpp>
namespace unifex {
namespace _bulk_join {
template <typename Receiver>
struct _join_receiver {
class type;
};
template <typename Receiver>
using join_receiver = typename _join_receiver<Receiver>::type;
template <typename Receiver>
class _join_receiver<Receiver>::type {
public:
template(typename Receiver2) //
(requires constructible_from<
Receiver,
Receiver2>) //
explicit type(Receiver2&& r) noexcept(
std::is_nothrow_constructible_v<Receiver, Receiver2>)
: receiver_((Receiver2 &&) r) {}
void set_next() & noexcept {}
template(typename... Values) //
(requires receiver_of<Receiver, Values...>) //
void set_value(Values&&... values) noexcept(
is_nothrow_receiver_of_v<Receiver, Values...>) {
unifex::set_value(std::move(receiver_), (Values &&) values...);
}
template(typename Error) //
(requires receiver<Receiver, Error>) //
void set_error(Error&& error) noexcept {
unifex::set_error(std::move(receiver_), (Error &&) error);
}
void set_done() noexcept { unifex::set_done(std::move(receiver_)); }
friend constexpr unifex::parallel_unsequenced_policy tag_invoke(
tag_t<get_execution_policy>, [[maybe_unused]] const type& r) noexcept {
return {};
}
template(typename CPO, typename Self) //
(requires is_receiver_query_cpo_v<CPO> AND same_as<
Self,
type>) //
friend auto tag_invoke(CPO cpo, const Self& self) noexcept(
std::is_nothrow_invocable_v<CPO, const Receiver&>)
-> std::invoke_result_t<CPO, const Receiver&> {
return cpo(self.receiver_);
}
private:
Receiver receiver_;
};
template <typename Source>
struct _join_sender {
class type;
};
template <typename Source>
using join_sender = typename _join_sender<Source>::type;
template <typename Source>
class _join_sender<Source>::type {
public:
template <
template <typename...>
class Variant,
template <typename...>
class Tuple>
using value_types = sender_value_types_t<Source, Variant, Tuple>;
template <template <typename...> class Variant>
using error_types = sender_error_types_t<Source, Variant>;
static constexpr bool sends_done = sender_traits<Source>::sends_done;
static constexpr blocking_kind blocking = sender_traits<Source>::blocking;
static constexpr bool is_always_scheduler_affine =
sender_traits<Source>::is_always_scheduler_affine;
template <typename Source2>
explicit type(Source2&& s) noexcept(
std::is_nothrow_constructible_v<Source, Source2>)
: source_((Source2 &&) s) {}
template(typename Self, typename Receiver) //
(requires same_as<remove_cvref_t<Self>, type> AND sender_to<
member_t<Self, Source>,
join_receiver<remove_cvref_t<Receiver>>>) //
friend auto tag_invoke(
tag_t<unifex::connect>,
Self&& self,
Receiver&&
r) noexcept(std::
is_nothrow_constructible_v<
remove_cvref_t<Receiver>>&&
is_nothrow_connectable_v<
member_t<Self, Source>,
join_receiver<remove_cvref_t<Receiver>>>)
-> connect_result_t<
member_t<Self, Source>,
join_receiver<remove_cvref_t<Receiver>>> {
return unifex::connect(
static_cast<Self&&>(self).source_,
join_receiver<remove_cvref_t<Receiver>>{static_cast<Receiver&&>(r)});
}
friend constexpr blocking_kind
tag_invoke(tag_t<unifex::blocking>, const type& s) noexcept {
return unifex::blocking(s.source_);
}
private:
Source source_;
};
struct _fn {
template(typename Source) //
(requires bulk_sender<Source>&& tag_invocable<_fn, Source>) //
auto
operator()(Source&& source) const
noexcept(is_nothrow_tag_invocable_v<_fn, Source>)
-> tag_invoke_result_t<_fn, Source> {
return tag_invoke(_fn{}, (Source &&) source);
}
template(typename Source) //
(requires bulk_sender<Source> && (!tag_invocable<_fn, Source>)) //
auto
operator()(Source&& source) const
noexcept(std::is_nothrow_constructible_v<remove_cvref_t<Source>, Source>)
-> join_sender<remove_cvref_t<Source>> {
return join_sender<remove_cvref_t<Source>>{(Source &&) source};
}
constexpr auto operator()() const
noexcept(std::is_nothrow_invocable_v<tag_t<bind_back>, _fn>)
-> bind_back_result_t<_fn> {
return bind_back(*this);
}
};
} // namespace _bulk_join
inline constexpr _bulk_join::_fn bulk_join{};
} // namespace unifex
#include <unifex/detail/epilogue.hpp>