深入了解 Scrapy 中的 Pipelines 和 Item

发布时间:2023年12月17日

item

Scrapy中的Item对象是用来保存爬取到的数据的容器。它类似于字典,但提供了更多的便利性和结构化,可以定义数据模型,帮助开发者明确和组织所需抓取的数据结构。

1. Item对象的作用

Item对象的主要作用是定义所需抓取数据的结构,为爬虫提供一个清晰的数据模型。通过Item对象,可以指定所需数据的字段和字段类型,确保数据的一致性和完整性。

2. 定义Item类

在Scrapy中定义Item类很简单,通常在项目中的items.py文件中创建。以下是一个示例:


import scrapy

class ProductItem(scrapy.Item):
    title = scrapy.Field()
    price = scrapy.Field()
    description = scrapy.Field()
    images = scrapy.Field()
    # 添加其他字段...

3. 字段及字段类型

在Item类中,可以定义各种字段以及它们的类型。在示例中,我们定义了几个常见的字段:

  • title: 产品标题,可以是字符串类型。
  • price: 产品价格,可以是浮点数或者字符串类型。
  • description: 产品描述,可以是字符串类型。
  • images: 产品图片链接列表,可以是一个列表类型。
字段类型说明:
  • Field(): Scrapy提供的用于定义字段的方法,可以存储各种类型的数据,如字符串、整数、浮点数、列表等。在Item中使用Field()定义字段,不需要指定字段类型,Scrapy会根据实际存储的数据自动确定类型。

总之,Item对象在Scrapy中扮演着重要的角色,它定义了数据的结构和类型,为爬虫提供了清晰的数据模型。通过定义Item类,可以有效组织和管理爬取到的数据,确保数据的准确性和一致性。

pip

Scrapy的Pipeline作用和功能

Pipeline是Scrapy中用于处理爬取到的数据的组件,它提供了一个灵活的机制来对爬取到的Item数据进行处理、清洗、验证或存储等操作。主要功能包括数据处理、过滤、持久化存储和验证。

创建一个简单的Pipeline示例

下面是一个简单的示例,展示如何创建一个Pipeline来处理爬取到的Item数据:


class MyPipeline(object):
    def process_item(self, item, spider):
        # 在这里对爬取到的Item数据进行处理
        # 这个示例中仅打印数据,实际应用中可以进行各种处理操作
        print("Processing Item:")
        print(f"Title: {item['title']}")
        print(f"Price: {item['price']}")
        # 添加其他处理逻辑...

        return item  # 返回Item,传递给下一个Pipeline或者保存数据

Pipeline中的数据处理、清洗、验证或存储操作

在上面的示例中,process_item方法是Pipeline中的核心方法,它接收爬取到的Item数据作为输入,并对数据进行处理。你可以在这个方法中编写任何你需要的数据处理逻辑,例如:

  • 清洗数据:移除不需要的字段、修复数据格式等。
  • 验证数据:检查数据是否符合预期的格式、范围或规则。
  • 存储数据:将数据存储到数据库、文件或其他数据存储介质中。

在settings.py中配置和启用Pipeline

要启用你编写的Pipeline,需要在Scrapy项目的settings.py文件中进行配置。假设上面定义的Pipeline是myproject.pipelines.MyPipeline,你可以像下面这样配置:


ITEM_PIPELINES = {
    'myproject.pipelines.MyPipeline': 300,  # 设置Pipeline的优先级,数字越小优先级越高
    # 可以添加其他的Pipeline...
}

Scrapy的Pipeline提供了一个处理爬取数据的灵活机制,允许你在爬虫运行过程中对Item数据进行各种操作。通过创建自定义的Pipeline并在settings.py中进行配置,可以方便地对爬取到的数据进行处理、清洗、验证或存储等操作,以满足特定需求或业务逻辑。

实际应用示例

样例1

爬取特定网站数据并处理存储:

假设我们要爬取一个电子产品销售网站的商品信息,并将其存储到数据库中。

Item定义


import scrapy

class ProductItem(scrapy.Item):
    title = scrapy.Field()
    price = scrapy.Field()
    description = scrapy.Field()
    # 其他字段...

自定义Pipeline来存储到数据库


import pymongo

class MongoDBPipeline(object):
    collection_name = 'products'

    def open_spider(self, spider):
        self.client = pymongo.MongoClient('localhost', 27017)
        self.db = self.client['scrapy_db']

    def close_spider(self, spider):
        self.client.close()

    def process_item(self, item, spider):
        self.db[self.collection_name].insert_one(dict(item))
        return item

配置settings.py启用Pipeline


ITEM_PIPELINES = {
    'myproject.pipelines.MongoDBPipeline': 300,
}

爬虫示例


import scrapy
from myproject.items import ProductItem

class ProductsSpider(scrapy.Spider):
    name = 'products'
    start_urls = ['http://example.com/products']

    def parse(self, response):
        # 解析网页内容并提取数据
        for product in response.xpath('//div[@class="product"]'):
            item = ProductItem()
            item['title'] = product.xpath('h2/a/text()').get()
            item['price'] = product.xpath('span[@class="price"]/text()').get()
            item['description'] = product.xpath('p/text()').get()
            yield item

样例2 MySQL持久化批量存储

item

class QvnaItem(scrapy.Item):
    Commentlist = scrapy.Field()
    Price = scrapy.Field()
    Title = scrapy.Field()
    Id = scrapy.Field()

pip

class MySQLPipeline:
    def __init__(self, mysql_host, mysql_port, mysql_database, mysql_user, mysql_password):
        self.mysql_host = mysql_host
        self.mysql_port = mysql_port
        self.mysql_database = mysql_database
        self.mysql_user = mysql_user
        self.mysql_password = mysql_password
        self.data = []
    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mysql_host=crawler.settings.get('MYSQL_HOST'),
            mysql_port=crawler.settings.get('MYSQL_PORT'),
            mysql_database=crawler.settings.get('MYSQL_DATABASE'),
            mysql_user=crawler.settings.get('MYSQL_USER'),
            mysql_password=crawler.settings.get('MYSQL_PASSWORD'),
        )

    def open_spider(self, spider):
        self.conn = mysql.connector.connect(
            host=self.mysql_host,
            port=self.mysql_port,
            database=self.mysql_database,
            user=self.mysql_user,
            password=self.mysql_password,
        )
        self.cursor = self.conn.cursor()

    def close_spider(self, spider):
        self.conn.close()
        if  self.data:
            sql = "INSERT INTO xiecheng_data (id,title, commentlist,averagescore,opentime,number) VALUES (%s,%s, %s,%s, %s,%s)"
            self.write_data(sql=sql)

    def process_item(self, item, spider):
        if isinstance(item, XiechengItem):
            # 在这里执行将item数据存入MySQL的操作

            print("执行成功")
            # print(item)
            sql = "INSERT INTO xiecheng_data (id,title, commentlist,averagescore,opentime,number) VALUES (%s,%s, %s,%s, %s,%s)"
            values = (
            item['Id'], item['Title'], item['Commentlist'], item['AverageScore'], item['OpenTime'], item['Number'])
            self.cursor.execute(sql, values)
            self.conn.commit()

        elif isinstance(item, QvnaItem):

            sql = "INSERT INTO qvna_data (id,Title, Commentlist,Price) VALUES (%s,%s,%s,%s)"
            values = (
                item['Id'], item['Title'], item['Commentlist'], item['Price'])
            self.data.append(values)
            print(len(self.data))
            if len(self.data) == 5:
                self.write_data(sql)

        elif isinstance(item, ZhihuItem):

            sql = "INSERT INTO zhihu_data (Id,Title, Commentlist) VALUES (%s,%s,%s)"
            values = (
                item['Id'], item['Title'], item['Commentlist'])
            self.data.append(values)
            print(len(self.data))
            if len(self.data) == 5:
                self.write_data(sql)
        return item

    def write_data(self, sql):
        self.cursor.executemany(sql, self.data)
        self.conn.commit()
        self.data.clear()
        print("提交完成")

方法说明:

  • init() :初始化方法,接收MySQL数据库连接的相关信息,并创建一个空的数据列表 self.data 用于临时存储待插入的数据。
  • from_crawler() :类方法,用于从Scrapy的配置中获取MySQL数据库连接相关的配置信息。
  • open_spider() :在爬虫启动时执行,用于建立与MySQL数据库的连接。
  • close_spider() :在爬虫关闭时执行,关闭与MySQL数据库的连接,并检查是否有未写入数据库的数据。
  • process_item() :处理每一个爬取到的Item数据,根据Item的类型执行不同的处理逻辑,将数据存入MySQL数据库中。

数据处理和存储逻辑:

  • 通过 process_item() 方法根据不同的Item类型,将数据存储到不同的MySQL数据库表中。

  • 根据不同的Item类型,执行不同的SQL插入语句来将数据插入到数据库中。当数据累积到一定数量时(这里是5条数据),会调用 write_data() 方法执行批量写入操作。

  • 数据库连接管理:在 open_spider() 中建立连接,在 close_spider() 中关闭连接,确保连接的及时释放,避免资源泄漏。

  • 数据批量处理:利用批量写入操作(executemany)提高数据写入的效率,减少数据库交互次数。

  • 异常处理:在数据库操作过程中添加适当的异常处理机制,确保数据写入的稳定性和可靠性。

实践和注意事项

实践建议:
  1. 字段定义清晰明确:确保Item对象中字段的定义清晰,以准确地反映爬取数据的结构和内容。
  2. Pipeline职责单一:每个Pipeline应专注于特定的任务,保持职责单一性,避免将太多的逻辑集中在一个Pipeline中。
  3. 异常处理:在Pipeline中添加异常处理机制,确保在处理数据时能够捕获和处理异常,保证爬虫的稳定性。
  4. 数据清洗和验证:在Pipeline中进行数据清洗和验证,确保数据的完整性和准确性,避免脏数据影响后续处理或存储。
优化Pipeline和Item:
  1. 优化爬虫速度:尽可能地使用异步操作,避免在Pipeline中进行耗时的同步操作,以提高爬虫的速度。
  2. 避免内存泄漏:在处理大量数据时,及时释放资源,避免内存泄漏,确保爬虫的稳定性和可靠性。
  3. 使用数据缓存:对频繁使用的数据进行缓存,减少重复请求和提高数据处理效率。
文章来源:https://blog.csdn.net/kilig_CSM/article/details/134965918
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。