Script CLI khusus metadata domain

Simpan sebagai:

nano domain_meta_check.py
import argparse
import csv
import os
import re
import signal
import socket
import sys
import tempfile
import threading
from collections import Counter
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from queue import Queue
from urllib.parse import urlparse

import requests

try:
    import whois
except ImportError:
    whois = None

STOP_EVENT = threading.Event()
THREAD_LOCAL = threading.local()

FIELDNAMES = [
    "checked_at",
    "domain",
    "source_used",
    "registrar",
    "registrant_org",
    "company_guess",
    "created_at",
    "expires_at",
    "age_days",
    "days_to_expire",
    "status",
    "nameservers",
    "ns_count",
    "rdap_error",
    "whois_error",
    "notes",
]

C_RESET = "\033[0m"
C_RED = "\033[91m"
C_GREEN = "\033[92m"
C_YELLOW = "\033[93m"
C_CYAN = "\033[96m"


def now_utc():
    return datetime.now(timezone.utc).isoformat()


def normalize_domain(raw):
    s = raw.strip()
    if not s or s.startswith("#"):
        return ""

    s = s.split("#", 1)[0].strip()
    if not s:
        return ""

    if "://" not in s:
        s = "http://" + s

    try:
        p = urlparse(s)
        host = p.netloc or p.path
        host = host.split("/")[0].split(":")[0].strip().lower().strip(".")
        if host.startswith("www."):
            host = host[4:]
        return host
    except Exception:
        return ""


def load_domains(input_file):
    domains = []
    seen = set()
    with open(input_file, "r", encoding="utf-8") as f:
        for line in f:
            d = normalize_domain(line)
            if d and d not in seen:
                seen.add(d)
                domains.append(d)
    return domains


def load_processed_domains(csv_path):
    processed = set()
    if not os.path.exists(csv_path) or os.path.getsize(csv_path) == 0:
        return processed

    try:
        with open(csv_path, "r", encoding="utf-8", newline="") as f:
            reader = csv.DictReader(f)
            for row in reader:
                d = (row.get("domain") or "").strip().lower()
                if d:
                    processed.add(d)
    except Exception:
        pass

    return processed


def remove_if_exists(path):
    try:
        if os.path.exists(path):
            os.remove(path)
    except Exception:
        pass


def sync_file(f):
    f.flush()
    os.fsync(f.fileno())


def atomic_write_text(path, text):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    fd, tmp = tempfile.mkstemp(prefix=".tmp_", dir=os.path.dirname(path))
    try:
        with os.fdopen(fd, "w", encoding="utf-8") as f:
            f.write(text)
            sync_file(f)
        os.replace(tmp, path)
    finally:
        try:
            if os.path.exists(tmp):
                os.remove(tmp)
        except Exception:
            pass


class SafeCsvWriter:
    def __init__(self, path, fieldnames):
        os.makedirs(os.path.dirname(path), exist_ok=True)
        self.f = open(path, "a", newline="", encoding="utf-8", buffering=1)
        self.writer = csv.DictWriter(self.f, fieldnames=fieldnames)
        if not (os.path.exists(path) and os.path.getsize(path) > 0):
            self.writer.writeheader()
            sync_file(self.f)

    def writerow(self, row):
        self.writer.writerow(row)
        sync_file(self.f)

    def close(self):
        try:
            self.f.close()
        except Exception:
            pass


class SafeLineWriter:
    def __init__(self, path):
        os.makedirs(os.path.dirname(path), exist_ok=True)
        self.f = open(path, "a", encoding="utf-8", buffering=1)

    def write_line(self, text):
        self.f.write(text.rstrip("\n") + "\n")
        sync_file(self.f)

    def close(self):
        try:
            self.f.close()
        except Exception:
            pass


def signal_handler(signum, frame):
    if not STOP_EVENT.is_set():
        STOP_EVENT.set()
        print(f"\n{C_YELLOW}Signal diterima. Stop submit job baru...{C_RESET}")
    else:
        raise KeyboardInterrupt


def get_session():
    session = getattr(THREAD_LOCAL, "session", None)
    if session is None:
        session = requests.Session()
        THREAD_LOCAL.session = session
    return session


def uniq(items):
    out = []
    seen = set()
    for x in items:
        x = str(x).strip()
        if not x:
            continue
        xl = x.lower()
        if xl not in seen:
            seen.add(xl)
            out.append(x)
    return out


def parse_dt(value):
    if not value:
        return None

    if isinstance(value, list):
        for v in value:
            dt = parse_dt(v)
            if dt:
                return dt
        return None

    if isinstance(value, datetime):
        if value.tzinfo is None:
            return value.replace(tzinfo=timezone.utc)
        return value

    if not isinstance(value, str):
        return None

    s = value.strip()
    if not s:
        return None

    s = s.replace("Z", "+00:00")

    patterns = [
        None,
        "%Y-%m-%d",
        "%Y-%m-%d %H:%M:%S",
        "%Y.%m.%d %H:%M:%S",
        "%d-%b-%Y",
        "%Y/%m/%d",
    ]

    for p in patterns:
        try:
            if p is None:
                dt = datetime.fromisoformat(s)
            else:
                dt = datetime.strptime(s, p)
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=timezone.utc)
            return dt
        except Exception:
            pass
    return None


def dt_to_str(dt):
    if not dt:
        return ""
    return dt.isoformat()


def days_between(a, b):
    if not a or not b:
        return ""
    return (b.date() - a.date()).days


def pick_event_date(events, wanted_actions):
    if not isinstance(events, list):
        return None

    for action in wanted_actions:
        for e in events:
            if str(e.get("eventAction", "")).lower() == action.lower():
                dt = parse_dt(e.get("eventDate"))
                if dt:
                    return dt
    return None


def parse_vcard_value(entity, wanted_fields):
    try:
        vcard = entity.get("vcardArray")
        if not vcard or len(vcard) < 2:
            return ""
        rows = vcard[1]
        for row in rows:
            if len(row) >= 4 and str(row[0]).lower() in wanted_fields:
                val = row[3]
                if isinstance(val, list):
                    val = " ".join(str(x) for x in val if x)
                return str(val).strip()
    except Exception:
        pass
    return ""


def extract_rdap_entities(data):
    registrar = ""
    registrant_org = ""

    entities = data.get("entities", [])
    for ent in entities:
        roles = [str(x).lower() for x in ent.get("roles", [])]

        if not registrar and "registrar" in roles:
            registrar = (
                parse_vcard_value(ent, {"fn", "org"})
                or registrar
            )

        if not registrant_org and "registrant" in roles:
            registrant_org = (
                parse_vcard_value(ent, {"org", "fn"})
                or registrant_org
            )

    return registrar, registrant_org


def rdap_lookup(domain, timeout=10):
    result = {
        "source": "",
        "registrar": "",
        "registrant_org": "",
        "created_at": None,
        "expires_at": None,
        "status": [],
        "nameservers": [],
        "error": "",
    }

    url = f"https://rdap.org/domain/{domain}"
    session = get_session()

    try:
        r = session.get(url, timeout=timeout, headers={"User-Agent": "Mozilla/5.0"})
        if r.status_code != 200:
            result["error"] = f"HTTP {r.status_code}"
            return result

        data = r.json()
        result["source"] = "RDAP"

        result["status"] = uniq(data.get("status", []))

        ns = []
        for item in data.get("nameservers", []):
            name = item.get("ldhName") or item.get("unicodeName") or ""
            if name:
                ns.append(str(name).rstrip("."))
        result["nameservers"] = uniq(ns)

        created_at = pick_event_date(data.get("events", []), [
            "registration", "registered", "creation"
        ])
        expires_at = pick_event_date(data.get("events", []), [
            "expiration", "expiry", "expired"
        ])

        result["created_at"] = created_at
        result["expires_at"] = expires_at

        registrar, registrant_org = extract_rdap_entities(data)
        result["registrar"] = registrar
        result["registrant_org"] = registrant_org

    except Exception as e:
        result["error"] = f"{type(e).__name__}: {e}"

    return result


def whois_lookup(domain):
    result = {
        "source": "",
        "registrar": "",
        "registrant_org": "",
        "created_at": None,
        "expires_at": None,
        "status": [],
        "nameservers": [],
        "error": "",
    }

    if whois is None:
        result["error"] = "python-whois not installed"
        return result

    try:
        w = whois.whois(domain)
        result["source"] = "WHOIS"

        registrar = getattr(w, "registrar", "") or ""
        registrant_org = getattr(w, "org", "") or ""

        creation_date = getattr(w, "creation_date", None)
        expiration_date = getattr(w, "expiration_date", None)
        status = getattr(w, "status", None)
        name_servers = getattr(w, "name_servers", None)

        result["registrar"] = str(registrar).strip() if registrar else ""
        result["registrant_org"] = str(registrant_org).strip() if registrant_org else ""
        result["created_at"] = parse_dt(creation_date)
        result["expires_at"] = parse_dt(expiration_date)

        if isinstance(status, (list, tuple, set)):
            result["status"] = uniq(status)
        elif status:
            result["status"] = uniq([status])

        if isinstance(name_servers, (list, tuple, set)):
            result["nameservers"] = uniq([str(x).strip().rstrip(".") for x in name_servers if x])
        elif name_servers:
            result["nameservers"] = uniq([str(name_servers).strip().rstrip(".")])

    except Exception as e:
        result["error"] = f"{type(e).__name__}: {e}"

    return result


def merge_meta(domain, rdap_data, whois_data):
    registrar = rdap_data["registrar"] or whois_data["registrar"]
    registrant_org = rdap_data["registrant_org"] or whois_data["registrant_org"]
    created_at = rdap_data["created_at"] or whois_data["created_at"]
    expires_at = rdap_data["expires_at"] or whois_data["expires_at"]
    status = uniq(rdap_data["status"] + whois_data["status"])
    nameservers = uniq(rdap_data["nameservers"] + whois_data["nameservers"])

    source_parts = []
    if rdap_data["source"]:
        source_parts.append("RDAP")
    if whois_data["source"]:
        source_parts.append("WHOIS")
    source_used = "+".join(source_parts) if source_parts else ""

    company_guess = registrant_org or registrar

    now = datetime.now(timezone.utc)
    age_days = days_between(created_at, now) if created_at else ""
    days_to_expire = days_between(now, expires_at) if expires_at else ""

    notes = []
    if registrant_org:
        notes.append("registrant_org available")
    else:
        notes.append("registrant_org empty/private")

    if registrar:
        notes.append("registrar found")

    if nameservers:
        notes.append(f"{len(nameservers)} NS found")
    else:
        notes.append("NS empty")

    row = {
        "checked_at": now_utc(),
        "domain": domain,
        "source_used": source_used,
        "registrar": registrar,
        "registrant_org": registrant_org,
        "company_guess": company_guess,
        "created_at": dt_to_str(created_at),
        "expires_at": dt_to_str(expires_at),
        "age_days": age_days,
        "days_to_expire": days_to_expire,
        "status": " | ".join(status),
        "nameservers": " | ".join(nameservers),
        "ns_count": len(nameservers),
        "rdap_error": rdap_data["error"],
        "whois_error": whois_data["error"],
        "notes": "; ".join(notes),
    }

    return row


def process_domain(domain, args):
    rdap_data = rdap_lookup(domain, timeout=args.rdap_timeout)

    need_whois = False
    if not rdap_data["created_at"]:
        need_whois = True
    if not rdap_data["expires_at"]:
        need_whois = True
    if not rdap_data["nameservers"]:
        need_whois = True
    if not rdap_data["registrar"]:
        need_whois = True
    if args.force_whois:
        need_whois = True

    whois_data = {
        "source": "",
        "registrar": "",
        "registrant_org": "",
        "created_at": None,
        "expires_at": None,
        "status": [],
        "nameservers": [],
        "error": "",
    }

    if need_whois:
        whois_data = whois_lookup(domain)

    return merge_meta(domain, rdap_data, whois_data)


def writer_loop(result_queue, output_dir, total_input, skipped_resume):
    os.makedirs(output_dir, exist_ok=True)

    all_csv = os.path.join(output_dir, "domain_meta_results.csv")
    ok_csv = os.path.join(output_dir, "domain_meta_ok.csv")
    problem_csv = os.path.join(output_dir, "domain_meta_problem.csv")
    summary_txt = os.path.join(output_dir, "summary.txt")
    progress_log = os.path.join(output_dir, "progress.log")

    all_writer = SafeCsvWriter(all_csv, FIELDNAMES)
    ok_writer = SafeCsvWriter(ok_csv, FIELDNAMES)
    problem_writer = SafeCsvWriter(problem_csv, FIELDNAMES)

    log_writer = SafeLineWriter(progress_log)

    counts = Counter()
    processed_now = 0

    def update_summary():
        lines = []
        lines.append("DOMAIN META CHECK SUMMARY")
        lines.append("=" * 40)
        lines.append(f"generated_at   : {now_utc()}")
        lines.append(f"total_input    : {total_input}")
        lines.append(f"skipped_resume : {skipped_resume}")
        lines.append(f"processed_now  : {processed_now}")
        lines.append(f"remaining_est  : {max(total_input - skipped_resume - processed_now, 0)}")
        lines.append("")
        lines.append("COUNTS")
        lines.append("-" * 40)
        for k, v in counts.most_common():
            lines.append(f"{k:20}: {v}")
        atomic_write_text(summary_txt, "\n".join(lines) + "\n")

    log_writer.write_line(f"RUN START {now_utc()} total_input={total_input} skipped_resume={skipped_resume}")
    update_summary()

    while True:
        item = result_queue.get()
        if item is None:
            break

        row = item
        all_writer.writerow(row)

        good = bool(row["created_at"] or row["expires_at"] or row["nameservers"] or row["registrar"])
        if good:
            ok_writer.writerow(row)
            counts["OK"] += 1
        else:
            problem_writer.writerow(row)
            counts["PROBLEM"] += 1

        processed_now += 1
        update_summary()

        log_writer.write_line(
            f"DONE {now_utc()} {row['domain']} source={row['source_used']} "
            f"expires={row['expires_at']} registrar={row['registrar']}"
        )

        color = C_GREEN if good else C_RED
        print(
            f"[{processed_now}] "
            f"{row['domain']:30} -> {color}{(row['source_used'] or 'NO DATA')[:12]:12}{C_RESET} | "
            f"expire={row['expires_at'][:19]:19} | "
            f"age={str(row['age_days'])[:6]:6} | "
            f"NS={str(row['ns_count'])[:3]:3} | "
            f"registrar={row['registrar'][:25]}"
        )

    update_summary()
    log_writer.write_line(f"RUN END {now_utc()} processed_now={processed_now}")

    all_writer.close()
    ok_writer.close()
    problem_writer.close()
    log_writer.close()


def worker(domain, args, result_queue):
    if STOP_EVENT.is_set():
        return
    try:
        row = process_domain(domain, args)
    except Exception as e:
        row = {
            "checked_at": now_utc(),
            "domain": domain,
            "source_used": "",
            "registrar": "",
            "registrant_org": "",
            "company_guess": "",
            "created_at": "",
            "expires_at": "",
            "age_days": "",
            "days_to_expire": "",
            "status": "",
            "nameservers": "",
            "ns_count": "",
            "rdap_error": "",
            "whois_error": f"{type(e).__name__}: {e}",
            "notes": "internal script error",
        }
    result_queue.put(row)


def parse_args():
    parser = argparse.ArgumentParser(description="Check expire, domain age, NS, registrar/company")
    parser.add_argument("-i", "--input", required=True, help="File input domain")
    parser.add_argument("-o", "--output", default="meta_results", help="Folder output")
    parser.add_argument("-w", "--workers", type=int, default=10, help="Jumlah worker")
    parser.add_argument("--rdap-timeout", type=int, default=10, help="Timeout RDAP")
    parser.add_argument("--whois-timeout", type=int, default=10, help="Socket timeout WHOIS")
    parser.add_argument("--force-whois", action="store_true", help="Tetap panggil WHOIS walau RDAP sudah cukup")
    parser.add_argument("--fresh", action="store_true", help="Hapus hasil lama")
    parser.add_argument("--no-resume", action="store_true", help="Jangan resume")
    return parser.parse_args()


def main():
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    args = parse_args()

    socket.setdefaulttimeout(args.whois_timeout)

    os.makedirs(args.output, exist_ok=True)

    all_csv = os.path.join(args.output, "domain_meta_results.csv")

    if args.fresh:
        for f in [
            "domain_meta_results.csv",
            "domain_meta_ok.csv",
            "domain_meta_problem.csv",
            "summary.txt",
            "progress.log",
        ]:
            remove_if_exists(os.path.join(args.output, f))

    domains = load_domains(args.input)
    if not domains:
        print("Tidak ada domain valid di file input.")
        sys.exit(1)

    processed = set()
    if not args.no_resume:
        processed = load_processed_domains(all_csv)

    queue_domains = [d for d in domains if d not in processed]

    print(f"{C_CYAN}Total input    : {len(domains)}{C_RESET}")
    print(f"{C_CYAN}Sudah diproses : {len(processed)}{C_RESET}")
    print(f"{C_CYAN}Akan diproses  : {len(queue_domains)}{C_RESET}")
    print(f"{C_CYAN}Workers        : {args.workers}{C_RESET}")
    print(f"{C_CYAN}Output folder  : {args.output}{C_RESET}")
    print("-" * 110)

    result_queue = Queue()
    writer_thread = threading.Thread(
        target=writer_loop,
        args=(result_queue, args.output, len(domains), len(processed)),
        daemon=True
    )
    writer_thread.start()

    executor = ThreadPoolExecutor(max_workers=args.workers)

    try:
        futures = []
        for domain in queue_domains:
            if STOP_EVENT.is_set():
                break
            futures.append(executor.submit(worker, domain, args, result_queue))

        for f in futures:
            if STOP_EVENT.is_set():
                break
            try:
                f.result()
            except Exception:
                pass

    except KeyboardInterrupt:
        STOP_EVENT.set()
        print(f"\n{C_RED}Dihentikan user.{C_RESET}")

    finally:
        try:
            executor.shutdown(wait=False, cancel_futures=True)
        except TypeError:
            executor.shutdown(wait=False)

        result_queue.put(None)
        writer_thread.join(timeout=15)

        print("-" * 110)
        print(f"{C_GREEN}Selesai / berhenti aman.{C_RESET}")
        print(f"All CSV     : {os.path.join(args.output, 'domain_meta_results.csv')}")
        print(f"OK CSV      : {os.path.join(args.output, 'domain_meta_ok.csv')}")
        print(f"Problem CSV : {os.path.join(args.output, 'domain_meta_problem.csv')}")
        print(f"Summary     : {os.path.join(args.output, 'summary.txt')}")


if __name__ == "__main__":
    main()

Jalankan

python3 domain_meta_check.py -i domains.txt -o meta_out

Kalau mau lebih agresif ambil WHOIS juga

python3 domain_meta_check.py -i domains.txt -o meta_out --force-whois

Tapi ini bisa lebih lambat.


Kalau mau mulai ulang dari nol

python3 domain_meta_check.py -i domains.txt -o meta_out --fresh

Output yang dihasilkan

domain_meta_results.csv

Semua hasil.

Kolom penting:

  • registrar
  • registrant_org
  • company_guess
  • created_at
  • expires_at
  • age_days
  • days_to_expire
  • nameservers
  • ns_count

domain_meta_ok.csv

Yang datanya berhasil dapat minimal sebagian.

domain_meta_problem.csv

Yang datanya gagal / minim.

summary.txt

Ringkasan.


Penjelasan kolom

registrar

Perusahaan registrar domain.

registrant_org

Company pemilik/pendaftar domain, kalau tidak diprivacy.

company_guess

Prioritas:

  • registrant_org
  • kalau kosong → registrar

Jadi ini semacam kolom “company paling mungkin”.

created_at

Tanggal registrasi domain.

expires_at

Tanggal expire domain.

age_days

Umur domain dalam hari sejak register.

days_to_expire

Sisa hari menuju expire.

nameservers

Nama NS gabungan dari RDAP/WHOIS.


Kenapa ini lebih cocok untuk kebutuhan Anda

Karena ini khusus metadata domain, bukan status web.

Jadi terpisah dari script sebelumnya yang fokus ke:

  • hidup/mati website
  • redirect
  • SSL
  • server error

Script baru ini fokus ke:

  • umur domain
  • expire
  • NS
  • registrar/company

Keterbatasan yang tetap perlu Anda tahu

Walau pakai RDAP + python-whois, tetap ada limitasi:

  1. Tidak semua TLD konsisten
  2. Registrant org sering kosong karena privacy
  3. Beberapa domain bisa rate limit
  4. Beberapa server whois lambat / aneh formatnya

Jadi kalau ada beberapa domain kosong, itu belum tentu script salah — sering memang datanya tidak dibuka publik dengan rapi.


Rekomendasi worker

Untuk script metadata ini, jangan terlalu tinggi.

Saya sarankan:

aman

-w 8

masih cepat

-w 10

agak agresif

-w 15

Untuk WHOIS/RDAP, lebih banyak worker belum tentu lebih bagus.


Kalau Anda mau, langkah berikutnya saya bisa bantu buatkan versi gabungan 2 script:

  1. status website checker
  2. domain metadata checker

lalu output akhirnya bisa digabung jadi satu CSV besar berisi:

  • domain
  • live/offline
  • page_type
  • registrar
  • company
  • created_at
  • expires_at
  • age_days
  • nameservers

Kalau mau, saya bisa lanjut buatkan script merger/final report itu.