python 对mongodb进行压力测试
                        
                            时间:2021-07-01 10:21:17
                            帮助过:3人阅读
							                        
                     
                    
                    
                    from pymongo import Connection,MongoClient,MongoReplicaSetClient
import multiprocessing
import time
#connection = MongoClient(
‘mongodb://10.120.11.212:27017/‘)
#connection = Connection([
‘10.120.11.122‘,
‘10.120.11.221‘,
‘10.120.11.212‘], 
27017)
‘‘‘数据库采用了读写分离设置,连接mongoDB的模式要配对‘‘‘
connection=
MongoReplicaSetClient(
        ‘10.120.11.122:27017,10.120.11.221:27017,10.120.11.212:27017‘,
        replicaSet=
‘rs0‘,
        read_preference=
3
#        read_preference=
3
        )
db = connection[
‘cms‘]
db.authenticate(‘cms‘, 
‘cms‘)
#计时器
def func_time(func):
        def _wrapper(*args,**
kwargs):
                start =
 time.time()
                func(*args,**
kwargs)
                print func.__name__,‘run:‘,time.time()-
start
        return _wrapper
#插入测试方法
def insert(num):
        posts =
 db.userinfo
        for x 
in range(num):
                post = {
"_id" : str(x),
                        "author": str(x),
                        "text": 
"My first blog post!"
                        }
             posts.insert(post)
#查询测试方法
def query(num):
    get=
db.device
    for i 
in xrange(num):
         get.find_one({
"scanid":
"010000138101010000009aaaaa"})
@func_time
def main(process_num,num):
    pool = multiprocessing.Pool(processes=
process_num)
    for i 
in xrange(num):
        pool.apply_async(query, (num, ))
    pool.close()
    pool.join()
    print "Sub-process(es) done."
if __name__ == 
"__main__":
#    query(500,
1)
        main(800,
500)
原文发表于http://www.cnblogs.com/reach296/
python 对mongodb进行压力测试
标签: