Skip to content

Commit

Permalink
[feature] Issue #37: Support MTS distributing in table level
Browse files Browse the repository at this point in the history
MySQL 5.6 introduce Multi-Threaded Slaves(MTS) which significantly improved
replicating capability of slave. However, the original MTS only support
distributing in schema level, which limits the parallelism of worker threads.

For example, if all updates on master is under one schema, there will be
only one worker applying events, and degenerate to signle thread replication.

To solve this problem, we add a new distributing level, table mode.
In the new mode, events can be distributing to different workers as long as
different tables is involed.

A new global option "slave_pr_mode" is introduced, which can be configured as:
1. schema, distributing in schema mode, same as origial;
2. table, distributing in table mode.

Note: if changed dynamically, please remember to restart SQL thread,
i.e. "stop slave sql_thread; start slave sql_thread", to make the changes
take effect.
  • Loading branch information
AliSQL authored and AliSQL committed Dec 22, 2016
1 parent c39ebbc commit 945e59d
Show file tree
Hide file tree
Showing 15 changed files with 132 additions and 7 deletions.
3 changes: 3 additions & 0 deletions mysql-test/r/mysqld--help-notwin.result
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,8 @@ The following options may be given as the first argument:
Max size of Slave Worker queues holding yet not applied
events.The least possible value must be not less than the
master side max_allowed_packet.
--slave-pr-mode=name
Parallel-replication based on SCHEMA or TABLE
--slave-rows-search-algorithms=name
Set of searching algorithms that the slave will use while
searching for records from the storage engine to either
Expand Down Expand Up @@ -1456,6 +1458,7 @@ slave-max-allowed-packet 1073741824
slave-net-timeout 3600
slave-parallel-workers 0
slave-pending-jobs-size-max 16777216
slave-pr-mode TABLE
slave-rows-search-algorithms TABLE_SCAN,INDEX_SCAN
slave-skip-errors (No default value)
slave-sql-verify-checksum TRUE
Expand Down
36 changes: 36 additions & 0 deletions mysql-test/suite/rds/r/rpl_mts_table.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
include/master-slave.inc
Warnings:
Note #### Sending passwords in plain text without SSL/TLS is extremely insecure.
Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information.
Warnings:
Note 1753 slave_transaction_retries is not supported in multi-threaded slave mode. In the event of a transient failure, the slave will not retry the transaction and will stop.
[connection master]
reset master;
create table t1(c int) engine=innodb;
create table t2(c int) engine=innodb;
create table t3(c int) engine=innodb;
insert into t1 values(1);
insert into t2 values(2);
insert into t3 values(3);
insert into t2 values(2);
stop slave;
change master to master_auto_position=1;
start slave;
Warnings:
Note 1753 slave_transaction_retries is not supported in multi-threaded slave mode. In the event of a transient failure, the slave will not retry the transaction and will stop.
stop slave;
set global relay_log_info_repository='FILE';
start slave;
Warnings:
Note 1753 slave_transaction_retries is not supported in multi-threaded slave mode. In the event of a transient failure, the slave will not retry the transaction and will stop.
insert into t3 values(3);
drop table t3;
drop table t2;
drop table t1;
stop slave;
set global relay_log_info_repository='TABLE';
change master to master_auto_position=0;
start slave;
Warnings:
Note 1753 slave_transaction_retries is not supported in multi-threaded slave mode. In the event of a transient failure, the slave will not retry the transaction and will stop.
include/rpl_end.inc
1 change: 1 addition & 0 deletions mysql-test/suite/rds/t/rpl_mts_table-master.opt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
--log-bin --gtid-mode=on --enforce-gtid-consistency --log-slave-updates --binlog_format=row
1 change: 1 addition & 0 deletions mysql-test/suite/rds/t/rpl_mts_table-slave.opt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
--slave_pr_mode=TABLE --log-bin --gtid-mode=on --enforce-gtid-consistency --log-slave-updates --binlog_format=row --slave_parallel_workers=8 --master_info_repository=TABLE --relay_log_info_repository=TABLE
35 changes: 35 additions & 0 deletions mysql-test/suite/rds/t/rpl_mts_table.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
--source include/have_log_bin.inc
--source include/master-slave.inc
connection master;
reset master;
create table t1(c int) engine=innodb;
create table t2(c int) engine=innodb;
create table t3(c int) engine=innodb;
insert into t1 values(1);
insert into t2 values(2);
insert into t3 values(3);
insert into t2 values(2);

connection slave;
stop slave;
change master to master_auto_position=1;
start slave;

connection master;
--sync_slave_with_master

stop slave;
set global relay_log_info_repository='FILE';

start slave;
connection master;
insert into t3 values(3);
drop table t3;
drop table t2;
drop table t1;
--sync_slave_with_master
stop slave;
set global relay_log_info_repository='TABLE';
change master to master_auto_position=0;
start slave;
--source include/rpl_end.inc
2 changes: 1 addition & 1 deletion mysql-test/suite/rpl/t/rpl_mts_debug-slave.opt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
--slave-transaction-retries=0 --relay-log-info-repository=TABLE --master-info-repository=TABLE
--slave-transaction-retries=0 --relay-log-info-repository=TABLE --master-info-repository=TABLE --slave_pr_mode=schema
2 changes: 2 additions & 0 deletions mysql-test/suite/sys_vars/r/all_vars.result
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ RDS_THREADS_RUNNING_CTL_MODE
RDS_THREADS_RUNNING_CTL_MODE
RDS_THREADS_RUNNING_HIGH_WATERMARK
RDS_THREADS_RUNNING_HIGH_WATERMARK
SLAVE_PR_MODE
SLAVE_PR_MODE
TOKUDB_ALTER_PRINT_ERROR
TOKUDB_ALTER_PRINT_ERROR
TOKUDB_ANALYZE_DELETE_FRACTION
Expand Down
37 changes: 33 additions & 4 deletions sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2616,6 +2616,8 @@ Log_event::continue_group(Relay_log_info *rli)
/**
@param end_group_sets_max_dbs when true the group terminal event
can carry partition info, see a note below.
@param rli relay log info

@return true in cases the current event
carries partition data,
false otherwise
Expand All @@ -2627,9 +2629,11 @@ Log_event::continue_group(Relay_log_info *rli)
assigning OVER_MAX_DBS_IN_EVENT_MTS to mts_accessed_dbs
of COMMIT query event.
*/
bool Log_event::contains_partition_info(bool end_group_sets_max_dbs)
bool Log_event::contains_partition_info(bool end_group_sets_max_dbs,
Relay_log_info *rli)
{
bool res;
bool table_mode = (rli->pr_mode == SLAVE_PR_MODE_TABLE);

switch (get_type_code()) {
case TABLE_MAP_EVENT:
Expand All @@ -2645,6 +2649,16 @@ bool Log_event::contains_partition_info(bool end_group_sets_max_dbs)
static_cast<Query_log_event*>(this)->mts_accessed_dbs=
OVER_MAX_DBS_IN_EVENT_MTS;
}
else if (table_mode && (!starts_group() && !ends_group()))
{
/*
In table mode MTS replication, all query event is executed
serially by worker 0.
*/
res= true;
static_cast<Query_log_event*>(this)->mts_accessed_dbs=
OVER_MAX_DBS_IN_EVENT_MTS;
}
else
res= (!ends_group() && !starts_group()) ? true : false;

Expand Down Expand Up @@ -2776,7 +2790,7 @@ Slave_worker *Log_event::get_slave_worker(Relay_log_info *rli)

// mini-group representative

if (contains_partition_info(rli->mts_end_group_sets_max_dbs))
if (contains_partition_info(rli->mts_end_group_sets_max_dbs, rli))
{
int i= 0;
Mts_db_names mts_dbs;
Expand Down Expand Up @@ -2830,9 +2844,24 @@ Slave_worker *Log_event::get_slave_worker(Relay_log_info *rli)
to satisfy hashcmp() implementation.
*/
const char all_db[NAME_LEN]= {0};

char map_key[NAME_LEN * 2 + 3] = {0};

if (mts_dbs.num != OVER_MAX_DBS_IN_EVENT_MTS)
{
if (get_type_code() == TABLE_MAP_EVENT &&
rli->pr_mode == SLAVE_PR_MODE_TABLE)
{
Table_map_log_event *ev= (Table_map_log_event *) this;
ev->set_full_name(map_key);
}
else
strcpy(map_key, mts_dbs.name[i]);
}

if (!(ret_worker=
map_db_to_worker(mts_dbs.num == OVER_MAX_DBS_IN_EVENT_MTS ?
all_db : mts_dbs.name[i], rli,
all_db : map_key, rli,
&mts_assigned_partitions[i],
/*
todo: optimize it. Although pure
Expand All @@ -2851,7 +2880,7 @@ Slave_worker *Log_event::get_slave_worker(Relay_log_info *rli)
DBUG_ASSERT(mts_dbs.num != OVER_MAX_DBS_IN_EVENT_MTS || !thd->temporary_tables);
DBUG_ASSERT(!strcmp(mts_assigned_partitions[i]->db,
mts_dbs.num != OVER_MAX_DBS_IN_EVENT_MTS ?
mts_dbs.name[i] : all_db));
map_key : all_db));
DBUG_ASSERT(ret_worker == mts_assigned_partitions[i]->worker);
DBUG_ASSERT(mts_assigned_partitions[i]->usage >= 0);
}
Expand Down
6 changes: 5 additions & 1 deletion sql/log_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -1517,7 +1517,7 @@ class Log_event
/**
@return TRUE if events carries partitioning data (database names).
*/
bool contains_partition_info(bool);
bool contains_partition_info(bool, Relay_log_info*);

/*
@return the number of updated by the event databases.
Expand Down Expand Up @@ -3902,6 +3902,10 @@ class Table_map_log_event : public Log_event
const Table_id& get_table_id() const { return m_table_id; }
const char *get_table_name() const { return m_tblnam; }
const char *get_db_name() const { return m_dbnam; }
void set_full_name(char *buf)
{
sprintf(buf, "%s\1%s", m_dbnam, m_tblnam);
}

virtual Log_event_type get_type_code() { return TABLE_MAP_EVENT; }
virtual bool is_valid() const { return m_memory != NULL; /* we check malloc */ }
Expand Down
1 change: 1 addition & 0 deletions sql/mysqld.cc
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ bool thread_cache_size_specified= false;
bool host_cache_size_specified= false;
bool table_definition_cache_specified= false;
my_bool opt_rds_allow_unsafe_stmt_with_gtid= FALSE;
ulong slave_pr_mode_options;

my_bool ic_reduce_hint_enable= 0;
HASH ic_gather_hash;
Expand Down
1 change: 1 addition & 0 deletions sql/mysqld.h
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ extern ulong log_warnings;
extern uint host_cache_size;
void init_sql_statement_names();
extern my_bool opt_rds_allow_unsafe_stmt_with_gtid;
extern ulong slave_pr_mode_options;


extern HASH ic_gather_hash;
Expand Down
3 changes: 3 additions & 0 deletions sql/rpl_rli.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ class Relay_log_info : public Rpl_info
*/
bool error_on_rli_init_info;

/* MTS distributing mode, see enum_slave_pr_mode */
ulong pr_mode;

/*
Let's call a group (of events) :
- a transaction
Expand Down
2 changes: 1 addition & 1 deletion sql/rpl_rli_pdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2041,7 +2041,7 @@ int slave_worker_exec_job(Slave_worker *worker, Relay_log_info *rli)
else if (!is_gtid_event(ev))
{
if ((part_event=
ev->contains_partition_info(worker->end_group_sets_max_dbs)))
ev->contains_partition_info(worker->end_group_sets_max_dbs, rli)))
{
uint num_dbs= ev->mts_number_dbs();
DYNAMIC_ARRAY *ep= &worker->curr_group_exec_parts;
Expand Down
2 changes: 2 additions & 0 deletions sql/sql_class.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ enum enum_slave_type_conversions { SLAVE_TYPE_CONVERSIONS_ALL_LOSSY,
enum enum_slave_rows_search_algorithms { SLAVE_ROWS_TABLE_SCAN = (1U << 0),
SLAVE_ROWS_INDEX_SCAN = (1U << 1),
SLAVE_ROWS_HASH_SCAN = (1U << 2)};
enum enum_slave_pr_mode { SLAVE_PR_MODE_SCHEMA,
SLAVE_PR_MODE_TABLE };

enum enum_mark_columns
{ MARK_COLUMNS_NONE, MARK_COLUMNS_READ, MARK_COLUMNS_WRITE};
Expand Down
7 changes: 7 additions & 0 deletions sql/sys_vars.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4954,3 +4954,10 @@ static Sys_var_mybool Sys_rds_allow_unsafe_stmt_with_gtid(
GLOBAL_VAR(opt_rds_allow_unsafe_stmt_with_gtid),
CMD_LINE(OPT_ARG), DEFAULT(FALSE),
&PLock_sys_rds_allow_unsafe_stmt_with_gtid, NOT_IN_BINLOG);

static const char *slave_pr_mode_names[]= {"SCHEMA", "TABLE", 0};
static Sys_var_enum Slave_pr_mode(
"slave_pr_mode",
"Parallel-replication based on SCHEMA or TABLE",
GLOBAL_VAR(slave_pr_mode_options),CMD_LINE(REQUIRED_ARG),
slave_pr_mode_names, DEFAULT(SLAVE_PR_MODE_TABLE));

0 comments on commit 945e59d

Please sign in to comment.