Skip to content

Commit

Permalink
MDEV-30366 Permit bulk implementation to return ALL individual results
Browse files Browse the repository at this point in the history
COM_STMT_BULK_STMT new flag to server to returns all unitary results
  • Loading branch information
vuvova committed Apr 22, 2024
1 parent 73ed0a2 commit ea6975b
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 38 deletions.
9 changes: 6 additions & 3 deletions include/mysql_com.h
Expand Up @@ -124,8 +124,7 @@ enum enum_indicator_type
bulk PS flags
*/
#define STMT_BULK_FLAG_CLIENT_SEND_TYPES 128
#define STMT_BULK_FLAG_INSERT_ID_REQUEST 64

#define STMT_BULK_FLAG_SEND_UNIT_RESULTS 64

/* sql type stored in .frm files for virtual fields */
#define MYSQL_TYPE_VIRTUAL 245
Expand Down Expand Up @@ -288,6 +287,9 @@ enum enum_indicator_type
/* Do not resend metadata for prepared statements, since 10.6*/
#define MARIADB_CLIENT_CACHE_METADATA (1ULL << 36)

/* permit sending unit result-set for BULK commands */
#define MARIADB_CLIENT_BULK_UNIT_RESULTS (1ULL << 37)

#ifdef HAVE_COMPRESS
#define CAN_CLIENT_COMPRESS CLIENT_COMPRESS
#else
Expand Down Expand Up @@ -328,7 +330,8 @@ enum enum_indicator_type
MARIADB_CLIENT_STMT_BULK_OPERATIONS |\
MARIADB_CLIENT_EXTENDED_METADATA|\
MARIADB_CLIENT_CACHE_METADATA |\
CLIENT_CAN_HANDLE_EXPIRED_PASSWORDS)
CLIENT_CAN_HANDLE_EXPIRED_PASSWORDS |\
MARIADB_CLIENT_BULK_UNIT_RESULTS)
/*
Switch off the flags that are optional and depending on build flags
If any of the optional flags is supported by the build it will be switched
Expand Down
2 changes: 1 addition & 1 deletion libmariadb
30 changes: 28 additions & 2 deletions sql/protocol.cc
Expand Up @@ -593,19 +593,44 @@ void Protocol::end_statement()

switch (thd->get_stmt_da()->status()) {
case Diagnostics_area::DA_ERROR:
thd->stop_collecting_unit_results();
/* The query failed, send error to log and abort bootstrap. */
error= send_error(thd->get_stmt_da()->sql_errno(),
thd->get_stmt_da()->message(),
thd->get_stmt_da()->get_sqlstate());
break;
case Diagnostics_area::DA_EOF:
case Diagnostics_area::DA_EOF_BULK:
error= send_eof(thd->server_status,
if (thd->need_report_unit_results()) {
// bulk returning result-set, like INSERT ... RETURNING
// result is already send, needs an EOF with MORE_RESULT_EXISTS
// before sending unit result-set
error= send_eof(thd->server_status | SERVER_MORE_RESULTS_EXISTS,
thd->get_stmt_da()->statement_warn_count());
if (thd->report_collected_unit_results() && thd->is_error())
error= send_error(thd->get_stmt_da()->sql_errno(),
thd->get_stmt_da()->message(),
thd->get_stmt_da()->get_sqlstate());
else
error= send_eof(thd->server_status,
thd->get_stmt_da()->statement_warn_count());
}
else
error= send_eof(thd->server_status,
thd->get_stmt_da()->statement_warn_count());
break;
case Diagnostics_area::DA_OK:
case Diagnostics_area::DA_OK_BULK:
error= send_ok(thd->server_status,
if (thd->report_collected_unit_results())
if (thd->is_error())
error= send_error(thd->get_stmt_da()->sql_errno(),
thd->get_stmt_da()->message(),
thd->get_stmt_da()->get_sqlstate());
else
error= send_eof(thd->server_status,
thd->get_stmt_da()->statement_warn_count());
else
error= send_ok(thd->server_status,
thd->get_stmt_da()->statement_warn_count(),
thd->get_stmt_da()->affected_rows(),
thd->get_stmt_da()->last_insert_id(),
Expand All @@ -615,6 +640,7 @@ void Protocol::end_statement()
break;
case Diagnostics_area::DA_EMPTY:
default:
thd->stop_collecting_unit_results();
DBUG_ASSERT(0);
error= send_ok(thd->server_status, 0, 0, 0, NULL);
break;
Expand Down
6 changes: 6 additions & 0 deletions sql/protocol.h
Expand Up @@ -34,6 +34,12 @@ struct TABLE_LIST;
typedef struct st_mysql_field MYSQL_FIELD;
typedef struct st_mysql_rows MYSQL_ROWS;

struct unit_results_desc
{
ulonglong generated_id;
ulonglong affected_rows;
};

class Protocol
{
protected:
Expand Down
112 changes: 112 additions & 0 deletions sql/sql_class.cc
Expand Up @@ -1375,6 +1375,7 @@ void THD::init()

apc_target.init(&LOCK_thd_kill);
gap_tracker_data.init();
unit_results= NULL;
DBUG_VOID_RETURN;
}

Expand Down Expand Up @@ -8325,6 +8326,117 @@ bool Discrete_intervals_list::append(Discrete_interval *new_interval)
DBUG_RETURN(0);
}

/*
indicate that unit result has to be reported
*/
bool THD::need_report_unit_results()
{
return unit_results;
}

/*
Initialize unit result array
*/
bool THD::init_collecting_unit_results()
{
if (!unit_results)
{
void *buff;

if (!(my_multi_malloc(PSI_NOT_INSTRUMENTED, MYF(MY_WME), &unit_results, sizeof(DYNAMIC_ARRAY),
&buff, sizeof(unit_results_desc) * 10,
NullS)) ||
my_init_dynamic_array2(PSI_INSTRUMENT_ME, unit_results, sizeof(unit_results_desc),
buff, 10, 100, MYF(MY_WME)))
{
if (unit_results)
my_free(unit_results);
unit_results= NULL;
return TRUE;
}
}
return FALSE;
}

/*
remove unit result array
*/
void THD::stop_collecting_unit_results()
{
if (unit_results)
{
delete_dynamic(unit_results);
my_free(unit_results);
unit_results= NULL;
}
}


/*
Add a unitary result to collection
*/
bool THD::collect_unit_results(ulonglong id, ulonglong affected_rows)
{
if (unit_results)
{
unit_results_desc el;
el.generated_id= id;
el.affected_rows= affected_rows;
if (insert_dynamic(unit_results, &el))
{
return TRUE;
}
}
return FALSE;
}

/*
Write unitary result result-set WITHOUT ending EOF/OK_Packet to socket.
*/
bool THD::report_collected_unit_results()
{
if (unit_results)
{
List<Item> field_list;
MEM_ROOT tmp_mem_root;
Query_arena arena(&tmp_mem_root, Query_arena::STMT_INITIALIZED), backup;

init_alloc_root(PSI_NOT_INSTRUMENTED, arena.mem_root, 2048, 4096, MYF(0));
set_n_backup_active_arena(&arena, &backup);
DBUG_ASSERT(mem_root == &tmp_mem_root);

field_list.push_back(new (mem_root)
Item_int(this, "Id", 0, MY_INT64_NUM_DECIMAL_DIGITS),
mem_root);
field_list.push_back(new (mem_root)
Item_int(this, "Affected_rows", 0, MY_INT64_NUM_DECIMAL_DIGITS),
mem_root);

if (protocol_binary.send_result_set_metadata(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
goto error;

for (ulonglong i= 0; i < unit_results->elements; i++)
{
unit_results_desc *last=
(unit_results_desc *)dynamic_array_ptr(unit_results, i);
protocol_binary.prepare_for_resend();
protocol_binary.store_longlong(last->generated_id, TRUE);
protocol_binary.store_longlong(last->affected_rows, TRUE);
if (protocol_binary.write())
goto error;
}
error:
restore_active_arena(&arena, &backup);
DBUG_ASSERT(arena.mem_root == &tmp_mem_root);
// no need free Items because they was only constants
free_root(arena.mem_root, MYF(0));
stop_collecting_unit_results();
return TRUE;
}
return FALSE;

}

void AUTHID::copy(MEM_ROOT *mem_root, const LEX_CSTRING *user_name,
const LEX_CSTRING *host_name)
Expand Down
8 changes: 8 additions & 0 deletions sql/sql_class.h
Expand Up @@ -5953,6 +5953,14 @@ class THD: public THD_count, /* this must be first */
return (lex->sphead != 0 &&
!(in_sub_stmt & (SUB_STMT_FUNCTION | SUB_STMT_TRIGGER)));
}

/* Data and methods for bulk multiple unit result reporting */
DYNAMIC_ARRAY *unit_results;
void stop_collecting_unit_results();
bool collect_unit_results(ulonglong id, ulonglong affected_rows);
bool need_report_unit_results();
bool report_collected_unit_results();
bool init_collecting_unit_results();
};


Expand Down
18 changes: 10 additions & 8 deletions sql/sql_delete.cc
Expand Up @@ -841,23 +841,23 @@ bool Sql_cmd_delete::delete_from_single_table(THD *thd)

if (likely(!error))
{
deleted++;
deleted++;
if (!delete_history && table->triggers &&
table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
TRG_ACTION_AFTER, FALSE))
{
error= 1;
break;
}
if (!--limit && using_limit)
{
error= -1;
break;
}
if (!--limit && using_limit)
{
error= -1;
break;
}
}
else
{
table->file->print_error(error,
table->file->print_error(error,
MYF(thd->lex->ignore ? ME_WARNING : 0));
if (thd->is_error())
{
Expand Down Expand Up @@ -947,7 +947,7 @@ bool Sql_cmd_delete::delete_from_single_table(THD *thd)

if (log_result > 0)
{
error=1;
error=1;
}
}
}
Expand All @@ -959,6 +959,8 @@ bool Sql_cmd_delete::delete_from_single_table(THD *thd)
if (thd->lex->analyze_stmt)
goto send_nothing_and_leave;

thd->collect_unit_results(0, deleted);

if (returning)
result->send_eof();
else
Expand Down
25 changes: 23 additions & 2 deletions sql/sql_insert.cc
Expand Up @@ -470,6 +470,7 @@ void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
the statement indirectly via a stored function or trigger:
if it is used, that will lead to a deadlock between the
client connection and the delayed thread.
- client explicitly ask to retrieve unitary changes
*/
if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) ||
thd->variables.max_insert_delayed_threads == 0 ||
Expand All @@ -480,6 +481,14 @@ void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
*lock_type= TL_WRITE;
return;
}

/* client explicitly asked to retrieved each affected rows and insert ids */
if (thd->need_report_unit_results())
{
*lock_type= TL_WRITE;
return;
}

if (thd->slave_thread)
{
/* Try concurrent insert */
Expand Down Expand Up @@ -717,6 +726,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
uint value_count;
/* counter of iteration in bulk PS operation*/
ulonglong iteration= 0;
ulonglong last_affected_rows= 0;
ulonglong id;
COPY_INFO info;
TABLE *table= 0;
Expand Down Expand Up @@ -931,7 +941,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
functions or invokes triggers since they may access
to the same table and therefore should not see its
inconsistent state created by this optimization.
So we call start_bulk_insert to perform nesessary checks on
So we call start_bulk_insert to perform necessary checks on
values_list.elements, and - if nothing else - to initialize
the code to make the call of end_bulk_insert() below safe.
*/
Expand Down Expand Up @@ -1159,6 +1169,17 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
}
its.rewind();
iteration++;

/*
Save affected rows and insert id when collecting using results
*/
ulonglong new_affected_rows= info.copied + info.deleted +
((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
info.touched : info.updated);
thd->collect_unit_results(
table->file->insert_id_for_cur_row,
new_affected_rows - last_affected_rows);
last_affected_rows = new_affected_rows;
} while (bulk_parameters_iterations(thd));

values_loop_end:
Expand Down Expand Up @@ -1342,7 +1363,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
Client expects an EOF/OK packet if result set metadata was sent. If
LEX::has_returning and the statement returns result set
we send EOF which is the indicator of the end of the row stream.
Oherwise we send an OK packet i.e when the statement returns only the
Otherwise we send an OK packet i.e when the statement returns only the
status information
*/
if (returning)
Expand Down

0 comments on commit ea6975b

Please sign in to comment.