Port original pyminifakedns, circa 2006, to Python 3.

This commit completes porting the internals of the MiniFakeDNS server
class to Python 3. This primarily means converting the Python 2 `str`
types to Python 3's `bytes` objects.

In the process, I've also translated the variable names from their
original Spanish into English, and added explanatory comments for how
the DNS header parsing is accomplished to enhance the educational
potential of SET.

Another small change is the addition of a new core helper function,
`detect_public_ip()`, which makes a couple parts of the codebase a
little more DRY by reducing code duplication across the `set.py` and
`setcore.py` files. This change also makes it possible to parameterize
the IP address that MiniFakeDNS server responds to requests with.
This commit is contained in:
Meitar M 2020-03-14 01:53:16 -04:00
parent 3a2ba0a500
commit bee1a38d05
No known key found for this signature in database
GPG key ID: 07EFAA28AB94BC85
4 changed files with 130 additions and 34 deletions

View file

@ -127,7 +127,8 @@ if operating_system == "posix":
dns = core.check_config("DNS_SERVER=")
if dns.lower() == "on":
import src.core.minifakedns
src.core.minifakedns.start_dns_server()
from src.core.setcore import detect_public_ip
src.core.minifakedns.start_dns_server(detect_public_ip())
# remove old files
for root, dirs, files in os.walk(core.userconfigpath):

View file

@ -18,12 +18,15 @@ import threading
# from outside of this module, e.g., during SET startup and cleanup.
dns_server_thread = None
def start_dns_server():
def start_dns_server(reply_ip):
"""
Helper function, intended to be called from other modules.
Args:
reply_ip (string): IPv4 address in dotted quad notation to use in all answers.
"""
global dns_server_thread
dns_server_thread = MiniFakeDNS(kwargs={'port': 53, 'ip': '1.2.3.4'})
dns_server_thread = MiniFakeDNS(kwargs={'port': 53, 'ip': reply_ip})
dns_server_thread.start()
def stop_dns_server():
@ -40,31 +43,117 @@ class DNSQuery:
See original for reference, but note there have been changes:
https://code.activestate.com/recipes/491264-mini-fake-dns-server/
Among the changes are variables names that have been translated
to English from their original Spanish.
"""
def __init__(self, data):
"""
Args:
data (bytes): The binary data of the DNS packet from the wire.
"""
self.data = data
self.dominio = ''
tipo = (ord(data[2]) >> 3) & 15 # Opcode bits
if tipo == 0: # Standard query
ini = 12
lon = ord(data[ini])
while lon != 0:
self.dominio += data[ini + 1:ini + lon + 1] + '.'
ini += lon + 1
lon = ord(data[ini])
# The domain name the client is querying the DNS for.
self.domain = ''
def respuesta(self, ip):
packet = ''
if self.dominio:
packet += self.data[:2] + "\x81\x80"
packet += self.data[4:6] + self.data[4:6] + '\x00\x00\x00\x00' # Questions and Answers Counts
# Parse DNS packet headers.
txn_id = data[:2] # DNS transaction ID, two bytes.
flags = data[2:4] # DNS flags, also two bytes.
# To determine whether or not this DNS packet is a query that
# we should respond to, we need to examine the "QR" field and
# the "opcode" field. Together, these make up five bits, but
# they are the left-most bits (most-significant bits) in the
# first byte of the two-byte Flags field. An ASCII diagram:
#
# X XXXX ...
# ^ ^
# | \- The opcode bits are here.
# |
# The QR bit.
#
# To read them meaningfully, we first discard the three bits
# in the rightmost (least significant) position by performing
# a 3-place bitwise right shift, which in python is the `>>`
# operator. At that point, we have a byte value like this:
#
# 000 X XXXX
# ^ ^
# | \- The opcode bits are here.
# |
# The QR bit.
#
# Now that the most significant bits are all zero'ed out, we
# can test the values of the unknown bits to see if they are
# representing a standard query.
#
# In DNS, a standard query has the opcode field set to zero,
# so all the bits in the opcode field should be 0. Meanwhile,
# the QR field should also be a 0, representing a DNS query
# rather than a DNS reply. So what we are hoping to see is:
#
# 000 0 0000
#
# To test for this reliably, we do a bitwise AND with a value
# of decimal 15, which is 1111 in binary, exactly four bits:
#
# 00000000 (Remember, 0 AND 1 equals 0.)
# AND 00001111
# ------------
# 00000000 = decimal 0
#
# In one line of Python code, we get the following:
kind = (flags[0] >> 3) & 15 # Opcode is in bits 4, 5, 6, and 7 of first byte.
# QR bit is 8th bit, but it should be 0.
# And now, we test to see if the result
if 0 == kind: # was a standard query.
# The header of a DNS packet is exactly twelve bytes long,
# meaning that the very start of the first DNS question
# will always begin at the same offset.
offset = 12 # The first question begins at the 13th byte.
# The DNS protocol encodes domain names as a series of
# labels. Each label is prefixed by a single byte denoting
# that label's length.
length = data[offset]
while 0 != length:
self.domain += data[offset + 1 : offset + length + 1].decode() + '.'
offset += length + 1
length = data[offset]
def response(self, ip):
"""
Construct a DNS reply packet with a given IP address.
TODO: This responds incorrectly to EDNS queries that make use
of the OPT pseudo-record type. Specifically, the pointer
wrong because we do not check the length of the original
query we received. Instead, we should note the length of
the original packet until the end of the first question,
and truncate (i.e., drop, ignore) the remainder.
For now, what this actually means is that testing this
server using a recent version of `dig(1)` will fail
unless you use the `+noedns` query option. For example:
dig @127.0.0.1 example.com +noedns
Simpler or older DNS utilities such as `host(1)` are
probably going to work.
Args:
ip (string): IP address to respond with.
"""
packet = b''
if self.domain:
packet += self.data[:2] + b'\x81\x80'
packet += self.data[4:6] + self.data[4:6] + b'\x00\x00\x00\x00' # Questions and Answers Counts
packet += self.data[12:] # Original Domain Name Question
packet += '\xc0\x0c' # Pointer to domain name
packet += '\x00\x01\x00\x01\x00\x00\x00\x3c\x00\x04' # Response type, ttl and resource data length -> 4 bytes
packet += str.join('', [chr(int(x)) for x in ip.split('.')]) # 4bytes of IP
packet += b'\xc0\x0c' # Pointer to domain name
packet += b'\x00\x01\x00\x01\x00\x00\x00\x3c\x00\x04' # Response type, ttl and resource data length -> 4 bytes
packet += bytes([int(x) for x in ip.split('.')]) # 4 bytes of IP.
return packet
class MiniFakeDNS(threading.Thread):
@ -110,7 +199,7 @@ class MiniFakeDNS(threading.Thread):
try:
data, addr = udps.recvfrom(1024)
p = DNSQuery(data)
udps.sendto(p.respuesta(self.ip), addr)
udps.sendto(p.response(self.ip), addr)
except BlockingIOError:
pass
print("Exiting the DNS Server..")

View file

@ -345,10 +345,7 @@ this is how networking works.
""")
try:
rhost = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
rhost.connect(('google.com', 0))
rhost.settimeout(2)
revipaddr = rhost.getsockname()[0]
revipaddr = detect_public_ip()
ipaddr = raw_input(setprompt(["2"], "IP address for the POST back in Harvester/Tabnabbing [" + revipaddr + "]"))
if ipaddr == "": ipaddr=revipaddr
except Exception:

View file

@ -304,7 +304,19 @@ class create_menu:
return
def detect_public_ip():
"""
Helper function to auto-detect our public IP(v4) address.
"""
rhost = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
rhost.connect(('google.com', 0))
rhost.settimeout(2)
return rhost.getsockname()[0]
def validate_ip(address):
"""
Validates that a given string is an IPv4 dotted quad.
"""
try:
if socket.inet_aton(address):
if len(address.split('.')) == 4:
@ -429,10 +441,7 @@ def meta_database():
#
def grab_ipaddress():
try:
rhost = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
rhost.connect(('google.com', 0))
rhost.settimeout(2)
revipaddr = rhost.getsockname()[0]
revipaddr = detect_public_ip()
rhost = raw_input(setprompt("0", "IP address or URL (www.ex.com) for the payload listener (LHOST) [" + revipaddr + "]"))
if rhost == "": rhost = revipaddr