Kenapa di script pertama terasa lambat?

Biasanya karena kombinasi ini:

  1. Sequential
    • domain dicek satu per satu
    • kalau 1 domain makan 3–8 detik, 100 domain bisa lama sekali
  2. Coba banyak variasi URL
    • https://domain
    • https://www.domain
    • http://domain
    • http://www.domain
    • kalau semuanya dites penuh untuk semua domain, request jadi banyak
  3. DNS terlalu lengkap
    • A, AAAA, CNAME, MX, NS semuanya
    • ini bagus, tapi lebih lambat
  4. Response dibaca terlalu banyak
    • body halaman besar
    • JavaScript besar
    • bisa bikin lambat / berat

Solusi terbaik

Kalau target Anda:

  • cepat seperti HeadMasterSEO
  • tetap bisa klasifikasi domain
  • tetap aman kalau script berhenti
  • hasil langsung ditulis
  • input dari file
  • output dipisah

maka solusi terbaik adalah:

CLI + multi-thread + safe write

Artinya:

  • baca domain dari file
  • cek paralel, misalnya 20–40 worker
  • tiap domain selesai → langsung tulis ke file
  • kalau Ctrl+C, hasil yang sudah selesai tetap aman
  • bisa resume

Saya buatkan versi yang lebih cepat

Versi ini fokus ke:

  • lebih cepat
  • tetap aman
  • hasil langsung tulis
  • split hidup / warning / offline
  • deteksi suspended / expired / parked / default page / 503 / forbidden / redirect / ssl invalid
  • resume

Install dependency

pip install requests dnspython beautifulsoup4


Script cepat CLI

Simpan sebagai:

domain_status_fast.py

Code Python

import argparse
import csv
import os
import re
import signal
import sys
import tempfile
import threading
import time
import warnings
from collections import Counter
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from queue import Queue
from urllib.parse import urljoin, urlparse

import dns.resolver
import requests
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from urllib3.exceptions import InsecureRequestWarning
from urllib3.util.retry import Retry

warnings.simplefilter("ignore", InsecureRequestWarning)

# =========================
# GLOBALS
# =========================

STOP_EVENT = threading.Event()
THREAD_LOCAL = threading.local()

HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/124.0 Safari/537.36"
    ),
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,text/plain;q=0.8,*/*;q=0.7",
}

DEFAULT_CONNECT_TIMEOUT = 5
DEFAULT_READ_TIMEOUT = 7
DEFAULT_DNS_TIMEOUT = 2
DEFAULT_MAX_REDIRECTS = 8
DEFAULT_MAX_BYTES = 98304   # 96 KB
DEFAULT_RETRIES = 1
DEFAULT_WORKERS = 25

FOR_SALE_KEYWORDS = [
    "domain is for sale",
    "buy this domain",
    "this domain may be for sale",
    "purchase this domain",
    "afternic",
    "sedo",
    "dan.com",
    "undeveloped",
]

PARKED_KEYWORDS = [
    "domain parked",
    "parked free",
    "parkingcrew",
    "bodis",
    "cashparking",
    "sedo parking",
    "parked domain",
    "this domain is parked",
]

DEFAULT_HOSTING_KEYWORDS = [
    "apache2 ubuntu default page",
    "apache2 debian default page",
    "welcome to nginx",
    "nginx test page",
    "test page for the nginx",
    "default web site page",
    "iis windows server",
]

COMING_SOON_KEYWORDS = [
    "coming soon",
    "under construction",
    "launching soon",
    "website coming soon",
    "site is coming soon",
]

SUSPENDED_KEYWORDS = [
    "this account has been suspended",
    "account suspended",
    "website suspended",
    "site suspended",
    "hosting account has been suspended",
    "suspended due to non-payment",
    "please contact billing",
    "contact your hosting provider",
    "billing issue",
]

EXPIRED_KEYWORDS = [
    "this domain has expired",
    "domain expired",
    "expired domain",
    "renew this domain",
    "renewal required",
    "domain renewal",
    "renew now",
    "expiration notice",
    "registrant verification failed",
    "has expired and may be available",
]

FIELDNAMES = [
    "checked_at",
    "domain",
    "bucket",
    "page_type",
    "dns_ok",
    "dns_error",
    "status_code",
    "ssl_status",
    "content_type",
    "elapsed_ms",
    "best_start_url",
    "best_final_url",
    "title",
    "notes",
    "error",
    "redirect_chain",
    "A",
    "AAAA",
    "CNAME",
    "all_attempts",
]

# ANSI color
C_RESET = "\033[0m"
C_RED = "\033[91m"
C_GREEN = "\033[92m"
C_YELLOW = "\033[93m"
C_CYAN = "\033[96m"
C_DIM = "\033[2m"


# =========================
# UTIL
# =========================

def now_utc():
    return datetime.now(timezone.utc).isoformat()


def clean_text(s):
    if not s:
        return ""
    return re.sub(r"\s+", " ", s).strip()


def normalize_domain(raw):
    s = raw.strip()
    if not s or s.startswith("#"):
        return ""

    s = s.split("#", 1)[0].strip()
    if not s:
        return ""

    if "://" not in s:
        s = "http://" + s

    try:
        p = urlparse(s)
        host = p.netloc or p.path
        host = host.split("/")[0].split(":")[0].strip().lower().strip(".")
        if host.startswith("www."):
            host = host[4:]
        return host
    except Exception:
        return ""


def load_domains(input_file):
    domains = []
    seen = set()

    with open(input_file, "r", encoding="utf-8") as f:
        for line in f:
            d = normalize_domain(line)
            if d and d not in seen:
                seen.add(d)
                domains.append(d)

    return domains


def load_processed_domains(all_results_csv):
    processed = set()
    if not os.path.exists(all_results_csv) or os.path.getsize(all_results_csv) == 0:
        return processed

    try:
        with open(all_results_csv, "r", encoding="utf-8", newline="") as f:
            reader = csv.DictReader(f)
            for row in reader:
                d = (row.get("domain") or "").strip().lower()
                if d:
                    processed.add(d)
    except Exception:
        pass

    return processed


def host_of(url):
    try:
        return (urlparse(url).hostname or "").lower()
    except Exception:
        return ""


def is_html_like(content_type):
    ct = (content_type or "").lower()
    return any(x in ct for x in [
        "text/html",
        "application/xhtml+xml",
        "text/plain",
        "application/xml",
        "text/xml",
    ])


def extract_title(html):
    if not html:
        return ""
    try:
        soup = BeautifulSoup(html, "html.parser")
        if soup.title and soup.title.string:
            return clean_text(soup.title.string)
    except Exception:
        pass

    m = re.search(r"<title[^>]*>(.*?)</title>", html, re.I | re.S)
    if m:
        return clean_text(m.group(1))
    return ""


def colorize_bucket(bucket):
    if bucket == "LIVE_OK":
        return f"{C_GREEN}{bucket}{C_RESET}"
    if bucket == "WARNING":
        return f"{C_YELLOW}{bucket}{C_RESET}"
    return f"{C_RED}{bucket}{C_RESET}"


# =========================
# SAFE FILE WRITERS
# =========================

def sync_file(f):
    f.flush()
    os.fsync(f.fileno())


def atomic_write_text(path, text):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    fd, tmp = tempfile.mkstemp(prefix=".tmp_", dir=os.path.dirname(path))
    try:
        with os.fdopen(fd, "w", encoding="utf-8") as f:
            f.write(text)
            sync_file(f)
        os.replace(tmp, path)
    finally:
        try:
            if os.path.exists(tmp):
                os.remove(tmp)
        except Exception:
            pass


class SafeCsvWriter:
    def __init__(self, path, fieldnames):
        self.path = path
        os.makedirs(os.path.dirname(path), exist_ok=True)
        file_exists = os.path.exists(path) and os.path.getsize(path) > 0
        self.f = open(path, "a", newline="", encoding="utf-8", buffering=1)
        self.writer = csv.DictWriter(self.f, fieldnames=fieldnames)
        if not file_exists:
            self.writer.writeheader()
            sync_file(self.f)

    def writerow(self, row):
        self.writer.writerow(row)
        sync_file(self.f)

    def close(self):
        try:
            self.f.close()
        except Exception:
            pass


class SafeLineWriter:
    def __init__(self, path):
        self.path = path
        os.makedirs(os.path.dirname(path), exist_ok=True)
        self.f = open(path, "a", encoding="utf-8", buffering=1)

    def write_line(self, text):
        self.f.write(text.rstrip("\n") + "\n")
        sync_file(self.f)

    def close(self):
        try:
            self.f.close()
        except Exception:
            pass


# =========================
# SIGNAL HANDLER
# =========================

def signal_handler(signum, frame):
    if not STOP_EVENT.is_set():
        STOP_EVENT.set()
        print(
            f"\n{C_YELLOW}Signal diterima. Menghentikan submit job baru... "
            f"hasil yang sudah selesai tetap disimpan.{C_RESET}"
        )
    else:
        raise KeyboardInterrupt


# =========================
# SESSION PER THREAD
# =========================

def get_session(retries=1, pool_size=100):
    session = getattr(THREAD_LOCAL, "session", None)
    if session is None:
        session = requests.Session()
        retry_cfg = Retry(
            total=retries,
            connect=retries,
            read=0,
            redirect=0,
            status=0,
            backoff_factor=0.2,
            allowed_methods=frozenset(["GET"]),
            raise_on_status=False,
        )
        adapter = HTTPAdapter(
            max_retries=retry_cfg,
            pool_connections=pool_size,
            pool_maxsize=pool_size
        )
        session.mount("http://", adapter)
        session.mount("https://", adapter)
        THREAD_LOCAL.session = session
    return session


# =========================
# DNS
# =========================

def get_dns_info(domain, dns_timeout=2, full_dns=False):
    result = {
        "dns_ok": False,
        "A": [],
        "AAAA": [],
        "CNAME": [],
        "dns_error": "",
    }

    resolver = dns.resolver.Resolver()
    resolver.timeout = dns_timeout
    resolver.lifetime = dns_timeout

    record_types = ["A", "AAAA", "CNAME"]
    if full_dns:
        record_types = ["A", "AAAA", "CNAME"]

    for rtype in record_types:
        try:
            answers = resolver.resolve(domain, rtype)
            vals = []
            for r in answers:
                if hasattr(r, "target"):
                    vals.append(str(r.target).rstrip("."))
                else:
                    vals.append(str(r).rstrip("."))
            result[rtype] = vals
        except dns.resolver.NXDOMAIN:
            result["dns_error"] = "NXDOMAIN"
            return result
        except (dns.resolver.NoAnswer, dns.resolver.NoNameservers, dns.resolver.LifetimeTimeout):
            pass
        except Exception as e:
            if not result["dns_error"]:
                result["dns_error"] = type(e).__name__

    if any(result[k] for k in ["A", "AAAA", "CNAME"]):
        result["dns_ok"] = True
    elif not result["dns_error"]:
        result["dns_error"] = "NO_RECORDS"

    return result


# =========================
# HTTP / PROBE
# =========================

def read_limited_text(resp, max_bytes):
    chunks = []
    total = 0
    try:
        for chunk in resp.iter_content(chunk_size=8192, decode_unicode=False):
            if not chunk:
                continue
            remain = max_bytes - total
            if remain <= 0:
                break
            if len(chunk) > remain:
                chunk = chunk[:remain]
            chunks.append(chunk)
            total += len(chunk)
            if total >= max_bytes:
                break
    except Exception:
        pass

    raw = b"".join(chunks)
    enc = resp.encoding or "utf-8"
    try:
        return raw.decode(enc, errors="replace")
    except Exception:
        return raw.decode("utf-8", errors="replace")


def classify_page(status_code, title, body, content_type):
    blob = ((title or "") + "\n" + (body or "")[:7000]).lower()

    if any(k in blob for k in EXPIRED_KEYWORDS):
        return "EXPIRED / RENEWAL ISSUE"
    if any(k in blob for k in SUSPENDED_KEYWORDS):
        return "SUSPENDED"
    if any(k in blob for k in FOR_SALE_KEYWORDS):
        return "FOR SALE"
    if any(k in blob for k in PARKED_KEYWORDS):
        return "PARKED"
    if any(k in blob for k in DEFAULT_HOSTING_KEYWORDS):
        return "DEFAULT HOSTING PAGE"
    if any(k in blob for k in COMING_SOON_KEYWORDS):
        return "COMING SOON"

    if 200 <= status_code <= 299:
        if content_type and not is_html_like(content_type):
            return "LIVE NON-HTML"
        return "LIVE 200"

    if status_code in (301, 302, 303, 307, 308):
        return "REDIRECT"
    if status_code == 401:
        return "UNAUTHORIZED"
    if status_code == 403:
        return "FORBIDDEN"
    if status_code == 404:
        return "NOT FOUND"
    if status_code == 410:
        return "GONE"
    if status_code == 429:
        return "RATE LIMITED"
    if 500 <= status_code <= 599:
        return "SERVER ERROR"

    return f"HTTP {status_code}"


def score_result(r):
    if not r["ok"]:
        return 0

    pt = r["page_type"]
    sc = r["status_code"]

    if pt == "LIVE 200":
        return 100
    if pt == "LIVE NON-HTML":
        return 98
    if pt in [
        "EXPIRED / RENEWAL ISSUE",
        "SUSPENDED",
        "FOR SALE",
        "PARKED",
        "DEFAULT HOSTING PAGE",
        "COMING SOON",
    ]:
        return 95
    if sc == 403:
        return 85
    if sc == 401:
        return 84
    if 500 <= sc <= 599:
        return 82
    if sc in (404, 410):
        return 80
    if 300 <= sc < 400:
        return 75
    return 10


def probe_url(url, connect_timeout, read_timeout, max_redirects, max_bytes, retries, pool_size):
    session = get_session(retries=retries, pool_size=pool_size)
    current_url = url
    chain = []
    ssl_status = "N/A"

    for _ in range(max_redirects):
        resp = None
        try:
            resp = session.get(
                current_url,
                headers=HEADERS,
                timeout=(connect_timeout, read_timeout),
                allow_redirects=False,
                verify=True,
                stream=True,
            )
            if current_url.startswith("https://"):
                ssl_status = "VALID"

        except requests.exceptions.SSLError:
            ssl_status = "INVALID"
            try:
                resp = session.get(
                    current_url,
                    headers=HEADERS,
                    timeout=(connect_timeout, read_timeout),
                    allow_redirects=False,
                    verify=False,
                    stream=True,
                )
            except requests.exceptions.Timeout:
                return {
                    "ok": False, "start_url": url, "final_url": current_url, "status_code": "",
                    "page_type": "TIMEOUT", "title": "", "chain": " | ".join(chain),
                    "ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": "Timeout",
                }
            except requests.exceptions.ConnectionError:
                return {
                    "ok": False, "start_url": url, "final_url": current_url, "status_code": "",
                    "page_type": "CONNECTION FAILED", "title": "", "chain": " | ".join(chain),
                    "ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": "ConnectionError",
                }
            except Exception as e:
                return {
                    "ok": False, "start_url": url, "final_url": current_url, "status_code": "",
                    "page_type": "SSL ERROR", "title": "", "chain": " | ".join(chain),
                    "ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": type(e).__name__,
                }

        except requests.exceptions.Timeout:
            return {
                "ok": False, "start_url": url, "final_url": current_url, "status_code": "",
                "page_type": "TIMEOUT", "title": "", "chain": " | ".join(chain),
                "ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": "Timeout",
            }

        except requests.exceptions.ConnectionError:
            return {
                "ok": False, "start_url": url, "final_url": current_url, "status_code": "",
                "page_type": "CONNECTION FAILED", "title": "", "chain": " | ".join(chain),
                "ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": "ConnectionError",
            }

        except Exception as e:
            return {
                "ok": False, "start_url": url, "final_url": current_url, "status_code": "",
                "page_type": "REQUEST ERROR", "title": "", "chain": " | ".join(chain),
                "ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": type(e).__name__,
            }

        try:
            elapsed_ms = int(resp.elapsed.total_seconds() * 1000)
        except Exception:
            elapsed_ms = ""

        chain.append(f"{resp.status_code} {current_url}")

        if 300 <= resp.status_code < 400 and resp.headers.get("Location"):
            next_url = urljoin(current_url, resp.headers.get("Location"))
            try:
                resp.close()
            except Exception:
                pass
            current_url = next_url
            continue

        content_type = resp.headers.get("Content-Type", "")
        body = ""
        if resp.status_code not in (204, 304):
            body = read_limited_text(resp, max_bytes)

        title = extract_title(body)
        page_type = classify_page(resp.status_code, title, body, content_type)

        final_url = resp.url
        try:
            resp.close()
        except Exception:
            pass

        return {
            "ok": True,
            "start_url": url,
            "final_url": final_url,
            "status_code": resp.status_code,
            "page_type": page_type,
            "title": title,
            "chain": " | ".join(chain),
            "ssl_status": ssl_status,
            "content_type": content_type,
            "elapsed_ms": elapsed_ms,
            "error": "",
        }

    return {
        "ok": False,
        "start_url": url,
        "final_url": current_url,
        "status_code": "",
        "page_type": "TOO MANY REDIRECTS",
        "title": "",
        "chain": " | ".join(chain),
        "ssl_status": ssl_status,
        "content_type": "",
        "elapsed_ms": "",
        "error": "TooManyRedirects",
    }


def is_good_enough(result):
    if not result["ok"]:
        return False

    if result["page_type"] in [
        "LIVE 200", "LIVE NON-HTML",
        "PARKED", "FOR SALE", "DEFAULT HOSTING PAGE",
        "COMING SOON", "SUSPENDED", "EXPIRED / RENEWAL ISSUE"
    ]:
        return True

    if result["status_code"] in (401, 403, 404, 410, 500, 501, 502, 503, 504):
        return True

    return False


def best_probe(domain, connect_timeout, read_timeout, max_redirects, max_bytes, retries, pool_size):
    """
    Lebih cepat dari versi lama:
    - utamakan https://domain
    - lalu http://domain
    - hanya coba www jika hasil awal masih jelek
    """
    candidates_primary = [
        f"https://{domain}",
        f"http://{domain}",
    ]
    candidates_www = [
        f"https://www.{domain}",
        f"http://www.{domain}",
    ]

    results = []
    best = None

    for url in candidates_primary:
        r = probe_url(url, connect_timeout, read_timeout, max_redirects, max_bytes, retries, pool_size)
        results.append(r)
        if best is None or score_result(r) > score_result(best):
            best = r
        if is_good_enough(best):
            return results, best

    for url in candidates_www:
        r = probe_url(url, connect_timeout, read_timeout, max_redirects, max_bytes, retries, pool_size)
        results.append(r)
        if best is None or score_result(r) > score_result(best):
            best = r
        if is_good_enough(best):
            return results, best

    return results, best


# =========================
# CLASSIFICATION
# =========================

def classify_bucket(best):
    if best["page_type"] in ["LIVE 200", "LIVE NON-HTML"]:
        return "LIVE_OK"

    if best["status_code"] != "":
        return "WARNING"

    if best["page_type"] == "TOO MANY REDIRECTS" and best["chain"]:
        return "WARNING"

    return "OFFLINE"


def build_notes(domain, dns_info, best):
    notes = []

    if not dns_info["dns_ok"]:
        notes.append("DNS problem")

    if best["ssl_status"] == "INVALID":
        notes.append("SSL invalid")

    final_url = best["final_url"] or ""
    start_url = best["start_url"] or ""
    final_host = host_of(final_url)

    if start_url and final_url and start_url != final_url:
        notes.append("Redirected")

    if final_url.startswith("http://"):
        notes.append("HTTP only")

    if final_host and final_host not in {domain, f"www.{domain}"}:
        notes.append(f"Redirect to other host: {final_host}")

    pt = best["page_type"]
    if pt == "DEFAULT HOSTING PAGE":
        notes.append("Server default page")
    elif pt == "PARKED":
        notes.append("Parked domain")
    elif pt == "FOR SALE":
        notes.append("Domain for sale")
    elif pt == "COMING SOON":
        notes.append("Coming soon page")
    elif pt == "SUSPENDED":
        notes.append("Suspended; check billing/hosting")
    elif pt == "EXPIRED / RENEWAL ISSUE":
        notes.append("Expired or renewal issue")
    elif pt == "SERVER ERROR":
        notes.append("Website reachable but server error")
    elif pt == "NOT FOUND":
        notes.append("Host reachable but page not found")
    elif pt == "FORBIDDEN":
        notes.append("Host reachable but forbidden")
    elif pt == "CONNECTION FAILED":
        notes.append("Cannot connect to web server")
    elif pt == "TIMEOUT":
        notes.append("Request timeout")

    ct = (best["content_type"] or "").strip()
    if ct and not is_html_like(ct):
        notes.append(f"Non-HTML content: {ct}")

    return "; ".join(notes)


def summarize_domain(domain, args):
    dns_info = get_dns_info(domain, dns_timeout=args.dns_timeout, full_dns=False)

    attempts, best = best_probe(
        domain=domain,
        connect_timeout=args.connect_timeout,
        read_timeout=args.read_timeout,
        max_redirects=args.max_redirects,
        max_bytes=args.max_bytes,
        retries=args.retries,
        pool_size=max(args.workers, 20),
    )

    bucket = classify_bucket(best)
    notes = build_notes(domain, dns_info, best)

    row = {
        "checked_at": now_utc(),
        "domain": domain,
        "bucket": bucket,
        "page_type": best["page_type"],
        "dns_ok": dns_info["dns_ok"],
        "dns_error": dns_info["dns_error"],
        "status_code": best["status_code"],
        "ssl_status": best["ssl_status"],
        "content_type": best["content_type"],
        "elapsed_ms": best["elapsed_ms"],
        "best_start_url": best["start_url"],
        "best_final_url": best["final_url"],
        "title": best["title"],
        "notes": notes,
        "error": best["error"],
        "redirect_chain": best["chain"],
        "A": ", ".join(dns_info["A"]),
        "AAAA": ", ".join(dns_info["AAAA"]),
        "CNAME": ", ".join(dns_info["CNAME"]),
        "all_attempts": " || ".join(
            f'{r["start_url"]} => {r["page_type"]} ({r["status_code"]}) -> {r["final_url"]}'
            for r in attempts
        ),
    }
    return row


def fallback_error_row(domain, err_msg):
    return {
        "checked_at": now_utc(),
        "domain": domain,
        "bucket": "OFFLINE",
        "page_type": "SCRIPT ERROR",
        "dns_ok": "",
        "dns_error": "",
        "status_code": "",
        "ssl_status": "",
        "content_type": "",
        "elapsed_ms": "",
        "best_start_url": "",
        "best_final_url": "",
        "title": "",
        "notes": "Internal script error",
        "error": err_msg,
        "redirect_chain": "",
        "A": "",
        "AAAA": "",
        "CNAME": "",
        "all_attempts": "",
    }


# =========================
# SUMMARY
# =========================

def update_summary(path, total_input, skipped_resume, processed_now, counts_bucket, counts_type):
    lines = []
    lines.append("DOMAIN STATUS FAST CHECKER")
    lines.append("=" * 40)
    lines.append(f"generated_at   : {now_utc()}")
    lines.append(f"total_input    : {total_input}")
    lines.append(f"skipped_resume : {skipped_resume}")
    lines.append(f"processed_now  : {processed_now}")
    lines.append(f"remaining_est  : {max(total_input - skipped_resume - processed_now, 0)}")
    lines.append("")
    lines.append("BUCKET COUNTS")
    lines.append("-" * 40)
    for k in ["LIVE_OK", "WARNING", "OFFLINE"]:
        lines.append(f"{k:12}: {counts_bucket.get(k, 0)}")

    lines.append("")
    lines.append("PAGE TYPE COUNTS")
    lines.append("-" * 40)
    for k, v in counts_type.most_common():
        lines.append(f"{k:28}: {v}")

    atomic_write_text(path, "\n".join(lines) + "\n")


# =========================
# WRITER THREAD
# =========================

def writer_loop(result_queue, args, total_input, skipped_resume):
    out = args.output
    os.makedirs(out, exist_ok=True)

    all_csv = os.path.join(out, "all_results.csv")
    hidup_csv = os.path.join(out, "hidup_results.csv")
    live_ok_csv = os.path.join(out, "live_ok_results.csv")
    warning_csv = os.path.join(out, "warning_results.csv")
    offline_csv = os.path.join(out, "offline_results.csv")

    hidup_txt = os.path.join(out, "hidup_domains.txt")
    live_ok_txt = os.path.join(out, "live_ok_domains.txt")
    warning_txt = os.path.join(out, "warning_domains.txt")
    offline_txt = os.path.join(out, "offline_domains.txt")

    summary_txt = os.path.join(out, "summary.txt")
    progress_log = os.path.join(out, "progress.log")

    all_writer = SafeCsvWriter(all_csv, FIELDNAMES)
    hidup_writer = SafeCsvWriter(hidup_csv, FIELDNAMES)
    live_ok_writer = SafeCsvWriter(live_ok_csv, FIELDNAMES)
    warning_writer = SafeCsvWriter(warning_csv, FIELDNAMES)
    offline_writer = SafeCsvWriter(offline_csv, FIELDNAMES)

    hidup_list_writer = SafeLineWriter(hidup_txt)
    live_ok_list_writer = SafeLineWriter(live_ok_txt)
    warning_list_writer = SafeLineWriter(warning_txt)
    offline_list_writer = SafeLineWriter(offline_txt)
    log_writer = SafeLineWriter(progress_log)

    counts_bucket = Counter()
    counts_type = Counter()
    processed_now = 0

    update_summary(summary_txt, total_input, skipped_resume, processed_now, counts_bucket, counts_type)
    log_writer.write_line(f"RUN START {now_utc()} total_input={total_input} skipped_resume={skipped_resume}")

    while True:
        item = result_queue.get()
        if item is None:
            break

        row = item
        domain = row["domain"]

        all_writer.writerow(row)

        if row["bucket"] in ("LIVE_OK", "WARNING"):
            hidup_writer.writerow(row)
            hidup_list_writer.write_line(domain)

        if row["bucket"] == "LIVE_OK":
            live_ok_writer.writerow(row)
            live_ok_list_writer.write_line(domain)
        elif row["bucket"] == "WARNING":
            warning_writer.writerow(row)
            warning_list_writer.write_line(domain)
        else:
            offline_writer.writerow(row)
            offline_list_writer.write_line(domain)

        counts_bucket[row["bucket"]] += 1
        counts_type[row["page_type"]] += 1
        processed_now += 1

        update_summary(summary_txt, total_input, skipped_resume, processed_now, counts_bucket, counts_type)
        log_writer.write_line(
            f"DONE {now_utc()} {domain} bucket={row['bucket']} type={row['page_type']} code={row['status_code']}"
        )

        code_text = str(row["status_code"]) if row["status_code"] != "" else "-"
        print(
            f"[{processed_now}] "
            f"{domain:30} -> {colorize_bucket(row['bucket']):18} | "
            f"{row['page_type'][:28]:28} | "
            f"{code_text:4} | "
            f"{str(row['elapsed_ms'])[:6]:6} ms | "
            f"{row['best_final_url'][:45]}"
        )

    log_writer.write_line(f"RUN END {now_utc()} processed_now={processed_now}")

    all_writer.close()
    hidup_writer.close()
    live_ok_writer.close()
    warning_writer.close()
    offline_writer.close()

    hidup_list_writer.close()
    live_ok_list_writer.close()
    warning_list_writer.close()
    offline_list_writer.close()
    log_writer.close()


# =========================
# WORKER
# =========================

def worker(domain, args, result_queue):
    if STOP_EVENT.is_set():
        return

    try:
        row = summarize_domain(domain, args)
    except Exception as e:
        row = fallback_error_row(domain, f"{type(e).__name__}: {e}")

    result_queue.put(row)

    if args.delay > 0:
        time.sleep(args.delay)


# =========================
# FILE CLEANUP
# =========================

def remove_if_exists(path):
    try:
        if os.path.exists(path):
            os.remove(path)
    except Exception:
        pass


def cleanup_output(outdir):
    files = [
        "all_results.csv",
        "hidup_results.csv",
        "live_ok_results.csv",
        "warning_results.csv",
        "offline_results.csv",
        "hidup_domains.txt",
        "live_ok_domains.txt",
        "warning_domains.txt",
        "offline_domains.txt",
        "summary.txt",
        "progress.log",
    ]
    for f in files:
        remove_if_exists(os.path.join(outdir, f))


# =========================
# CLI
# =========================

def parse_args():
    parser = argparse.ArgumentParser(description="Fast CLI Domain Status Checker")
    parser.add_argument("-i", "--input", required=True, help="File domain input (.txt)")
    parser.add_argument("-o", "--output", default="results", help="Folder output")
    parser.add_argument("-w", "--workers", type=int, default=DEFAULT_WORKERS, help="Jumlah worker paralel")
    parser.add_argument("--connect-timeout", type=int, default=DEFAULT_CONNECT_TIMEOUT, help="Connect timeout")
    parser.add_argument("--read-timeout", type=int, default=DEFAULT_READ_TIMEOUT, help="Read timeout")
    parser.add_argument("--dns-timeout", type=int, default=DEFAULT_DNS_TIMEOUT, help="DNS timeout")
    parser.add_argument("--max-redirects", type=int, default=DEFAULT_MAX_REDIRECTS, help="Maks redirect")
    parser.add_argument("--max-bytes", type=int, default=DEFAULT_MAX_BYTES, help="Max body dibaca")
    parser.add_argument("--retries", type=int, default=DEFAULT_RETRIES, help="Retry koneksi ringan")
    parser.add_argument("--delay", type=float, default=0.0, help="Delay per domain")
    parser.add_argument("--fresh", action="store_true", help="Hapus hasil lama")
    parser.add_argument("--no-resume", action="store_true", help="Jangan skip hasil lama")
    return parser.parse_args()


def main():
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    args = parse_args()
    os.makedirs(args.output, exist_ok=True)

    if args.fresh:
        cleanup_output(args.output)

    all_csv = os.path.join(args.output, "all_results.csv")

    domains = load_domains(args.input)
    if not domains:
        print("Tidak ada domain valid di file input.")
        sys.exit(1)

    processed = set()
    if not args.no_resume:
        processed = load_processed_domains(all_csv)

    queue_domains = [d for d in domains if d not in processed]

    print(f"{C_CYAN}Total input    : {len(domains)}{C_RESET}")
    print(f"{C_CYAN}Sudah diproses : {len(processed)}{C_RESET}")
    print(f"{C_CYAN}Akan diproses  : {len(queue_domains)}{C_RESET}")
    print(f"{C_CYAN}Workers        : {args.workers}{C_RESET}")
    print(f"{C_CYAN}Output folder  : {args.output}{C_RESET}")
    print("-" * 110)

    result_queue = Queue()
    writer_thread = threading.Thread(
        target=writer_loop,
        args=(result_queue, args, len(domains), len(processed)),
        daemon=True
    )
    writer_thread.start()

    executor = ThreadPoolExecutor(max_workers=args.workers)

    try:
        futures = []
        for domain in queue_domains:
            if STOP_EVENT.is_set():
                break
            futures.append(executor.submit(worker, domain, args, result_queue))

        for f in futures:
            if STOP_EVENT.is_set():
                break
            try:
                f.result()
            except Exception:
                pass

    except KeyboardInterrupt:
        STOP_EVENT.set()
        print(f"\n{C_RED}Dihentikan paksa oleh user.{C_RESET}")

    finally:
        try:
            executor.shutdown(wait=False, cancel_futures=True)
        except TypeError:
            executor.shutdown(wait=False)

        # beri waktu singkat supaya worker yang sudah selesai sempat push queue
        time.sleep(0.5)
        result_queue.put(None)
        writer_thread.join(timeout=10)

        print("-" * 110)
        print(f"{C_GREEN}Selesai / berhenti aman.{C_RESET}")
        print(f"All CSV      : {os.path.join(args.output, 'all_results.csv')}")
        print(f"Hidup CSV    : {os.path.join(args.output, 'hidup_results.csv')}")
        print(f"Live OK CSV  : {os.path.join(args.output, 'live_ok_results.csv')}")
        print(f"Warning CSV  : {os.path.join(args.output, 'warning_results.csv')}")
        print(f"Offline CSV  : {os.path.join(args.output, 'offline_results.csv')}")
        print(f"Summary      : {os.path.join(args.output, 'summary.txt')}")


if __name__ == "__main__":
    main()



Cara pakai

1. Buat file input

Misalnya domains.txt

trazam.com
deudex.com
4bx-cd.com
j3kram.com
edzzup.com
usaiptv.tv

Txt

Copy


2. Jalankan

python3 domain_status_fast.py -i domains.txt -o hasil


3. Untuk 100 domain, saya sarankan

Kalau VPS/server cukup kuat:

python3 domain_status_fast.py -i domains.txt -o hasil -w 25

Kalau ingin lebih cepat:

python3 domain_status_fast.py -i domains.txt -o hasil -w 40

Kalau server kecil:

python3 domain_status_fast.py -i domains.txt -o hasil -w 15


4. Mulai ulang dari nol

python3 domain_status_fast.py -i domains.txt -o hasil --fresh


5. Resume otomatis

Kalau pernah berhenti di tengah:

python3 domain_status_fast.py -i domains.txt -o hasil

Dia akan baca all_results.csv, lalu skip domain yang sudah pernah dicek.


File output

CSV

  • all_results.csv → semua hasil
  • hidup_results.csv → semua domain yang masih reachable
  • live_ok_results.csv → website yang normal/live
  • warning_results.csv → reachable tapi bermasalah
  • offline_results.csv → tidak reachable / gagal

TXT

  • hidup_domains.txt
  • live_ok_domains.txt
  • warning_domains.txt
  • offline_domains.txt

Ringkasan

  • summary.txt
  • progress.log

Arti bucket hasil

LIVE_OK

Domain/website aktif normal:

  • LIVE 200
  • LIVE NON-HTML

WARNING

Domain masih hidup / reachable, tapi ada masalah:

  • DEFAULT HOSTING PAGE
  • PARKED
  • FOR SALE
  • COMING SOON
  • SUSPENDED
  • EXPIRED / RENEWAL ISSUE
  • 403
  • 404
  • 503
  • SSL invalid
  • redirect aneh

OFFLINE

Benar-benar gagal:

  • DNS tidak ada
  • connection failed
  • timeout
  • request error

Kenapa versi ini lebih cepat?

1. Multi-thread

Bukan 1 domain selesai dulu baru pindah,
tapi banyak domain dicek bersamaan.

Itu alasan utama kenapa lebih mirip kecepatan HeadMasterSEO.


2. Optimasi kandidat URL

Script ini tidak selalu memaksa cek semua:

  • https://domain
  • http://domain

Dulu duluan.

Kalau sudah dapat hasil yang “cukup jelas”, dia bisa stop lebih cepat
tanpa harus selalu lanjut ke versi www.


3. Body dibatasi

Hanya baca sebagian isi halaman,
cukup untuk ambil:

  • title
  • keyword parked
  • keyword suspended
  • keyword expired
  • default hosting page

Jadi lebih cepat dan lebih ringan.


4. DNS dibuat ringan

Default-nya DNS ringan:

  • A
  • AAAA
  • CNAME

Bukan scan DNS yang terlalu lengkap.


5. Hasil langsung ditulis

Setiap domain selesai:

  • langsung tulis file
  • flush
  • fsync

Jadi kalau proses berhenti, hasil tetap aman.


Rekomendasi setting untuk 100 domain

Aman dan cukup cepat

python3 domain_status_fast.py -i domains.txt -o hasil -w 20 --connect-timeout 5 --read-timeout 7 --dns-timeout 2

Lebih cepat

python3 domain_status_fast.py -i domains.txt -o hasil -w 35 --connect-timeout 4 --read-timeout 6 --dns-timeout 2

Kalau sering kena hang / host lambat

python3 domain_status_fast.py -i domains.txt -o hasil -w 15 --connect-timeout 4 --read-timeout 5 --dns-timeout 2

Penting: jangan terlalu besar worker

Kalau Anda pakai:

-w 100

belum tentu lebih cepat.

Malah bisa:

  • bikin VPS berat
  • koneksi padat
  • DNS resolver kepanasan
  • banyak timeout palsu

Untuk 100 domain, biasanya sweet spot:

  • 20 sampai 40 worker

Kalau Anda bandingkan dengan HeadMasterSEO

Tetap ada kemungkinan HeadMasterSEO masih lebih cepat, karena:

  • dia aplikasi native
  • optimasi network stack lebih matang
  • kadang tidak membaca body terlalu banyak
  • bisa jadi dia tidak mengecek DNS sedetail script Python

Tapi dengan script di atas, performanya harus jauh lebih cepat daripada versi sequential/non-CLI.


Saran saya untuk Anda

Coba langsung begini:

python3 domain_status_fast.py -i domains.txt -o hasil -w 25 --fresh

Kalau sudah keluar hasil, kirim 5–10 baris sample output atau isi CSV ke sini.

Nanti saya bantu lihat apakah:

  • klasifikasinya sudah pas
  • masih ada kategori yang perlu ditambah
  • perlu dibuat lebih mirip HeadMasterSEO
  • perlu ditambah mode ultra-fast

Kalau mau, langkah berikutnya saya bisa buatkan 2 mode:

1. --mode fast

Sangat cepat, klasifikasi dasar, cocok untuk list besar.

2. --mode full

Lebih detail, lebih akurat, lebih lambat sedikit.

Kalau Anda mau, saya bisa lanjutkan langsung buatkan versi dual-mode (fast/full) supaya:

  • untuk 1000 domain bisa pakai fast
  • untuk domain penting bisa pakai full.