# 表
class ProductCheckBillMod(Base):
__tablename__ = 'sc_product_check'
id = Column(Integer, primary_key=True)
bill_no = Column(String)
class ProductCheckBillMod(Base):
__tablename__ = 'sc_product_check_detail'
id = Column(Integer, primary_key=True)
# 查询
task_obj = db.session.query(*DBModels).join(*joins).filter(*filters).order_by(*orderbys).slice((page_index - 1) * page_size,page_index * page_size).all()
情况 1 DBModels = [ProductCheckBillMod,ProductCheckBillMod]
情况 2 DBModels = [ProductCheckBillMod.id,ProductCheckBillMod.bill_no,ProductCheckBillMod]
def queryToDict(models,fsModel=None):
if fsModel == None:
fsModel = Model
if isinstance(fsModel,list):
fsModel=fsModel[0]
if (isinstance(models, list)):
if not len(models):
return []
if (isinstance(models[0], fsModel)):
lst = []
for model in models:
gen = model_to_dict(model)
dit = dict((g[0], g[1]) for g in gen)
lst.append(dit)
return lst
else:
res = result_to_dict(models)
return res
else:
if (isinstance(models, fsModel)):
gen = model_to_dict(models)
dit = dict((g[0], g[1]) for g in gen)
return dit
else:
res = dict(zip(models.keys(), models))
find_datetime(res)
return res
# 当结果为 result 对象列表时,result 有 key()方法
def result_to_dict(results):
res = [dict(zip(r.keys(), r)) for r in results]
# 这里 r 为一个字典,对象传递直接改变字典属性
for r in res:
find_datetime(r)
return res
def model_to_dict(model): # 这段来自于参考资源
for col in model.__table__.columns:
if isinstance(col.type, DateTime):
value = convert_datetime(getattr(model, col.name))
elif isinstance(col.type, Numeric):
value = float(getattr(model, col.name))
else:
value = getattr(model, col.name)
yield (col.name, value)
def find_datetime(value):
pass
def convert_datetime(value):
pass
核心来自 https://www.cnblogs.com/eating-gourd/p/9997751.html 重写了 result_to_dict 函数
# 当结果为result对象列表时一般为多表或多列查询结果
def result_to_dict(results):
res=[]
for result in results:
row_d = dict()
for attr in dir(result):
if(not attr.startswith('_') and attr not in ['count','index','keys']):
value = getattr(result, attr)
if "_sa_instance_state" in dir(value):# 通过dir中是否含有_sa_instance_state判定是否为Model类
gen = model_to_dict(value)
dit = dict((g[0], g[1]) for g in gen)
row_d.update(dit) # 更新到行字典中
else:
row_d[attr] = getattr(result, attr) # 通过getattr获取值添加到字典
res.append(d)
for r in res:
find_datetime(r)
return res
1
18870715400 2020-10-29 15:02:59 +08:00
|
2
luxiaoer OP @18870715400
感谢回复,我查询后的 task_obj 是一个 result 对象,尝试使用时提示 AttributeError: 'result' object has no attribute '__table__' |
3
18870715400 2020-10-29 21:46:56 +08:00
@luxiaoer 我 使用的是 sqlalchemy 同样都是转换成 字典, 你可以 dir 一下看一下应该有类似的方法, 或者直接去官方文档看一下应该有
https://www.cnblogs.com/eating-gourd/p/9997751.html 参考 |
4
luxiaoer OP @18870715400 是的,我的核心代码部分就是 copy 的这部分。
已解决 |
5
luxiaoer OP 回车快了,贴下解决代码部分
重写了 result_to_dict 函数,希望可以帮到其他小伙伴 # 当结果为 result 对象列表时一般为多表或多列查询结果 def result_to_dict(results): res=[] for result in results: row_d = dict() for attr in dir(result): if(not attr.startswith('_') and attr not in ['count','index','keys']): value = getattr(result, attr) if "_sa_instance_state" in dir(value):# 通过 dir 中是否含有_sa_instance_state 判定是否为 Model 类 gen = model_to_dict(value) dit = dict((g[0], g[1]) for g in gen) row_d.update(dit) # 更新到行字典中 else: row_d[attr] = getattr(result, attr) # 通过 getattr 获取值添加到字典 res.append(d) for r in res: find_datetime(r) return res |