Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement message expiry in broker #731

Merged
merged 1 commit into from
Nov 27, 2020

Conversation

kleunen
Copy link
Contributor

@kleunen kleunen commented Nov 21, 2020

This is what I have so far:

Retained message expiry (tested)
Will expiry (tested)
Offline message expiry (no test)
The message expiry is updated when the message is finally transmitted, this is also untested yet.

The unit tests for this are a bit of a hassle, is it maybe better to use synchronous clients, much easier to write the test. Also, when the unit test stalls, there is no real timeout on the unit test. That is also why the CI hangs sometimes.

@codecov
Copy link

codecov bot commented Nov 21, 2020

Codecov Report

Merging #731 (6357928) into master (e155f05) will not change coverage.
The diff coverage is n/a.

@@           Coverage Diff           @@
##           master     #731   +/-   ##
=======================================
  Coverage   84.57%   84.57%           
=======================================
  Files          46       46           
  Lines        7020     7020           
=======================================
  Hits         5937     5937           
  Misses       1083     1083           

@redboltz
Copy link
Owner

Test for offline message is mandatory to merge.

The unit tests for this are a bit of a hassle, is it maybe better to use synchronous clients, much easier to write the test.

mqtt_cpp has sync and async APIs. See https://github.com/redboltz/mqtt_cpp/wiki/Sync-vs-Async
Even if you use sync APIs, event handlers are async.
There is no sync event handler because it is not fit for practical usecases. In addition, it is not fit for pubsub model.
Consider one subscription multiple publish received case.

The current test scenario framework helps developing tests.

mqtt_cpp used to use order counting based test scenario.
See https://github.com/redboltz/mqtt_cpp/blob/v1.0.7/test/pubsub.cpp#L140

It was very difficult to maintain. So I introduced https://github.com/redboltz/mqtt_cpp/blob/master/test/checker.hpp

We can write test scenario based on graph structure.
And we can use topological sorting base match() function.

Unfortunately it is not well documented but I could answer if you have question when you writing tests.

If you want to write non socket communication based tests, modulization is a solution.
You already write https://github.com/redboltz/mqtt_cpp/blob/master/test/subscription_map.cpp . It is a kind of pure unit test.

I'm thinking re-structure source tree.

The current source tree is as follows:

        include point for SDK users
         |
         V
mqtt_cpp/include/mqtt/*.hpp (SDK headers)
        /example/*.cpp      (SDK examples)
        /test/*.hpp         (test utilities, broker, and broker parts)
              *.cpp         (tests. system_test and unit_test are mixed)

I'm thinking updating the tree as follows:

        include point for SDK users
         |
         V
mqtt_cpp/include/mqtt/*.hpp        (SDK headers)
                     /broker/*.hpp (broker and modulized broker headers (subscription_map, retained_topic_map, etc)
        /example/*.cpp             (SDK examples)
        /test/common/*.hpp         (test utilities)
              unit_test/*.cpp      (unit tests)
              system_test/*.cpp    (system tests)

Then, most the tests forcused on Message Expiry Interval is placed on unit_test.
Only some of typical case placed on system_test.

I think that you have two choices.
One is writing offline messages tests using the current (system_test) way.
The other is waiting until I would finish re-structuring the tree.
I need a couple of weeks to do that.
If you choose the latter, please don't send new PRs until I would finish the work.

Also, when the unit test stalls, there is no real timeout on the unit test. That is also why the CI hangs sometimes.

If you know a good solution, please write a PR.
Maybe github actions and azure build pipelines has timeout setting.
For example if no output some duration, then abort test as error. But some tests e.g. connect, doesn't output anything a long time but it is not stuck.
We can control log_level using --log_level=all but all is too much. Maybe we need to adjust log_level and CI's timeout.
It is not my priority but PR is welcome :)

if (!ec) {
auto& idx = sessions_.get<tag_tim>();
idx.erase(sp);
sessions_.get<tag_tim>().erase(wp.lock());
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that checking wp lifetime first is better.
It's not introduced by your PR. The original code is also not good.

erase(nullptr); doesn't match anything in our design so works expectedly but I don't like this kinds of design.

Please use the following pattern.

if (auto sp = wp.lock()) { // first check wp and acquire sp
    if (!ec) { // timer fired, not cancelled
        // do erase operation
    }
}

@kleunen
Copy link
Contributor Author

kleunen commented Nov 22, 2020

If there is already an existing test for offline message, i could reuse this. But I have not found this yet.

Regarding the unit test framework, i understand a bit what the CHECK is doing, but not fully. I was thinking for the callback, maybe it is possible to use the async interface, but then have some "wait" function, which waits for a callback to complete (or maybe throw exception on timeout).

For example if you call "async_connect" on the client. You then call a "wait" method, which takes the client, and waits for the callback to occur. That way, the ordering of which events should occur in the test is clear, it becomes a sequential process. Any deviation of the sequential process will just result in an exception.

@redboltz
Copy link
Owner

If there is already an existing test for offline message, i could reuse this. But I have not found this yet.

Oh, really. I just added #732. It is test for broker side offline_messages. In addition, it demonstrates how to write multiple client test.
The order of the handler is the same order as the scenario. If already written handler is called 2nd time, then introduce chk.match() order control mechanism.
This is a way to easy to understand. I noticed recently so not all tests are written the way.

@redboltz
Copy link
Owner

@kleunen
I got the following memory error.
It seems that the error is related to subscription_map.
#732 (comment)

Could you debug it ?

I guess that you can reproduce the error on your local environment.
100% happens on my environment.

After the problem would be fixed, then I will add mqtt v5 version test case.
And then, I will merge #732. Then you can rebase the master and use the test case for the base of your message expiry interval test for offline_messages.

@kleunen
Copy link
Contributor Author

kleunen commented Nov 22, 2020

I had the following idea to run callbacks in a blocking/synchronous way:
https://wandbox.org/permlink/l0jWXUQpVMSvy7fy

But I can not get it to function properly.

@redboltz
Copy link
Owner

redboltz commented Nov 22, 2020

I had the following idea to run callbacks in a blocking/synchronous way:
https://wandbox.org/permlink/l0jWXUQpVMSvy7fy

But I can not get it to function properly.

I think that it too complicated. Not calling ioc.run() is some of Boost Asio expert only functionality.
In addition system_test (actual socket communication test) should use SDK the same way as the common users.
As far as I know, The common users call ioc.run() including me.

I think that async with scenario and handlers the same order is simple enough.
See https://github.com/redboltz/mqtt_cpp/pull/732/files

@redboltz
Copy link
Owner

redboltz commented Nov 22, 2020

Let me explain how to write tests.
The example is broker_offline_message.cpp.

Design the scenario.

BOOST_AUTO_TEST_CASE( offline_pubsub_v3_1_1 ) {

    //
    // c1 ---- broker ----- c2 (CleanSession: false)
    //
    // 1. c2 subscribe t1 QoS2
    // 2. c2 disconnect
    // 3. c1 publish t1 QoS0
    // 4. c1 publish t1 QoS1
    // 5. c1 publish t1 QoS2
    // 6. c2 connect again
    //

Reflect to chk array.

    checker chk = {
        cont("c1_h_connack"),
        cont("c2_h_connack1"),

        // c2 subscribe t1 qos2
        cont("c2_h_suback"),
        cont("c2_h_close1"),

        // c1 publish t1 qos0
        // c1 publish t1 qos1
        // c1 publish t1 qos2
        cont("c1_h_puback"),
        cont("c1_h_pubrec"),
        cont("c1_h_pubcomp"),

        // c2 connect again
        cont("c2_h_connack2"),
        cont("c2_h_publish1"),
        cont("c2_h_publish2"),
        cont("c2_h_publish3"),

        cont("c1_h_close"),
        cont("c2_h_close2"),
    };

Write handlers

The order is the same as chk array. It is very simple.

    c1->set_connack_handler(
        [&chk, &c2]
        (bool sp, MQTT_NS::connect_return_code connack_return_code) {
            MQTT_CHK("c1_h_connack");
            BOOST_TEST(sp == false);
            BOOST_TEST(connack_return_code == MQTT_NS::connect_return_code::accepted);
            c2->connect();
            return true;
        }
    );
    c2->set_connack_handler(
        [&chk, &c2]
        (bool sp, MQTT_NS::connect_return_code connack_return_code) {
        MQTT_CHK("c2_h_connack1");
        BOOST_TEST(sp == false);
        BOOST_TEST(connack_return_code == MQTT_NS::connect_return_code::accepted);
        c2->subscribe("topic1", MQTT_NS::qos::exactly_once);
        return true;
    );
    c2->set_suback_handler(
        [&chk, &c2]
        (packet_id_t, std::vector<MQTT_NS::suback_return_code> results) {
            MQTT_CHK("c2_h_suback");
            BOOST_TEST(results.size() == 1U);
            BOOST_TEST(results[0] == MQTT_NS::suback_return_code::success_maximum_qos_2);
            c2->disconnect();
            return true;
        }
    );
    c2->set_close_handler(
        [&chk, &c1, &finish]
        () {
        MQTT_CHK("c2_h_close1");
        c1->publish("topic1", "topic1_contents1", MQTT_NS::qos::at_most_once);
        c1->publish("topic1", "topic1_contents2", MQTT_NS::qos::at_least_once);
        c1->publish("topic1", "topic1_contents3", MQTT_NS::qos::exactly_once);
    );
    c1->set_puback_handler(
        [&chk]
        (std::uint16_t) {
            MQTT_CHK("c1_h_puback");
            return true;
        }
    );
    c1->set_pubrec_handler(
        [&chk]
        (std::uint16_t) {
            MQTT_CHK("c1_h_pubrec");
            return true;
        }
    );
    c1->set_pubcomp_handler(
        [&chk, &c2]
        (std::uint16_t) {
            MQTT_CHK("c1_h_pubcomp");
            c2->connect();
            return true;
        }
    );

Now, c2->connect() requires c2->set_connack_handler() but it is already implemented.
Then, introduce match().

Update

    c2->set_connack_handler(
        [&chk, &c2]
        (bool sp, MQTT_NS::connect_return_code connack_return_code) {
        MQTT_CHK("c2_h_connack1");
        BOOST_TEST(sp == false);
        BOOST_TEST(connack_return_code == MQTT_NS::connect_return_code::accepted);
        c2->subscribe("topic1", MQTT_NS::qos::exactly_once);
        return true;
    );

to

    c2->set_connack_handler(
        [&chk, &c2]
        (bool sp, MQTT_NS::connect_return_code connack_return_code) {
            auto ret = chk.match(
                "c1_h_connack",
                [&] {
                    // existing code is move here
                    MQTT_CHK("c2_h_connack1");
                    BOOST_TEST(sp == false);
                    BOOST_TEST(connack_return_code == MQTT_NS::connect_return_code::accepted);
                    c2->subscribe("topic1", MQTT_NS::qos::exactly_once);
                },
                "c2_h_connack1",
                [&] {
                    // new 2nd call is here
                    MQTT_CHK("c2_h_connack2");
                    BOOST_TEST(sp == true);
                    BOOST_TEST(connack_return_code == MQTT_NS::connect_return_code::accepted);
                }
            );
            BOOST_TEST(ret);
            return true;
        }
    );

Add continuous event handling

After connack is received, 3 times publishes are expected.

    c2->set_publish_handler(
        [&chk, &c1]
        (MQTT_NS::optional<packet_id_t> packet_id,
         MQTT_NS::publish_options pubopts,
         MQTT_NS::buffer topic,
         MQTT_NS::buffer contents) {
            auto ret = chk.match(
                "c2_h_connack2", // just above of "c2_h_publish1" at the chk array
                [&] {
                    MQTT_CHK("c2_h_publish1");
                    BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no);
                    BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::at_most_once);
                    BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no);
                    BOOST_CHECK(!packet_id);
                    BOOST_TEST(topic == "topic1");
                    BOOST_TEST(contents == "topic1_contents1");
                },
                "c2_h_publish1", // previous call chk point
                [&] {
                    MQTT_CHK("c2_h_publish2");
                    BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no);
                    BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::at_least_once);
                    BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no);
                    BOOST_CHECK(packet_id);
                    BOOST_TEST(topic == "topic1");
                    BOOST_TEST(contents == "topic1_contents2");
                },
                "c2_h_publish2", // previous call chk point
                [&] {
                    MQTT_CHK("c2_h_publish3");
                    BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no);
                    BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::exactly_once);
                    BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no);
                    BOOST_CHECK(packet_id);
                    BOOST_TEST(topic == "topic1");
                    BOOST_TEST(contents == "topic1_contents3");

                    c1->disconnect(); // all tests are finished then close clients.
                }
            );
            BOOST_TEST(ret);
            return true;
        }
    );

Add closing code

    c1->set_close_handler(
        [&chk, &c2]
        () {
            MQTT_CHK("c1_h_close");
            c2->disconnect();
        }
    );

c2->set_close_handler() is expected and it has already been implemented.
So update it as folows.

    c2->set_close_handler(
        [&chk, &c1, &finish]
        () {
            auto ret = chk.match(
                "c2_h_suback",
                [&] {
                    MQTT_CHK("c2_h_close1");
                    c1->publish("topic1", "topic1_contents1", MQTT_NS::qos::at_most_once);
                    c1->publish("topic1", "topic1_contents2", MQTT_NS::qos::at_least_once);
                    c1->publish("topic1", "topic1_contents3", MQTT_NS::qos::exactly_once);
                },
                "c2_h_close1",
                [&] {
                    MQTT_CHK("c2_h_close2");
                    finish(); // all clients are closed then shutdown the broker
                }
            );
            BOOST_TEST(ret);

        }
    );

Shutdown the test_broker

Finally call finish() to shutdown the broker.

Note

This test creates clients manually because the test requires two client those are publisher and subscriber. But if you use only one client, you can use do_combi_test.

BOOST_AUTO_TEST_CASE( qos0_sub_string_single ) {
    auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) {
        // write test body (scenario and handlers here)
    };
    do_combi_test_sync(test);
}

finish() function is provided as the parameter.

@redboltz
Copy link
Owner

#732 has been merged. Please rebase #731 from the new master.
Then you get offline_messages tests. You can update it for message expiry interval.

@kleunen
Copy link
Contributor Author

kleunen commented Nov 22, 2020

I added the test case, it is combined with the offline test case: one where the published messages are received, other, the published messages should be timeout. Actually these:

        cont("c2_h_publish1"),
        cont("c2_h_publish2"),
        cont("c2_h_publish3"),

Should not be part of the checker when the messages timeout, but I could not figure out how to handle this nicely.

Also when timeouts are used in the test cases, you actually have to wait for 1 or 2 seconds before the messages timeout. This does slowdown the unit test.


if(i.tim_message_expiry) {
set_property<MQTT_NS::v5::property::message_expiry_interval>(props,
MQTT_NS::v5::property::message_expiry_interval(static_cast<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(i.tim_message_expiry->expires_from_now()).count())));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expires_from_now() is deprecated.
See https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/steady_timer.html

Try compile with -DBOOST_ASIO_NO_DEPRECATED.

@redboltz
Copy link
Owner

redboltz commented Nov 24, 2020

I added the test case, it is combined with the offline test case: one where the published messages are received, other, the published messages should be timeout. Actually these:

        cont("c2_h_publish1"),
        cont("c2_h_publish2"),
        cont("c2_h_publish3"),

Should not be part of the checker when the messages timeout, but I could not figure out how to handle this nicely.

Also when timeouts are used in the test cases, you actually have to wait for 1 or 2 seconds before the messages timeout. This does slowdown the unit test.

I will take a look it.
Could you separate timeout version and no timeout version as different test case?

You introduced a common function:

static void impl_offline_pubsub_v5(bool wait_for_timeout)

In normal module design, it is good practice. But this test case, I think that it is not good. Because expected test scenario is different. When I try edit the function, I need to think the two test cases.
Many if (wait_for_timeout) make difficult to understand the test scenario.

After the separation, I guess that you remove

        cont("c2_h_publish1"),
        cont("c2_h_publish2"),
        cont("c2_h_publish3"),

from the scenario.

Please keep test case failure (or blocked) .

NOTE:
I will merge #733, after I would get your response. Rebasing the PR from #733 merged master would handles well the blocking case.

Then I will check the test case.

@redboltz
Copy link
Owner

master has been updated.

@redboltz
Copy link
Owner

Thank you for updating.
It seems that all CI has passed without cont("c2_h_publish1"), 2, and 3.

Should not be part of the checker when the messages timeout, but I could not figure out how to handle this nicely.

Is this solved?

@kleunen
Copy link
Contributor Author

kleunen commented Nov 24, 2020

Thank you for updating.
It seems that all CI has passed without cont("c2_h_publish1"), 2, and 3.

Should not be part of the checker when the messages timeout, but I could not figure out how to handle this nicely.

Is this solved?

yes, this is solved now. Because they are separate tests

test/will.cpp Outdated
std::uint16_t pid_unsub2;


std::vector<std::string> const expected2 = {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the variable is not used.

template <typename T>
static void set_property(MQTT_NS::v5::properties const &props, T&& v) {
auto visitor = MQTT_NS::make_lambda_visitor(
[v = std::forward<T>(v)](T& t) mutable { t = std::forward<T>(v); },
Copy link
Owner

@redboltz redboltz Nov 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[v = std::forward<T>(v)](T& t) mutable { t = std::forward<T>(v); },
[&v](T& t) mutable { t = std::forward<T>(v); },

It is a little bit complicated.

  1. visit() is sync function. So you don't need to care about the lifetime of v.
  2. The left hand side of v in the capture list is l-value. So the lambda body should be { t = MQTT_NS::force_move(v); because captured v is l-value.
  3. But you don't need do that. Simply capture v as reference then use std::forward in the body of lambda expression.

See https://wandbox.org/permlink/PupJ8O8Yx0ulxbjF

NOTE: If the lambda expression called asynchronously e.g. as::post(ioc, [] {}), you need to do as follows:

as::post(ioc, [t = std::forward<T>(t)] (auto& v) mutable { v = std::move(t); }

}

std::shared_ptr<as::steady_timer> tim_message_expiry;
if(message_expiry_interval) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if(message_expiry_interval) {
if (message_expiry_interval) {

tim_message_expiry = std::make_shared<as::steady_timer>(ioc, message_expiry_interval.value());
tim_message_expiry->async_wait(
[&, wp = std::weak_ptr<as::steady_timer>(tim_message_expiry)](MQTT_NS::error_code ec) mutable {
if(auto sp = wp.lock()) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if(auto sp = wp.lock()) {
if (auto sp = wp.lock()) {

);
}

auto &seq_idx = offline_messages.get<tag_seq>();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto &seq_idx = offline_messages.get<tag_seq>();
auto& seq_idx = offline_messages.get<tag_seq>();

tim_will_expiry.reset();
will_value = MQTT_NS::force_move(will);

if(will_value && will_expiry_interval) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if(will_value && will_expiry_interval) {
if (will_value && will_expiry_interval) {

will_value = MQTT_NS::nullopt;
}

MQTT_NS::optional<MQTT_NS::will> &will() { return will_value; }
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
MQTT_NS::optional<MQTT_NS::will> &will() { return will_value; }
MQTT_NS::optional<MQTT_NS::will>& will() { return will_value; }

}

MQTT_NS::optional<MQTT_NS::will> &will() { return will_value; }
MQTT_NS::optional<MQTT_NS::will> const &will() const { return will_value; }
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
MQTT_NS::optional<MQTT_NS::will> const &will() const { return will_value; }
MQTT_NS::optional<MQTT_NS::will> const& will() const { return will_value; }

MQTT_NS::optional<MQTT_NS::will> &will() { return will_value; }
MQTT_NS::optional<MQTT_NS::will> const &will() const { return will_value; }

std::shared_ptr<as::steady_timer> &get_tim_will_expiry() { return tim_will_expiry; }
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::shared_ptr<as::steady_timer> &get_tim_will_expiry() { return tim_will_expiry; }
std::shared_ptr<as::steady_timer>& get_tim_will_expiry() { return tim_will_expiry; }

MQTT_NS::optional<MQTT_NS::will> const &will() const { return will_value; }

std::shared_ptr<as::steady_timer> &get_tim_will_expiry() { return tim_will_expiry; }
protected:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why protected? I think that it should be private.

:topic(MQTT_NS::force_move(topic)),
contents(MQTT_NS::force_move(contents)),
props(MQTT_NS::force_move(props)),
qos_value(qos_value)
qos_value(qos_value),
tim_message_expiry(tim_message_expiry)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tim_message_expiry(tim_message_expiry)
tim_message_expiry(MQTT_NS::force_move(tim_message_expiry))

if(will_value && will_expiry_interval) {
tim_will_expiry = std::make_shared<as::steady_timer>(ioc, will_expiry_interval.value());
tim_will_expiry->async_wait(
[&](MQTT_NS::error_code ec) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use [&] capture with asynchronous callback.
I just added the coding rule. See https://github.com/redboltz/mqtt_cpp/wiki/Coding-Rules#details-of-lambda-expression

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another comment.
Please capture

[wp = std::weak_ptr<as::steady_timer>(tim_will_expiry)]

And use it as follows:

if (auto sp = wp.lock()) {
    ....
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I actually use reset_will on shared_ptr<session_state> , but I can assume shared_ptr<session_state> still exists if tim_will_expiry shared_ptr still exist, correct ? tim_will_expiry is a member of session_state, timer will get cancelled when shared_ptr<session_state> is destroyed.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I actually use reset_will on shared_ptr<session_state> , but I can assume shared_ptr<session_state> still exists if tim_will_expiry shared_ptr still exist, correct ?

Yes.

tim_will_expiry is a member of session_state, timer will get cancelled when shared_ptr<session_state> is destroyed.

Yes, but as I mentioned several times, the following scenario could happen.

  1. session_state starts destroying.
  2. The member variables of session_state starts destroying.
  3. tim_will_expiry is fired. Boost.Asio remove timer entry and post timer fired event (handler) to ioc.
  4. tim_will_expiry starts destroying.
  5. tim_will_expiry timer.cancel() called internally but no timer entry matched.
  6. destroy other members and finish session_state destoroy.
  7. timer handler is invoked. And access deallocated session_state. It's bad.

if (!ec) {
auto& idx = sessions_.get<tag_tim>();
idx.erase(sp);
[&, wp = std::weak_ptr<as::steady_timer>(e.tim_session_expiry)](MQTT_NS::error_code ec) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[&, wp = std::weak_ptr<as::steady_timer>(e.tim_session_expiry)](MQTT_NS::error_code ec) {
[this, wp = std::weak_ptr<as::steady_timer>(e.tim_session_expiry)](MQTT_NS::error_code ec) {

It is the original code problem. Implicit captiore & shouldn't be used with async function's callback.
See https://github.com/redboltz/mqtt_cpp/wiki/Coding-Rules#details-of-lambda-expression

auto& idx = sessions_.get<tag_tim>();
idx.erase(sp);
[&, wp = std::weak_ptr<as::steady_timer>(e.tim_session_expiry)](MQTT_NS::error_code ec) {
if(auto sp = wp.lock()) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if(auto sp = wp.lock()) {
if (auto sp = wp.lock()) {

@@ -1305,6 +1356,11 @@ class test_broker {
if (sid) {
props.push_back(MQTT_NS::v5::property::subscription_identifier(*sid));
}
if(r.tim_message_expiry) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if(r.tim_message_expiry) {
if (r.tim_message_expiry) {

if(message_expiry_interval) {
tim_message_expiry = std::make_shared<as::steady_timer>(ioc_, message_expiry_interval.value());
tim_message_expiry->async_wait(
[&, topic = MQTT_NS::force_move(topic)]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[&, topic = MQTT_NS::force_move(topic)]
[this, topic, wp = std::weak_ptr<as::steady_timer>(tim_message_expiry)]

topic is used after this point.

tim_message_expiry = std::make_shared<as::steady_timer>(ioc_, message_expiry_interval.value());
tim_message_expiry->async_wait(
[&, topic = MQTT_NS::force_move(topic)]
(boost::system::error_code const& ec) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the following pattern to prevent the lifetime problem.

if (auto sp = wp.lock()) {

if(message_expiry_interval) {
tim_message_expiry = std::make_shared<as::steady_timer>(ioc, message_expiry_interval.value());
tim_message_expiry->async_wait(
[&, wp = std::weak_ptr<as::steady_timer>(tim_message_expiry)](MQTT_NS::error_code ec) mutable {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[&, wp = std::weak_ptr<as::steady_timer>(tim_message_expiry)](MQTT_NS::error_code ec) mutable {
[this, wp = std::weak_ptr<as::steady_timer>(tim_message_expiry)](MQTT_NS::error_code ec) mutable {

tim_message_expiry->async_wait(
[this, topic = topic, wp = std::weak_ptr<as::steady_timer>(tim_message_expiry)]
(boost::system::error_code const& ec) {
if (wp.lock()) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (wp.lock()) {
if (auto sp = wp.lock()) {

Please do as I mentioned way.
shared_ptr is temporary variable. It lose lock immediately. lvalue is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. But you will warning variable is unused?

Copy link
Owner

@redboltz redboltz Nov 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. See https://wandbox.org/permlink/1CpMqC1i6Yq7hBrw
Only f is warned because foo has trivial destructor. But b is not warned because it has user defined destructor. Even if the destructor is empty, it is non trivial.

std::shared_ptr has non trivial destructor. So if compiler is smart enough, no unuserd variable warning outputs.

@redboltz redboltz merged commit 091098a into redboltz:master Nov 27, 2020
@redboltz
Copy link
Owner

Merged. Thank you!

@kleunen
Copy link
Contributor Author

kleunen commented Nov 27, 2020

Merged. Thank you!

You are welcome and also @jonesmz for the initial code, thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants