# i2p-docker-proxy # Copyright (C) 2019 LoveIsGrief # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import argparse import logging import os import re import signal import socketserver import threading from datetime import datetime from ipaddress import ip_address, IPv4Address from pathlib import Path from time import sleep from dnslib import dns from dnslib.server import DNSServer, BaseResolver from faker import Faker SERIAL_NO = int((datetime.utcnow() - datetime(1970, 1, 1)).total_seconds()) handler = logging.StreamHandler() handler.setLevel(logging.INFO) handler.setFormatter(logging.Formatter('%(asctime)s: %(message)s', datefmt='%H:%M:%S')) logger = logging.getLogger(__name__) logger.addHandler(handler) logger.setLevel(logging.INFO) def validate_fqdn(domain_name): """ Taken from https://stackoverflow.com/questions/2532053/validate-a-hostname-string/33715765#33715765 :param domain_name: :type domain_name: basestring :rtype: bool """ if domain_name.endswith('.'): domain_name = domain_name[:-1] if len(domain_name) < 1 or len(domain_name) > 253: return False ldh_re = re.compile('^[a-z0-9]([a-z0-9-]{0,61}[a-z0-9])?$', re.IGNORECASE) return all(ldh_re.match(x) for x in domain_name.split('.')) class UnixSocketServer(socketserver.ThreadingMixIn, socketserver.UnixStreamServer): """ A simple, threaded server that follows the DNSServer signatures """ def __init__(self, socket_path, request_handler_class, bind_and_activate=True): super().__init__(socket_path, request_handler_class, bind_and_activate) self.thread = threading.Thread(target=self.serve_forever) self.thread.daemon = True def start_thread(self): self.thread.start() def stop(self): self.shutdown() def isAlive(self): return self.thread.isAlive() class UnixSocketHandler(socketserver.BaseRequestHandler): def handle(self): host = "" try: ip = ip_address(self.request.recv(1024).strip()) host = RandomResolver.CACHE_IP[ip] except: pass self.request.sendall(host) class RandomResolver(BaseResolver): """ Resolves a random, public IP for every unique DNS request. """ CACHE_IP = {} CACHE_FQDN = {} _FAKER = Faker(providers=["faker.providers.internet"]) def __init__(self, resolve_dir): self.resolve_dir = Path(resolve_dir) @classmethod def load_dir(cls, resolve_dir): # TODO: implement this raise NotImplementedError() @staticmethod def ip2path(address, resolve_dir): """ Constructs the path to get stored FQDN of the given IP address :param address: IPv4 address :type address: basestring :param resolve_dir: The root dir where the IPs and FQDNs will be stored :type resolve_dir: Path :rtype: Path | None """ try: if not isinstance(ip_address(address), IPv4Address): return except ValueError: return a, b, c, d = address.split(".") return resolve_dir / a / b / c / d @classmethod def reload_ip(cls, ip, resolve_dir): """ Retrieves the stored FQDN from the given IP :param ip: Should be a valid IPv4 address :type ip: basestring :param resolve_dir: :type resolve_dir: :return: :rtype: """ ip_path = cls.ip2path(ip, resolve_dir) if not (ip_path and ip_path.is_file()): return fqdn = ip_path.read_text().strip() if not validate_fqdn(fqdn): return # Make sure to remove old entries for _ip, _fqdn in cls.CACHE_IP.items(): if _fqdn == fqdn: del cls.CACHE_FQDN[_fqdn] break cls.CACHE_IP[ip] = fqdn cls.CACHE_FQDN[fqdn] = ip @classmethod def create_fake_entry(cls, fqdn): ip = cls._FAKER.ipv4_public(network=False, address_class=None) cls.CACHE_FQDN[fqdn] = ip cls.CACHE_IP[ip] = fqdn return ip @classmethod def get_ip(cls, fqdn): ip = cls.CACHE_FQDN.get(fqdn) if not ip: ip = cls.create_fake_entry(fqdn) return ip def resolve(self, request, handler): """ Resolves a random, public IP for each unique DNS request :type request: dnslib.dns.DNSRecord :type handler: dnslib.server.DNSHandler :rtype: dnslib.dns.DNSRecord """ reply = request.reply() fqdn = str(request.q.get_qname()) reply.add_answer(dns.RR( fqdn, rdata=dns.A(self.get_ip(fqdn)) )) return reply def handle_sig(signum, frame): logger.info('pid=%d, got signal: %s, stopping...', os.getpid(), signal.Signals(signum).name) exit(0) def main(args): """ :param args: :type args: argparse.Namespace """ signal.signal(signal.SIGTERM, handle_sig) port = args.port resolve_dir = Path(args.resolve_dir) resolver = RandomResolver(resolve_dir) udp_server = DNSServer(resolver, port=port) tcp_server = DNSServer(resolver, port=port, tcp=True) servers = [udp_server, tcp_server] if args.socket_path: socket_path = Path(args.socket_path) servers.append(UnixSocketServer(socket_path, UnixSocketHandler)) logger.info('starting DNS server on port %d', port) for server in servers: server.start_thread() try: while udp_server.isAlive(): sleep(1) except KeyboardInterrupt: pass for server in servers: server.stop() if __name__ == '__main__': parser = argparse.ArgumentParser( description="A DNS server that returns fake IPs for requests" ) parser.add_argument("-p", "--port", help="Which port to use to accept DNS requests", type=int, default=53) parser.add_argument("-r", "--resolve-dir", help="Where the resolves will stored", default="/etc/resolve/") parser.add_argument("-s", "--socket-path", help="A path to create a socket where" " reverse requests can be mad") main(parser.parse_args())