【python爬虫】设计自己的爬虫 4. 封装模拟浏览器 PyppeteerSimulate

发布时间:2024年01月06日

Pyppeteer是Puppeteer的Python版实现
Pyppeteer的背后实际上有一个类似于Chrome的浏览器–Chromium

class PyppeteerSimulate(BrowserSimulateBase):
    def __init__(self):
        self.browser = None
        self.page = None

    # 启动浏览器
    # is_headless 是否开启无头模式
    # is_cdp 是否使用cdp (Chrome Devtools Protocol)
    async def start_browser(self, is_headless=False, is_dev=False, proxy=None, is_socks5=False, *args, **kwargs):
        """
        异步启动浏览器。

        Args:
            is_headless (bool, optional): 是否开启无头模式。默认为 False。
            is_dev (bool, optional): 是否启用调试模式。默认为 False。
            proxy (str, optional): 代理设置。默认为 None。
            is_socks5 (bool, optional): 是否使用 SOCKS5 代理。默认为 False。
            *args, **kwargs: 其他参数。

        Returns:
            BrowserContext: 已启动的浏览器对象。
        """
        args = ['--disable-infobars', f'--window-size={WINDOW_WIDTH},{WINDOW_HEIGHT}']

        if proxy:
            proxy_protocol = 'socks5://' if is_socks5 else 'http://'
            args.append('--proxy-server=' + proxy_protocol + proxy)

        self.browser = await launch(headless=is_headless, devtools=is_dev, args=args, autoClose=True)

        return self.browser

    async def start_page(self, url: str):
        """
        在已启动的浏览器上创建新页面并访问指定的 URL。

        Args:
            url (str): 要访问的页面的 URL。

        Returns:
            Page: 新创建的页面对象。
        """
        context = await self.browser.createIncognitoBrowserContext()
        self.page = await context.newPage()

        await self.page.setViewport({'width': WINDOW_WIDTH, 'height': WINDOW_HEIGHT})
        await self.page.evaluateOnNewDocument('Object.defineProperty(navigator, "webdriver", {get: () => undefined})')
        await self.page.goto(url)

        return self.page

    # 显式等待
    async def wait_until_element(self, selector_location, timeout=None, selector_type=None):
        """
        等待直到页面中出现指定的元素。

        参数:
        selector_location (str): 要等待的元素选择器。

        返回:
        element (ElementHandle or None): 如果找到元素,返回元素的句柄,否则返回None。
        """

        try:
            element = await self.page.waitForSelector(selector_location)
            return element
        except Exception as e:
            print(f"等待元素时发生错误: {str(e)}")
            return None

    # 等待时间 s
    async def wait_for_time(self, timeout):
        """
        在异步上下文中等待指定的时间(秒)。

        参数:
        timeout (int): 等待的时间(秒)。

        无返回值。
        """
        await self.page.waitFor(timeout * 1000)

    # 查找多个元素
    async def find_elements(self, selector_location, selector_type=None):
        """
        使用指定的选择器查找所有匹配的元素。

        参数:
        selector_location (str): 要查找的元素选择器。
        selector_type (str, optional): 选择器类型(例如 'css', 'xpath' 等)。

        返回:
        elements (List): 匹配的元素列表。
        """

        elements = await self.page.JJ(selector_location)
        return elements

    # 查找元素
    async def find_element(self, selector_location, selector_type=None):
        """
        使用指定的选择器查找第一个匹配的元素。

        参数:
        selector_location (str): 要查找的元素选择器。
        selector_type (str, optional): 选择器类型(例如 'css', 'xpath' 等)。

        返回:
        element (ElementHandle or None): 匹配的元素句柄,如果未找到则返回 None。
        """
        return await self.page.J(selector_location)

    # iframe 查找多个元素
    async def find_iframe_elements(self, selector_location, iframe):
        """
        在指定的 iframe 中查找所有匹配的元素。

        参数:
        selector_location (str): 要查找的元素选择器。
        iframe (Frame): 包含要查找元素的 iframe 对象。

        返回:
        elements (List): 匹配的元素列表。
        """
        return await iframe.JJ(selector_location)

    # iframe 查找元素
    async def find_iframe_element(self, selector_location, iframe):
        """
        在指定的 iframe 中查找第一个匹配的元素。

        参数:
        selector_location (str): 要查找的元素选择器。
        iframe (Frame): 包含要查找元素的 iframe 对象。

        返回:
        element (ElementHandle or None): 匹配的元素句柄,如果未找到则返回 None。
        """
        return await iframe.J(selector_location)

    # 查找并获取元素属性的值
    async def find_element_all_eval(self, selector_location, selector_type=None, script_command=None):
        """
        使用指定的选择器和脚本命令查找所有匹配的元素。

        参数:
        selector_location (str): 要查找的元素选择器。
        script_command (str, optional): 用于评估元素的自定义脚本命令。

        返回:
        elements (List): 匹配的元素列表。
        """
        return await self.page.JJeval(selector_location, script_command)

    # 浏览器回退
    async def go_back(self):
        """
        在浏览器中执行后退操作,返回上一页。

        无返回值。
        """
        await self.page.goBack()

    # 浏览器前进
    async def go_forward(self):
        """
        在浏览器中执行前进操作,前往下一页。

        无返回值。
        """
        await self.page.goForward()

    # 获取cookies
    async def get_cookies(self):
        """
        获取当前页面的所有 Cookies。

        返回:
        cookies (List): 包含所有 Cookies 的列表。
        """
        return await self.page.cookies()

    # 添加cookies
    async def add_cookie(self, cookie):
        """
        向当前页面添加一个 Cookie。

        参数:
        cookie (dict): 要添加的 Cookie 对象,应包含 'name' 和 'value' 属性。

        无返回值。
        """
        await self.page.setCookie(cookie)

    # 删除cookies
    async def del_cookies(self):
        """
        删除当前页面的所有 Cookies。

        无返回值。
        """
        await self.page.deleteCookie()

    # 切换选项卡
    async def switch_tab(self, tab):
        """
        在浏览器窗口中切换到指定的标签页。

        参数:
        tab (int): 要切换到的标签页的索引号。

        无返回值。
        """
        pages = await self.browser.pages()
        await pages[tab].bringToFront()

    # 刷新页面
    async def reload_page(self):
        """
        重新加载当前页面。

        无返回值。
        """
        await self.page.reload()

    # 截图
    async def screen_page(self, file_path=None):
        """
        截取当前页面的屏幕截图。

        参数:
        file_path (str, optional): 截图文件保存的路径和名称。如果未提供路径,将在当前工作目录保存。

        无返回值。
        """
        await self.page.screenshot(path=file_path)

    # 关闭浏览器
    async def close_browser(self):
        """
        关闭浏览器。

        无返回值。
        """
        await self.browser.close()

    # 获取页面内容
    async def get_content(self):
        """
        获取当前页面的内容。

        返回:
        content (str): 当前页面的HTML内容。
        """
        return await self.page.content()

    # 点击
    async def click(self, selector_location, selector_type=None):
        """
        在指定的选择器位置执行点击操作。

        参数:
        selector_location (str): 要点击的元素选择器。
        selector_type (str, optional): 选择器类型(例如 'css', 'xpath' 等)。

        无返回值。
        """
        return await self.page.click(selector_location)

    # 输入内容
    async def send_keys(self, selector_location, input_content, selector_type=None):
        """
        在指定的选择器位置输入文本内容。

        参数:
        selector_location (str): 要输入文本的元素选择器。
        input_content (str): 要输入的文本内容。
        selector_type (str, optional): 选择器类型(例如 'css', 'xpath' 等)。

        无返回值。
        """
        return await self.page.type(selector_location, input_content)

    async def drag_and_drop(self, source, target):
        """
        模拟拖拽操作,将源元素拖拽到目标元素位置。

        参数:
        source (ElementHandle): 要拖拽的源元素句柄。
        target (ElementHandle): 拖拽的目标元素句柄。

        无返回值。
        """
        source_box = await source.boundingBox()
        target_box = await target.boundingBox()

        # 计算源和目标元素的中心点
        source_x = source_box['x'] + source_box['width'] / 2
        source_y = source_box['y'] + source_box['height'] / 2
        target_x = target_box['x'] + target_box['width'] / 2
        target_y = target_box['y'] + target_box['height'] / 2

        # 模拟拖拽操作
        await self.page.mouse.move(source_x, source_y)
        await self.page.mouse.down()
        await self.page.mouse.move(target_x, target_y)
        await self.page.mouse.up()

    # iframe
    async def to_iframe(self, iframe):
        """
        切换到指定名称的 iframe。

        参数:
        iframe_name (str): 要切换到的 iframe 的名称。

        返回:
        target_frame (Frame or None): 匹配的 iframe 对象,如果未找到则返回 None。
        """

        frames = self.page.frames
        # 找到你需要的iframe
        for frame in frames:
            if frame.name == iframe:
                target_frame = frame
        return target_frame

测试代码

# 测试代码
async def test_pyppeteer():
    await pyppeteer_simulate.start_browser()
    # await pyppeteer_simulate.start_page('https://www.baidu.com/')
    # await pyppeteer_simulate.wait_until_element('.s_ipt')
    # await pyppeteer_simulate.wait_for_time(2)
    # await pyppeteer_simulate.screen_page('../../files/pyppeteer_example.png')
    # print(await pyppeteer_simulate.get_content())

    await pyppeteer_simulate.start_page('http://www.runoob.com/try/try.php?filename=jqueryui-api-droppable')
    target_frame = await pyppeteer_simulate.to_iframe('iframeResult')

    # 在特定的iframe中查找元素
    source = await pyppeteer_simulate.find_iframe_element('#draggable', target_frame)
    target = await pyppeteer_simulate.find_iframe_element('#droppable', target_frame)
    await pyppeteer_simulate.drag_and_drop(source, target)

    await pyppeteer_simulate.close_browser()


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(test_pyppeteer())

文章来源:https://blog.csdn.net/loyd3/article/details/135378203
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。