当系统数据越来越多的时候,查询变得缓慢,即使加了索引,由于表数据的增加,索引的维护也会成为数据库性能的限制问题,所以此时可以通过分表,将数据通过某种准则分别存储到不同的表中,以实现缓解单表的压力。
分表的方法大部分都是通过主键id取数据库分表个数的余。最简单的方法就是定义多个Model对象,然后通过一个map的映射,当我们获取具体的Model的时候,直接就通过id与表个数取余,然后通过map的映射我们可以获取到具体的表model对象。
class User(models.Model):
user_id = models.IntegerField(primary_key=True)
user_name = models.CharField(max_length=256)
password = models.CharField(max_length=256)
class User0(User):
class Meta:
db_table = 'user_0'
class User1(User):
class Meta:
db_table = 'user_1'
class User2(User):
class Meta:
db_table = 'user_2'
user_model_map = {
0: User0,
1: User1,
2: User2,
}
table_num = 3
# 获取表的model
def get_user_db_model(user_id):
key = user_id % table_num
return user_model_map[key]
但如果表数量躲到几百甚至上千,这种方案就很不使用。
所以我们使用第二种方案,首先我们需要思考为什么在我们使用User.objects.first()的时候,Django可以找到具体的User表,这是因为设置了Meta.db_table具体名称,django可以找到具体的表进行数据的查询。
如果是多张表呢,我们可以想到,可以动态的创建多个表的具体名称,这样的话,在我们程序运行过程中,只要我们传入一个id,此时可以动态的获取到具体的表的名称,就可以找到具体的表,实现具体的分表作用。
要想实现动态数据变更,就需要使用到反射,Python原生支持反射的,我们可以使用type完成新类的创建。实际上主要的功能就是获取表model的时候动态修改内部的Meta.db_table部分的信息即可。
SHARD_TABLE_NUM = 3
class User(models.Model):
@classmethod
def get_student_db_model(cls, user_id=None):
suffix = user_id % SHARD_TABLE_NUM
table_name = 'user_%s' % suffix
# construct a private field _user_model_dict to store the model,
# it can help to accelerate the speed of get model
if table_name in cls._user_model_dict:
return cls._user_model_dict[table_name]
class Meta:
db_table = table_name
attrs = {
'__module__': cls.__module__,
'Meta': Meta
}
# the first param is obejct name
# second: class
# the class field info
user_db_model = type(str('User_%s') % suffix, (cls,), attrs)
cls._user_model_dict[table_name] = user_db_model
return user_db_model
_user_model_dict = {} # help to reuse the model object
id = models.AutoField(primary_key=True)
user_name = models.CharField(max_length=256)
password = models.CharField(max_length=256)
class Meta:
abstract = True
上面的代码通过编写一个类方法,先获取到表的名称,具体的获取方法就是采用id取余的方法来实现的。在获取到表名之后,通过class Meta对象,最后通过attrs的属性装配,最后由type实现新类的构建。
def __init__(cls, what, bases=None, dict=None):
# known special case of type.__init__
"""
type(object) -> the object's type
type(name, bases, dict) -> a new type # 构建新的type即类对象
# (copied from class doc)
"""
pass
最后就可以实现具体的分表操作,但是需要注意,当我们插入的时候,因为需要提前知道id,这个时候最好采用一个辅助的Model类来协助id的生成,便于我们后续具体分表的实施,具体表的设计如下:
# the sharding table of Django
class UserIndex(models.Model):
id = models.AutoField(primary_key=True)
class Meta:
db_table = 'user_index_tab'
通过某一张表的一个字段password_code
进行动态的创建models类加入app的model应用中心去
#动态创建模型
def create_task_detail_model(password_code):
if password_code:
classname = f'TaskDetail_{password_code}'
class Meta:
db_table = classname
model_class = type(classname, (models.Model,), {
'task': models.ForeignKey(Task, on_delete=models.CASCADE, related_name='details', verbose_name='任务'),
'user_id': models.IntegerField(verbose_name='用户ID'),
'created_time': models.DateTimeField(auto_now_add=True, verbose_name='创建时间'),
'is_complet': models.BooleanField(default=False, verbose_name='是否完成'),
'__str__': lambda self: f"{self.task.password_code} - {self.user_id}",
'Meta': Meta,
'__module__': __name__,
})
# Register the model with the app's registry
apps.all_models['your_app_name'][classname] = model_class
return model_class
else:
classname = f'TaskDetail'
class Meta:
db_table = classname
model_class = type(classname, (models.Model,), {
'task': models.ForeignKey(Task, on_delete=models.CASCADE, related_name='details', verbose_name='任务'),
'user_id': models.IntegerField(verbose_name='用户ID'),
'created_time': models.DateTimeField(auto_now_add=True, verbose_name='创建时间'),
'is_complet': models.BooleanField(default=False, verbose_name='是否完成'),
'__str__': lambda self: f"{self.task.password_code} - {self.user_id}",
'Meta': Meta,
'__module__': __name__,
})
# Register the model with the app's registry
apps.all_models['your_app_name'][classname] = model_class
return model_class
动态的使用模型类
model_class = apps.get_model('your_app_name', class_name)
开启服务就加入到apps中的model中就使用信号机制
在signalstask.py中配置自动装配逻辑(前提是在对应的表存在的情况下装配的)
@receiver(startup_complete)
def create_dynamic_models(sender, **kwargs):
Task = apps.get_model('your_app_name', 'Task') # Replace with the actual name of your app
# Iterate over existing tasks and create corresponding models
for task in Task.objects.all():
create_task_detail_model(task.password_code)
apps.py中配置
from django.apps import AppConfig
class TaskmanageConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'your_app_name'
def ready(self):
import your_app_name.signalstask # 导入信号文件以进行连接
post_migrate.connect(self.on_startup_complete, sender=self)
def on_startup_complete(self, sender, **kwargs):
from your_app_name.signalstask import startup_complete
# 在这里发送服务启动完成的信号
startup_complete.send(sender=self)
分库的方案有两种,
采取路由的方式设置具体的app_label,按照app_label的不同分配到不同的库
采用自己编写方法,在具体的Model中配置好相应的库从而实现分库
DB路由方式,设置一个DB路由的规则,根据规则的不同,对应的数据库的读写会映射到不同的库上面,以User为例,假设sharding项目的数据存储到sharding库中,sharding_one项目的数据存储到sharding_one数据库中,此时可以编写一个DbRouter,如下
class DbRouter(object):
def db_for_read(self, model, **hints):
if model._meta.app_label == 'sharding_one': # 判断app_label标签,这个可以在model中设置
return 'sharding_one'
return None
def db_for_write(self, model, **hints):
if model._meta.app_label == 'sharding_one':
return 'sharding_one'
return None
为什么不为sharding_one就返回None呢,因为这里仅有两个项目,一个对应着sharding库,一个对应着sharding_one库,而django程序有个默认库default,返回None的话表示直接对应sharding库。
具体的Model中的设置:
# sharding_one
class Meta:
abstract = True
app_label = 'sharding_one'
这样最后只需要在不同的项目中执行数据读取或者写入操作,库的切换就通过DB路由切换到了设置的库,从而实现了分库的操作。
采用Config配置方式是一种比较方便的形式,即在Model中配置一个内部类Config,而Config中的属性则对映射具体的写数据和读数据的库操作。
这种方式的话需要写一个获取DB节点的操作,可以使用objects.using(‘db_name’)指定连接具体的数据库。
首先在model中添加具体的Config配置
class Config:
db_for_read = 'sharding_one'
db_for_write = 'sharding_one'
写一个获取db节点的方法
这个方法先从Config中获取到具体的数据库节点名称,在指定自己使用当前的库
def get_db_obj(objects):
db_name = objects.model.Config.db_for_write
return objects.using(db_name)
最后写入和查询的代码分别如下:
def search_user_service(user_id):
# get_student_db_model()就是前面的分表方案中的设计,获取到具体的指定表的model信息,然后从中获取到objects
# 然后通过这个表对象,指定具体的库即可,最后从库中查询出具体的数据
obj = get_db_obj(User.get_student_db_model(user_id).objects)
user = obj.first()
return user
def insert_user_service(user_name, password):
index = UserIndex() # 这里是因为分表之后,需要插入数据的话,怎么提前获得到主键呢,采用user_index表获取一个自增的主键,或者按照自己设计的方式生成的主键
index.save()
user_id = index.id
# 这里与之前一样,但是由于是插入,所以对象构造成功之后需要取出内部的model进行数据的填充操作
model = get_db_obj(User.get_student_db_model(user_id=user_id).objects).model(user_id, user_name, password)
try:
model.save() # 最后数据保存即可
except Exception as e:
print "[insert_user] exception error %s" % e
通过上面的方式我们可以实现具体的分库分表操作,这种方式比较有效,对于分库较多的操作,比如写主库,读从库的操作可以通过这个Config配置直接可以实现,而获取db的时候,需要多补写几个方法,就是指定使用db库的方法,这样看起来会更加的直观!