diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl index b103d869da9..b2b715e8845 100644 --- a/src/mango/src/mango_cursor_view.erl +++ b/src/mango/src/mango_cursor_view.erl @@ -55,6 +55,15 @@ covering_index => 'maybe'(#idx{}) }. +-type mrargs_extra_item() :: + {callback, {atom(), atom()}} + | {selector, any()} + | {callback_args, viewcbargs()} + | {ignore_partition_query_limit, boolean()} + | {execution_stats_map, boolean()} + | {execution_stats_rolling, boolean()}. +-type mrargs_extra() :: [mrargs_extra_item()]. + -spec viewcbargs_new(Selector, Fields, CoveringIndex) -> ViewCBArgs when Selector :: selector(), Fields :: fields(), @@ -199,10 +208,13 @@ base_args(#cursor{index = Idx, selector = Selector, fields = Fields} = Cursor) - {ignore_partition_query_limit, true}, - % Request execution statistics in a map. The purpose of this option is - % to maintain interoperability on version upgrades. - % TODO remove this option in a later version. - {execution_stats_map, true} + % The purpose of the following options is to maintain + % interoperability on version upgrades: + % TODO: Remove them in a later version + % - Return execution statistics in a map + {execution_stats_map, true}, + % - Stream execution statistics + {execution_stats_rolling, true} ] }. @@ -336,19 +348,66 @@ choose_best_index(IndexRanges) -> {SelectedIndex, SelectedIndexRanges, _} = hd(SortedIndexRanges), {{SelectedIndex, SelectedIndexRanges}, SortedIndexRanges}. +-spec maybe_init_stats(Options) -> ok when + Options :: mrargs_extra(). +maybe_init_stats(Options) -> + case couch_util:get_value(execution_stats_rolling, Options, false) of + true -> ok; + false -> mango_execution_stats:shard_init() + end. + +-spec roll_stats(Stats, Options) -> ok when + Stats :: shard_stats_v2(), + Options :: mrargs_extra(). +roll_stats(Stats, Options) -> + case couch_util:get_value(execution_stats_rolling, Options, false) of + true -> + ok = rexi:stream2({execution_stats, format_stats(Stats, Options)}); + false -> + KeysExamined = maps:get(keys_examined, Stats), + DocsExamined = maps:get(docs_examined, Stats), + mango_execution_stats:shard_incr_keys_examined(KeysExamined), + mango_execution_stats:shard_incr_docs_examined(DocsExamined) + end. + +-spec format_stats(RawStats, Options) -> FormattedStats when + RawStats :: shard_stats_v2(), + Options :: mrargs_extra(), + FormattedStats :: shard_stats_v1() | shard_stats_v2(). +format_stats(Stats, Options) -> + case couch_util:get_value(execution_stats_map, Options, false) of + true -> + Stats; + false -> + DocsExamined = maps:get(docs_examined, Stats), + {docs_examined, DocsExamined} + end. + +-spec maybe_submit_stats(Options) -> ok when + Options :: mrargs_extra(). +maybe_submit_stats(Options) -> + case couch_util:get_value(execution_stats_rolling, Options, false) of + true -> + ok; + false -> + ShardStats = mango_execution_stats:shard_get_stats(), + Stats = format_stats(ShardStats, Options), + % Send execution stats in batch (shard-level) + ok = rexi:stream2({execution_stats, Stats}) + end. + -spec view_cb (Message, #mrargs{}) -> Response when Message :: {meta, any()} | {row, row_properties()} | complete, Response :: {ok, #mrargs{}}; (ok, ddoc_updated) -> any(). -view_cb({meta, Meta}, Acc) -> +view_cb({meta, Meta}, #mrargs{extra = Options} = Acc) -> % Map function starting - mango_execution_stats:shard_init(), + maybe_init_stats(Options), set_mango_msg_timestamp(), ok = rexi:stream2({meta, Meta}), {ok, Acc}; view_cb({row, Row}, #mrargs{extra = Options} = Acc) -> - mango_execution_stats:shard_incr_keys_examined(), couch_stats:increment_counter([mango, keys_examined]), ViewRow = #view_row{ id = couch_util:get_value(id, Row), @@ -389,32 +448,25 @@ view_cb({row, Row}, #mrargs{extra = Options} = Acc) -> end, case {ViewRow#view_row.doc, CoveringIndex} of {null, _} -> + roll_stats(#{keys_examined => 1, docs_examined => 0}, Options), maybe_send_mango_ping(); {undefined, Index = #idx{}} -> Doc = derive_doc_from_index(Index, ViewRow), - Process(Doc); + Process(Doc), + roll_stats(#{keys_examined => 1, docs_examined => 0}, Options); {undefined, _} -> % include_docs=false. Use quorum fetch at coordinator ok = rexi:stream2(ViewRow), + roll_stats(#{keys_examined => 1, docs_examined => 0}, Options), set_mango_msg_timestamp(); {Doc, _} -> - mango_execution_stats:shard_incr_docs_examined(), couch_stats:increment_counter([mango, docs_examined]), - Process(Doc) + Process(Doc), + roll_stats(#{keys_examined => 1, docs_examined => 1}, Options) end, {ok, Acc}; view_cb(complete, #mrargs{extra = Options} = Acc) -> - ShardStats = mango_execution_stats:shard_get_stats(), - Stats = - case couch_util:get_value(execution_stats_map, Options, false) of - true -> - ShardStats; - false -> - DocsExamined = maps:get(docs_examined, ShardStats), - {docs_examined, DocsExamined} - end, - % Send shard-level execution stats - ok = rexi:stream2({execution_stats, Stats}), + maybe_submit_stats(Options), % Finish view output ok = rexi:stream_last(complete), {ok, Acc}; @@ -744,7 +796,8 @@ base_opts_test() -> covering_index => undefined }}, {ignore_partition_query_limit, true}, - {execution_stats_map, true} + {execution_stats_map, true}, + {execution_stats_rolling, true} ], MRArgs = #mrargs{ @@ -1063,7 +1116,8 @@ t_execute_ok_all_docs(_) -> covering_index => undefined }}, {ignore_partition_query_limit, true}, - {execution_stats_map, true} + {execution_stats_map, true}, + {execution_stats_rolling, true} ], Args = #mrargs{ @@ -1146,7 +1200,8 @@ t_execute_ok_query_view(_) -> covering_index => undefined }}, {ignore_partition_query_limit, true}, - {execution_stats_map, true} + {execution_stats_map, true}, + {execution_stats_rolling, true} ], Args = #mrargs{ @@ -1239,7 +1294,8 @@ t_execute_ok_all_docs_with_execution_stats(_) -> covering_index => undefined }}, {ignore_partition_query_limit, true}, - {execution_stats_map, true} + {execution_stats_map, true}, + {execution_stats_rolling, true} ], Args = #mrargs{ @@ -1308,22 +1364,43 @@ view_cb_test_() -> end, [ ?TDEF_FE(t_view_cb_meta), + ?TDEF_FE(t_view_cb_meta_rolling), ?TDEF_FE(t_view_cb_row_matching_regular_doc), + ?TDEF_FE(t_view_cb_row_matching_regular_doc_rolling), ?TDEF_FE(t_view_cb_row_non_matching_regular_doc), + ?TDEF_FE(t_view_cb_row_non_matching_regular_doc_rolling), ?TDEF_FE(t_view_cb_row_null_doc), + ?TDEF_FE(t_view_cb_row_null_doc_rolling), ?TDEF_FE(t_view_cb_row_missing_doc_triggers_quorum_fetch), + ?TDEF_FE(t_view_cb_row_missing_doc_triggers_quorum_fetch_rolling), ?TDEF_FE(t_view_cb_row_matching_covered_doc), + ?TDEF_FE(t_view_cb_row_matching_covered_doc_rolling), ?TDEF_FE(t_view_cb_row_non_matching_covered_doc), + ?TDEF_FE(t_view_cb_row_non_matching_covered_doc_rolling), ?TDEF_FE(t_view_cb_row_backwards_compatible), ?TDEF_FE(t_view_cb_complete_shard_stats_v1), ?TDEF_FE(t_view_cb_complete_shard_stats_v2), + ?TDEF_FE(t_view_cb_complete_shard_stats_v2_rolling), ?TDEF_FE(t_view_cb_ok) ] }. t_view_cb_meta(_) -> + Accumulator = #mrargs{extra = []}, + meck:expect(rexi, stream2, [{meta, meta}], meck:val(ok)), + ?assertEqual({ok, Accumulator}, view_cb({meta, meta}, Accumulator)), + ?assert(meck:called(rexi, stream2, '_')). + +t_view_cb_meta_rolling(_) -> + Accumulator = + #mrargs{ + extra = [ + {execution_stats_map, true}, + {execution_stats_rolling, true} + ] + }, meck:expect(rexi, stream2, [{meta, meta}], meck:val(ok)), - ?assertEqual({ok, accumulator}, view_cb({meta, meta}, accumulator)), + ?assertEqual({ok, Accumulator}, view_cb({meta, meta}, Accumulator)), ?assert(meck:called(rexi, stream2, '_')). t_view_cb_row_matching_regular_doc(_) -> @@ -1344,6 +1421,26 @@ t_view_cb_row_matching_regular_doc(_) -> ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), ?assert(meck:called(rexi, stream2, '_')). +t_view_cb_row_matching_regular_doc_rolling(_) -> + Row = [{id, id}, {key, key}, {doc, doc}], + Result = #view_row{id = id, key = key, doc = doc}, + ExecutionStats = {execution_stats, #{keys_examined => 1, docs_examined => 1}}, + meck:expect(rexi, stream2, [{[Result], ok}, {[ExecutionStats], ok}]), + Accumulator = + #mrargs{ + extra = [ + {callback_args, #{ + selector => {[]}, + fields => all_fields, + covering_index => undefined + }}, + {execution_stats_map, true}, + {execution_stats_rolling, true} + ] + }, + ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), + ?assert(meck:num_calls(rexi, stream2, '_') == 2). + t_view_cb_row_non_matching_regular_doc(_) -> Doc = {[]}, Row = [{id, id}, {key, key}, {doc, Doc}], @@ -1363,6 +1460,27 @@ t_view_cb_row_non_matching_regular_doc(_) -> ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), ?assertNot(meck:called(rexi, stream2, '_')). +t_view_cb_row_non_matching_regular_doc_rolling(_) -> + Doc = {[]}, + Row = [{id, id}, {key, key}, {doc, Doc}], + ExecutionStats = {execution_stats, #{keys_examined => 1, docs_examined => 1}}, + meck:expect(rexi, stream2, [{[ExecutionStats], ok}]), + Accumulator = + #mrargs{ + extra = [ + {callback_args, #{ + selector => {[{<<"field">>, {[{<<"$exists">>, true}]}}]}, + fields => all_fields, + covering_index => undefined + }}, + {execution_stats_map, true}, + {execution_stats_rolling, true} + ] + }, + put(mango_last_msg_timestamp, os:timestamp()), + ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), + ?assert(meck:num_calls(rexi, stream2, '_') == 1). + t_view_cb_row_null_doc(_) -> Row = [{id, id}, {key, key}, {doc, null}], meck:expect(rexi, stream2, ['_'], undefined), @@ -1381,6 +1499,26 @@ t_view_cb_row_null_doc(_) -> ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), ?assertNot(meck:called(rexi, stream2, '_')). +t_view_cb_row_null_doc_rolling(_) -> + Row = [{id, id}, {key, key}, {doc, null}], + ExecutionStats = {execution_stats, #{keys_examined => 1, docs_examined => 0}}, + meck:expect(rexi, stream2, [{[ExecutionStats], ok}]), + Accumulator = + #mrargs{ + extra = [ + {callback_args, #{ + selector => {[]}, + fields => all_fields, + covering_index => undefined + }}, + {execution_stats_map, true}, + {execution_stats_rolling, true} + ] + }, + put(mango_last_msg_timestamp, os:timestamp()), + ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), + ?assert(meck:num_calls(rexi, stream2, '_') == 1). + t_view_cb_row_missing_doc_triggers_quorum_fetch(_) -> Row = [{id, id}, {key, key}, {doc, undefined}], ViewRow = #view_row{id = id, key = key, doc = undefined}, @@ -1399,6 +1537,26 @@ t_view_cb_row_missing_doc_triggers_quorum_fetch(_) -> ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), ?assert(meck:called(rexi, stream2, '_')). +t_view_cb_row_missing_doc_triggers_quorum_fetch_rolling(_) -> + Row = [{id, id}, {key, key}, {doc, undefined}], + ViewRow = #view_row{id = id, key = key, doc = undefined}, + ExecutionStats = {execution_stats, #{keys_examined => 1, docs_examined => 0}}, + meck:expect(rexi, stream2, [{[ViewRow], ok}, {[ExecutionStats], ok}]), + Accumulator = + #mrargs{ + extra = [ + {callback_args, #{ + selector => {[]}, + fields => all_fields, + covering_index => undefined + }}, + {execution_stats_map, true}, + {execution_stats_rolling, true} + ] + }, + ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), + ?assert(meck:num_calls(rexi, stream2, '_') == 2). + t_view_cb_row_matching_covered_doc(_) -> Keys = [key1, key2], Row = [{id, id}, {key, Keys}, {doc, undefined}], @@ -1425,6 +1583,34 @@ t_view_cb_row_matching_covered_doc(_) -> ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), ?assert(meck:called(rexi, stream2, '_')). +t_view_cb_row_matching_covered_doc_rolling(_) -> + Keys = [key1, key2], + Row = [{id, id}, {key, Keys}, {doc, undefined}], + Doc = {[{<<"field1">>, key1}, {<<"field2">>, key2}]}, + Result = #view_row{id = id, key = Keys, doc = Doc}, + Fields = [<<"field1">>, <<"field2">>], + Index = + #idx{ + type = <<"json">>, + def = {[{<<"fields">>, {[{<<"field1">>, undefined}, {<<"field2">>, undefined}]}}]} + }, + ExecutionStats = {execution_stats, #{keys_examined => 1, docs_examined => 0}}, + meck:expect(rexi, stream2, [{[Result], ok}, {[ExecutionStats], ok}]), + Accumulator = + #mrargs{ + extra = [ + {callback_args, #{ + selector => {[]}, + fields => Fields, + covering_index => Index + }}, + {execution_stats_map, true}, + {execution_stats_rolling, true} + ] + }, + ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), + ?assert(meck:num_calls(rexi, stream2, '_') == 2). + t_view_cb_row_non_matching_covered_doc(_) -> Row = [{id, id}, {key, [key1, key2]}, {doc, undefined}], Fields = [<<"field1">>, <<"field2">>], @@ -1449,6 +1635,32 @@ t_view_cb_row_non_matching_covered_doc(_) -> ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), ?assertNot(meck:called(rexi, stream2, '_')). +t_view_cb_row_non_matching_covered_doc_rolling(_) -> + Row = [{id, id}, {key, [key1, key2]}, {doc, undefined}], + Fields = [<<"field1">>, <<"field2">>], + Index = + #idx{ + type = <<"json">>, + def = {[{<<"fields">>, {[{<<"field1">>, undefined}, {<<"field2">>, undefined}]}}]} + }, + ExecutionStats = {execution_stats, #{keys_examined => 1, docs_examined => 0}}, + meck:expect(rexi, stream2, [{[ExecutionStats], ok}]), + Accumulator = + #mrargs{ + extra = [ + {callback_args, #{ + selector => {[{<<"field">>, {[{<<"$exists">>, true}]}}]}, + fields => Fields, + covering_index => Index + }}, + {execution_stats_map, true}, + {execution_stats_rolling, true} + ] + }, + put(mango_last_msg_timestamp, os:timestamp()), + ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), + ?assert(meck:num_calls(rexi, stream2, '_') == 1). + t_view_cb_row_backwards_compatible(_) -> Row = [{id, id}, {key, key}, {doc, null}], meck:expect(rexi, stream2, ['_'], undefined), @@ -1477,6 +1689,14 @@ t_view_cb_complete_shard_stats_v2(_) -> ?assert(meck:called(rexi, stream2, '_')), ?assert(meck:called(rexi, stream_last, '_')). +t_view_cb_complete_shard_stats_v2_rolling(_) -> + meck:expect(rexi, stream2, ['_'], undefined), + meck:expect(rexi, stream_last, [complete], meck:val(ok)), + Accumulator = #mrargs{extra = [{execution_stats_map, true}, {execution_stats_rolling, true}]}, + ?assertEqual({ok, Accumulator}, view_cb(complete, Accumulator)), + ?assertNot(meck:called(rexi, stream2, '_')), + ?assert(meck:called(rexi, stream_last, '_')). + t_view_cb_ok(_) -> meck:expect(rexi, reply, [{ok, ddoc_updated}], meck:val(ok)), view_cb(ok, ddoc_updated), diff --git a/src/mango/src/mango_execution_stats.erl b/src/mango/src/mango_execution_stats.erl index dce7c087b29..2056cfa54c9 100644 --- a/src/mango/src/mango_execution_stats.erl +++ b/src/mango/src/mango_execution_stats.erl @@ -26,8 +26,8 @@ log_stats/1, maybe_add_stats/4, shard_init/0, - shard_incr_keys_examined/0, - shard_incr_docs_examined/0, + shard_incr_keys_examined/1, + shard_incr_docs_examined/1, shard_get_stats/0 ]). @@ -123,19 +123,19 @@ shard_init() -> InitialState = #{docs_examined => 0, keys_examined => 0}, put(?SHARD_STATS_KEY, InitialState). --spec shard_incr_keys_examined() -> any(). -shard_incr_keys_examined() -> - incr(keys_examined). +-spec shard_incr_keys_examined(integer()) -> any(). +shard_incr_keys_examined(N) -> + incr(keys_examined, N). --spec shard_incr_docs_examined() -> any(). -shard_incr_docs_examined() -> - incr(docs_examined). +-spec shard_incr_docs_examined(integer()) -> any(). +shard_incr_docs_examined(N) -> + incr(docs_examined, N). --spec incr(atom()) -> any(). -incr(Key) -> +-spec incr(atom(), integer()) -> any(). +incr(Key, N) -> case get(?SHARD_STATS_KEY) of #{} = Stats0 -> - Stats = maps:update_with(Key, fun(X) -> X + 1 end, Stats0), + Stats = maps:update_with(Key, fun(X) -> X + N end, Stats0), put(?SHARD_STATS_KEY, Stats); _ -> ok diff --git a/src/mango/test/15-execution-stats-test.py b/src/mango/test/15-execution-stats-test.py index caa542d7a82..729b13d9246 100644 --- a/src/mango/test/15-execution-stats-test.py +++ b/src/mango/test/15-execution-stats-test.py @@ -73,6 +73,19 @@ def test_covering_json_index(self): self.assertEqual(resp["execution_stats"]["total_quorum_docs_examined"], 0) self.assertEqual(resp["execution_stats"]["results_returned"], 3) + def test_reporting_consistency(self): + resp = self.db.find( + {"age": {"$lte": 42}}, + fields=["name", "email", "age"], + limit=3, + return_raw=True, + executionStats=True, + ) + executionStats = resp["execution_stats"] + self.assertEqual(executionStats["total_keys_examined"], 3) + self.assertEqual(executionStats["total_docs_examined"], 3) + self.assertEqual(executionStats["results_returned"], 3) + @unittest.skipUnless(mango.has_text_service(), "requires text service") class ExecutionStatsTests_Text(mango.UserDocsTextTests):