【AI Agent系列】【MetaGPT】7. 一句话订阅专属信息 - 订阅智能体进阶,实现一个更通用的订阅智能体

发布时间:2024年01月23日

0. 前置推荐阅读

1. 本文内容

跟着《MetaGPT智能体开发入门》教程,对上次实现的订阅智能体进行优化和进阶,也用上ActionNode模块。

上次实现订阅智能体的文章可参考 【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件 。在上次这篇文章中我们虽然完成了订阅智能体的功能,但是这个智能体的作用是被局限在某一特定领域的,当我们需要订阅另外一个数据源的分析结果时,我们需要手动再写一个Role,这个智能体不太通用。

如何实现一个更通用的订阅智能体?实现思路如下:

(1)解析用户指令(分析用户需求,我们期望用户用一句自然语言下指令即可得到想要的)
(2)爬取网页
(3)让大模型写从html中提取用户需要的数据的代码
(4)将爬取网页和大模型写的提取用户需要数据的代码结合,得到爬取指定网页信息的Action
(5)将提取后的数据,让大模型进行处理和分析,得到网页数据分析的Action
(6)将这两个Action组合,就可以得到一个特定网页的Watcher Role

下面我们一步一步来实现。

2. 解析用户指令(分析用户需求)

写爬虫代码是需要先知道要爬取的网页URL、网页数据提取需求是什么,最简单的方式就是用户自己填写,但是这就让我们的入参变得复杂。所以,我们首先实现一个解析用户指令的代码,用户只需用一句自然语言输入自己的需求,代码自动解析出网页URL、网页数据提取需求等,即:将用户的输入结构化

2.1 完整代码及注释

# 加载 .env 到环境变量
# from dotenv import load_dotenv, find_dotenv
# _ = load_dotenv(find_dotenv())

from metagpt.actions.action_node import ActionNode
from metagpt.actions.action import Action
import asyncio

## 分析用户的要求语言
LANGUAGE = ActionNode(
    key="language",
    expected_type=str,
    instruction="Provide the language used in the project, typically matching the user's requirement language.",
    example="en_us",
)

## 分析用户的订阅推送时间
CRON_EXPRESSION = ActionNode(
    key="Cron Expression",
    expected_type=str,
    instruction="If the user requires scheduled triggering, please provide the corresponding 5-field cron expression. "
    "Otherwise, leave it blank.",
    example="",
)

## 分析用户订阅的网址URL,可以是列表
CRAWLER_URL_LIST = ActionNode(
    key="Crawler URL List",
    expected_type=list[str],
    instruction="List the URLs user want to crawl. Leave it blank if not provided in the User Requirement.",
    example=["https://example1.com", "https://example2.com"],
)

## 分析用户所需要的网站数据
PAGE_CONTENT_EXTRACTION = ActionNode(
    key="Page Content Extraction",
    expected_type=str,
    instruction="Specify the requirements and tips to extract from the crawled web pages based on User Requirement.",
    example="Retrieve the titles and content of articles published today.",
)

## 分析用户所需要的汇总数据的方式
CRAWL_POST_PROCESSING = ActionNode(
    key="Crawl Post Processing",
    expected_type=str,
    instruction="Specify the processing to be applied to the crawled content, such as summarizing today's news.",
    example="Generate a summary of today's news articles.",
)

## 补充说明,如果url或定时器解析为空,则提示用户补充
INFORMATION_SUPPLEMENT = ActionNode(
    key="Information Supplement",
    expected_type=str,
    instruction="If unable to obtain the Cron Expression, prompt the user to provide the time to receive subscription "
    "messages. If unable to obtain the URL List Crawler, prompt the user to provide the URLs they want to crawl. Keep it "
    "blank if everything is clear",
    example="",
)

NODES = [
    LANGUAGE,
    CRON_EXPRESSION,
    CRAWLER_URL_LIST,
    PAGE_CONTENT_EXTRACTION,
    CRAWL_POST_PROCESSING,
    INFORMATION_SUPPLEMENT,
]

PARSE_SUB_REQUIREMENTS_NODE = ActionNode.from_children("ParseSubscriptionReq", NODES)

## 解析用户的需求的Action
PARSE_SUB_REQUIREMENT_TEMPLATE = """
### User Requirement
{requirements}
"""

class ParseSubRequirement(Action):
    async def run(self, requirements):
        requirements = "\n".join(i.content for i in requirements)
        context = PARSE_SUB_REQUIREMENT_TEMPLATE.format(requirements=requirements)
        node = await PARSE_SUB_REQUIREMENTS_NODE.fill(context=context, llm=self.llm)
        return node
    
if __name__ == "__main__":
    from metagpt.schema import Message
    asyncio.run(ParseSubRequirement().run([Message(
        "从36kr创投平台https://pitchhub.36kr.com/financing-flash 爬取所有初创企业融资的信息,获取标题,链接, 时间,总结今天的融资新闻,然后在晚上七点半送给我"
    )]))

2.2 运行结果

在这里插入图片描述

3. 利用大模型写爬虫代码

3.1 对html内容进行精简

一般html元素太多,所以调用大模型时token数非常大,甚至一次对话都无法接收一个完整的html,我们可以先对网页内容做一下简化,因为对元素进行定位一般用css selector就够了,所以我们可以主要提供html的class属性信息,另外可以将html转成css表达式和对应的内容提供给llm,从而减少token的消耗。

def get_outline(page):
    soup = _get_soup(page.html)
    outline = []

    def process_element(element, depth):
        name = element.name
        if not name:
            return
        if name in ["script", "style"]:
            return

        element_info = {"name": element.name, "depth": depth}

        if name in ["svg"]:
            element_info["text"] = None
            outline.append(element_info)
            return

        element_info["text"] = element.string
        # Check if the element has an "id" attribute
        if "id" in element.attrs:
            element_info["id"] = element["id"]

        if "class" in element.attrs:
            element_info["class"] = element["class"]
        outline.append(element_info)
        for child in element.children:
            process_element(child, depth + 1)

    for element in soup.body.children:
        process_element(element, 1)

    return outline

3.2 利用大模型写爬虫代码

PROMPT_TEMPLATE = """Please complete the web page crawler parse function to achieve the User Requirement. The parse \
function should take a BeautifulSoup object as input, which corresponds to the HTML outline provided in the Context.

```python
from bs4 import BeautifulSoup

# only complete the parse function
def parse(soup: BeautifulSoup):
    ...
    # Return the object that the user wants to retrieve, don't use print

## User Requirement
{requirement}

## Context

The outline of html page to scrabe is show like below:

```tree
{outline}
"""

class WriteCrawlerCode(Action):
    async def run(self, requirement):
        requirement: Message = requirement[-1] ## 上一步解析完成的用户需求结果
        data = requirement.instruct_content.dict()
        urls = data["Crawler URL List"] ## 获取用户想要的网页URL
        query = data["Page Content Extraction"] ## 获取用户想关注的网页数据
        codes = {}
        for url in urls:
            codes[url] = await self._write_code(url, query) ## 每个URL的爬虫程序
        return "\n".join(f"# {url}\n{code}" for url, code in codes.items())

    async def _write_code(self, url, query):
        page = await WebBrowserEngine().run(url)
        outline = get_outline(page)
        outline = "\n".join(
            f"{' '*i['depth']}{'.'.join([i['name'], *i.get('class', [])])}: {i['text'] if i['text'] else ''}"
            for i in outline
        )
        code_rsp = await self._aask(PROMPT_TEMPLATE.format(outline=outline, requirement=query))
        code = CodeParser.parse_code(block="", text=code_rsp)
        return code

3.3 补充代码,测试本节程序

  • 引入所需包
import asyncio
from metagpt.actions.action import Action
from metagpt.schema import Message
from metagpt.tools.web_browser_engine import WebBrowserEngine
from metagpt.utils.common import CodeParser
from metagpt.utils.parse_html import _get_soup
  • main运行函数
if __name__ == "__main__":
    from metagpt.actions.action_node import ActionNode
    cls = ActionNode.create_model_class(
        "ActionModel", {
            "Cron Expression": (str, ...),
            "Crawler URL List": (list[str], ...),
            "Page Content Extraction": (str, ...),
            "Crawl Post Processing": (str, ...),
        }
    )

    data = {
        "Cron Expression": "0 30 19 * * *",
        "Crawler URL List": ["https://pitchhub.36kr.com/financing-flash"],
        "Page Content Extraction": "从36kr创投平台爬取所有初创企业融资的信息,获取标题,链接, 时间。",
        "Crawl Post Processing": "总结今天的融资新闻。",
    }
    asyncio.run(WriteCrawlerCode().run([Message(instruct_content=cls(**data))]))

3.4 运行结果及踩坑

3.4.1 运行结果

在这里插入图片描述

3.4.2 坑一:No module named ‘playwright’

在这里插入图片描述

  • 解决方法:
pip install playwright

4. 爬虫工程师角色定义:CrawlerEngineer

# 定义爬虫工程师角色
from metagpt.roles import Role
class CrawlerEngineer(Role):
    name: str = "同学小张的专属爬虫工程师"
    profile: str = "Crawling Engineer"
    goal: str = "Write elegant, readable, extensible, efficient code"
    constraints: str = "The code should conform to standards like PEP8 and be modular and maintainable"

    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)

        self._init_actions([WriteCrawlerCode])
        self._watch([ParseSubRequirement])  ## 触发?

爬虫工程师角色的代码中,初始化了一个写爬虫代码的Action,为WriteCrawlerCode。那这个角色什么时候触发运行呢?答案在这一句:self._watch([ParseSubRequirement]),当解析完用户需求之后,会触发该角色的运行,触发写代码的动作。

5. 订阅助手角色定义:SubscriptionAssistant

class SubscriptionAssistant(Role):
    """Analyze user subscription requirements."""

    name: str = "同学小张的订阅助手"
    profile: str = "Subscription Assistant"
    goal: str = "analyze user subscription requirements to provide personalized subscription services."
    constraints: str = "utilize the same language as the User Requirement"

    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)

        self._init_actions([ParseSubRequirement, RunSubscription]) ## 2. 先解析用户需求,然后运行订阅
        self._watch([UserRequirement, WriteCrawlerCode])  ## 触发?

    async def _think(self) -> bool:
        cause_by = self.rc.history[-1].cause_by
        if cause_by == any_to_str(UserRequirement):
            state = 0
        elif cause_by == any_to_str(WriteCrawlerCode):
            state = 1

        if self.rc.state == state:
            self.rc.todo = None
            return False
        self._set_state(state)
        return True

订阅助手角色定义的代码中,初始化了两个Action,一个是解析用户需求,一个是RunSubscription,运行智能体。
看下它是怎么运作的:
(1)self._watch([UserRequirement, WriteCrawlerCode])
它观察了这两个输入,用户输入和写完爬虫代码。
(2)_think函数
当上一次的信息是来自用户输入,则运行ParseSubRequirement,如果上一次输入是写爬虫代码的Action,则运行RunSubscription。

6. 运行订阅智能体的Action:RunSubscription

6.1 总结信息的Action

有了用户需求,有了爬虫代码,还缺的就是将爬下来的数据总结成用户需求中想要的内容。下面的SubAction就是干这事儿的(类比之前订阅智能体中的AnalysisOSSTrending):

data是从所有Url网页中爬下来的数据,process是之前解析的用户对数据的处理需求。

SUB_ACTION_TEMPLATE = """
## Requirements
Answer the question based on the provided context {process}. If the question cannot be answered, please summarize the context.

## context
{data}"
"""

class SubAction(Action):
    async def run(self, *args, **kwargs):
        pages = await WebBrowserEngine().run(*urls)
        if len(urls) == 1:
            pages = [pages]

        data = []
        for url, page in zip(urls, pages):
            data.append(getattr(modules[url], "parse")(page.soup))
        ## 8. 根据用户的数据需求,和爬取的网页数据,让大模型总结
        return await self.llm.aask(SUB_ACTION_TEMPLATE.format(process=process, data=data))

6.2 运行订阅智能体的Action

# 运行订阅智能体的Action
class RunSubscription(Action):
    async def run(self, msgs):
        from metagpt.roles.role import Role
        from metagpt.subscription import SubscriptionRunner

        code = msgs[-1].content ## 4. 获取爬虫代码
        req = msgs[-2].instruct_content.dict() ## 5. 获取用户需求
        urls = req["Crawler URL List"]
        process = req["Crawl Post Processing"]
        spec = req["Cron Expression"]
        SubAction = self.create_sub_action_cls(urls, code, process) ## 6. 创建一个Action,urls网页链接、code爬虫代码、process用户需求的数据
        SubRole = type("SubRole", (Role,), {}) ## 7. 定时触发的Role
        role = SubRole()
        role.init_actions([SubAction])
        runner = SubscriptionRunner()

        async def callback(msg):
            print(msg)

        await runner.subscribe(role, CronTrigger(spec), callback)
        await runner.run()

    @staticmethod
    def create_sub_action_cls(urls: list[str], code: str, process: str):
        modules = {}
        for url in urls[::-1]:
            code, current = code.rsplit(f"# {url}", maxsplit=1)
            name = uuid4().hex
            module = type(sys)(name)
            exec(current, module.__dict__)
            modules[url] = module

        class SubAction(Action):
            async def run(self, *args, **kwargs):
                pages = await WebBrowserEngine().run(*urls)
                if len(urls) == 1:
                    pages = [pages]

                data = []
                for url, page in zip(urls, pages):
                    data.append(getattr(modules[url], "parse")(page.soup))
                ## 8. 根据用户的数据需求,和爬取的网页数据,让大模型总结
                return await self.llm.aask(SUB_ACTION_TEMPLATE.format(process=process, data=data))

        return SubAction

7. 定时器代码和callback代码

这部分代码就可以直接复用原来订阅智能体的代码了,通用的。

7.1 定时器

class CronTrigger:
    def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None) -> None:
        self.crontab = crontab(spec, tz=tz)

    def __aiter__(self):
        return self

    async def __anext__(self):
        await self.crontab.next()
        return Message()

7.2 callback

class WxPusherClient:
    def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):
        self.base_url = base_url
        self.token = token or os.environ["WXPUSHER_TOKEN"] # 5.1 从环境变量中获取token,所以你需要在环境变量中配置WXPUSHER_TOKEN或在配置文件中设置WXPUSHER_TOKEN

    async def send_message(
        self,
        content,
        summary: Optional[str] = None,
        content_type: int = 1,
        topic_ids: Optional[list[int]] = None,
        uids: Optional[list[int]] = None,
        verify: bool = False,
        url: Optional[str] = None,
    ):
        payload = {
            "appToken": self.token,
            "content": content,
            "summary": summary,
            "contentType": content_type,
            "topicIds": topic_ids or [],
            # 5.2 从环境变量中获取uids,所以你需要在环境变量中配置WXPUSHER_UIDS
            # uids是你想推送给哪个微信,必须是关注了你这个订阅号的微信才可以知道uid
            "uids": uids or os.environ["WXPUSHER_UIDS"].split(","), 
            "verifyPay": verify,
            "url": url,
        }
        url = f"{self.base_url}/api/send/message"
        return await self._request("POST", url, json=payload)

    async def _request(self, method, url, **kwargs):
        async with aiohttp.ClientSession() as session:
            async with session.request(method, url, **kwargs) as response:
                response.raise_for_status()
                return await response.json()

# 5.3 微信callback wrapper,使用WxPusherClient给指定微信推送消息
async def wxpusher_callback(msg: Message):
    client = WxPusherClient()
    await client.send_message(msg.content, content_type=3)

7.3 集成callback

# 运行订阅智能体的Action
class RunSubscription(Action):
    async def run(self, msgs):
        ······ 与前文代码一样

        callbacks = []
        callbacks.append(wxpusher_callback)

        async def callback(msg):
            print(msg)
            await asyncio.gather(*(call(msg) for call in callbacks)) # 遍历所有回调函数,触发回调,分发消息

       ······ 与前文代码一样

8. 整体运行

创建了一个Team,其中有SubscriptionAssistant角色和CrawlerEngineer角色。

用户输入信息,然后run_project,SubscriptionAssistant首先观察到用户输入信息,触发ParseSubRequirement动作。

if __name__ == "__main__":
    import asyncio
    from metagpt.team import Team

    team = Team()
    team.hire([SubscriptionAssistant(), CrawlerEngineer()])
    team.run_project("从36kr创投平台https://pitchhub.36kr.com/financing-flash爬取所有初创企业融资的信息,获取标题,链接, 时间,总结今天的融资新闻,然后在早上10:56送给我")
    asyncio.run(team.run())

运行结果

在这里插入图片描述

9. 可能有的坑

9.1 更新MetaGPT源码后,运行报错:No module named ‘zhipuai.types’

在这里插入图片描述

  • 解决方法
pip install zhipuai

本文完。按照教程跑通了订阅智能体进阶的流程。

这好像是教程中唯一的一个多智能体的例子。所以里面涉及了多智能体之间的交互方式,懵懵懂懂,需要深入了解下(_watch、_observe等)。咱们后面的文章细讲。

文章来源:https://blog.csdn.net/Attitude93/article/details/135720544
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。