以下是一个Python脚本,用于定时查询历史告警,找到攻击结果为成功的告警,并根据告警结果查询威胁情报。如果是恶意IP,则调用防火墙进行封禁,并记录封禁动作的日志。
首先,确保已经安装了pymysql
和requests
库。如果没有,请使用以下命令安装:
pip install pymysql requests
接下来,创建一个名为block_malicious_ips.py
的脚本,并将以下代码复制到其中:
# 导入所需库
import pymysql # 用于与MySQL数据库进行交互
import requests # 用于发送HTTP请求
import json # 用于处理JSON数据
import subprocess # 用于执行系统命令
import time # 用于获取当前时间
# 数据库配置
db_config = {
'host': 'localhost', # 数据库主机名
'user': 'root', # 数据库用户名
'password': 'your_password', # 数据库密码
'db': 'your_database', # 数据库名称
'charset': 'utf8mb4', # 数据库字符集
}
# 微步威胁情报API配置
threat_intelligence_api_key = 'your_api_key' # 微步威胁情报API密钥
threat_intelligence_url = 'https://api.example.com/v1/ip/query' # 微步威胁情报API的URL
# 防火墙配置
firewall_cmd = 'iptables' # 防火墙命令,这里使用iptables作为示例
# 日志文件
log_file = 'block_malicious_ips.log' # 日志文件名
# 定义查询成功攻击的函数
def query_successful_attacks():
connection = pymysql.connect(**db_config) # 连接到MySQL数据库
try:
with connection.cursor() as cursor: # 创建一个数据库游标
sql = "SELECT * FROM `alerts` WHERE `attack_result` = 'success'" # 查询成功攻击的SQL语句
cursor.execute(sql) # 执行SQL查询
return cursor.fetchall() # 返回查询结果
finally:
connection.close() # 关闭数据库连接
# 定义查询威胁情报的函数
def query_threat_intelligence(ip):
headers = {
'Authorization': f'Bearer {threat_intelligence_api_key}', # 设置API密钥
}
response = requests.get(threat_intelligence_url, headers=headers, params={'ip': ip}) # 发送GET请求查询威胁情报
return response.json() # 返回JSON格式的响应数据
# 定义封禁IP的函数
def block_ip(ip):
cmd = f'{firewall_cmd} -A INPUT -s {ip} -j DROP' # 构造防火墙命令
subprocess.run(cmd, shell=True, check=True) # 执行防火墙命令
# 定义记录封禁IP日志的函数
def log_blocked_ip(ip):
with open(log_file, 'a') as f: # 以追加模式打开日志文件
f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} Blocked IP: {ip}\n") # 写入封禁IP的日志
# 主函数
def main():
successful_attacks = query_successful_attacks() # 查询成功攻击
for attack in successful_attacks: # 遍历成功攻击
ip = attack['source_ip'] # 获取攻击源IP
threat_info = query_threat_intelligence(ip) # 查询威胁情报
if threat_info.get('malicious'): # 判断是否为恶意IP
block_ip(ip) # 封禁恶意IP
log_blocked_ip(ip) # 记录封禁日志
# 当脚本作为主程序运行时,执行主函数
if __name__ == '__main__':
main()
在上述脚本中,需要替换以下几个变量:
db_config
:数据库配置,包括主机名、用户名、密码和数据库名称。threat_intelligence_api_key
:微步威胁情报API密钥。threat_intelligence_url
:微步威胁情报API的URL。firewall_cmd
:防火墙命令,这里使用iptables
作为示例。接下来,设置一个Linux计划任务,定期运行此脚本。首先,打开终端并运行以下命令:
crontab -e
在打开的编辑器中,添加以下行以每小时运行一次脚本:
0 * * * * /usr/bin/python3 /path/to/block_malicious_ips.py
保存并退出编辑器。现在,脚本将每小时运行一次,自动封禁恶意IP并记录封禁日志。
关于Linux计划任务(Cron)的详细解释:
Cron是Linux系统中用于定期执行任务的工具。它允许用户在特定的日期和时间自动执行命令或者脚本。
在这个例子中,我们使用crontab -e命令来编辑当前用户的Cron配置。crontab命令用于管理用户的Cron表,-e选项表示编辑模式。
编辑器中的每一行都代表一个计划任务。每行的格式如下:
* * * * * /path/to/command arg1 arg2
| | | | |
| | | | ----- Day of week (0 - 7) (Sunday = 0 or 7)
| | | ------- Month (1 - 12)
| | --------- Day of month (1 - 31)
| ----------- Hour (0 - 23)
------------- Minute (0 - 59)
在这个例子中,我们将添加以下行以每小时运行一次脚本:
0 * * * * /usr/bin/python3 /path/to/block_malicious_ips.py
这里的0 * * * *
表示每小时的第0分钟执行任务,也就是每小时运行一次。/usr/bin/python3
是Python 3解释器的路径,/path/to/block_malicious_ips.py
是脚本的路径。请确保将其替换为实际的路径。
保存并退出编辑器后,Cron将自动加载新的配置,并按照计划执行任务。这样,脚本将每小时运行一次,自动封禁恶意IP并记录封禁日志。
这是一个Python脚本,使用了schedule
库来实现定时任务,requests
库来发送HTTP请求。请确保已安装这两个库:
pip install schedule requests
import schedule
import time
import requests
# 定义查询历史告警的函数
def get_historical_alerts():
# 这里替换为实际的查询历史告警的API地址
url = "https://your-alert-api.example.com/alerts"
response = requests.get(url)
alerts = response.json()
return alerts
# 定义根据告警结果查询威胁情报的函数
def get_threat_intelligence(alert):
# 这里替换为实际的查询威胁情报的API地址
url = "https://your-threat-intelligence-api.example.com/intelligence"
response = requests.get(url, params={"ip": alert["source_ip"]})
threat_info = response.json()
return threat_info
# 定义调用防火墙进行封禁的函数
def block_ip(ip):
# 这里替换为实际的调用防火墙封禁IP的API地址
url = "https://your-firewall-api.example.com/block"
response = requests.post(url, json={"ip": ip})
return response.ok
# 定义定时任务
def check_alerts():
alerts = get_historical_alerts()
for alert in alerts:
if alert["attack_result"] == "success":
threat_info = get_threat_intelligence(alert)
if threat_info["malicious"]:
blocked = block_ip(alert["source_ip"])
if blocked:
print(f"IP {alert['source_ip']} 已被封禁")
else:
print(f"无法封禁 IP {alert['source_ip']}")
# 每小时执行一次定时任务
schedule.every(1).hours.do(check_alerts)
# 无限循环执行定时任务
while True:
schedule.run_pending()
time.sleep(1)
请将上述代码中的API地址替换为实际的查询历史告警、查询威胁情报和调用防火墙封禁IP的API地址。运行此脚本后,它将每小时执行一次定时任务,检查历史告警中攻击结果为成功的告警,并根据告警结果查询威胁情报。如果是恶意IP,则调用防火墙进行封禁。
这是一个使用 Python 编写的示例脚本,它定时查询历史告警,找到攻击结果为成功的告警,根据告警结果查询威胁情报,如果是恶意IP则调用防火墙进行封禁。
请注意,这个示例脚本仅用于说明目的,并没有实际的 API 调用。你需要根据你的实际环境和 API 进行修改。
import time
import requests
# 配置参数
INTERVAL = 60 # 查询间隔(秒)
ALARM_API_URL = "http://example.com/api/alarm" # 历史告警 API URL
THREAT_INTEL_API_URL = "http://example.com/api/threat-intel" # 威胁情报 API URL
FIREWALL_API_URL = "http://example.com/api/firewall" # 防火墙 API URL
def get_successful_alarms():
response = requests.get(ALARM_API_URL)
alarms = response.json()
successful_alarms = [alarm for alarm in alarms if alarm["attack_result"] == "success"]
return successful_alarms
def get_threat_intel(ip):
response = requests.get(THREAT_INTEL_API_URL, params={"ip": ip})
threat_intel = response.json()
return threat_intel
def block_ip(ip):
response = requests.post(FIREWALL_API_URL, json={"action": "block", "ip": ip})
return response.status_code == 200
def main():
while True:
print("查询历史告警...")
successful_alarms = get_successful_alarms()
for alarm in successful_alarms:
ip = alarm["source_ip"]
print(f"查询威胁情报: {ip}")
threat_intel = get_threat_intel(ip)
if threat_intel["malicious"]:
print(f"封禁恶意 IP: {ip}")
if block_ip(ip):
print(f"封禁成功: {ip}")
else:
print(f"封禁失败: {ip}")
time.sleep(INTERVAL)
if __name__ == "__main__":
main()
这个脚本首先定义了一些配置参数,如查询间隔、API URL 等。接下来,定义了三个函数:get_successful_alarms
用于查询历史告警并筛选出攻击成功的告警;get_threat_intel
用于查询指定 IP 的威胁情报;block_ip
用于调用防火墙 API 封禁指定 IP。
main
函数是脚本的主要逻辑,它会循环执行以下操作:查询历史告警,找到攻击成功的告警;对于每个成功的告警,查询其源 IP 的威胁情报;如果威胁情报显示该 IP 是恶意的,则调用防火墙 API 进行封禁。
为了运行此脚本,你需要安装 requests
库。你可以使用以下命令安装:
pip install requests
请注意,这个脚本仅作为示例,你需要根据你的实际环境和 API 进行修改。在实际使用中,你还需要考虑异常处理、日志记录等方面的问题。
以下是一个更详细的 Python 脚本示例,使用 pymysql 库查询数据库中的历史告警,并使用微步在线的威胁情报 API 查询威胁情报。该脚本包含详细的注释以解释每个步骤。
请注意,此示例脚本仅用于说明目的,你需要根据你的实际环境和数据库配置进行修改。
import time
import pymysql
import requests
# 配置参数
INTERVAL = 60 # 查询间隔(秒)
DB_CONFIG = { # 数据库配置
'host': 'localhost',
'user': 'username',
'password': 'password',
'db': 'database_name',
'charset': 'utf8mb4'
}
THREAT_INTEL_API_URL = "https://api.example.com/v4/ip" # 微步威胁情报 API URL
THREAT_INTEL_API_KEY = "your_api_key" # 微步 API 密钥
FIREWALL_API_URL = "http://example.com/api/firewall" # 防火墙 API URL
# 连接数据库
def connect_db():
connection = pymysql.connect(**DB_CONFIG)
return connection
# 查询成功攻击的告警
def get_successful_alarms(connection):
with connection.cursor() as cursor:
sql = "SELECT * FROM alarms WHERE attack_result = 'success'"
cursor.execute(sql)
successful_alarms = cursor.fetchall()
return successful_alarms
# 查询微步威胁情报
def get_threat_intel(ip):
headers = {"Authorization": f"ApiKey {THREAT_INTEL_API_KEY}"}
response = requests.get(f"{THREAT_INTEL_API_URL}/{ip}", headers=headers)
threat_intel = response.json()
return threat_intel
# 调用防火墙 API 进行封禁
def block_ip(ip):
response = requests.post(FIREWALL_API_URL, json={"action": "block", "ip": ip})
return response.status_code == 200
# 主函数
def main():
connection = connect_db()
while True:
print("查询成功攻击的告警...")
successful_alarms = get_successful_alarms(connection)
for alarm in successful_alarms:
ip = alarm["source_ip"]
print(f"查询微步威胁情报: {ip}")
threat_intel = get_threat_intel(ip)
if threat_intel["malicious"]:
print(f"封禁恶意 IP: {ip}")
if block_ip(ip):
print(f"封禁成功: {ip}")
else:
print(f"封禁失败: {ip}")
time.sleep(INTERVAL)
if __name__ == "__main__":
main()
在这个脚本中,我们首先定义了一些配置参数,如查询间隔、数据库配置、API URL 等。接下来,定义了四个函数:
connect_db
:连接到数据库并返回连接对象。get_successful_alarms
:查询数据库中成功攻击的告警。get_threat_intel
:查询指定 IP 的微步威胁情报。block_ip
:调用防火墙 API 封禁指定 IP。main
函数是脚本的主要逻辑,它会循环执行以下操作:连接到数据库,查询成功攻击的告警;对于每个成功的告警,查询其源 IP 的微步威胁情报;如果威胁情报显示该 IP 是恶意的,则调用防火墙 API 进行封禁。
为了运行此脚本,你需要安装 pymysql
和 requests
库。你可以使用以下命令安装:
pip install pymysql requests
请注意,这个脚本仅作为示例,你需要根据你的实际环境和数据库配置进行修改。在实际使用中,你还需要考虑异常处理、日志记录等方面的问题。