【明道云】【企业数字化】如何将明道云的数据直连SmartBi使用

发布时间:2024年01月23日

【背景】

明道云优势在于上传和企业内部流程的系统化,换句话说,是产生数据的部分。
SmartBi优势在于生成报表和即席查询等,是使用沉淀数据的部分。
如果能将这两部分的数据打通,可以大大扩展企业数字化框架的覆盖范围。让这两套平台协同发挥1+1远大于2的作用。

【分析】

目前总共有两种方法,一种是SmartBi提供的官方接口。一种是我自研的方法。

官方接口建立的连接,严格来说只是批量。制定一个时间段间隔去跑抽取程序,将指定明道云APP的数据抽取到SmartBi的高速缓存。
在这里插入图片描述
我的自研方法,是在明道云上传时将数据同步到自己的企业内数据库(我用的MySQL),SmartBi直连MySQL数据库,从而实现联通。

两种方法,前者好在不用自己写服务,弱点在于无法获得实时的最新数据,对数据的预处理可能也不方便。

后者由于已经写了通用服务,如果大家直接用我的源码也是不用重复开发的。弱点在于上传后记得点一下同步按钮,优点在于自主自由,可以在MySQL中以视图、存储过程等方式做前置的预处理,获得实时数据的时点也不受抽取批量执行时间间隔的限制。

【同步功能源码】

我的同步源码是用python flask写的视图函数。这是一套通用源码,任何APP都可以利用此套服务将APP内的表一键同步到自有MySQL数据库中。无须重复开发。
同步源码:

from flask import Flask, request
import requests as rq
import pandas as pd
from sqlalchemy import create_engine,exc
import json

app = Flask(__name__)


@app.route('/Mindao_cloud_common_upload', methods=['POST', 'GET'])
def Mindao_cloud_common_upload():
    appKey = request.args.get('appKey')
    sign = request.args.get('sign')
    result = syncDB(appKey, sign)
    return result


def syncDB(appKey, sign):
    try:
        result_list = []
        table_id_list = getAppTables(appKey, sign)
        if len(table_id_list) == 0:
            return "No table exist in current app!"
        else:
            for table_id in table_id_list:
                result = fetchTableRows(appKey, sign, table_id)
                result_list.append(result)
            return str(result_list)
    except Exception as e:
        return e


def getAppTables(appKey, sign):
    try:
        result = rq.get(url=f"http://明道云服务IP地址/api/v1/open/app/get?appKey={appKey}&sign={sign}")
        result_dict = result.json()
        table_dict = result_dict['data']['sections'][0]['items']
        table_id_list = []
        for table_item in table_dict:
            if table_item['type'] == 0:
                table_id_list.append(table_item['alias'])
        return table_id_list
    except Exception as e:
        return e


def fetchTableRows(appKey, sign, table_id):
    try:
        ini_index = 0
        row_num = 1000
        page_size = 1000
        total_df = pd.DataFrame()

        while row_num == page_size:
            ini_index = ini_index + 1
            params = {
                "appKey": appKey,
                "sign": sign,
                "worksheetId": table_id,
                "viewId": "",
                "pageSize": page_size,
                "pageIndex": ini_index,
                "sortId": "",
                "isAsc": ""
            }
            result = rq.post(url="http://明道云服务IP地址/api/v2/open/worksheet/getFilterRows", json=params)
            result_dict = result.json()
            if result_dict['data']['total'] > 0:
                rows = result_dict['data']['rows']
                row_num = len(rows)
                rows_list = []
                for row in rows:
                    sorted_items = sorted(row.items())
                    sorted_keys, sorted_values = zip(*sorted_items)
                    rows_list.append(sorted_values)
                rows_df = pd.DataFrame(rows_list,columns=sorted_keys)
                total_df = pd.concat([total_df, rows_df], ignore_index=True)
                result = insert_into_table(total_df, table_id)
            else:
                result = f"{table_id} no records!"
                break

    except Exception as e:
        result = e

    return result


def insert_into_table(dataframe, table_name):
    try:
        database_url = 'mysql://root:x5@127.0.0.1:3306/mingdaocloud'
        # 创建数据库连接引擎
        engine = create_engine(database_url)

        # 写入数据到数据库表
        try:
            dataframe.to_sql(name=table_name, con=engine, if_exists='replace', index=False)
        except exc.IntegrityError as e:
            # 处理数据库完整性错误,例如违反主键约束
            return (f"数据库完整性错误: {e}")

        except exc.OperationalError as e:
            # 处理数据库操作错误,例如表不存在
            return (f"数据库操作错误: {e}")
            # 在这里创建新表
            table_name.__table__.create(bind=engine)
            return ("已创建新表。")

        except Exception as e:
            # 处理其他类型的异常
            return (f"发生了其他类型的错误: {e}")

        finally:
            engine.dispose()

        return f"{table_name}全量同步已完成"
    except Exception as e:
        return e



if __name__ == '__main__':
    app.run(debug=True, host="0.0.0.0", port=5050)

【源码用法】

  1. 搭建Flask服务,配合tornado运行上面的源码。
  2. 明道云方面在自定义界面新建一个按钮,操作选择打开链接,配置链接和目标传参。参数这里只要两个,就是本App的sign和appKey。链接就是你服务所在服务器地址+视图函数,这里是http://服务器IP:5050/Mindao_cloud_common_upload。
    在这里插入图片描述
  3. 在MySQL数据库后台要先创建好一个名为mingdaocloud的数据库。
  4. 为了让自动创建的表和字段的名字有意义,强烈建议第一次同步前,先去APP内部API设置部分,将表的别名和字段别名都设置好。不然创建的表和字段名称都会是明道云内部给的表编号和字段编号,不易于后续使用。
    在这里插入图片描述

【源码说明】

这个源码具体做的事情,就是先获得当前APP所有表,接着再去拿到每张表的所有数据,这里存在一个分页的问题,也处理了一些错位的可能性。原理上就是把所有数据塞进一个Dataframe,然后再全量更新到MySQL数据库中。
如果有一些自己的需求,可以在此基础上修改。这套代码的通用性等我在自己的应用中都做过测试。

文章来源:https://blog.csdn.net/weixin_41697242/article/details/135769158
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。