时间:2021-07-01 10:21:17 帮助过:12人阅读
不提议: 去skip 经常skip的话,数据会越来越不一致了 :(
遇到错误,应该修正,使其能运行
 
 
 


 
1 #/usr/bin/python 2 3 import sys 4 import os 5 import re 6 import optparse 7 import pymysql 8 9 10 MYSQL_SHOW_SLAVE_STATUS = ‘SHOW SLAVE STATUS;‘ 11 GTID_MODE = "select @@gtid_mode" 12 com_mysqlbinlog = "/usr/local/mysql/bin/mysqlbinlog" 13 14 r1062 = r"Could not execute Write_rows event on table (.*); Duplicate entry ‘(\d+)‘ for key ‘PRIMARY‘, Error_code: 1062; handler error HA_ERR_FOUND_DUPP_KEY; the event‘s master log (.*), end_log_pos (\d+)" 15 u1032 = r"Could not execute (.*)_rows event on table (.*); Can‘t find record in (.*), Error_code: 1032; handler error HA_ERR_KEY_NOT_FOUND; the event‘s master log (.*), end_log_pos (\d+)" 16 17 GET_FROM_LOG="%s -v --base64-output=decode-rows -R --host=‘%s‘ --port=%d --user=‘%s‘ --password=‘%s‘ --start-position=%d --stop-position=%d %s |grep @%s|head -n 1" 18 19 FLAGS = optparse.Values() 20 parser = optparse.OptionParser() 21 22 def DEFINE_string(name, default, description, short_name=None): 23 if default is not None and default != ‘‘: 24 description = "%s (default: %s)" % (description, default) 25 args = [ "--%s" % name ] 26 if short_name is not None: 27 args.insert(0, "-%s" % short_name) 28 29 parser.add_option(type="string", help=description, *args) 30 parser.set_default(name, default) 31 setattr(FLAGS, name, default) 32 33 def DEFINE_integer(name, default, description, short_name=None): 34 if default is not None and default != ‘‘: 35 description = "%s (default: %s)" % (description, default) 36 args = [ "--%s" % name ] 37 if short_name is not None: 38 args.insert(0, "-%s" % short_name) 39 40 parser.add_option(type="int", help=description, *args) 41 parser.set_default(name, default) 42 setattr(FLAGS, name, default) 43 44 DEFINE_integer(‘db_port‘, ‘3306‘, ‘DB port : 3306‘) 45 DEFINE_string(‘db_user‘, ‘wubx‘, ‘DB user ‘) 46 DEFINE_string(‘db_password‘, ‘‘, ‘DB password‘) 47 DEFINE_string(‘db_host‘, ‘127.0.0.1‘, ‘DB Hostname‘) 48 49 50 def ShowUsage(): 51 parser.print_help() 52 53 def ParseArgs(argv): 54 usage = sys.modules["__main__"].__doc__ 55 parser.set_usage(usage) 56 unused_flags, new_argv = parser.parse_args(args=argv, values=FLAGS) 57 return new_argv 58 59 def get_conn(): 60 return pymysql.connect(host=FLAGS.db_host, port=int(FLAGS.db_port), user=FLAGS.db_user,passwd=FLAGS.db_password) 61 62 def get_tb_pk(db_table): 63 db, tb = db_table.split(‘.‘) 64 conn = get_conn() 65 sql = "select column_name,ordinal_position from information_schema.columns where table_schema=‘%s‘ and table_name=‘%s‘ and column_key=‘PRI‘;" % (db, tb) 66 cursor = conn.cursor() 67 cursor.execute(sql) 68 r = cursor.fetchone() 69 cursor.close() 70 conn.close() 71 return r 72 73 74 def get_rpl_mode(conn): 75 cursor = conn.cursor() 76 cursor.execute(GTID_MODE) 77 r = cursor.fetchone() 78 print r 79 80 if (r[0] == "ON"): 81 return 1 82 else: 83 return 0 84 85 def handler_1062(r,rpl): 86 print r[‘Last_SQL_Error‘] 87 p = re.compile(r1062) 88 m = p.search(r[‘Last_SQL_Error‘]) 89 db_table = m.group(1) 90 pk_v = m.group(2) 91 conn = get_conn() 92 pk_col = get_tb_pk(db_table)[0] 93 94 sql = "delete from %s where %s=%s" % (db_table, pk_col, pk_v) 95 cursor = conn.cursor() 96 cursor.execute("set session sql_log_bin=0;") 97 cursor.execute(sql) 98 #cursor.execute("set session sql_log_bin=1") 99 cursor.execute("start slave sql_thread") 100 cursor.close() 101 conn.commit() 102 conn.close() 103 return 0 104 105 106 def handler_1032(r, rpl): 107 print r[‘Last_SQL_Error‘] 108 p = re.compile(u1032) 109 m = p.search(r[‘Last_SQL_Error‘]) 110 db_table = m.group(2) 111 tb_name = m.group(3) 112 log_file_name = m.group(4) 113 log_stop_position = m.group(5) 114 log_start_position = r[‘Exec_Master_Log_Pos‘] 115 pk_seq = get_tb_pk(db_table)[1] 116 do_getlog = GET_FROM_LOG % (com_mysqlbinlog, r[‘Master_Host‘], int(r[‘Master_Port‘]),FLAGS.db_user,FLAGS.db_password, int(log_start_position), int(log_stop_position), log_file_name,pk_seq) 117 pk_value = os.popen(do_getlog).readlines()[0].split("=",2)[1].rstrip() 118 print pk_value 119 sql = mk_tb_replace(db_table, pk_value, pk_seq) 120 121 conn = get_conn() 122 cursor = conn.cursor() 123 cursor.execute("set session sql_log_bin=0;") 124 cursor.execute(sql) 125 #cursor.execute("set session sql_log_bin=1") 126 cursor.execute("start slave sql_thread") 127 conn.commit() 128 cursor.close() 129 conn.close() 130 131 132 def mk_tb_replace(db_table, pk_value, pk_seq): 133 db, tb_name = db_table.split(".") 134 r = "replace into %s.%s "%(db,tb_name) 135 136 sql = "select column_name, ORDINAL_POSITION from information_schema.columns where table_schema=‘%s‘ and table_name=‘%s‘ and IS_NULLABLE=‘NO‘;" % (db, tb_name) 137 #print sql 138 139 col_list=‘‘ 140 value_list=‘‘ 141 142 conn = get_conn() 143 cusror = conn.cursor() 144 cusror.execute(sql) 145 result = cusror.fetchall() 146 for col in result: 147 if (col[1] == pk_seq): 148 col_list = col_list +"%s," % (col[0]) 149 value_list = value_list + "‘%s‘," % (pk_value) 150 else: 151 col_list = col_list +"%s," % (col[0]) 152 value_list = value_list + "‘%s‘," % (‘1‘) 153 print value_list 154 r = r+"(%s) values(%s)" % ( col_list.rstrip(‘,‘), value_list.rstrip(‘,‘)) 155 cusror.close() 156 conn.close() 157 return r.rstrip(‘,‘) 158 159 160 161 def get_slave_status(conn): 162 cursor = conn.cursor(pymysql.cursors.DictCursor) 163 cursor.execute(MYSQL_SHOW_SLAVE_STATUS) 164 result = cursor.fetchone() 165 return result 166 cursor.close() 167 168 if __name__ == ‘__main__‘: 169 new_argv = ParseArgs(sys.argv[1:]) 170 171 print FLAGS 172 conn = get_conn() 173 174 175 r = get_slave_status(conn) 176 177 if (r[‘Slave_IO_Running‘] == "Yes" and r[‘Slave_SQL_Running‘] == "Yes"): 178 print "Rpl Ok" 179 if (r[‘Seconds_Behind_Master‘] > 0): 180 print r[‘Seconds_Behind_Master‘] 181 182 conn.close() 183 exit(0) 184 185 while( 1 ): 186 187 r = get_slave_status(conn) 188 if (r[‘Slave_IO_Running‘] == "Yes" and r[‘Slave_SQL_Running‘] == "No"): 189 rpl_mode = get_rpl_mode(conn) 190 print "rpl_mode %s " % (rpl_mode) 191 print r[‘Last_Errno‘] 192 if ( r[‘Last_Errno‘] == 1062 ): 193 r1062 = handler_1062(r, rpl_mode) 194 195 196 if ( r[‘Last_Errno‘] == 1032 ): 197 r1032 = handler_1032(r, rpl_mode) 198 else: 199 break 200 201 202 conn.close()1032 update错误处理难点,去主库上获取出来主值的值
 


 
 

 
s1: show master status;
s2:start slave;
 

 

从库前增加cache
 
 

 

perf top -p `pidof mysqld`


一个分区两个文件句柄,一个分区是两个文件
fd=open(xxx,‘w‘);

表设计的时候一定要有主键,
 
复制结构的调整?

 

s11 解析执行完的binlog last event
mysql-学习-10-20170605-复制的修复
标签:show cache code could not insert status 跳过 nal star