-
Notifications
You must be signed in to change notification settings - Fork 106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implemented #756. #757
Implemented #756. #757
Conversation
Codecov Report
@@ Coverage Diff @@
## master #757 +/- ##
==========================================
+ Coverage 85.50% 85.55% +0.04%
==========================================
Files 60 60
Lines 8521 8549 +28
==========================================
+ Hits 7286 7314 +28
Misses 1235 1235 |
@kleunen , please check this PR. |
I will check in moment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some feedback on the implementation. Please consider the feedback, i think this is important feature of the broker.
idx.begin(), | ||
[&](auto& e) { | ||
auto qos_value = e.pubopts_.get_qos(); | ||
e.send(ep); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Send is const method, so you don't need idx.modify ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
return; | ||
} | ||
catch (packet_id_exhausted_error const& e) { | ||
MQTT_LOG("mqtt_broker", warning) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here again, the result of the publish is handled by an exception. This will probably be called less often, but I don't think it is exceptional that the packet_id can run out. Also a warning is generated, but the broker is able to handle this behaviour now. So nothing to warn about.
Ideally you want to generate the packet_id here by the broker, and call publish with the pre-generated packet id. This should be possible with optional acquire_unique_packet_id (see also comment below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think packet_id_exhausted is exceptional case. I understand it actually happens. Even if it handles well in the broker, finally run out memory and got bad_alloc. And in the broker exception happens just one time across threshold, then the message is go to offline_messages.
If packet_id_exhausted frequently happens, the network capacity design is broken.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But as you mentioned, optional version of packet_id acquisition is worth considering. Please wait a moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that network design is broken. If client becomes unresponsive or disconnected, message may accumulate. Or if broker is very slow, also messages accumulate. Ideally the connection should timeout soon. Otherwise the broker may accumulate a lot of memory. But this is use case optimization i think. User should select qos and connection keepalive timeouts to prevent this. Or make option to set maximum of offline messages per session?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that network design is broken. If client becomes unresponsive or disconnected, message may accumulate. Or if broker is very slow, also messages accumulate. Ideally the connection should timeout soon. Otherwise the broker may accumulate a lot of memory. But this is use case optimization i think. User should select qos and connection keepalive timeouts to prevent this. Or make option to set maximum of offline messages per session?
I'm not sure I answer your question but,
- User can use Message Expiry Interval
- User can set Receive Maximum https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901049, and broker also set it https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901083. It is bidirectional. (Not implemented yet)
- User can use some kind of application level flow control
I think that there are many existing options.
So far, I don't have a plan to implement broker option to limit offline message (size count). But in the future, I will add it if it is really needed.
// used with some hypothetical "async_server" in the future. | ||
con_->publish( | ||
publish( | ||
ioc, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove comment about async ? I was wondering if this could be an async operation or not ? Is there some reason why this is not async ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't remove it. It moved to publish()
idx.pop_front(); | ||
} | ||
} | ||
catch (packet_id_exhausted_error const& e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop will run until exception is thrown (packet id exhausted). In my opinion, this is not "exceptional" behaviour. Also, handling an exception is more expensive / performance intensive operation than handling a return value.
Maybe have an optional acquire_unique_packet_id (called: acquire_unique_packet_id_no_except)
/**
* @brief Acquire the new unique packet id.
* If all packet ids are already in use, then throw packet_id_exhausted_error exception.
* After acquiring the packet id, you can call acquired_* functions.
* The ownership of packet id is moved to the library.
* Or you can call release_packet_id to release it.
* @return packet id
*/
std::optional<packet_id_t> acquire_unique_packet_id_no_except() {
LockGuard<Mutex> lck (store_mtx_);
if (packet_id_.size() == std::numeric_limits<packet_id_t>::max()) std::optional<packet_id_t>();
if (packet_id_master_ == std::numeric_limits<packet_id_t>::max()) {
packet_id_master_ = 1U;
}
else {
++packet_id_master_;
}
....
And have existing acquire_packet_unique_id call this method:
/**
* @brief Acquire the new unique packet id.
* If all packet ids are already in use, then throw packet_id_exhausted_error exception.
* After acquiring the packet id, you can call acquired_* functions.
* The ownership of packet id is moved to the library.
* Or you can call release_packet_id to release it.
* @return packet id
*/
packet_id_t acquire_unique_packet_id() {
auto result = acquire_unique_packet_id_no_except();
if (!result) throw packet_id_exhausted_error();
return result.value();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done :)
Fix minior bugs.
3dcbae3
to
704dcb7
Compare
I forgot send_will fix. I just force pushed it. |
Ah yes. I forget. There is also a cout in system test of broker. Can you remove? |
Ok, I just done. |
04bc9de
to
89a6401
Compare
Removed cout messages. Used loop counter variable.
89a6401
to
c108bb8
Compare
@@ -96,25 +96,85 @@ class offline_messages { | |||
catch (packet_id_exhausted_error const& e) { | |||
MQTT_LOG("mqtt_broker", warning) | |||
<< MQTT_ADD_VALUE(address, &ep) | |||
<< e.what(); | |||
<< e.what() | |||
<< " : send the rest messages later."; | |||
} | |||
for (auto const& oflm : messages_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this loop ? message_ is empty here, correct ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't clearly define, but I mean this loop:
for (auto const& oflm : messages_) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yes.
I forgot removing it at the earlier commit. But I think that it has been removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that all comments are reflected to the code.
If it works on yourr environment, I will merge the PR.
yes, i have no further comments. Please give me some time to test, i have some other duties i have to do before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, please let me know if you finish your test.
I think that all comments are reflected to the code. |
I am testing now, but I still periodicaly see this assertion: I am publishing now with QoS 2. This is unrelated to this PR, so maybe I should make an issue ? I have no idea how to reproduce yet. It only happens rarely |
Yes, Please create the new issue. Can I merge the PR ? |
Yes, merge PR and I create separate issue for other assertion |
No description provided.