Dan untuk kebutuhan mencari domain yang benar-benar “mati karena tidak punya record DNS web”, script khusus memang bisa jauh lebih cepat daripada script full checker.

Karena kalau target Anda hanya mencari pola ini:

bucket    = OFFLINE<br>
page_type = CONNECTION FAILED<br>
dns_ok    = False<br>dns_error = NO_RECORDS

maka kita tidak perlu:

  • cek https://
  • cek http://
  • cek redirect
  • cek SSL
  • baca title
  • baca body halaman
  • cek parked/default hosting/suspended

Cukup fokus ke:

  • ada A?
  • ada AAAA?
  • ada CNAME?
  • kalau tidak ada semua → dns_error = NO_RECORDS
  • optional: ambil NS untuk info nameserver

Kalau tidak ada A/AAAA/CNAME, maka secara praktis web tidak punya alamat tujuan, jadi status bisa langsung diinferensikan sebagai:

  • bucket = OFFLINE
  • page_type = CONNECTION FAILED

Itu jauh lebih cepat.

Script khusus cari domain “NO_RECORDS / OFFLINE / CONNECTION FAILED”

Simpan sebagai:

find_dead_norecords.py

Script

import argparse
import csv
import os
import re
import signal
import sys
import tempfile
import threading
from collections import Counter
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from queue import Queue
from urllib.parse import urlparse

import dns.resolver

STOP_EVENT = threading.Event()

DEFAULT_DNS_TIMEOUT = 2
DEFAULT_WORKERS = 50

FIELDNAMES = [
    "checked_at",
    "domain",
    "match",
    "bucket",
    "page_type",
    "dns_ok",
    "dns_error",
    "A",
    "AAAA",
    "CNAME",
    "NS",
    "notes",
]

C_RESET = "\033[0m"
C_RED = "\033[91m"
C_GREEN = "\033[92m"
C_YELLOW = "\033[93m"
C_CYAN = "\033[96m"


def now_utc():
    return datetime.now(timezone.utc).isoformat()


def normalize_domain(raw):
    s = raw.strip()
    if not s or s.startswith("#"):
        return ""

    s = s.split("#", 1)[0].strip()
    if not s:
        return ""

    if "://" not in s:
        s = "http://" + s

    try:
        p = urlparse(s)
        host = p.netloc or p.path
        host = host.split("/")[0].split(":")[0].strip().lower().strip(".")
        if host.startswith("www."):
            host = host[4:]
        return host
    except Exception:
        return ""


def load_domains(input_file):
    domains = []
    seen = set()

    with open(input_file, "r", encoding="utf-8") as f:
        for line in f:
            d = normalize_domain(line)
            if d and d not in seen:
                seen.add(d)
                domains.append(d)
    return domains


def load_processed_domains(csv_path):
    processed = set()
    if not os.path.exists(csv_path) or os.path.getsize(csv_path) == 0:
        return processed

    try:
        with open(csv_path, "r", encoding="utf-8", newline="") as f:
            reader = csv.DictReader(f)
            for row in reader:
                d = (row.get("domain") or "").strip().lower()
                if d:
                    processed.add(d)
    except Exception:
        pass

    return processed


def remove_if_exists(path):
    try:
        if os.path.exists(path):
            os.remove(path)
    except Exception:
        pass


def atomic_write_text(path, text):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    fd, tmp = tempfile.mkstemp(prefix=".tmp_", dir=os.path.dirname(path))
    try:
        with os.fdopen(fd, "w", encoding="utf-8") as f:
            f.write(text)
            f.flush()
            os.fsync(f.fileno())
        os.replace(tmp, path)
    finally:
        try:
            if os.path.exists(tmp):
                os.remove(tmp)
        except Exception:
            pass


class SafeCsvWriter:
    def __init__(self, path, fieldnames):
        self.path = path
        os.makedirs(os.path.dirname(path), exist_ok=True)
        file_exists = os.path.exists(path) and os.path.getsize(path) > 0
        self.f = open(path, "a", newline="", encoding="utf-8", buffering=1)
        self.writer = csv.DictWriter(self.f, fieldnames=fieldnames)

        if not file_exists:
            self.writer.writeheader()
            self.f.flush()
            os.fsync(self.f.fileno())

    def writerow(self, row):
        self.writer.writerow(row)
        self.f.flush()
        os.fsync(self.f.fileno())

    def close(self):
        try:
            self.f.close()
        except Exception:
            pass


class SafeLineWriter:
    def __init__(self, path):
        self.f = open(path, "a", encoding="utf-8", buffering=1)

    def write_line(self, text):
        self.f.write(text.rstrip("\n") + "\n")
        self.f.flush()
        os.fsync(self.f.fileno())

    def close(self):
        try:
            self.f.close()
        except Exception:
            pass


def signal_handler(signum, frame):
    if not STOP_EVENT.is_set():
        STOP_EVENT.set()
        print(f"\n{C_YELLOW}Signal diterima. Stop submit job baru...{C_RESET}")
    else:
        raise KeyboardInterrupt


def get_dns_info(domain, dns_timeout=2):
    result = {
        "dns_ok": False,
        "dns_error": "",
        "A": [],
        "AAAA": [],
        "CNAME": [],
        "NS": [],
    }

    resolver = dns.resolver.Resolver()
    resolver.timeout = dns_timeout
    resolver.lifetime = dns_timeout

    # Fokus utama: A, AAAA, CNAME
    for rtype in ["A", "AAAA", "CNAME"]:
        try:
            answers = resolver.resolve(domain, rtype)
            vals = []
            for r in answers:
                if hasattr(r, "target"):
                    vals.append(str(r.target).rstrip("."))
                else:
                    vals.append(str(r).rstrip("."))
            result[rtype] = vals
        except dns.resolver.NXDOMAIN:
            result["dns_error"] = "NXDOMAIN"
            # kalau NXDOMAIN, biasanya NS tidak relevan
            return result
        except (dns.resolver.NoAnswer, dns.resolver.NoNameservers, dns.resolver.LifetimeTimeout):
            pass
        except Exception as e:
            if not result["dns_error"]:
                result["dns_error"] = type(e).__name__

    if any(result[k] for k in ["A", "AAAA", "CNAME"]):
        result["dns_ok"] = True
    elif not result["dns_error"]:
        result["dns_error"] = "NO_RECORDS"

    # Optional info NS: tetap dicoba agar tahu nameserver kalau ada
    try:
        answers = resolver.resolve(domain, "NS")
        result["NS"] = [str(x).rstrip(".") for x in answers]
    except Exception:
        pass

    return result


def classify_row(domain, dns_info):
    # Exact target:
    # bucket = OFFLINE
    # page_type = CONNECTION FAILED
    # dns_ok = False
    # dns_error = NO_RECORDS

    if dns_info["dns_ok"] is False and dns_info["dns_error"] == "NO_RECORDS":
        notes = []
        if dns_info["NS"]:
            notes.append("NS ada, tapi tidak ada A/AAAA/CNAME")
        else:
            notes.append("Tidak ada A/AAAA/CNAME")
        return {
            "checked_at": now_utc(),
            "domain": domain,
            "match": "YES",
            "bucket": "OFFLINE",
            "page_type": "CONNECTION FAILED",
            "dns_ok": False,
            "dns_error": "NO_RECORDS",
            "A": ", ".join(dns_info["A"]),
            "AAAA": ", ".join(dns_info["AAAA"]),
            "CNAME": ", ".join(dns_info["CNAME"]),
            "NS": ", ".join(dns_info["NS"]),
            "notes": "; ".join(notes),
        }

    # selain itu = tidak match exact filter
    if dns_info["dns_error"] == "NXDOMAIN":
        bucket = "OFFLINE"
        page_type = "NXDOMAIN"
        notes = "Domain tidak ada / tidak terdaftar / tidak resolve"
    elif dns_info["dns_ok"]:
        bucket = "HAS_DNS"
        page_type = "HAS_DNS_RECORD"
        notes = "Ada record DNS web"
    else:
        bucket = "OTHER"
        page_type = "DNS_NOT_MATCH"
        notes = "Bukan NO_RECORDS exact match"

    return {
        "checked_at": now_utc(),
        "domain": domain,
        "match": "NO",
        "bucket": bucket,
        "page_type": page_type,
        "dns_ok": dns_info["dns_ok"],
        "dns_error": dns_info["dns_error"],
        "A": ", ".join(dns_info["A"]),
        "AAAA": ", ".join(dns_info["AAAA"]),
        "CNAME": ", ".join(dns_info["CNAME"]),
        "NS": ", ".join(dns_info["NS"]),
        "notes": notes,
    }


def check_domain(domain, dns_timeout):
    dns_info = get_dns_info(domain, dns_timeout=dns_timeout)
    return classify_row(domain, dns_info)


def writer_loop(result_queue, output_dir, total_input, skipped_resume):
    os.makedirs(output_dir, exist_ok=True)

    all_csv = os.path.join(output_dir, "all_checked.csv")
    matched_csv = os.path.join(output_dir, "matched_dead_norecords.csv")
    others_csv = os.path.join(output_dir, "others.csv")
    matched_txt = os.path.join(output_dir, "matched_dead_norecords.txt")
    progress_log = os.path.join(output_dir, "progress.log")
    summary_txt = os.path.join(output_dir, "summary.txt")

    all_writer = SafeCsvWriter(all_csv, FIELDNAMES)
    matched_writer = SafeCsvWriter(matched_csv, FIELDNAMES)
    others_writer = SafeCsvWriter(others_csv, FIELDNAMES)
    matched_txt_writer = SafeLineWriter(matched_txt)
    log_writer = SafeLineWriter(progress_log)

    counts_match = Counter()
    counts_type = Counter()
    processed_now = 0

    def write_summary():
        lines = []
        lines.append("FIND DEAD NO_RECORDS SUMMARY")
        lines.append("=" * 40)
        lines.append(f"generated_at   : {now_utc()}")
        lines.append(f"total_input    : {total_input}")
        lines.append(f"skipped_resume : {skipped_resume}")
        lines.append(f"processed_now  : {processed_now}")
        lines.append(f"remaining_est  : {max(total_input - skipped_resume - processed_now, 0)}")
        lines.append("")
        lines.append("MATCH COUNTS")
        lines.append("-" * 40)
        lines.append(f"MATCH YES : {counts_match.get('YES', 0)}")
        lines.append(f"MATCH NO  : {counts_match.get('NO', 0)}")
        lines.append("")
        lines.append("PAGE TYPE COUNTS")
        lines.append("-" * 40)
        for k, v in counts_type.most_common():
            lines.append(f"{k:25}: {v}")
        atomic_write_text(summary_txt, "\n".join(lines) + "\n")

    log_writer.write_line(f"RUN START {now_utc()} total_input={total_input} skipped_resume={skipped_resume}")
    write_summary()

    while True:
        item = result_queue.get()
        if item is None:
            break

        row = item
        domain = row["domain"]

        all_writer.writerow(row)

        if row["match"] == "YES":
            matched_writer.writerow(row)
            matched_txt_writer.write_line(domain)
        else:
            others_writer.writerow(row)

        counts_match[row["match"]] += 1
        counts_type[row["page_type"]] += 1
        processed_now += 1

        write_summary()
        log_writer.write_line(
            f"DONE {now_utc()} {domain} match={row['match']} dns_error={row['dns_error']} ns={row['NS']}"
        )

        color = C_GREEN if row["match"] == "YES" else C_RED
        print(
            f"[{processed_now}] "
            f"{domain:30} -> {color}{row['match']}{C_RESET} | "
            f"{row['page_type'][:22]:22} | "
            f"dns_ok={str(row['dns_ok']):5} | "
            f"dns_error={row['dns_error'][:15]:15} | "
            f"NS={row['NS'][:35]}"
        )

    write_summary()
    log_writer.write_line(f"RUN END {now_utc()} processed_now={processed_now}")

    all_writer.close()
    matched_writer.close()
    others_writer.close()
    matched_txt_writer.close()
    log_writer.close()


def worker(domain, dns_timeout, result_queue):
    if STOP_EVENT.is_set():
        return
    try:
        row = check_domain(domain, dns_timeout)
    except Exception as e:
        row = {
            "checked_at": now_utc(),
            "domain": domain,
            "match": "NO",
            "bucket": "OTHER",
            "page_type": "SCRIPT ERROR",
            "dns_ok": "",
            "dns_error": type(e).__name__,
            "A": "",
            "AAAA": "",
            "CNAME": "",
            "NS": "",
            "notes": str(e),
        }
    result_queue.put(row)


def parse_args():
    parser = argparse.ArgumentParser(
        description="Cari domain exact match: OFFLINE + CONNECTION FAILED + dns_ok=False + dns_error=NO_RECORDS"
    )
    parser.add_argument("-i", "--input", required=True, help="File domain input")
    parser.add_argument("-o", "--output", default="dead_results", help="Folder output")
    parser.add_argument("-w", "--workers", type=int, default=DEFAULT_WORKERS, help="Jumlah worker")
    parser.add_argument("--dns-timeout", type=int, default=DEFAULT_DNS_TIMEOUT, help="DNS timeout")
    parser.add_argument("--fresh", action="store_true", help="Hapus hasil lama")
    parser.add_argument("--no-resume", action="store_true", help="Jangan resume")
    return parser.parse_args()


def main():
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    args = parse_args()
    os.makedirs(args.output, exist_ok=True)

    all_csv = os.path.join(args.output, "all_checked.csv")

    if args.fresh:
        for f in [
            "all_checked.csv",
            "matched_dead_norecords.csv",
            "others.csv",
            "matched_dead_norecords.txt",
            "progress.log",
            "summary.txt",
        ]:
            remove_if_exists(os.path.join(args.output, f))

    domains = load_domains(args.input)
    if not domains:
        print("Tidak ada domain valid di file input.")
        sys.exit(1)

    processed = set()
    if not args.no_resume:
        processed = load_processed_domains(all_csv)

    queue_domains = [d for d in domains if d not in processed]

    print(f"{C_CYAN}Total input    : {len(domains)}{C_RESET}")
    print(f"{C_CYAN}Sudah diproses : {len(processed)}{C_RESET}")
    print(f"{C_CYAN}Akan diproses  : {len(queue_domains)}{C_RESET}")
    print(f"{C_CYAN}Workers        : {args.workers}{C_RESET}")
    print(f"{C_CYAN}DNS timeout    : {args.dns_timeout}{C_RESET}")
    print(f"{C_CYAN}Output folder  : {args.output}{C_RESET}")
    print("-" * 110)

    result_queue = Queue()
    writer_thread = threading.Thread(
        target=writer_loop,
        args=(result_queue, args.output, len(domains), len(processed)),
        daemon=True
    )
    writer_thread.start()

    executor = ThreadPoolExecutor(max_workers=args.workers)

    try:
        futures = []
        for domain in queue_domains:
            if STOP_EVENT.is_set():
                break
            futures.append(executor.submit(worker, domain, args.dns_timeout, result_queue))

        for f in futures:
            if STOP_EVENT.is_set():
                break
            try:
                f.result()
            except Exception:
                pass

    except KeyboardInterrupt:
        STOP_EVENT.set()
        print(f"\n{C_RED}Dihentikan user.{C_RESET}")

    finally:
        try:
            executor.shutdown(wait=False, cancel_futures=True)
        except TypeError:
            executor.shutdown(wait=False)

        result_queue.put(None)
        writer_thread.join(timeout=10)

        print("-" * 110)
        print(f"{C_GREEN}Selesai / berhenti aman.{C_RESET}")
        print(f"All checked : {os.path.join(args.output, 'all_checked.csv')}")
        print(f"Matched CSV : {os.path.join(args.output, 'matched_dead_norecords.csv')}")
        print(f"Matched TXT : {os.path.join(args.output, 'matched_dead_norecords.txt')}")
        print(f"Others CSV  : {os.path.join(args.output, 'others.csv')}")
        print(f"Summary     : {os.path.join(args.output, 'summary.txt')}")


if __name__ == "__main__":
    main()

Cara pakai

1. File input

Misalnya domains.txt

trazam.com
deudex.com
4bx-cd.com
j3kram.com
edzzup.com
usaiptv.tv

2. Jalankan

python3 find_dead_norecords.py -i domains.txt -o deadscan

3. Lebih cepat lagi

Untuk 100–1000 domain, bisa naikkan worker:

python3 find_dead_norecords.py -i domains.txt -o deadscan -w 80 --dns-timeout 2

Kalau resolver/VPS kuat, bisa coba:

python3 find_dead_norecords.py -i domains.txt -o deadscan -w 100 --dns-timeout 1

Kenapa ini lebih cepat dari full checker?

Karena script ini hanya DNS-focused:

  • tidak request halaman web
  • tidak cek SSL
  • tidak baca body HTML
  • tidak follow redirect
  • tidak cek 4 variasi URL

Jadi untuk target khusus NO_RECORDS, ini memang jauh lebih cepat.


File output

matched_dead_norecords.csv

Hanya domain yang match exact target Anda:

  • bucket = OFFLINE
  • page_type = CONNECTION FAILED
  • dns_ok = False
  • dns_error = NO_RECORDS

matched_dead_norecords.txt

List domain saja, satu baris satu domain.

others.csv

Yang tidak match exact filter.

all_checked.csv

Semua hasil.

summary.txt

Ringkasan.

progress.log

Log proses.


Tentang kolom NS

Ya, saya tambahkan kolom:

NS

Jadi kalau sebuah domain:

  • tidak punya A/AAAA/CNAME
  • tapi masih punya NS

Anda tetap bisa lihat nameserver-nya.

Contoh kasus:

  • domain belum diarahkan ke web server
  • tapi nameserver registrar/hosting masih aktif

Jadi hasil bisa seperti:

  • dns_error = NO_RECORDS
  • NS = ns1.something.com, ns2.something.com

Itu sering berguna untuk analisa.


Penting: NO_RECORDS ≠ NXDOMAIN

Script ini membedakan:

NO_RECORDS

Domain masih ada, tapi tidak punya record web yang kita cari.

NXDOMAIN

Domain tidak ada / tidak resolve sama sekali.

Kalau Anda memang hanya mau exact match NO_RECORDS, script ini sudah memisahkan itu.


Kalau Anda ingin paling cepat lagi

Saya kasih tips praktis:

Untuk target exact NO_RECORDS

Gunakan:

  • worker tinggi
  • dns timeout kecil

Contoh:

python3 find_dead_norecords.py -i domains.txt -o deadscan -w 100 --dns-timeout 1

Tapi kalau resolver sering lambat, pakai:

python3 find_dead_norecords.py -i domains.txt -o deadscan -w 50 --dns-timeout 2

Biasanya lebih stabil.


Kalau Anda sudah punya all_results.csv

Kalau sebenarnya Anda sudah pernah full scan, lalu cuma mau ambil yang exact match, paling cepat justru filter CSV saja, tanpa scan ulang.

Kalau mau, saya juga bisa buatkan script mini filter seperti:

  • input: all_results.csv
  • output: matched_dead_norecords.csv
  • sangat cepat
  • tanpa network sama sekali

Saran saya

Karena kebutuhan Anda pribadi, idealnya Anda punya 2 script:

1. domain_status_fast.py

Untuk scan umum.

2. find_dead_norecords.py

Untuk berburu domain yang benar-benar “mati/no records”.


Kalau Anda mau, saya bisa lanjut buatkan versi filter CSV juga, jadi nanti alurnya bisa begini:

  • scan besar pakai domain_status_fast.py
  • lalu sortir exact status pakai filter lokal super cepat

Kalau mau, saya lanjut buatkan yang filter CSV itu juga.