本篇文档配套有完整的视频教程和源码,大家感兴趣的可以留言或者私信我。
学习目标:
目录:
01 概述
02 部署RestClient工具
03 发送GET请求
04 发送POST请求
05 发送PUT请求
06 发送DELETE请求
07 发送PATCH请求
08 Docker部署MySQL8
09 创建数据库和表
10 实现新增用户接口
11 实现查询所有用户接口
12 实现根据ID查询用户接口
13 实现根据ID修改用户接口
14 实现根据ID删除用户接口
15 总结
第一步:加载镜像
docker load -i .\data\rest_client_v1.tar
第二步:启动镜像
docker run -d --name rest_client --restart=always -p 10001:80 rest
第一步:安装zdppy_api框架
pip install .\data\zdppy_api-0.1.0.tar.gz
pip install uvicorn
第二步:编写接口
from api import resp, Api, middleware
app = Api(
routes=[resp.json_route("/user", [{"id": 1, "name": "张三"}])],
middleware=[middleware.cors()]
)
if __name__ == '__main__':
import uvicorn
uvicorn.run("main:app")
第三步:运行服务,使用RestClient发送请求
{
"method":"get",
"url": "http://localhost:8000/user"
}
第一步:编写接口
from api import req, resp, Api, middleware
async def post_method(r):
"""新增用户"""
# 获取用户传递过来的用户信息
user = await req.get_json(r)
# 响应
return resp.success(user)
# 用户相关的路由
user_routes = [
resp.post("/user", post_method)
]
app = Api(
routes=user_routes,
middleware=[middleware.cors()]
)
if __name__ == '__main__':
import uvicorn
uvicorn.run("main:app")
第二步:使用RestClient进行测试
{
"method":"post",
"url": "http://localhost:8000/user",
"data":{
"name":"张三"
}
}
第一步:编写接口
from api import req, resp, Api, middleware
async def add_user(r):
user = await req.get_json(r)
print("....", user)
return resp.success(user)
async def update_user(r):
uid = req.get_path(r, "uid")
user = await req.get_json(r)
user["id"] = uid
return resp.success(user)
app = Api(
routes=[
resp.post("/user", add_user),
resp.put("/user/{uid}", update_user),
],
middleware=[middleware.cors()]
)
if __name__ == '__main__':
import uvicorn
uvicorn.run("main:app")
第二步:使用RestClient进行测试
{
"method":"put",
"url": "http://localhost:8000/user/1",
"data":{
"name":"张三"
}
}
第一步:编写接口
from api import req, resp, Api, middleware
async def add_user(r):
user = await req.get_json(r)
print("....", user)
return resp.success(user)
async def update_user(r):
uid = req.get_path(r, "uid")
user = await req.get_json(r)
user["id"] = uid
return resp.success(user)
async def delete_user(r):
uid = req.get_path(r, "uid")
return resp.success({"id": uid})
app = Api(
routes=[
resp.post("/user", add_user),
resp.put("/user/{uid}", update_user),
resp.delete("/user/{uid}", delete_user),
],
middleware=[middleware.cors()]
)
if __name__ == '__main__':
import uvicorn
uvicorn.run("main:app")
第二步:发送请求
{
"method":"delete",
"url": "http://localhost:8000/user/1"
}
第一步:编写接口
from api import req, resp, Api, middleware
async def add_user(r):
user = await req.get_json(r)
print("....", user)
return resp.success(user)
async def update_user(r):
uid = req.get_path(r, "uid")
user = await req.get_json(r)
user["id"] = uid
return resp.success(user)
async def delete_user(r):
uid = req.get_path(r, "uid")
return resp.success({"id": uid})
app = Api(
routes=[
resp.post("/user", add_user),
resp.patch("/user/{uid}", update_user),
resp.delete("/user/{uid}", delete_user),
],
middleware=[middleware.cors()]
)
if __name__ == '__main__':
import uvicorn
uvicorn.run("main:app")
第二步:发送请求
{
"method":"patch",
"url": "http://localhost:8000/user/1",
"data": {
"name": "张三"
}
}
拉取镜像:
docker pull mysql:8.0.25
创建容器:
docker run --name mysql -p 3306:3306 --restart=always -e MYSQL_ROOT_PASSWORD=zhangdapeng520 -d mysql:8.0.25
配置信息如下:
from mysql.db_object import Database
db = Database(password="zhangdapeng520")
# 创建数据库
db.add_database_force("user")
db = Database(password="zhangdapeng520", database="user")
# 创建表
user_table_sql = "create table user(id int primary key auto_increment, name varchar(36))"
db.execute(user_table_sql)
# 查看所有的表
print(db.get_all_table())
from api import req, resp, Api, middleware
from mysql.db_object import Database
# 创建db对象
db = Database(password="zhangdapeng520", database="user")
table = "user"
columns = ["name"]
async def add_user(r):
"""新增用户"""
# 获取用户传递过来的用户信息
user = await req.get_json(r)
db.add(table, columns, [user.get("name")])
# 响应
return resp.success(user)
async def update_user(r):
"""修改用户"""
uid = req.get_path(r, "uid")
# 获取用户传递过来的用户信息
user = await req.get_json(r)
user["id"] = uid
# 响应
return resp.success(user)
async def delete_user(r):
"""删除用户"""
uid = req.get_path(r, "uid")
# 响应
return resp.success({"uid": uid})
app = Api(
routes=[
resp.post("/user", add_user),
resp.patch("/user/{uid}", update_user),
resp.delete("/user/{uid}", delete_user),
],
middleware=[middleware.cors()]
)
if __name__ == '__main__':
import uvicorn
uvicorn.run("main:app")
第一步:定义接口方法
async def get_user(r):
"""获取用户"""
users = db.get_all(table)
return resp.success(users)
第二步:注册接口路由
app = Api(
routes=[
resp.get("/user", get_user),
resp.post("/user", add_user),
resp.patch("/user/{uid}", update_user),
resp.delete("/user/{uid}", delete_user),
],
middleware=[middleware.cors()]
)
第三步:启动服务,并使用RestClient进行测试
{
"method":"get",
"url": "http://localhost:8000/user"
}
完整示例代码:
from api import req, resp, Api, middleware
from mysql.db_object import Database
# 创建db对象
db = Database(password="zhangdapeng520", database="user")
table = "user"
columns = ["name"]
async def add_user(r):
"""新增用户"""
# 获取用户传递过来的用户信息
user = await req.get_json(r)
db.add(table, columns, [user.get("name")])
# 响应
return resp.success(user)
async def get_user(r):
"""获取用户"""
users = db.get_all(table)
return resp.success(users)
async def update_user(r):
"""修改用户"""
uid = req.get_path(r, "uid")
# 获取用户传递过来的用户信息
user = await req.get_json(r)
user["id"] = uid
# 响应
return resp.success(user)
async def delete_user(r):
"""删除用户"""
uid = req.get_path(r, "uid")
# 响应
return resp.success({"uid": uid})
app = Api(
routes=[
resp.get("/user", get_user),
resp.post("/user", add_user),
resp.patch("/user/{uid}", update_user),
resp.delete("/user/{uid}", delete_user),
],
middleware=[middleware.cors()]
)
if __name__ == '__main__':
import uvicorn
uvicorn.run("main:app")
第一步:编写根据ID获取用户的接口
async def get_user_by_id(r):
"""根据id查询用户"""
uid = req.get_path(r, "uid")
user = db.get_by_id(table, uid)
return resp.success(user)
第二步:注册接口
app = Api(
routes=[
resp.get("/user", get_user),
resp.post("/user", add_user),
resp.patch("/user/{uid}", update_user),
resp.delete("/user/{uid}", delete_user),
resp.get("/user/{uid}", get_user_by_id),
],
middleware=[middleware.cors()]
)
第三步:使用RestClient测试接口
{
"method":"get",
"url": "http://localhost:8000/user/3"
}
完整示例代码:
from api import req, resp, Api, middleware
from mysql.db_object import Database
# 创建db对象
db = Database(password="zhangdapeng520", database="user")
table = "user"
columns = ["name"]
async def add_user(r):
"""新增用户"""
# 获取用户传递过来的用户信息
user = await req.get_json(r)
db.add(table, columns, [user.get("name")])
# 响应
return resp.success(user)
async def get_user(r):
"""获取用户"""
users = db.get_all(table)
return resp.success(users)
async def update_user(r):
"""修改用户"""
uid = req.get_path(r, "uid")
# 获取用户传递过来的用户信息
user = await req.get_json(r)
user["id"] = uid
# 响应
return resp.success(user)
async def delete_user(r):
"""删除用户"""
uid = req.get_path(r, "uid")
# 响应
return resp.success({"uid": uid})
async def get_user_by_id(r):
"""根据id查询用户"""
uid = req.get_path(r, "uid")
user = db.get_by_id(table, uid)
return resp.success(user)
app = Api(
routes=[
resp.get("/user", get_user),
resp.post("/user", add_user),
resp.patch("/user/{uid}", update_user),
resp.delete("/user/{uid}", delete_user),
resp.get("/user/{uid}", get_user_by_id),
],
middleware=[middleware.cors()]
)
if __name__ == '__main__':
import uvicorn
uvicorn.run("main:app")
第一步:编写修改用户接口
async def update_user(r):
"""修改用户"""
uid = req.get_path(r, "uid")
user = await req.get_json(r)
user["id"] = uid
db.update(table, uid, columns, [user.get("name")])
return resp.success(user)
第二步:注册接口
app = Api(
routes=[
resp.get("/user", get_user),
resp.post("/user", add_user),
resp.put("/user/{uid}", update_user),
resp.delete("/user/{uid}", delete_user),
resp.get("/user/{uid}", get_user_by_id),
],
middleware=[middleware.cors()]
)
第三步:使用RestClient测试接口
{
"method":"put",
"url": "http://localhost:8000/user/3",
"data": {
"name": "王五333"
}
}
第四步:校验数据是否修改成功
{
"method":"get",
"url": "http://localhost:8000/user/3"
}
完整示例代码:
from api import req, resp, Api, middleware
from mysql.db_object import Database
# 创建db对象
db = Database(password="zhangdapeng520", database="user")
table = "user"
columns = ["name"]
async def add_user(r):
"""新增用户"""
# 获取用户传递过来的用户信息
user = await req.get_json(r)
db.add(table, columns, [user.get("name")])
# 响应
return resp.success(user)
async def get_user(r):
"""获取用户"""
users = db.get_all(table)
return resp.success(users)
async def update_user(r):
"""修改用户"""
uid = req.get_path(r, "uid")
user = await req.get_json(r)
user["id"] = uid
db.update(table, uid, columns, [user.get("name")])
return resp.success(user)
async def delete_user(r):
"""删除用户"""
uid = req.get_path(r, "uid")
# 响应
return resp.success({"uid": uid})
async def get_user_by_id(r):
"""根据id查询用户"""
uid = req.get_path(r, "uid")
user = db.get_by_id(table, uid)
return resp.success(user)
app = Api(
routes=[
resp.get("/user", get_user),
resp.post("/user", add_user),
resp.put("/user/{uid}", update_user),
resp.delete("/user/{uid}", delete_user),
resp.get("/user/{uid}", get_user_by_id),
],
middleware=[middleware.cors()]
)
if __name__ == '__main__':
import uvicorn
uvicorn.run("main:app")
第一步:编写接口
async def delete_user(r):
"""删除用户"""
uid = req.get_path(r, "uid")
db.delete(table, uid)
return resp.success()
第二步:注册接口
app = Api(
routes=[
resp.get("/user", get_user),
resp.post("/user", add_user),
resp.put("/user/{uid}", update_user),
resp.delete("/user/{uid}", delete_user),
resp.get("/user/{uid}", get_user_by_id),
],
middleware=[middleware.cors()]
)
第三步:测试接口
{
"method":"delete",
"url": "http://localhost:8000/user/3"
}
第四步:使用查询所有接口,测试删除是否成功
{
"method":"get",
"url": "http://localhost:8000/user"
}
完整示例代码:
from api import req, resp, Api, middleware
from mysql.db_object import Database
# 创建db对象
db = Database(password="zhangdapeng520", database="user")
table = "user"
columns = ["name"]
async def add_user(r):
"""新增用户"""
# 获取用户传递过来的用户信息
user = await req.get_json(r)
db.add(table, columns, [user.get("name")])
# 响应
return resp.success(user)
async def get_user(r):
"""获取用户"""
users = db.get_all(table)
return resp.success(users)
async def update_user(r):
"""修改用户"""
uid = req.get_path(r, "uid")
user = await req.get_json(r)
user["id"] = uid
db.update(table, uid, columns, [user.get("name")])
return resp.success(user)
async def delete_user(r):
"""删除用户"""
uid = req.get_path(r, "uid")
db.delete(table, uid)
return resp.success()
async def get_user_by_id(r):
"""根据id查询用户"""
uid = req.get_path(r, "uid")
user = db.get_by_id(table, uid)
return resp.success(user)
app = Api(
routes=[
resp.get("/user", get_user),
resp.post("/user", add_user),
resp.put("/user/{uid}", update_user),
resp.delete("/user/{uid}", delete_user),
resp.get("/user/{uid}", get_user_by_id),
],
middleware=[middleware.cors()]
)
if __name__ == '__main__':
import uvicorn
uvicorn.run("main:app")
1、学会了编写不同请求方法的接口
2、学会了操作数据库,对用户做增删改查
3、学会了开发REST风格的接口,比如用户的增删改查接口
后续建议: