在上一篇文章中,简单对Python利用pymysql操作MySQL进行了封装,实现了多种功能的复用。接下来,在参阅pymysql的API后,对Python进行了更加详细的封装,实现了Python操作MySQL的工具类MySQLDBHelper,其代码主要如下:
MySQLDBHelper工具类
import pymysql
import re
class MySQLDBHelper(object):
"""
定义构造方法,用于初始化数据库连接
"""
def __init__(self, config):
self.host = config['host']
self.user = config['user']
self.password = config['password']
self.port = config['port']
self.connection = None
self.cursor = None
try:
self.connection = pymysql.connect(**config)
self.connection.autocommit(1)
# 调用cursor()方法创建一个用于操作的游标
self.cursor = self.connection.cursor()
except:
print("数据库连接失败,请检查数据库配置项!")
# 定义关闭数据库连接的方法
def close(self):
if not self.connection:
self.connection.close()
else:
print("数据库连接已经关闭!")
# 创建数据库
def createDatabase(self, DB_NAME):
"""
创建数据库
"""
self.cursor.execute(
"CREATE DATABASE IF NOT EXISTS %s DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci" % DB_NAME)
self.connection.select_db(DB_NAME)
print(f"数据库{DB_NAME}创建成功!")
# 选择数据库
def selectDatabase(self, DB_NAME):
self.connection.select_db(DB_NAME)
# 获取数据库的版本号
def getDatabaseVersion(self):
self.cursor.execute('SELECT VERSION()')
return self.queryOne()
# 获取一行查询结果
def queryOne(self):
return self.cursor.fetchone()
# 创建数据库表
def createTable(self, table_name, attribute_dict, constraint):
"""
创建数据库表
:param table_name: 表的名称
:param attribute_dict: 属性键值对
:param constraint: 主外键约束
"""
# 判断要创建的表是否存在
if self.isExistTable(table_name):
print(f"{table_name}已经存在!")
return
sql = ''
sql_mid = '`id` bigint(11) auto_increment,'
for attr, value in attribute_dict.items():
sql_mid = sql_mid + '`' + attr + '`' + ' ' + value + ','
sql = sql + 'CREATE TABLE IF NOT EXISTS %s (' % table_name
sql = sql + sql_mid
sql = sql + constraint
sql = sql + ') ENGINE=InnoDB DEFAULT CHARSET=utf8'
print(f'创建表的语句是:{sql}')
self.executeCommit(sql)
def executeSql(self, sql=''):
"""
执行sql语句,针对读操作返回结果集
"""
try:
self.cursor.execute(sql)
records = self.cursor.fetchall()
return records
except pymysql.Error as e:
print(f'SQL语句执行失败!ERROR({e.args[0]}): {e.args[1]}')
def executeCommit(self, sql=''):
"""
执行SQL语句,针对更新、删除等事务操作,失败时回滚
"""
try:
self.cursor.execute(sql)
self.connection.commit()
except pymysql.Error as e:
self.connection.rollback()
error = 'SQL语句执行失败!ERROR (%s): %s' % (e.args[0], e.args[1])
print(error)
return error
def insert(self, table_name, params):
"""
"""
key = []
value = []
for k, v in params.items():
key.append(k)
if isinstance(v, str):
value.append("\'" + v + "\'")
else:
value.append(v)
attrs_sql = '(' + ','.join(key) + ')'
values_sql = ' values(' + ','.join(value) + ')'
sql = 'insert into %s' % table_name
sql = sql + attrs_sql + values_sql
print(f'插入SQL语句: {sql}')
self.executeCommit(sql)
def select(self, table_name, cond_dict='', order='', fields='*'):
"""
查询数据
args:
table_name: 表的名字
cond_dict: 查询条件
order: 排序条件
示例:
db.select(table)
db.select(table, fields=['user_name])
db.select(table, fields=['user_name','income'])
"""
cond_sql = ' '
if cond_dict != '':
for k, v in cond_dict.items():
cond_sql = cond_sql + '`' + k + '`' + '=' + '"' + v + '"' + ' and'
cond_sql = cond_sql + ' 1=1 '
if fields == '*':
sql = 'select * from %s where ' % table_name
else:
if isinstance(fields, list):
fields = ",".join(fields)
sql = 'select %s from %s where ' % (fields, table_name)
else:
print('传入的字段不正确,请以列表的形式传入字段信息!')
sql = sql + cond_sql + order
print(f'查询语句: {sql}')
return self.executeSql(sql)
def insertMany(self, table_name, attrs, values):
"""
插入多条数据
args:
table_name: 表的名字
attrs: 属性键
values: 属性值
示例:
table_name ='mytable'
key = ["id","name","age"]
value = [[1,'张三',20],[2,'李四',25]]
db.insertMany(table_name, key, value)
"""
values_sql = ['%s' for attr in attrs]
attrs_sql = '(' + ','.join(attrs) + ')'
values_sql = ' values(' + ','.join(values_sql) + ')'
sql = 'insert into %s' % table_name
sql = sql + attrs_sql + values_sql
print(f'插入多条语句:{sql}')
try:
for i in range(0, len(values), 20000):
self.cursor.executemany(sql, values[i:i + 20000])
self.connection.commit()
except pymysql.Error as e:
self.connection.rollback()
print(f'插入多条数据遇到错误! ERROR({e.args[0]}):{e.args[1]}')
def delete(self, table_name, cond_dict):
"""
删除数据
args:
table_name: 表的名字
cond_dict: 删除条件字典
示例:
params = {'name': '风清扬', 'age':38}
db.delete(table_name, params)
"""
cond_sql = ' '
if cond_sql != '':
for k, v in cond_dict.items():
if isinstance(v, str):
v = "\'" + v + "\'"
cond_sql = cond_sql + table_name + "." + k + '=' + v + ' and '
cond_sql = cond_sql + ' 1=1 '
sql = 'delete from %s where %s ' % (table_name, cond_sql)
print(f'生成的sql语句:{sql}')
return self.executeCommit(sql)
def update(self, table_name, attrs_dict, cond_dict):
"""
args:
table_name: 表的名字
attrs_dict: 更新属性键值对字典
cond_dict: 更新条件字典
example:
params = {'name': '风清扬', 'age': 40}
"""
attrs_list = []
cond_sql = ' '
for k, v in attrs_dict.items():
attrs_list.append("`" + k + "`" + "=" + "\'" + v + "\'")
attrs_sql = ",".join(attrs_list)
print(f"attrs_sql: {attrs_sql}")
if cond_dict != '':
for k, v in cond_dict.items():
if isinstance(v, str):
v = "\'" + v + "\'"
cond_sql = cond_sql + "`" + table_name + "`." + "`" + k + "`" + '=' + v + ' and '
cond_sql = cond_sql + ' 1=1 '
sql = 'update %s set %s where %s' % (table_name, attrs_sql, cond_sql)
print(sql)
return self.executeCommit(sql)
def dropTable(self, table_name):
"""
删除数据库表
args:
table_name: 表名字
"""
sql = 'DROP TABLE %s' % table_name
self.executeCommit(sql)
def deleteTable(self, table_name):
"""
清空表数据
"""
sql = 'DELETE FROM %s' % table_name
print(sql)
self.executeCommit(sql)
def truncateTable(self, table_name):
"""
清空表数据,不写日志
"""
sql = 'TRUNCATE TABLE %s' % table_name
print(sql)
self.executeCommit(sql)
def isExistTable(self, table_name):
"""
判断表是否存在
"""
sql = 'select * from %s' % table_name
ret = self.executeCommit(sql)
if ret is None:
return True
else:
if re.search("doesn't exist", ret):
return False
else:
return True
MySQLDBHelper工具类的测试
# 测试
if __name__ == "__main__":
# 定义数据库访问参数
config = {
'host': 'node05',
'user': 'root',
'password': 'Love88me',
'port': 3306,
'charset': 'utf8',
'cursorclass': pymysql.cursors.DictCursor
}
# 初始化打开数据库连接
db = MySQLDBHelper(config)
# 打印数据库版本
print(db.getDatabaseVersion())
# 测试创建数据库
print("================测试创建数据库=========================")
DB_NAME = input('输入要创建的数据库名称:')
db.createDatabase(DB_NAME)
# 测试选择数据库
print("================测试选择数据库=========================")
db.selectDatabase(DB_NAME)
# 测试创建表
print("================测试创建数据表=========================")
table_name = input('请输入要创建的数据表名:')
'''
例如
create table `location` (
`id` bigint(11) not null auto_increment,
`name` varchar(30) not null,
`province` varchar(20),
`city` varchar(50)
) engine=InnoDB default charset=utf8;
'''
attrs_dict = {
'name': 'varchar(30) not null',
'province': 'varchar(30) not null',
'city': 'varchar(50)'
}
constraint = 'primary key(`id`)'
db.createTable(table_name, attrs_dict, constraint)
# 测试单条数据插入功能
params = {'name': '北京市文化局', 'province': '北京', 'city': '丰台区'}
db.insert(table_name, params)
# 测试批量插入数据
insert_values = [['江苏省无锡市文化局','江苏省','无锡市'],
['安徽省合肥市教育局','安徽省','合肥市'],
['浙江省杭州市旅游局','浙江省','杭州市']]
insert_attrs = ['name','province','city']
db.insertMany(table_name, insert_attrs, insert_values)
# 测试数据查询
print(db.select(table_name, fields=['id','name']))
print(db.select(table_name,cond_dict={'province':'江苏省'}, fields=['name','province','city']))
print(db.select(table_name, fields=['id','name', 'province', 'city'], order='order by id desc'))
# 测试更新数据
update_params = {'name':'江苏省无锡市苏南国际鸡厂'}
update_cond_dict = {'province':'江苏省'}
db.update(table_name, update_params, update_cond_dict)
# 测试删除数据
delete_params = {'province':'安徽省'}
db.delete(table_name, delete_params)
# 测试删除表数据
db.deleteTable(table_name)
# 测试删除表
db.dropTable(table_name)