From ea6975b1f15a0ecfc7ae6022eaa52b8654f2549b Mon Sep 17 00:00:00 2001 From: Sergei Golubchik Date: Mon, 4 Dec 2023 18:49:39 +0100 Subject: [PATCH] MDEV-30366 Permit bulk implementation to return ALL individual results COM_STMT_BULK_STMT new flag to server to returns all unitary results --- include/mysql_com.h | 9 ++-- libmariadb | 2 +- sql/protocol.cc | 30 +++++++++++- sql/protocol.h | 6 +++ sql/sql_class.cc | 112 ++++++++++++++++++++++++++++++++++++++++++++ sql/sql_class.h | 8 ++++ sql/sql_delete.cc | 18 +++---- sql/sql_insert.cc | 25 +++++++++- sql/sql_prepare.cc | 56 +++++++++++++--------- sql/sql_update.cc | 3 ++ 10 files changed, 231 insertions(+), 38 deletions(-) diff --git a/include/mysql_com.h b/include/mysql_com.h index 0d50981ecd532..f530018c106a4 100644 --- a/include/mysql_com.h +++ b/include/mysql_com.h @@ -124,8 +124,7 @@ enum enum_indicator_type bulk PS flags */ #define STMT_BULK_FLAG_CLIENT_SEND_TYPES 128 -#define STMT_BULK_FLAG_INSERT_ID_REQUEST 64 - +#define STMT_BULK_FLAG_SEND_UNIT_RESULTS 64 /* sql type stored in .frm files for virtual fields */ #define MYSQL_TYPE_VIRTUAL 245 @@ -288,6 +287,9 @@ enum enum_indicator_type /* Do not resend metadata for prepared statements, since 10.6*/ #define MARIADB_CLIENT_CACHE_METADATA (1ULL << 36) +/* permit sending unit result-set for BULK commands */ +#define MARIADB_CLIENT_BULK_UNIT_RESULTS (1ULL << 37) + #ifdef HAVE_COMPRESS #define CAN_CLIENT_COMPRESS CLIENT_COMPRESS #else @@ -328,7 +330,8 @@ enum enum_indicator_type MARIADB_CLIENT_STMT_BULK_OPERATIONS |\ MARIADB_CLIENT_EXTENDED_METADATA|\ MARIADB_CLIENT_CACHE_METADATA |\ - CLIENT_CAN_HANDLE_EXPIRED_PASSWORDS) + CLIENT_CAN_HANDLE_EXPIRED_PASSWORDS |\ + MARIADB_CLIENT_BULK_UNIT_RESULTS) /* Switch off the flags that are optional and depending on build flags If any of the optional flags is supported by the build it will be switched diff --git a/libmariadb b/libmariadb index 1e2968ade732d..b4d75e78c4872 160000 --- a/libmariadb +++ b/libmariadb @@ -1 +1 @@ -Subproject commit 1e2968ade732d320e074e89c3e9d39a4a57cd70c +Subproject commit b4d75e78c487254cab9c42e259c3522e9cd0c7c4 diff --git a/sql/protocol.cc b/sql/protocol.cc index d2ef52e0887e5..1e27c996687ec 100644 --- a/sql/protocol.cc +++ b/sql/protocol.cc @@ -593,6 +593,7 @@ void Protocol::end_statement() switch (thd->get_stmt_da()->status()) { case Diagnostics_area::DA_ERROR: + thd->stop_collecting_unit_results(); /* The query failed, send error to log and abort bootstrap. */ error= send_error(thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->message(), @@ -600,12 +601,36 @@ void Protocol::end_statement() break; case Diagnostics_area::DA_EOF: case Diagnostics_area::DA_EOF_BULK: - error= send_eof(thd->server_status, + if (thd->need_report_unit_results()) { + // bulk returning result-set, like INSERT ... RETURNING + // result is already send, needs an EOF with MORE_RESULT_EXISTS + // before sending unit result-set + error= send_eof(thd->server_status | SERVER_MORE_RESULTS_EXISTS, + thd->get_stmt_da()->statement_warn_count()); + if (thd->report_collected_unit_results() && thd->is_error()) + error= send_error(thd->get_stmt_da()->sql_errno(), + thd->get_stmt_da()->message(), + thd->get_stmt_da()->get_sqlstate()); + else + error= send_eof(thd->server_status, + thd->get_stmt_da()->statement_warn_count()); + } + else + error= send_eof(thd->server_status, thd->get_stmt_da()->statement_warn_count()); break; case Diagnostics_area::DA_OK: case Diagnostics_area::DA_OK_BULK: - error= send_ok(thd->server_status, + if (thd->report_collected_unit_results()) + if (thd->is_error()) + error= send_error(thd->get_stmt_da()->sql_errno(), + thd->get_stmt_da()->message(), + thd->get_stmt_da()->get_sqlstate()); + else + error= send_eof(thd->server_status, + thd->get_stmt_da()->statement_warn_count()); + else + error= send_ok(thd->server_status, thd->get_stmt_da()->statement_warn_count(), thd->get_stmt_da()->affected_rows(), thd->get_stmt_da()->last_insert_id(), @@ -615,6 +640,7 @@ void Protocol::end_statement() break; case Diagnostics_area::DA_EMPTY: default: + thd->stop_collecting_unit_results(); DBUG_ASSERT(0); error= send_ok(thd->server_status, 0, 0, 0, NULL); break; diff --git a/sql/protocol.h b/sql/protocol.h index 09dbdfbde2b8f..d1096b041af3f 100644 --- a/sql/protocol.h +++ b/sql/protocol.h @@ -34,6 +34,12 @@ struct TABLE_LIST; typedef struct st_mysql_field MYSQL_FIELD; typedef struct st_mysql_rows MYSQL_ROWS; +struct unit_results_desc +{ + ulonglong generated_id; + ulonglong affected_rows; +}; + class Protocol { protected: diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 0dee58184dd1c..f8cadf4ca3ae0 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -1375,6 +1375,7 @@ void THD::init() apc_target.init(&LOCK_thd_kill); gap_tracker_data.init(); + unit_results= NULL; DBUG_VOID_RETURN; } @@ -8325,6 +8326,117 @@ bool Discrete_intervals_list::append(Discrete_interval *new_interval) DBUG_RETURN(0); } +/* + indicate that unit result has to be reported +*/ +bool THD::need_report_unit_results() +{ + return unit_results; +} + +/* + Initialize unit result array +*/ +bool THD::init_collecting_unit_results() +{ + if (!unit_results) + { + void *buff; + + if (!(my_multi_malloc(PSI_NOT_INSTRUMENTED, MYF(MY_WME), &unit_results, sizeof(DYNAMIC_ARRAY), + &buff, sizeof(unit_results_desc) * 10, + NullS)) || + my_init_dynamic_array2(PSI_INSTRUMENT_ME, unit_results, sizeof(unit_results_desc), + buff, 10, 100, MYF(MY_WME))) + { + if (unit_results) + my_free(unit_results); + unit_results= NULL; + return TRUE; + } + } + return FALSE; +} + +/* + remove unit result array +*/ +void THD::stop_collecting_unit_results() +{ + if (unit_results) + { + delete_dynamic(unit_results); + my_free(unit_results); + unit_results= NULL; + } +} + + +/* + Add a unitary result to collection +*/ +bool THD::collect_unit_results(ulonglong id, ulonglong affected_rows) +{ + if (unit_results) + { + unit_results_desc el; + el.generated_id= id; + el.affected_rows= affected_rows; + if (insert_dynamic(unit_results, &el)) + { + return TRUE; + } + } + return FALSE; +} + +/* + Write unitary result result-set WITHOUT ending EOF/OK_Packet to socket. +*/ +bool THD::report_collected_unit_results() +{ + if (unit_results) + { + List field_list; + MEM_ROOT tmp_mem_root; + Query_arena arena(&tmp_mem_root, Query_arena::STMT_INITIALIZED), backup; + + init_alloc_root(PSI_NOT_INSTRUMENTED, arena.mem_root, 2048, 4096, MYF(0)); + set_n_backup_active_arena(&arena, &backup); + DBUG_ASSERT(mem_root == &tmp_mem_root); + + field_list.push_back(new (mem_root) + Item_int(this, "Id", 0, MY_INT64_NUM_DECIMAL_DIGITS), + mem_root); + field_list.push_back(new (mem_root) + Item_int(this, "Affected_rows", 0, MY_INT64_NUM_DECIMAL_DIGITS), + mem_root); + + if (protocol_binary.send_result_set_metadata(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) + goto error; + + for (ulonglong i= 0; i < unit_results->elements; i++) + { + unit_results_desc *last= + (unit_results_desc *)dynamic_array_ptr(unit_results, i); + protocol_binary.prepare_for_resend(); + protocol_binary.store_longlong(last->generated_id, TRUE); + protocol_binary.store_longlong(last->affected_rows, TRUE); + if (protocol_binary.write()) + goto error; + } +error: + restore_active_arena(&arena, &backup); + DBUG_ASSERT(arena.mem_root == &tmp_mem_root); + // no need free Items because they was only constants + free_root(arena.mem_root, MYF(0)); + stop_collecting_unit_results(); + return TRUE; + } + return FALSE; + +} void AUTHID::copy(MEM_ROOT *mem_root, const LEX_CSTRING *user_name, const LEX_CSTRING *host_name) diff --git a/sql/sql_class.h b/sql/sql_class.h index cc72cf44e951d..ee6547a9d8e4d 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -5953,6 +5953,14 @@ class THD: public THD_count, /* this must be first */ return (lex->sphead != 0 && !(in_sub_stmt & (SUB_STMT_FUNCTION | SUB_STMT_TRIGGER))); } + + /* Data and methods for bulk multiple unit result reporting */ + DYNAMIC_ARRAY *unit_results; + void stop_collecting_unit_results(); + bool collect_unit_results(ulonglong id, ulonglong affected_rows); + bool need_report_unit_results(); + bool report_collected_unit_results(); + bool init_collecting_unit_results(); }; diff --git a/sql/sql_delete.cc b/sql/sql_delete.cc index 6b442da858dd6..1a2f127227e3a 100644 --- a/sql/sql_delete.cc +++ b/sql/sql_delete.cc @@ -841,7 +841,7 @@ bool Sql_cmd_delete::delete_from_single_table(THD *thd) if (likely(!error)) { - deleted++; + deleted++; if (!delete_history && table->triggers && table->triggers->process_triggers(thd, TRG_EVENT_DELETE, TRG_ACTION_AFTER, FALSE)) @@ -849,15 +849,15 @@ bool Sql_cmd_delete::delete_from_single_table(THD *thd) error= 1; break; } - if (!--limit && using_limit) - { - error= -1; - break; - } + if (!--limit && using_limit) + { + error= -1; + break; + } } else { - table->file->print_error(error, + table->file->print_error(error, MYF(thd->lex->ignore ? ME_WARNING : 0)); if (thd->is_error()) { @@ -947,7 +947,7 @@ bool Sql_cmd_delete::delete_from_single_table(THD *thd) if (log_result > 0) { - error=1; + error=1; } } } @@ -959,6 +959,8 @@ bool Sql_cmd_delete::delete_from_single_table(THD *thd) if (thd->lex->analyze_stmt) goto send_nothing_and_leave; + thd->collect_unit_results(0, deleted); + if (returning) result->send_eof(); else diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index d904732036f90..4ef791aa9fd17 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -470,6 +470,7 @@ void upgrade_lock_type(THD *thd, thr_lock_type *lock_type, the statement indirectly via a stored function or trigger: if it is used, that will lead to a deadlock between the client connection and the delayed thread. + - client explicitly ask to retrieve unitary changes */ if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) || thd->variables.max_insert_delayed_threads == 0 || @@ -480,6 +481,14 @@ void upgrade_lock_type(THD *thd, thr_lock_type *lock_type, *lock_type= TL_WRITE; return; } + + /* client explicitly asked to retrieved each affected rows and insert ids */ + if (thd->need_report_unit_results()) + { + *lock_type= TL_WRITE; + return; + } + if (thd->slave_thread) { /* Try concurrent insert */ @@ -717,6 +726,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list, uint value_count; /* counter of iteration in bulk PS operation*/ ulonglong iteration= 0; + ulonglong last_affected_rows= 0; ulonglong id; COPY_INFO info; TABLE *table= 0; @@ -931,7 +941,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list, functions or invokes triggers since they may access to the same table and therefore should not see its inconsistent state created by this optimization. - So we call start_bulk_insert to perform nesessary checks on + So we call start_bulk_insert to perform necessary checks on values_list.elements, and - if nothing else - to initialize the code to make the call of end_bulk_insert() below safe. */ @@ -1159,6 +1169,17 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list, } its.rewind(); iteration++; + + /* + Save affected rows and insert id when collecting using results + */ + ulonglong new_affected_rows= info.copied + info.deleted + + ((thd->client_capabilities & CLIENT_FOUND_ROWS) ? + info.touched : info.updated); + thd->collect_unit_results( + table->file->insert_id_for_cur_row, + new_affected_rows - last_affected_rows); + last_affected_rows = new_affected_rows; } while (bulk_parameters_iterations(thd)); values_loop_end: @@ -1342,7 +1363,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list, Client expects an EOF/OK packet if result set metadata was sent. If LEX::has_returning and the statement returns result set we send EOF which is the indicator of the end of the row stream. - Oherwise we send an OK packet i.e when the statement returns only the + Otherwise we send an OK packet i.e when the statement returns only the status information */ if (returning) diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc index 364a125c9b4e6..6261ba46cac52 100644 --- a/sql/sql_prepare.cc +++ b/sql/sql_prepare.cc @@ -223,7 +223,7 @@ class Prepared_statement: public Statement uchar *packet_arg, uchar *packet_end_arg); bool execute_bulk_loop(String *expanded_query, bool open_cursor, - uchar *packet_arg, uchar *packet_end_arg); + uchar *packet_arg, uchar *packet_end_arg, bool multiple_ok_request); bool execute_server_runnable(Server_runnable *server_runnable); my_bool set_bulk_parameters(bool reset); bool bulk_iterations() { return iterations; }; @@ -3099,7 +3099,8 @@ static void mysql_stmt_execute_common(THD *thd, uchar *packet_end, ulong cursor_flags, bool iteration, - bool types); + bool types, + bool send_all_ok); /** COM_STMT_EXECUTE handler: execute a previously prepared statement. @@ -3137,7 +3138,7 @@ void mysqld_stmt_execute(THD *thd, char *packet_arg, uint packet_length) packet+= 9; /* stmt_id + 5 bytes of flags */ mysql_stmt_execute_common(thd, stmt_id, packet, packet_end, flags, FALSE, - FALSE); + FALSE, FALSE); DBUG_VOID_RETURN; } @@ -3164,9 +3165,9 @@ void mysqld_stmt_bulk_execute(THD *thd, char *packet_arg, uint packet_length) uchar *packet= (uchar*)packet_arg; // GCC 4.0.1 workaround DBUG_ENTER("mysqld_stmt_execute_bulk"); - const uint packet_header_lenght= 4 + 2; //ID & 2 bytes of flags + const uint packet_header_length= 4 + 2; //ID & 2 bytes of flags - if (packet_length < packet_header_lenght) + if (packet_length < packet_header_length) { my_error(ER_MALFORMED_PACKET, MYF(0)); DBUG_VOID_RETURN; @@ -3185,7 +3186,7 @@ void mysqld_stmt_bulk_execute(THD *thd, char *packet_arg, uint packet_length) DBUG_VOID_RETURN; } /* Check for implemented parameters */ - if (flags & (~STMT_BULK_FLAG_CLIENT_SEND_TYPES)) + if (flags & (~(STMT_BULK_FLAG_CLIENT_SEND_TYPES | STMT_BULK_FLAG_SEND_UNIT_RESULTS))) { DBUG_PRINT("error", ("unsupported bulk execute flags %x", flags)); my_error(ER_UNSUPPORTED_PS, MYF(0)); @@ -3193,9 +3194,10 @@ void mysqld_stmt_bulk_execute(THD *thd, char *packet_arg, uint packet_length) } /* stmt id and two bytes of flags */ - packet+= packet_header_lenght; + packet+= packet_header_length; mysql_stmt_execute_common(thd, stmt_id, packet, packet_end, 0, TRUE, - (flags & STMT_BULK_FLAG_CLIENT_SEND_TYPES)); + (flags & STMT_BULK_FLAG_CLIENT_SEND_TYPES), + (flags & STMT_BULK_FLAG_SEND_UNIT_RESULTS)); DBUG_VOID_RETURN; } @@ -3283,13 +3285,14 @@ stmt_execute_packet_sanity_check(Prepared_statement *stmt, /** Common part of prepared statement execution - @param thd THD handle - @param stmt_id id of the prepared statement - @param paket packet with parameters to bind - @param packet_end pointer to the byte after parameters end - @param cursor_flags cursor flags - @param bulk_op id it bulk operation - @param read_types flag say that types muast been read + @param thd THD handle + @param stmt_id id of the prepared statement + @param paket packet with parameters to bind + @param packet_end pointer to the byte after parameters end + @param cursor_flags cursor flags + @param bulk_op is it bulk operation + @param read_types flag say that types must been read + @param send_unit_results send a result-set with all insert IDs and affected rows */ static void mysql_stmt_execute_common(THD *thd, @@ -3298,7 +3301,8 @@ static void mysql_stmt_execute_common(THD *thd, uchar *packet_end, ulong cursor_flags, bool bulk_op, - bool read_types) + bool read_types, + bool send_unit_results) { /* Query text for binary, general or slow log, if any of them is open */ String expanded_query; @@ -3367,7 +3371,7 @@ static void mysql_stmt_execute_common(THD *thd, if (!bulk_op) stmt->execute_loop(&expanded_query, open_cursor, packet, packet_end); else - stmt->execute_bulk_loop(&expanded_query, open_cursor, packet, packet_end); + stmt->execute_bulk_loop(&expanded_query, open_cursor, packet, packet_end, send_unit_results); thd->cur_stmt= save_cur_stmt; thd->protocol= save_protocol; @@ -3482,7 +3486,7 @@ void mysql_sql_stmt_execute(THD *thd) thd->free_items(); // Free items created by execute_loop() /* Now restore the "external" (e.g. "SET STATEMENT") Item list. - It will be freed normaly in THD::cleanup_after_query(). + It will be freed normally in THD::cleanup_after_query(). */ thd->free_list= free_list_backup; @@ -4551,7 +4555,8 @@ bool Prepared_statement::execute_bulk_loop(String *expanded_query, bool open_cursor, uchar *packet_arg, - uchar *packet_end_arg) + uchar *packet_end_arg, + bool send_unit_results) { Reprepare_observer reprepare_observer; unsigned char *readbuff= NULL; @@ -4584,9 +4589,16 @@ Prepared_statement::execute_bulk_loop(String *expanded_query, my_error(ER_UNSUPPORTED_PS, MYF(0)); goto err; } + + if (send_unit_results && thd->init_collecting_unit_results()) + { + DBUG_PRINT("error", ("Error initializing array.")); + return TRUE; + } + /* Here second buffer for not optimized commands, - optimized commands do it inside thier internal loop. + optimized commands do it inside their internal loop. */ if (!(sql_command_flags() & CF_PS_ARRAY_BINDING_OPTIMIZED) && this->lex->has_returning()) @@ -4619,7 +4631,7 @@ Prepared_statement::execute_bulk_loop(String *expanded_query, { /* Here we set parameters for not optimized commands, - optimized commands do it inside thier internal loop. + optimized commands do it inside their internal loop. */ if (!(sql_command_flags() & CF_PS_ARRAY_BINDING_OPTIMIZED)) { @@ -4662,7 +4674,7 @@ Prepared_statement::execute_bulk_loop(String *expanded_query, { /* Re-execution success is unlikely after an error from - wsrep_after_statement(), so retrun error immediately. + wsrep_after_statement(), so return error immediately. */ thd->get_stmt_da()->reset_diagnostics_area(); wsrep_override_error(thd, thd->wsrep_cs().current_error(), diff --git a/sql/sql_update.cc b/sql/sql_update.cc index e6181b98627ce..7b51cbe79b90c 100644 --- a/sql/sql_update.cc +++ b/sql/sql_update.cc @@ -1250,6 +1250,9 @@ bool Sql_cmd_update::update_single_table(THD *thd) ER_THD(thd, ER_UPDATE_INFO_WITH_SYSTEM_VERSIONING), (ulong) found, (ulong) updated, (ulong) rows_inserted, (ulong) thd->get_stmt_da()->current_statement_warn_count()); + thd->collect_unit_results( + id, + (thd->client_capabilities & CLIENT_FOUND_ROWS) ? found : updated); my_ok(thd, (thd->client_capabilities & CLIENT_FOUND_ROWS) ? found : updated, id, buff); DBUG_PRINT("info",("%ld records updated", (long) updated));