create database gx character set utf8mb4;
#!/usr/bin/python3
import mysql as pymysql
# 打开数据库连接
db = pymysql.connect(host='localhost',
port=3306,
user='root',
password='zhangdapeng520',
database='gx')
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
# 使用 execute() 方法执行 SQL 查询
cursor.execute("SELECT VERSION()")
# 使用 fetchone() 方法获取单条数据.
data = cursor.fetchone()
print("Database version : %s " % data)
# 关闭数据库连接
db.close()
#!/usr/bin/python3
import mysql as pymysql
# 打开数据库连接
db = pymysql.connect(host='localhost',
port=3306,
user='root',
password='zhangdapeng520',
database='gx')
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
# 使用 execute() 方法执行 SQL,如果表存在则删除
cursor.execute("drop table if exists user")
# 使用预处理语句创建表
sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"
cursor.execute(sql)
# 关闭数据库连接
db.close()
#!/usr/bin/python3
import mysql as pymysql
# 打开数据库连接
db = pymysql.connect(host='localhost',
port=3306,
user='root',
password='zhangdapeng520',
database='gx')
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
# 使用 execute() 方法执行 SQL,如果表存在则删除
cursor.execute("drop table if exists user")
# 使用预处理语句创建表
sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"
try:
# 执行sql语句
cursor.execute(sql)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
# 使用预处理语句创建表
sql = "insert into user(name,age) values(%s,%s)"
args = ["张三", 23]
try:
# 执行sql语句
cursor.execute(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
# 关闭数据库连接
db.close()
import mysql as pymysql
# 打开数据库连接
db = pymysql.connect(host='localhost',
port=3306,
user='root',
password='zhangdapeng520',
database='gx')
def execute(db, sql, args=None):
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.execute(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
# 使用 execute() 方法执行 SQL,如果表存在则删除
sql = "drop table if exists user"
execute(db, sql)
# 使用预处理语句创建表
sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"
execute(db, sql)
# 使用预处理语句创建表
sql = "insert into user(name,age) values(%s,%s)"
args = ["张三", 23]
execute(db, sql, args)
# 关闭数据库连接
db.close()
import mysql as pymysql
# 打开数据库连接
db = pymysql.connect(host='localhost',
port=3306,
user='root',
password='zhangdapeng520',
database='gx')
def execute(db, sql, args=None):
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.execute(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
def executemany(db, sql, args=None):
if type(args) is not list:
return
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.executemany(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
# 使用 execute() 方法执行 SQL,如果表存在则删除
sql = "drop table if exists user"
execute(db, sql)
# 使用预处理语句创建表
sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"
execute(db, sql)
# 使用预处理语句创建表
sql = "insert into user(name,age) values(%s,%s)"
args = [
("张三1", 23),
("张三2", 33),
("张三3", 24),
]
executemany(db, sql, args)
# 关闭数据库连接
db.close()
import mysql as pymysql
# 打开数据库连接
db = pymysql.connect(host='localhost',
port=3306,
user='root',
password='zhangdapeng520',
database='gx')
def execute(db, sql, args=None):
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.execute(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
def fetchone(db, sql, args=None):
cursor = db.cursor()
cursor.execute(sql, args)
return cursor.fetchone()
def executemany(db, sql, args=None):
if type(args) is not list:
return
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.executemany(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
# 使用 execute() 方法执行 SQL,如果表存在则删除
sql = "drop table if exists user"
execute(db, sql)
# 使用预处理语句创建表
sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"
execute(db, sql)
# 使用预处理语句创建表
sql = "insert into user(name,age) values(%s,%s)"
args = [
("张三1", 23),
("张三2", 33),
("张三3", 24),
]
executemany(db, sql, args)
# 查询
sql = "select * from user where id = %s"
args = [1]
print(fetchone(db, sql, args))
# 关闭数据库连接
db.close()
import mysql as pymysql
# 打开数据库连接
db = pymysql.connect(host='localhost',
port=3306,
user='root',
password='zhangdapeng520',
database='gx')
def execute(db, sql, args=None):
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.execute(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
def fetchone(db, sql, args=None):
cursor = db.cursor()
cursor.execute(sql, args)
return cursor.fetchone()
def fetchall(db, sql, args=None):
cursor = db.cursor()
cursor.execute(sql, args)
return cursor.fetchall()
def executemany(db, sql, args=None):
if type(args) is not list:
return
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.executemany(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
# 使用 execute() 方法执行 SQL,如果表存在则删除
sql = "drop table if exists user"
execute(db, sql)
# 使用预处理语句创建表
sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"
execute(db, sql)
# 使用预处理语句创建表
sql = "insert into user(name,age) values(%s,%s)"
args = [
("张三1", 23),
("张三2", 33),
("张三3", 24),
]
executemany(db, sql, args)
# 查询
sql = "select * from user"
print(fetchall(db, sql))
# 关闭数据库连接
db.close()
import mysql as pymysql
# 打开数据库连接
db = pymysql.connect(host='localhost',
port=3306,
user='root',
password='zhangdapeng520',
database='gx')
def execute(db, sql, args=None):
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.execute(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
def fetchone(db, sql, args=None):
cursor = db.cursor()
cursor.execute(sql, args)
return cursor.fetchone()
def fetchall(db, sql, args=None):
cursor = db.cursor()
cursor.execute(sql, args)
return cursor.fetchall()
def executemany(db, sql, args=None):
if type(args) is not list:
return
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.executemany(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
# 使用 execute() 方法执行 SQL,如果表存在则删除
sql = "drop table if exists user"
execute(db, sql)
# 使用预处理语句创建表
sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"
execute(db, sql)
# 批量新增
sql = "insert into user(name,age) values(%s,%s)"
args = [
("张三1", 23),
("张三2", 33),
("张三3", 24),
]
executemany(db, sql, args)
# 更新
sql = "update user set name=%s where id = %s"
args = ["张三333", 1]
execute(db, sql, args)
# 查询
sql = "select * from user"
print(fetchall(db, sql))
# 关闭数据库连接
db.close()
import mysql as pymysql
# 打开数据库连接
db = pymysql.connect(host='localhost',
port=3306,
user='root',
password='zhangdapeng520',
database='gx')
def execute(db, sql, args=None):
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.execute(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
def fetchone(db, sql, args=None):
cursor = db.cursor()
cursor.execute(sql, args)
return cursor.fetchone()
def fetchall(db, sql, args=None):
cursor = db.cursor()
cursor.execute(sql, args)
return cursor.fetchall()
def executemany(db, sql, args=None):
if type(args) is not list:
return
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.executemany(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
# 使用 execute() 方法执行 SQL,如果表存在则删除
sql = "drop table if exists user"
execute(db, sql)
# 使用预处理语句创建表
sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"
execute(db, sql)
# 批量新增
sql = "insert into user(name,age) values(%s,%s)"
args = [
("张三1", 23),
("张三2", 33),
("张三3", 24),
]
executemany(db, sql, args)
# 更新
sql = "update user set age=%s where id > %s"
args = [34, 1]
execute(db, sql, args)
# 查询
sql = "select * from user"
print(fetchall(db, sql))
# 关闭数据库连接
db.close()
import mysql as pymysql
# 打开数据库连接
db = pymysql.connect(host='localhost',
port=3306,
user='root',
password='zhangdapeng520',
database='gx')
def execute(db, sql, args=None):
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.execute(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
def fetchone(db, sql, args=None):
cursor = db.cursor()
cursor.execute(sql, args)
return cursor.fetchone()
def fetchall(db, sql, args=None):
cursor = db.cursor()
cursor.execute(sql, args)
return cursor.fetchall()
def executemany(db, sql, args=None):
if type(args) is not list:
return
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.executemany(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
# 使用 execute() 方法执行 SQL,如果表存在则删除
sql = "drop table if exists user"
execute(db, sql)
# 使用预处理语句创建表
sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"
execute(db, sql)
# 批量新增
sql = "insert into user(name,age) values(%s,%s)"
args = [
("张三1", 23),
("张三2", 33),
("张三3", 24),
]
executemany(db, sql, args)
# 删除
sql = "delete from user where id = %s"
args = [1]
execute(db, sql, args)
# 查询
sql = "select * from user"
print(fetchall(db, sql))
# 关闭数据库连接
db.close()
import mysql as pymysql
# 打开数据库连接
db = pymysql.connect(host='localhost',
port=3306,
user='root',
password='zhangdapeng520',
database='gx')
def execute(db, sql, args=None):
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.execute(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
def fetchone(db, sql, args=None):
cursor = db.cursor()
cursor.execute(sql, args)
return cursor.fetchone()
def fetchall(db, sql, args=None):
cursor = db.cursor()
cursor.execute(sql, args)
return cursor.fetchall()
def executemany(db, sql, args=None):
if type(args) is not list:
return
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.executemany(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
# 使用 execute() 方法执行 SQL,如果表存在则删除
sql = "drop table if exists user"
execute(db, sql)
# 使用预处理语句创建表
sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"
execute(db, sql)
# 批量新增
sql = "insert into user(name,age) values(%s,%s)"
args = [
("张三1", 23),
("张三2", 33),
("张三3", 24),
]
executemany(db, sql, args)
# 删除
sql = "delete from user where id > %s"
args = [1]
execute(db, sql, args)
# 查询
sql = "select * from user"
print(fetchall(db, sql))
# 关闭数据库连接
db.close()
这种方案,有比较严重的注入的风险。
import mysql as pymysql
# 打开数据库连接
db = pymysql.connect(host='localhost',
port=3306,
user='root',
password='zhangdapeng520',
database='gx')
def execute(db, sql, args=None):
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.execute(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
def fetchone(db, sql, args=None):
cursor = db.cursor()
cursor.execute(sql, args)
return cursor.fetchone()
def fetchall(db, sql, args=None):
cursor = db.cursor()
cursor.execute(sql, args)
return cursor.fetchall()
def executemany(db, sql, args=None):
if type(args) is not list:
return
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.executemany(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
def fetchmany(db, sql, args=None):
if type(args) is not list:
return
# select * from user where id in (1,2,3)
in_arg = ",".join([str(i) for i in args])
print("in查询SQL:", sql)
print("in查询参数:", in_arg)
sql = sql % (in_arg,)
print("in查询最终SQL:", sql)
cursor = db.cursor()
cursor.execute(sql)
return cursor.fetchall()
# 使用 execute() 方法执行 SQL,如果表存在则删除
sql = "drop table if exists user"
execute(db, sql)
# 使用预处理语句创建表
sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"
execute(db, sql)
# 批量新增
sql = "insert into user(name,age) values(%s,%s)"
args = [
("张三1", 23),
("张三2", 33),
("张三3", 24),
]
executemany(db, sql, args)
# 根据id列表查询
sql = "select * from user where id in (%s)"
args = [1, 2, 4]
print(fetchmany(db, sql, args))
# 查询
sql = "select * from user"
print(fetchall(db, sql))
# 关闭数据库连接
db.close()
核心代码:
def is_safe_id(id):
"""校验是否为安全的ID"""
return re.match(r"^\w+$", str(id)) is not None
def fetchin(db, sql, args=None):
if type(args) is not list:
return
# select * from user where id in (1,2,3)
in_arg = ",".join([str(i) for i in args if is_safe_id(i)])
print("in查询SQL:", sql)
print("in查询参数:", in_arg)
sql = sql % (in_arg,)
print("in查询最终SQL:", sql)
cursor = db.cursor()
cursor.execute(sql)
return cursor.fetchall()
完整代码:
import mysql as pymysql
import re
# 打开数据库连接
db = pymysql.connect(host='localhost',
port=3306,
user='root',
password='zhangdapeng520',
database='gx')
def execute(db, sql, args=None):
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.execute(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
def fetchone(db, sql, args=None):
cursor = db.cursor()
cursor.execute(sql, args)
return cursor.fetchone()
def fetchall(db, sql, args=None):
cursor = db.cursor()
cursor.execute(sql, args)
return cursor.fetchall()
def executemany(db, sql, args=None):
if type(args) is not list:
return
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.executemany(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
def is_safe_id(id):
"""校验是否为安全的ID"""
return re.match(r"^\w+$", str(id)) is not None
def fetchin(db, sql, args=None):
if type(args) is not list:
return
# select * from user where id in (1,2,3)
in_arg = ",".join([str(i) for i in args if is_safe_id(i)])
print("in查询SQL:", sql)
print("in查询参数:", in_arg)
sql = sql % (in_arg,)
print("in查询最终SQL:", sql)
cursor = db.cursor()
cursor.execute(sql)
return cursor.fetchall()
# 使用 execute() 方法执行 SQL,如果表存在则删除
sql = "drop table if exists user"
execute(db, sql)
# 使用预处理语句创建表
sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"
execute(db, sql)
# 批量新增
sql = "insert into user(name,age) values(%s,%s)"
args = [
("张三1", 23),
("张三2", 33),
("张三3", 24),
]
executemany(db, sql, args)
# 根据id列表查询
sql = "select * from user where id in (%s)"
args = [1, 2, 4]
print(fetchin(db, sql, args))
# 查询
sql = "select * from user"
print(fetchall(db, sql))
# 关闭数据库连接
db.close()
import mysql as pymysql
import re
# 打开数据库连接
db = pymysql.connect(host='localhost',
port=3306,
user='root',
password='zhangdapeng520',
database='gx')
def execute(db, sql, args=None):
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.execute(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
def fetchone(db, sql, args=None):
cursor = db.cursor()
cursor.execute(sql, args)
return cursor.fetchone()
def fetchall(db, sql, args=None):
cursor = db.cursor()
cursor.execute(sql, args)
return cursor.fetchall()
def executemany(db, sql, args=None):
if type(args) is not list:
return
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
try:
# 执行sql语句
cursor.executemany(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
def is_safe_id(id):
"""校验是否为安全的ID"""
return re.match(r"^\w+$", str(id)) is not None
def fetchin(db, sql, args=None):
if type(args) is not list:
return
# select * from user where id in (1,2,3)
in_arg = ",".join([str(i) for i in args if is_safe_id(i)])
print("in查询SQL:", sql)
print("in查询参数:", in_arg)
sql = sql % (in_arg,)
print("in查询最终SQL:", sql)
cursor = db.cursor()
cursor.execute(sql)
return cursor.fetchall()
def deletein(db, sql, args=None):
if type(args) is not list:
return
in_arg = ",".join([str(i) for i in args if is_safe_id(i)])
sql = sql % (in_arg,)
cursor = db.cursor()
cursor.execute(sql)
# 使用 execute() 方法执行 SQL,如果表存在则删除
sql = "drop table if exists user"
execute(db, sql)
# 使用预处理语句创建表
sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"
execute(db, sql)
# 批量新增
sql = "insert into user(name,age) values(%s,%s)"
args = [
("张三1", 23),
("张三2", 33),
("张三3", 24),
]
executemany(db, sql, args)
# 根据id列表删除
sql = "delete from user where id in (%s)"
args = [1, 2, 4]
deletein(db, sql, args)
# 查询
sql = "select * from user"
print(fetchall(db, sql))
# 关闭数据库连接
db.close()
import mysql as pymysql
import re
# 打开数据库连接
db = pymysql.connect(host='localhost',
port=3306,
user='root',
password='zhangdapeng520',
database='gx')
def execute(db, sql, args=None):
# 使用 cursor() 方法创建一个游标对象 cursor
with db.cursor() as cursor:
try:
# 执行sql语句
cursor.execute(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
def fetchone(db, sql, args=None):
with db.cursor() as cursor:
cursor.execute(sql, args)
return cursor.fetchone()
def fetchall(db, sql, args=None):
with db.cursor() as cursor:
cursor = db.cursor()
cursor.execute(sql, args)
return cursor.fetchall()
def executemany(db, sql, args=None):
if type(args) is not list:
return
with db.cursor() as cursor:
try:
# 执行sql语句
cursor.executemany(sql, args)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
def is_safe_id(id):
"""校验是否为安全的ID"""
return re.match(r"^\w+$", str(id)) is not None
def fetchin(db, sql, args=None):
if type(args) is not list:
return
# select * from user where id in (1,2,3)
in_arg = ",".join([str(i) for i in args if is_safe_id(i)])
print("in查询SQL:", sql)
print("in查询参数:", in_arg)
sql = sql % (in_arg,)
print("in查询最终SQL:", sql)
cursor = db.cursor()
cursor.execute(sql)
return cursor.fetchall()
def deletein(db, sql, args=None):
if type(args) is not list:
return
in_arg = ",".join([str(i) for i in args if is_safe_id(i)])
sql = sql % (in_arg,)
cursor = db.cursor()
cursor.execute(sql)
# 使用 execute() 方法执行 SQL,如果表存在则删除
sql = "drop table if exists user"
execute(db, sql)
# 使用预处理语句创建表
sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"
execute(db, sql)
# 批量新增
sql = "insert into user(name,age) values(%s,%s)"
args = [
("张三1", 23),
("张三2", 33),
("张三3", 24),
]
executemany(db, sql, args)
# 根据id列表删除
sql = "delete from user where id in (%s)"
args = [1, 2, 4]
deletein(db, sql, args)
# 查询
sql = "select * from user"
print(fetchall(db, sql))
# 关闭数据库连接
db.close()