技术使用点:
1.Python的requests请求,代理声明
requests库介绍:
requests是一个常用的HTTP请求库,用于发送HTTP请求并处理响应。您可以使用它轻松地进行网页爬取。如果您尚未安装该库,可以使用以下命令进行安装
pip install requests
以下是一个简单的使用requests库发送GET请求的示例:
import requests
url = 'https://example.com'
response = requests.get(url)
# 打印响应内容
print(response.text)
使用代理:
如果您需要使用代理来进行爬取,可以在requests.get方法中传递proxies参数。代理通常是一个字典,包含协议和代理地址。以下是一个使用代理的示例:
import requests
url = 'https://example.com'
proxy = {'http': 'http://your_proxy', 'https': 'https://your_proxy'}
response = requests.get(url, proxies=proxy)
# 打印响应内容
print(response.text)
请将 ‘http://your_proxy’ 和 ‘https://your_proxy’ 替换为实际的代理地址。确保代理地址格式正确。
2.BeautifulSoup
BeautifulSoup
是一个用于从HTML和XML文档中提取数据的Python库。它提供了一种方便的方式来浏览文档、搜索特定标签、提取数据等。下面是关于BeautifulSoup库的简介以及一些代码示例。
pip install beautifulsoup4
from bs4 import BeautifulSoup
# HTML文档示例
html_doc = """
<html>
<head>
<title>BeautifulSoup Example</title>
</head>
<body>
<h1>Hello, BeautifulSoup!</h1>
<p>This is a paragraph.</p>
<ul>
<li>Item 1</li>
<li>Item 2</li>
<li>Item 3</li>
</ul>
</body>
</html>
"""
# 创建BeautifulSoup对象
soup = BeautifulSoup(html_doc, 'html.parser')
# 提取标题
title = soup.title.text
print(f'Title: {title}')
# 提取第一个段落
paragraph = soup.p.text
print(f'Paragraph: {paragraph}')
# 提取列表项
list_items = soup.find_all('li')
for item in list_items:
print(f'List Item: {item.text}')
3. 使用BeautifulSoup进行页面解析:
通常,BeautifulSoup用于解析从网页上抓取的HTML内容。以下是一个使用requests和BeautifulSoup结合的简单示例:
import requests
from bs4 import BeautifulSoup
url = 'https://example.com'
response = requests.get(url)
# 使用BeautifulSoup解析HTML内容
soup = BeautifulSoup(response.text, 'html.parser')
# 提取特定标签的内容
title = soup.title.text
print(f'Title: {title}')
# 提取所有链接
links = soup.find_all('a')
for link in links:
print(f'Link: {link.get("href")}')
BeautifulSoup提供了强大的工具,可以根据标签、类、id等来搜索和提取数据,使得网页数据的提取变得相对简单。
3.日志记录,日志库logging
这里做了个简单的日志Helper
import logging
import os
from logging import handlers
class LogHelper(object):
level_relations = {
'debug':logging.DEBUG,
'info':logging.INFO,
'warning':logging.WARNING,
'error':logging.ERROR,
'crit':logging.CRITICAL
}#日志级别关系映射
def __init__(self,filename,level='info',when='D',backCount=3,fmt='[%(asctime)s] [%(process)d] [%(levelname)s] - %(module)s.%(funcName)s (%(filename)s:%(lineno)d) - %(message)s'):
mexist=os.path.exists("log")
if mexist == False:
os.makedirs("log")
self.logger = logging.getLogger(filename)
format_str = logging.Formatter(fmt)#设置日志格式
self.logger.setLevel(self.level_relations.get(level))#设置日志级别
sh = logging.StreamHandler()#往屏幕上输出
sh.setFormatter(format_str) #设置屏幕上显示的格式
th = handlers.TimedRotatingFileHandler(filename="log/"+filename,when=when,backupCount=backCount,encoding='utf-8')#往文件里写入#指定间隔时间自动生成文件的处理器
#实例化TimedRotatingFileHandler
#interval是时间间隔,backupCount是备份文件的个数,如果超过这个个数,就会自动删除,when是间隔的时间单位,单位有以下几种:
# S 秒
# M 分
# H 小时、
# D 天、
# W 每星期(interval==0时代表星期一)
# midnight 每天凌晨
th.setFormatter(format_str)#设置文件里写入的格式
self.logger.addHandler(sh) #把对象加到logger里
self.logger.addHandler(th)
if __name__ == '__main__':
log = Logger('Log/all.log',level='debug')
log.logger.debug('debug')
log.logger.info('info')
log.logger.warning('警告')
log.logger.error('报错')
log.logger.critical('严重')
Logger('error.log', level='error').logger.error('error')
4.FastAPI接口
FastAPI 是一个基于 Python 3.7+ 标准类型提示的现代、快速(高性能)、Web
框架。它被设计成易于使用和高效,支持异步编程,使得处理请求时能够更加高效地利用系统资源。 FastAPI
是高性能的框架,具有比许多传统框架更好的性能。其使用了 Starlette 和 Pydantic 等现代工具,充分利用了 Python
3.7+ 的异步特性。
使用FastAPI可以快速自动生成 API 文档:
FastAPI 自动生成交互式的 API 文档,使用者可以通过浏览器直接与 API
进行交互,查看请求和响应的模型,测试不同的参数等。这是通过集成 Swagger UI 和 ReDoc 实现的。
这里列出爬取的代码:
def geturl(url,listName):
global nowlist,daylist,weeklist,logger
headers={"User-Agent":Agent.get_user_agent_pc()}
response=requests.get(url=url,headers=headers)
html=response.content
html_doc=str(html,response.apparent_encoding)
soup = BeautifulSoup(html_doc,'lxml')
list = soup.select('.keyword')
gethotValue(soup,listName)
getnewslink(soup,listName)
if listName==0:
nowlist=[]
elif listName==1:
daylist=[]
else :
weeklist=[]
for item in list:
logger.logger.info("爬取一条新闻===》")
for child in item.contents:
if child =='\n' or child ==' ':
continue
else:
if child.text=='search' or child.text=='' :
continue
else:
#print("=======>"+str(index)+"."+child.text)
if listName==0:
nowlist.append(child.text)
elif listName==1:
daylist.append(child.text)
else :
weeklist.append(child.text)
#查询热点数据
def gethotValue(soup,listName):
global nowhotlist,dayhotlist,weekhotlist
list = soup.select('span[class^="icon-"]')
if listName==0:
nowhotlist=[]
elif listName==1:
dayhotlist=[]
else :
weekhotlist=[]
for item in list:
status=str(item.attrs.get('class'))
state="平"
if status=='[\'icon-fall\']':
state="下降"
elif status=='[\'icon-rise\']':
state="上升"
else:
state="平"
if listName==0:
nowhotlist.append(item.text+","+state)
elif listName==1:
dayhotlist.append(item.text+","+state)
else :
weekhotlist.append(item.text+","+state)
def getnewslink(soup,listName):
global nowlinklist,daylinklist,weeklinklist,nowtemplist,daytemplist,weektemplist
mpool = ThreadPoolExecutor(5)
list = soup.select('.tc a')
if listName==0:
nowlinklist=[]
nowtemplist=[]
elif listName==1:
daylinklist=[]
daytemplist=[]
else :
weeklinklist=[]
weektemplist=[]
index =1;
for item in list:
if item.text=="新闻":
href = item.get('href')
mpool.submit(getNewsImgAndDesc,href,listName,index)
index+=1
mpool.shutdown(wait=True)
if listName==0:
templist=sorted(nowtemplist, key=lambda s: s[0])
nowlinklist.extend(templist)
elif listName==1:
templist=sorted(daytemplist, key=lambda s: s[0])
daylinklist.extend(templist)
else :
templist=sorted(weektemplist, key=lambda s: s[0])
weeklinklist.extend(templist)
#获取新闻图片和描述
def getNewsImgAndDesc(url,listName,index):
global nowtemplist,daytemplist,weektemplist
try:
headers={"User-Agent":Agent.get_user_agent_pc()}
response=requests.get(url=url,headers=headers)
html=response.content
html_doc=str(html,response.apparent_encoding)
soup = BeautifulSoup(html_doc,'lxml')
descSpan=soup.find('span', attrs={'class': 'c-font-normal c-color-text'})
descText = descSpan.text
imgDiv = soup.find('div', attrs={'class': 'c-img c-img3 c-img-radius-large'})
imgSrc=None
if len(imgDiv.contents) > 0:
imgSrc = imgDiv.contents[1].attrs.get('src')
data = [index,url,imgSrc,descText]
if listName==0:
nowtemplist.append(data)
elif listName==1:
daytemplist.append(data)
else:
weektemplist.append(data)
except Exception as e:
logger.logger.error('result Error : '+ str(e))
data = [index,url,None,None]
if listName==0:
nowtemplist.append(data)
elif listName==1:
daytemplist.append(data)
else:
weektemplist.append(data)
#订阅新闻
def subNews():
global logger
t0 = time.time()
logger.logger.info("开始订阅实时新闻===》")
geturl("http://top.baidu.com/buzz?b=1&fr=topindex",0)
logger.logger.info("开始订阅今日新闻===》")
geturl("http://top.baidu.com/buzz?b=341&c=513&fr=topbuzz_b1",1)
logger.logger.info("开始订阅7日新闻===》")
geturl("http://top.baidu.com/buzz?b=42&c=513&fr=topbuzz_b341_c513",2)
#pool.submit(geturl,"http://top.baidu.com/buzz?b=1&fr=topindex",0)
#pool.submit(geturl,"http://top.baidu.com/buzz?b=341&c=513&fr=topbuzz_b1",1)
#pool.submit(geturl,"http://top.baidu.com/buzz?b=42&c=513&fr=topbuzz_b341_c513",2)
#pool.shutdown(wait=True)
dlist=[]
#print("=======实时热点=======")
mindex = 1
for i in range(len(nowlist)):
arr=nowhotlist[i].split(',')
news = DataEntity.DataEntity(mindex,
0,
nowlist[i],
nowlinklist[i][3],
nowlinklist[i][1],
arr[0],
arr[1],
nowlinklist[i][2])
dlist.append(news.__dict__)
mindex+=1
#print("=======今日热点=======")
for i in range(len(daylist)):
arr=dayhotlist[i].split(',')
news = DataEntity.DataEntity(mindex,
1,
daylist[i],
daylinklist[i][3],
daylinklist[i][1],
arr[0],
arr[1],
daylinklist[i][2])
dlist.append(news.__dict__)
mindex+=1
#print("=======七日热点=======")
for i in range(len(weeklist)):
arr=weekhotlist[i].split(',')
news = DataEntity.DataEntity(mindex,
2,
weeklist[i],
weeklinklist[i][3],
weeklinklist[i][1],
arr[0],
arr[1],
weeklinklist[i][2])
dlist.append(news.__dict__)
mindex+=1
print("list==>"+str(len(dlist)))
try:
res=json.dumps(dlist,ensure_ascii=False)
resStr= str(res)
print(resStr)
filename="news.json"
with open(filename, 'w',encoding='utf-8') as file_object:
file_object.write(resStr)
logger.logger.info("保存完成===》")
except Exception as e:
logger.logger.error('result Error : '+ str(e))
logger.logger.info("本次耗时计算===》")
logger.logger.info(time.time() - t0)