时间:2021-07-01 10:21:17 帮助过:3人阅读
MySQLStorageEngine的初始化过程涉及以下六个方面:
(1)begin、end用于统计一次批量写入的耗时;
(2)接收用户定制的参数;
host:数据库主机名;
user:数据库登录用户名;
passwd:数据库登录密码;
db:数据库实例
port:数据库端口
charset:数据库字符编码
sql:写入数据时使用的SQL语句,如“insert into students (c_number, c_name) values (%s, %s)”
threads:写入线程数目;
bufferSize:写入线程内部的缓存区大小,亦即每一次“batch”的大小;
mincached:数据库连接池内部最小缓存连接数;
maxcached:数据库连接池内部最大缓存连接数;
maxconnections:数据库连接池所允许同时建立的最大连接数:目前与写入线程数目相同。
(3)构建共享队列queue;
(4)saveNum、storeNum用于统计用户写入总数及实际(成功)写入总数,考虑到多线程使用环境,分别构建相应的锁对象saveLock、storeLock;
(5)构建数据库连接池,这里使用的是DBUtils PooledDB;
(6)构建写入线程(多个)并启动。
至此,MySQLStorageEngine实例创建完毕,并且启动内部多个写入线程用于消费队列queue中数据。用户可通过实例方法save写入数据:
可见,save支持两种类型的数据,一种是Tuple,另一种是Tuple数组,它们都被保存至队列queue中,由写入线程负责处理。
用户写入完成之后,可以通过方法close关闭“存储引擎”,
需要注意的是,用户写入的数据实际是保存在队列queue中的,“用户写入完成”并不代表队列queue中的数据已全部被写入线程消费且完成入库,因此必须首先通知写入线程用户数据已全部写入完毕(requestStop),然后等待写入线程运行完毕(join),最后关闭数据库连接池。
写入线程由MySQLSaver实现,它的初始化过程特别简单:
(1)接收“存储引擎”实例engine;
(2)定义实例变量stop,初始值为False,用于表示用户尚有数据写入;True表示用户写入结束。
MySQLSaver的工作流程如下:
(1)初始化缓存区buffer,用于保存从队列queue消费而来的数据;
(2)循环从队列queue获取数据,如果没有获取到数据,则执行(3);如果获取到数据,则执行(4);
(3)如果用户写入结束(stop值为True)而且队列中已经没有剩余数据,将缓冲区buffer中的数据一次性写入(__save),结束线程(break);
(4)将(2)中获取到的数据存入缓存区,如果缓存区的大小达到数目限制,则将缓冲区buffer中的数据一次性写入(__save),继续(2)。
__save的工作流程如下:
(1)从连接池pool中获取数据库连接db并构建实例cursor;
(2)写入buffer中的数据(executemany)并提交(commit);
(3)清空buffer、关闭实例cursor、将数据库连接db“归还”给连接池(close)。
使用示例如下:

MySQL通用批量写入工具(Python)
标签: