计算机毕设项目(一)基于flask+mongo+angular实现爬取加密货币信息并使用LSTM模型预测价格的论坛可视化平台

发布时间:2024年01月13日

加密货币平台项目介绍

这个项目是一个基于 Flask 和 MongoDB 的深度学习应用程序,通过爬虫爬取加密货币行情和介绍信息,并根据新的数据使用LSTM去预测行情价格。展示涵盖了用户管理、新闻获取、加密货币数据处理、对话获取和处理、帖子管理等多个功能。
在这里插入图片描述

技术栈

  • 后端: Flask 提供了一个轻量级的网页服务器和后端API。
  • 前端: 使用angular+echart进行界面构建和图表绘制。
  • 数据库: 使用 MongoDB 作为数据库,用于存储用户信息、新闻、帖子和其他数据。
  • 数据处理: 对于加密货币价格预测,使用 LSTM 模型处理历史市场数据。
  • 代理支持: 在获取外部数据时支持通过代理访问。

1. 用户管理

  • 注册(/signin): 允许新用户创建账户。系统会检查用户名是否已存在,并对密码进行加密保存。
  • 登录(/login): 用户可以登录到系统。该接口验证用户名和密码的正确性。
  • 密码修改(/modify): 用户可以修改他们的密码。
  • 删除用户(/dele): 提供用户删除自己账户的功能。

2. 新闻和帖子管理

  • 获取卡片详情(/card-details/<card_id>): 根据卡片ID获取加密货币相关新闻的详细信息。
  • 上传帖子(/uploadpost): 允许用户上传新的帖子,包括标题、内容、日期等信息。
  • 获取所有帖子(/getposts): 可以获取平台上所有用户上传的帖子。
  • 更新帖子喜欢数(/updatelikes)更新评论数(/updatecomments): 这两个接口允许用户更新帖子的喜欢数和评论数。
    在这里插入图片描述

3. 加密货币数据

  • 获取市场数据(/market): 从外部API获取实时的加密货币市场数据。
  • 获取特定加密货币数据(/cryptos): 允许用户根据特定加密货币获取历史市场数据。
  • 价格预测(/predict): 使用 LSTM 模型预测特定加密货币的未来价格。
    在这里插入图片描述
    在这里插入图片描述

4. 对话获取

  • 获取对话(/dialog): 提供一个接口来获取存储的对话数据。

5. 数据获取

  • 获取数据(/fetch_data 和 /fetch_data2): 这两个接口用于从指定的URL获取数据,支持代理设置。

服务端代码

flask的服务端代码如下

import json
import bcrypt
from bson import json_util
from flask import Flask, request, jsonify, session
from pymongo import MongoClient
from flask_cors import CORS
import requests
from predict_price import lstm_predictor

# 连接到 MongoDB 服务器
mongo_client = MongoClient("mongodb://localhost:27017/")

# 选择数据库和集合
db_name = "crypto_db"
collection_name = "users"
collection_name2 = "news"
collection_name3 = "crypto"
collection_name4 = "dialog"
collection_name5 = "post"
db = mongo_client[db_name]
collection = db[collection_name]
collection2 = db[collection_name2]
collection3 = db[collection_name3]
collection4 = db[collection_name4]
collection5 = db[collection_name5]
# 如果集合为空,则插入初始化记录
if collection.estimated_document_count() == 0:
  initial_record = {"usr": "admin", "pwd": "admin"}
  collection.insert_one(initial_record)

# 初始化 Flask 应用
app = Flask(__name__)
CORS(app)


def need_proxy():
  url = "https://www.google.com"
  try:
    response = requests.get(url, timeout=1)
    # 检查响应状态是否正常(状态码 200)
    if response.status_code == 200:
      return 1
    else:
      return 0
  except Exception as e:
    # print(f"Error accessing Google: {e}")
    return 0


# 路由 - 注册
@app.route('/signin', methods=['POST'])
def signin():
  user_data = request.get_json()
  username = user_data.get('usr')
  password = user_data.get('pwd')
  print(username, password)

  if username and password:
    # 检查用户名是否已存在
    existing_user = collection.find_one({"usr": username})
    if existing_user:
      return jsonify({"status": "fail", "message": "Username already exists."})

    # 哈希密码并插入新用户
    password = password.encode('utf-8')
    hashed_password = bcrypt.hashpw(password, bcrypt.gensalt())
    new_user = {"usr": username, "pwd": hashed_password}
    collection.insert_one(new_user)
    return jsonify({"status": "success", "message": "User registered successfully."})
  else:
    return jsonify({"status": "fail", "message": "Username or password missing."})


# 路由 - 登录
@app.route('/login', methods=['POST'])
def login():
  user_data = request.get_json()
  username = user_data.get('usr')
  password = user_data.get('pwd')

  if username and password:
    # 查找用户
    existing_user = collection.find_one({"usr": username})
    if existing_user:
      # 验证密码
      password = password.encode('utf-8')
      if bcrypt.checkpw(password, existing_user['pwd']):
        return jsonify({"status": "success", "message": "Login successful."})
      else:
        return jsonify({"status": "fail", "message": "Incorrect password."})
    else:
      return jsonify({"status": "fail", "message": "User not found."})
  else:
    return jsonify({"status": "fail", "message": "Username or password missing."})


@app.route('/card-details/<card_id>', methods=['GET'])
def get_card_details(card_id):
  card = collection2.find_one({'_id': int(card_id)})
  # print(card)
  if card:
    card_dict = json_util.loads(json_util.dumps(card))  # 将结果转换为 dict 类型
    card_dict.pop('_id')  # 删除 '_id' 键,因为它无法被 JSON 序列化
    return jsonify(card_dict)
  else:
    return jsonify({'error': 'Card not found'}), 404


@app.route('/dialog', methods=['GET'])
def get_dialogue():
  dialogue_data = collection4.find_one()
  if dialogue_data:
    dialogue_data.pop('_id')
    return dialogue_data
  else:
    return jsonify({"error": "Data not found"}), 404


def fetch_market_data():
  url = "https://www.okx.com/priapi/v5/market/mult-cup-tickers?t=1682245351541&ccys=BTC,ETH,USDT,BNB,USDC,XRP,ADA,OKB,DOGE,MATIC,SOL,DOT,LTC,SHIB,TRX,AVAX,DAI,WBTC,UNI,LINK,TON,LEO,ATOM,XMR,ETC,XLM,ICP,BCH,FIL,TUSD"
  # 设置代理
  proxies = {
    "http": "http://127.0.0.1:10809",
    "https": "http://127.0.0.1:10809",
  }
  res = need_proxy()
  try:
    if res:
      response = requests.get(url)
    else:
      response = requests.get(url, proxies=proxies)
    if response.status_code == 200:
      return response.json()
    else:
      print(f"Error fetching market data: {response.status_code}")
      return 0
  except Exception as e:
    print(f"Error fetching market data: {e}")
    return 0


@app.route('/market', methods=['GET'])
def get_market_data():
  market_data = fetch_market_data()
  # print(market_data)
  return jsonify(market_data['data'][:9])


def fetch_market_data2(coin):
  if coin == 'USDT':
    url = 'https://www.okx.com/priapi/v5/market/index-candles?t=1682316274275&instId=USDT-USD&limit=1000&bar=1D'
  else:
    url = "https://www.okx.com/priapi/v5/market/candles?t=1682307645213&instId=" + coin + "-USDT&limit=1000&bar=1D"

  # 设置代理
  proxies = {
    "http": "http://127.0.0.1:10809",
    "https": "http://127.0.0.1:10809",
  }
  res = need_proxy()
  try:
    if res:
      response = requests.get(url)
    else:
      response = requests.get(url, proxies=proxies)
    if response.status_code == 200:
      return response.json()
    else:
      print(f"Error fetching market data: {response.status_code}")
      return 0
  except Exception as e:
    print(f"Error fetching market data: {e}")
    return 0


@app.route('/cryptos', methods=['POST'])
def get_cryptos():
  data = request.get_json()
  coin = data.get('coin')
  market_data = fetch_market_data2(coin)
  #  save the data to local,filename is coin+timestap
  filename = "datasets/" + coin + ".csv"
  with open(filename, 'w') as f:
    # 保存每个元素的第一个和第六个元素,即时间和收盘价存为csv文件
    f.write('time,price\n')
    for i in market_data['data']:
      f.write(str(i[0]) + ',' + str(i[5]) + '\n')

  return jsonify(market_data['data'][::-1])


@app.route('/modify', methods=['POST'])
def modify():
  user_data = request.get_json()
  username = user_data.get('usr')
  password = user_data.get('pwd')
  if username and password:
    # 查找用户
    existing_user = collection.find_one({"usr": username})
    if existing_user:
      # 哈希新密码并更新用户密码
      new_password = password.encode('utf-8')
      hashed_new_password = bcrypt.hashpw(new_password, bcrypt.gensalt())
      collection.update_one({"usr": username}, {"$set": {"pwd": hashed_new_password}})
      return jsonify({"status": "success", "message": "Password updated successfully."})
    else:
      return jsonify({"status": "fail", "message": "Username not found."})
  else:
    return jsonify({"status": "fail", "message": "Username, or new password missing."})


@app.route('/dele', methods=['POST'])
def dele():
  user_data = request.get_json()
  username = user_data.get('usr')
  if username:
    # 查找用户
    existing_user = collection.find_one({"usr": username})
    if existing_user:
      collection.delete_one({"usr": username})
      return jsonify({"status": "success", "message": "User deleted successfully."})
    else:
      return jsonify({"status": "fail", "message": "Username not found."})
  else:
    return jsonify({"status": "fail", "message": "Username  missing."})


# post
@app.route('/uploadpost', methods=['POST'])
def upload_post():
  post_data = request.get_json()
  title = post_data.get('title')
  content = post_data.get('content')
  date = post_data.get('date')
  summary = post_data.get('summary')
  likes = post_data.get('likes')
  comments = post_data.get('comments')
  if title and content and date and summary:
    new_post = {
      "title": title,
      "content": content,
      "date": date,
      "summary": summary,
      "likes": likes,
      "comments": comments
    }
    collection5.insert_one(new_post)
    print(new_post)
    return jsonify({"status": "success", "message": "Post saved successfully."})
  else:
    return jsonify({"status": "fail", "message": "Post data missing or incomplete."})


@app.route('/getposts', methods=['GET'])
def get_all_posts():
  try:
    posts = list(collection5.find({}))
    for post in posts:
      post['_id'] = str(post['_id'])
    return jsonify({"status": "success", "data": posts})
  except Exception as e:
    print(f"Error retrieving posts: {e}")
    return jsonify({"status": "fail", "message": "Error retrieving posts."})


@app.route('/updatelikes', methods=['POST'])
def updatelikes():
  post_data = request.get_json()
  posttitle = post_data.get('title')
  new_likes = post_data.get('likes')

  if posttitle and new_likes is not None:
    result = collection5.update_one({"title": posttitle}, {"$set": {"likes": new_likes}})
    if result.modified_count > 0:
      return jsonify({"status": "success", "message": "Post likes updated successfully."})
    else:
      return jsonify({"status": "fail", "message": "Post not found or likes not updated."})
  else:
    return jsonify({"status": "fail", "message": "Post title or new likes missing."})


@app.route('/updatecomments', methods=['POST'])
def updatecomments():
  post_data = request.get_json()
  posttitle = post_data.get('title')
  new_comments = post_data.get('comments')

  if posttitle and new_comments is not None:
    result = collection5.update_one({"title": posttitle}, {"$set": {"comments": new_comments}})
    if result.modified_count > 0:
      return jsonify({"status": "success", "message": "Post comments updated successfully."})
    else:
      return jsonify({"status": "fail", "message": "Post not found or comments not updated."})
  else:
    return jsonify({"status": "fail", "message": "Post title or new comments missing."})


@app.route('/fetch_data', methods=['POST'])
def fetch_data():
  data = request.json
  url = data['url']
  print(url)
  proxies = {
    "http": "http://127.0.0.1:10809",
    "https": "http://127.0.0.1:10809",
  }
  response = requests.get(url, proxies=proxies)
  xml_data = json.loads(response.text)
  return xml_data["data"]


@app.route('/fetch_data2', methods=['POST'])
def fetch_data2():
  data = request.json
  url = data['url']
  print(url)
  proxies = {
    "http": "http://127.0.0.1:10809",
    "https": "http://127.0.0.1:10809",
  }
  response = requests.get(url, proxies=proxies)
  xml_data = json.loads(response.text)

  return jsonify({"data": xml_data["data"]["introduce"]})

@app.route('/predict', methods=['POST'])
def predict():
  data = request.json
  coin = data['coin']
  return jsonify({"data": lstm_predictor(coin)})

if __name__ == '__main__':
  app.run(debug=True)

完整代码

整个项目为加密货币爱好者提供了一个综合平台,不仅支持基本的用户管理和社交功能,还整合了市场数据获取和分析,以及数据预测功能,是一个多功能的加密货币社区应用。
需要获取源码的可以关注公众号"一颗程序树",点击菜单栏的免费源码-源码集合的页面中输入关键词加密货币即可

文章来源:https://blog.csdn.net/Demonslzh/article/details/135570409
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。