明道云优势在于上传和企业内部流程的系统化,换句话说,是产生数据的部分。
SmartBi优势在于生成报表和即席查询等,是使用沉淀数据的部分。
如果能将这两部分的数据打通,可以大大扩展企业数字化框架的覆盖范围。让这两套平台协同发挥1+1远大于2的作用。
目前总共有两种方法,一种是SmartBi提供的官方接口。一种是我自研的方法。
官方接口建立的连接,严格来说只是批量。制定一个时间段间隔去跑抽取程序,将指定明道云APP的数据抽取到SmartBi的高速缓存。
我的自研方法,是在明道云上传时将数据同步到自己的企业内数据库(我用的MySQL),SmartBi直连MySQL数据库,从而实现联通。
两种方法,前者好在不用自己写服务,弱点在于无法获得实时的最新数据,对数据的预处理可能也不方便。
后者由于已经写了通用服务,如果大家直接用我的源码也是不用重复开发的。弱点在于上传后记得点一下同步按钮,优点在于自主自由,可以在MySQL中以视图、存储过程等方式做前置的预处理,获得实时数据的时点也不受抽取批量执行时间间隔的限制。
我的同步源码是用python flask写的视图函数。这是一套通用源码,任何APP都可以利用此套服务将APP内的表一键同步到自有MySQL数据库中。无须重复开发。
同步源码:
from flask import Flask, request
import requests as rq
import pandas as pd
from sqlalchemy import create_engine,exc
import json
app = Flask(__name__)
@app.route('/Mindao_cloud_common_upload', methods=['POST', 'GET'])
def Mindao_cloud_common_upload():
appKey = request.args.get('appKey')
sign = request.args.get('sign')
result = syncDB(appKey, sign)
return result
def syncDB(appKey, sign):
try:
result_list = []
table_id_list = getAppTables(appKey, sign)
if len(table_id_list) == 0:
return "No table exist in current app!"
else:
for table_id in table_id_list:
result = fetchTableRows(appKey, sign, table_id)
result_list.append(result)
return str(result_list)
except Exception as e:
return e
def getAppTables(appKey, sign):
try:
result = rq.get(url=f"http://明道云服务IP地址/api/v1/open/app/get?appKey={appKey}&sign={sign}")
result_dict = result.json()
table_dict = result_dict['data']['sections'][0]['items']
table_id_list = []
for table_item in table_dict:
if table_item['type'] == 0:
table_id_list.append(table_item['alias'])
return table_id_list
except Exception as e:
return e
def fetchTableRows(appKey, sign, table_id):
try:
ini_index = 0
row_num = 1000
page_size = 1000
total_df = pd.DataFrame()
while row_num == page_size:
ini_index = ini_index + 1
params = {
"appKey": appKey,
"sign": sign,
"worksheetId": table_id,
"viewId": "",
"pageSize": page_size,
"pageIndex": ini_index,
"sortId": "",
"isAsc": ""
}
result = rq.post(url="http://明道云服务IP地址/api/v2/open/worksheet/getFilterRows", json=params)
result_dict = result.json()
if result_dict['data']['total'] > 0:
rows = result_dict['data']['rows']
row_num = len(rows)
rows_list = []
for row in rows:
sorted_items = sorted(row.items())
sorted_keys, sorted_values = zip(*sorted_items)
rows_list.append(sorted_values)
rows_df = pd.DataFrame(rows_list,columns=sorted_keys)
total_df = pd.concat([total_df, rows_df], ignore_index=True)
result = insert_into_table(total_df, table_id)
else:
result = f"{table_id} no records!"
break
except Exception as e:
result = e
return result
def insert_into_table(dataframe, table_name):
try:
database_url = 'mysql://root:x5@127.0.0.1:3306/mingdaocloud'
# 创建数据库连接引擎
engine = create_engine(database_url)
# 写入数据到数据库表
try:
dataframe.to_sql(name=table_name, con=engine, if_exists='replace', index=False)
except exc.IntegrityError as e:
# 处理数据库完整性错误,例如违反主键约束
return (f"数据库完整性错误: {e}")
except exc.OperationalError as e:
# 处理数据库操作错误,例如表不存在
return (f"数据库操作错误: {e}")
# 在这里创建新表
table_name.__table__.create(bind=engine)
return ("已创建新表。")
except Exception as e:
# 处理其他类型的异常
return (f"发生了其他类型的错误: {e}")
finally:
engine.dispose()
return f"{table_name}全量同步已完成"
except Exception as e:
return e
if __name__ == '__main__':
app.run(debug=True, host="0.0.0.0", port=5050)
这个源码具体做的事情,就是先获得当前APP所有表,接着再去拿到每张表的所有数据,这里存在一个分页的问题,也处理了一些错位的可能性。原理上就是把所有数据塞进一个Dataframe,然后再全量更新到MySQL数据库中。
如果有一些自己的需求,可以在此基础上修改。这套代码的通用性等我在自己的应用中都做过测试。