DNS日志访问排查自动化脚本
import logging
from scapy.all import DNS, DNSQR, DNSRR, IP, sniff, UDP
# 配置日志记录
logging.basicConfig(filename='dns_capture.log', level=logging.INFO, format='%(asctime)s %(message)s')
def process_packet(packet):
# 将此替换为您感兴趣的域名列表
target_domains = ["www.wangtory.com",]
# 检查是否为 DNS 响应包
if packet.haslayer(DNSRR):
# 获取响应的域名和数据
rrname = packet[DNSRR].rrname
rdata = packet[DNSRR].rdata
# 将字节类型的 rrname 和 rdata 转换为字符串
if isinstance(rrname, bytes):
rrname = rrname.decode('utf-8')
if isinstance(rdata, bytes):
rdata = rdata.decode('utf-8')
# 检查响应的域名是否在目标域名列表中
if any(domain in rrname for domain in target_domains):
# 记录日志并打印相关信息
log_message = f"[DNS Response] {rrname} -> {rdata}"
print(log_message)
logging.info(log_message)
# 检查是否为 DNS 查询包
elif packet.haslayer(DNSQR):
# 获取查询的域名和源 IP
qname = packet[DNSQR].qname
src_ip = packet[IP].src
# 将字节类型的 qname 转换为字符串
if isinstance(qname, bytes):
qname = qname.decode('utf-8')
# 检查查询的域名是否在目标域名列表中
if any(domain in qname for domain in target_domains):
# 记录日志并打印相关信息
log_message = f"[DNS Query] {src_ip} -> {qname}"
print(log_message)
logging.info(log_message)
def main():
# 替换为实际的OpenVPN接口名称,例如:tun0
interface = "tun0"
# 打印开始捕获网络流量的信息
print(f"开始捕获DNS流量(接口:{interface})...")
# 开始捕获网络流量
sniff(iface=interface, filter="udp and port 53", prn=process_packet, store=0)
# 如果这个脚本被直接运行,而不是作为模块导入,那么就执行 main 函数
if __name__ == "__main__":
main()