时间:2021-07-01 10:21:17 帮助过:13人阅读
 初始化数据库:
 def __init__(self, username, password):
    # 初始化数据库
    self.username = username
    self.password = password
创建联合唯一索引
# __table_args__(
#     UniqueConstraint("列名1","列名2","联合唯一索引名"),
#     index("索引名","列名1","列名2"),
# ) #创建联合唯一索引
__table_args__ = (
    UniqueConstraint(‘id‘, ‘username‘, name=‘uix_id_name‘),
    Index(‘ix_id_name‘, ‘username‘, ‘password‘),
)
3.创建映射
# 创建映射
class UserORMHelper(object):
    def __init__(self, database_name):
        DATABASE_PATH = os.path.join(basedir, database_name)
        SQLALCHEMY_DATABASE_URI = ‘sqlite:///‘ + DATABASE_PATH
        self.engine = create_engine(SQLALCHEMY_DATABASE_URI)
创建表
def create_db(self):
    # 创建表
    Base.metadata.create_all(self.engine)  # 创建表
    Session = sessionmaker(bind=self.engine)
    self.session = Session()
删除表
def drop_db(self):
    # 删除表
    Base.metadata.drop_all(self.engine)   #删除表
4.接下来就是简单的针对数据库的增删改查操作了,这些操作我是为了方便测试安装自己的做了些修改,后面我会附上测试的代码
插入数据
def addUser(self, users):
    # 插入数据
    isSucess=False
    usrsList=self.query_all_with_user_name_password(users)
    if(usrsList and len(usrsList)>0):#用户已经注册
       return False
    self.session.add(users)
    self.session.commit()
    isSucess=True
    return isSucess
删除数据
def delete(self,users):
    # 删除数据
    isSucess=False
    user1 = self.session.query(Users).filter_by(username=users.username).first()
    self.session.delete(user1)
    self.session.commit()
    isSucess=True
    return isSucess
修改
(1)匹配username修改password
def update_user_extra_by_user_name(self, users):
    # 匹配并且修改password
    isSucess = False
    self.session.query(Users).filter(Users.username == users.username).update({"password": users.password}, synchronize_session=‘evaluate‘)
    self.session.commit()
    isSucess=True
    return isSucess
(2)匹配password修改username
def update_user_name_by_user_extra(self, users):
    # 匹配并且修改name
    isSucess = False
    self.session.query(Users).filter(Users.password == users.password).update({"username": users.username}, synchronize_session=‘evaluate‘)
    self.session.commit()
    isSucess=True
    return isSucess
查询
安password查询所有
def query_all_with_user_extra(self, users):
    # 查询相同password数据
    userList = self.session.query(Users).filter_by(password=users.password).all()
    return userList
安username查询所有,但是只取其一
def query_one(self,users):
    # 查询相同name数据,取一个
    userList=self.session.query(Users).filter_by(username=users.username).limit(1).all()
    self.session.commit()
    return userList
安username查询但是只取第一条数据
    def query_first_with_user_name(self, users):
        # 查询相同name数据,取第一个
        userList=self.session.query(Users).filter_by(username=users.username).first()
        return userList
安username,password查询所有
def query_all_with_user_name_password(self, users):
    # 查询所有相同于name,password数据
    userList=self.session.query(Users).filter_by(username=users.username,password=users.password).all()
    return userList
安username查询所有
def query_all_with_user_name(self, users):
    # 查询所有相同name数据
    userList=self.session.query(Users).filter_by(username=users.username).all()
    return userList
下面的是数据库的测试代码,写测试我用的是TDD
import unittest
from UserORM import Users,UserORMHelper
TEST_DB = ‘test.db‘
class UserORMHelperTestCase(unittest.TestCase):
    def setUp(self):
        """Set up a blank temp database before each test"""
        self.helper=UserORMHelper("user.db")
        self.helper.drop_db()
        self.helper.create_db()
    def tearDown(self):
        """Destroy blank temp database after each test"""
        self.helper.drop_db()
    def test_add(self):
        # 插入测试
        isSuccess=self.helper.addUser(Users("tome", "tom jobn"))
        self.assertTrue(isSuccess)
        isSuccess=self.helper.addUser(Users("tome", "tom jobn"))
        self.assertFalse(isSuccess)
    def test_delete(self):
        # 删除测试
        isSuccess = self.helper.addUser(Users("tome", "tom jobn"))
        self.assertTrue(isSuccess)
        isSucess=self.helper.delete(Users("tome", "tom jobn"))
        self.assertTrue(isSucess)
    def test_query_all(self):
        # 查询所有相同name数据测试
         isSuccess=self.helper.addUser(Users("tome", "tom jobn"))
         self.assertTrue(isSuccess)
         isSuccess=self.helper.addUser(Users("tome", "tom jobn2"))
         self.assertTrue(isSuccess)
         isSuccess=self.helper.addUser(Users("tome", "tom jobn3"))
         self.assertTrue(isSuccess)
         userList=self.helper.query_all_with_user_name(Users("tome", "tom jobn"))
         # print userList
         self.assertEqual(len(userList),3)
    def test_query_none(self):
        # 查询相同name数据,取第一个  测试
         userList=self.helper.query_first_with_user_name(Users("tom", "tom job"))
         self.assertEqual(userList,None)
    def test_query_one(self):
        # 查询相同name数据,取一个   测试
        isSuccess = self.helper.addUser(Users("tome", "tom jobn"))
        self.assertTrue(isSuccess)
        isSuccess = self.helper.addUser(Users("tome", "tom jobn2"))
        self.assertTrue(isSuccess)
        isSuccess = self.helper.addUser(Users("tome", "tom jobn3"))
        self.assertTrue(isSuccess)
        userList = self.helper.query_one(Users("tome", "tom jobn4"))
        self.assertEqual(len(userList), 1)
    def test_revise_extra(self):
        isSuccess = self.helper.addUser(Users("tome", "tom jobn"))
        self.assertTrue(isSuccess)
        isSuccess=self.helper.update_user_extra_by_user_name(Users("tome", "tomUpdate"))
        self.assertTrue(isSuccess)
        userList = self.helper.query_all_with_user_name(Users("tome", "tom job"))
        self.assertEqual(len(userList), 1)
    def test_recise_name(self):
        isSuccess = self.helper.addUser(Users("tome", "tom jobn"))
        self.assertTrue(isSuccess)
        isSuccess = self.helper.update_user_name_by_user_extra(Users("tomeUpdate", "tom jobn"))
        self.assertTrue(isSuccess)
        userList = self.helper.query_all_with_user_name(Users("tomeUpdate", "tom job"))
        self.assertEqual(len(userList),1)
if __name__ == ‘__main__‘:
    unittest.main()
 Python SqlAlchemy使用方法
标签:prim code 连接 url dir eva www equal pat