Script CLI khusus metadata domain
Simpan sebagai:
nano domain_meta_check.pyimport argparse
import csv
import os
import re
import signal
import socket
import sys
import tempfile
import threading
from collections import Counter
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from queue import Queue
from urllib.parse import urlparse
import requests
try:
import whois
except ImportError:
whois = None
STOP_EVENT = threading.Event()
THREAD_LOCAL = threading.local()
FIELDNAMES = [
"checked_at",
"domain",
"source_used",
"registrar",
"registrant_org",
"company_guess",
"created_at",
"expires_at",
"age_days",
"days_to_expire",
"status",
"nameservers",
"ns_count",
"rdap_error",
"whois_error",
"notes",
]
C_RESET = "\033[0m"
C_RED = "\033[91m"
C_GREEN = "\033[92m"
C_YELLOW = "\033[93m"
C_CYAN = "\033[96m"
def now_utc():
return datetime.now(timezone.utc).isoformat()
def normalize_domain(raw):
s = raw.strip()
if not s or s.startswith("#"):
return ""
s = s.split("#", 1)[0].strip()
if not s:
return ""
if "://" not in s:
s = "http://" + s
try:
p = urlparse(s)
host = p.netloc or p.path
host = host.split("/")[0].split(":")[0].strip().lower().strip(".")
if host.startswith("www."):
host = host[4:]
return host
except Exception:
return ""
def load_domains(input_file):
domains = []
seen = set()
with open(input_file, "r", encoding="utf-8") as f:
for line in f:
d = normalize_domain(line)
if d and d not in seen:
seen.add(d)
domains.append(d)
return domains
def load_processed_domains(csv_path):
processed = set()
if not os.path.exists(csv_path) or os.path.getsize(csv_path) == 0:
return processed
try:
with open(csv_path, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
d = (row.get("domain") or "").strip().lower()
if d:
processed.add(d)
except Exception:
pass
return processed
def remove_if_exists(path):
try:
if os.path.exists(path):
os.remove(path)
except Exception:
pass
def sync_file(f):
f.flush()
os.fsync(f.fileno())
def atomic_write_text(path, text):
os.makedirs(os.path.dirname(path), exist_ok=True)
fd, tmp = tempfile.mkstemp(prefix=".tmp_", dir=os.path.dirname(path))
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(text)
sync_file(f)
os.replace(tmp, path)
finally:
try:
if os.path.exists(tmp):
os.remove(tmp)
except Exception:
pass
class SafeCsvWriter:
def __init__(self, path, fieldnames):
os.makedirs(os.path.dirname(path), exist_ok=True)
self.f = open(path, "a", newline="", encoding="utf-8", buffering=1)
self.writer = csv.DictWriter(self.f, fieldnames=fieldnames)
if not (os.path.exists(path) and os.path.getsize(path) > 0):
self.writer.writeheader()
sync_file(self.f)
def writerow(self, row):
self.writer.writerow(row)
sync_file(self.f)
def close(self):
try:
self.f.close()
except Exception:
pass
class SafeLineWriter:
def __init__(self, path):
os.makedirs(os.path.dirname(path), exist_ok=True)
self.f = open(path, "a", encoding="utf-8", buffering=1)
def write_line(self, text):
self.f.write(text.rstrip("\n") + "\n")
sync_file(self.f)
def close(self):
try:
self.f.close()
except Exception:
pass
def signal_handler(signum, frame):
if not STOP_EVENT.is_set():
STOP_EVENT.set()
print(f"\n{C_YELLOW}Signal diterima. Stop submit job baru...{C_RESET}")
else:
raise KeyboardInterrupt
def get_session():
session = getattr(THREAD_LOCAL, "session", None)
if session is None:
session = requests.Session()
THREAD_LOCAL.session = session
return session
def uniq(items):
out = []
seen = set()
for x in items:
x = str(x).strip()
if not x:
continue
xl = x.lower()
if xl not in seen:
seen.add(xl)
out.append(x)
return out
def parse_dt(value):
if not value:
return None
if isinstance(value, list):
for v in value:
dt = parse_dt(v)
if dt:
return dt
return None
if isinstance(value, datetime):
if value.tzinfo is None:
return value.replace(tzinfo=timezone.utc)
return value
if not isinstance(value, str):
return None
s = value.strip()
if not s:
return None
s = s.replace("Z", "+00:00")
patterns = [
None,
"%Y-%m-%d",
"%Y-%m-%d %H:%M:%S",
"%Y.%m.%d %H:%M:%S",
"%d-%b-%Y",
"%Y/%m/%d",
]
for p in patterns:
try:
if p is None:
dt = datetime.fromisoformat(s)
else:
dt = datetime.strptime(s, p)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except Exception:
pass
return None
def dt_to_str(dt):
if not dt:
return ""
return dt.isoformat()
def days_between(a, b):
if not a or not b:
return ""
return (b.date() - a.date()).days
def pick_event_date(events, wanted_actions):
if not isinstance(events, list):
return None
for action in wanted_actions:
for e in events:
if str(e.get("eventAction", "")).lower() == action.lower():
dt = parse_dt(e.get("eventDate"))
if dt:
return dt
return None
def parse_vcard_value(entity, wanted_fields):
try:
vcard = entity.get("vcardArray")
if not vcard or len(vcard) < 2:
return ""
rows = vcard[1]
for row in rows:
if len(row) >= 4 and str(row[0]).lower() in wanted_fields:
val = row[3]
if isinstance(val, list):
val = " ".join(str(x) for x in val if x)
return str(val).strip()
except Exception:
pass
return ""
def extract_rdap_entities(data):
registrar = ""
registrant_org = ""
entities = data.get("entities", [])
for ent in entities:
roles = [str(x).lower() for x in ent.get("roles", [])]
if not registrar and "registrar" in roles:
registrar = (
parse_vcard_value(ent, {"fn", "org"})
or registrar
)
if not registrant_org and "registrant" in roles:
registrant_org = (
parse_vcard_value(ent, {"org", "fn"})
or registrant_org
)
return registrar, registrant_org
def rdap_lookup(domain, timeout=10):
result = {
"source": "",
"registrar": "",
"registrant_org": "",
"created_at": None,
"expires_at": None,
"status": [],
"nameservers": [],
"error": "",
}
url = f"https://rdap.org/domain/{domain}"
session = get_session()
try:
r = session.get(url, timeout=timeout, headers={"User-Agent": "Mozilla/5.0"})
if r.status_code != 200:
result["error"] = f"HTTP {r.status_code}"
return result
data = r.json()
result["source"] = "RDAP"
result["status"] = uniq(data.get("status", []))
ns = []
for item in data.get("nameservers", []):
name = item.get("ldhName") or item.get("unicodeName") or ""
if name:
ns.append(str(name).rstrip("."))
result["nameservers"] = uniq(ns)
created_at = pick_event_date(data.get("events", []), [
"registration", "registered", "creation"
])
expires_at = pick_event_date(data.get("events", []), [
"expiration", "expiry", "expired"
])
result["created_at"] = created_at
result["expires_at"] = expires_at
registrar, registrant_org = extract_rdap_entities(data)
result["registrar"] = registrar
result["registrant_org"] = registrant_org
except Exception as e:
result["error"] = f"{type(e).__name__}: {e}"
return result
def whois_lookup(domain):
result = {
"source": "",
"registrar": "",
"registrant_org": "",
"created_at": None,
"expires_at": None,
"status": [],
"nameservers": [],
"error": "",
}
if whois is None:
result["error"] = "python-whois not installed"
return result
try:
w = whois.whois(domain)
result["source"] = "WHOIS"
registrar = getattr(w, "registrar", "") or ""
registrant_org = getattr(w, "org", "") or ""
creation_date = getattr(w, "creation_date", None)
expiration_date = getattr(w, "expiration_date", None)
status = getattr(w, "status", None)
name_servers = getattr(w, "name_servers", None)
result["registrar"] = str(registrar).strip() if registrar else ""
result["registrant_org"] = str(registrant_org).strip() if registrant_org else ""
result["created_at"] = parse_dt(creation_date)
result["expires_at"] = parse_dt(expiration_date)
if isinstance(status, (list, tuple, set)):
result["status"] = uniq(status)
elif status:
result["status"] = uniq([status])
if isinstance(name_servers, (list, tuple, set)):
result["nameservers"] = uniq([str(x).strip().rstrip(".") for x in name_servers if x])
elif name_servers:
result["nameservers"] = uniq([str(name_servers).strip().rstrip(".")])
except Exception as e:
result["error"] = f"{type(e).__name__}: {e}"
return result
def merge_meta(domain, rdap_data, whois_data):
registrar = rdap_data["registrar"] or whois_data["registrar"]
registrant_org = rdap_data["registrant_org"] or whois_data["registrant_org"]
created_at = rdap_data["created_at"] or whois_data["created_at"]
expires_at = rdap_data["expires_at"] or whois_data["expires_at"]
status = uniq(rdap_data["status"] + whois_data["status"])
nameservers = uniq(rdap_data["nameservers"] + whois_data["nameservers"])
source_parts = []
if rdap_data["source"]:
source_parts.append("RDAP")
if whois_data["source"]:
source_parts.append("WHOIS")
source_used = "+".join(source_parts) if source_parts else ""
company_guess = registrant_org or registrar
now = datetime.now(timezone.utc)
age_days = days_between(created_at, now) if created_at else ""
days_to_expire = days_between(now, expires_at) if expires_at else ""
notes = []
if registrant_org:
notes.append("registrant_org available")
else:
notes.append("registrant_org empty/private")
if registrar:
notes.append("registrar found")
if nameservers:
notes.append(f"{len(nameservers)} NS found")
else:
notes.append("NS empty")
row = {
"checked_at": now_utc(),
"domain": domain,
"source_used": source_used,
"registrar": registrar,
"registrant_org": registrant_org,
"company_guess": company_guess,
"created_at": dt_to_str(created_at),
"expires_at": dt_to_str(expires_at),
"age_days": age_days,
"days_to_expire": days_to_expire,
"status": " | ".join(status),
"nameservers": " | ".join(nameservers),
"ns_count": len(nameservers),
"rdap_error": rdap_data["error"],
"whois_error": whois_data["error"],
"notes": "; ".join(notes),
}
return row
def process_domain(domain, args):
rdap_data = rdap_lookup(domain, timeout=args.rdap_timeout)
need_whois = False
if not rdap_data["created_at"]:
need_whois = True
if not rdap_data["expires_at"]:
need_whois = True
if not rdap_data["nameservers"]:
need_whois = True
if not rdap_data["registrar"]:
need_whois = True
if args.force_whois:
need_whois = True
whois_data = {
"source": "",
"registrar": "",
"registrant_org": "",
"created_at": None,
"expires_at": None,
"status": [],
"nameservers": [],
"error": "",
}
if need_whois:
whois_data = whois_lookup(domain)
return merge_meta(domain, rdap_data, whois_data)
def writer_loop(result_queue, output_dir, total_input, skipped_resume):
os.makedirs(output_dir, exist_ok=True)
all_csv = os.path.join(output_dir, "domain_meta_results.csv")
ok_csv = os.path.join(output_dir, "domain_meta_ok.csv")
problem_csv = os.path.join(output_dir, "domain_meta_problem.csv")
summary_txt = os.path.join(output_dir, "summary.txt")
progress_log = os.path.join(output_dir, "progress.log")
all_writer = SafeCsvWriter(all_csv, FIELDNAMES)
ok_writer = SafeCsvWriter(ok_csv, FIELDNAMES)
problem_writer = SafeCsvWriter(problem_csv, FIELDNAMES)
log_writer = SafeLineWriter(progress_log)
counts = Counter()
processed_now = 0
def update_summary():
lines = []
lines.append("DOMAIN META CHECK SUMMARY")
lines.append("=" * 40)
lines.append(f"generated_at : {now_utc()}")
lines.append(f"total_input : {total_input}")
lines.append(f"skipped_resume : {skipped_resume}")
lines.append(f"processed_now : {processed_now}")
lines.append(f"remaining_est : {max(total_input - skipped_resume - processed_now, 0)}")
lines.append("")
lines.append("COUNTS")
lines.append("-" * 40)
for k, v in counts.most_common():
lines.append(f"{k:20}: {v}")
atomic_write_text(summary_txt, "\n".join(lines) + "\n")
log_writer.write_line(f"RUN START {now_utc()} total_input={total_input} skipped_resume={skipped_resume}")
update_summary()
while True:
item = result_queue.get()
if item is None:
break
row = item
all_writer.writerow(row)
good = bool(row["created_at"] or row["expires_at"] or row["nameservers"] or row["registrar"])
if good:
ok_writer.writerow(row)
counts["OK"] += 1
else:
problem_writer.writerow(row)
counts["PROBLEM"] += 1
processed_now += 1
update_summary()
log_writer.write_line(
f"DONE {now_utc()} {row['domain']} source={row['source_used']} "
f"expires={row['expires_at']} registrar={row['registrar']}"
)
color = C_GREEN if good else C_RED
print(
f"[{processed_now}] "
f"{row['domain']:30} -> {color}{(row['source_used'] or 'NO DATA')[:12]:12}{C_RESET} | "
f"expire={row['expires_at'][:19]:19} | "
f"age={str(row['age_days'])[:6]:6} | "
f"NS={str(row['ns_count'])[:3]:3} | "
f"registrar={row['registrar'][:25]}"
)
update_summary()
log_writer.write_line(f"RUN END {now_utc()} processed_now={processed_now}")
all_writer.close()
ok_writer.close()
problem_writer.close()
log_writer.close()
def worker(domain, args, result_queue):
if STOP_EVENT.is_set():
return
try:
row = process_domain(domain, args)
except Exception as e:
row = {
"checked_at": now_utc(),
"domain": domain,
"source_used": "",
"registrar": "",
"registrant_org": "",
"company_guess": "",
"created_at": "",
"expires_at": "",
"age_days": "",
"days_to_expire": "",
"status": "",
"nameservers": "",
"ns_count": "",
"rdap_error": "",
"whois_error": f"{type(e).__name__}: {e}",
"notes": "internal script error",
}
result_queue.put(row)
def parse_args():
parser = argparse.ArgumentParser(description="Check expire, domain age, NS, registrar/company")
parser.add_argument("-i", "--input", required=True, help="File input domain")
parser.add_argument("-o", "--output", default="meta_results", help="Folder output")
parser.add_argument("-w", "--workers", type=int, default=10, help="Jumlah worker")
parser.add_argument("--rdap-timeout", type=int, default=10, help="Timeout RDAP")
parser.add_argument("--whois-timeout", type=int, default=10, help="Socket timeout WHOIS")
parser.add_argument("--force-whois", action="store_true", help="Tetap panggil WHOIS walau RDAP sudah cukup")
parser.add_argument("--fresh", action="store_true", help="Hapus hasil lama")
parser.add_argument("--no-resume", action="store_true", help="Jangan resume")
return parser.parse_args()
def main():
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
args = parse_args()
socket.setdefaulttimeout(args.whois_timeout)
os.makedirs(args.output, exist_ok=True)
all_csv = os.path.join(args.output, "domain_meta_results.csv")
if args.fresh:
for f in [
"domain_meta_results.csv",
"domain_meta_ok.csv",
"domain_meta_problem.csv",
"summary.txt",
"progress.log",
]:
remove_if_exists(os.path.join(args.output, f))
domains = load_domains(args.input)
if not domains:
print("Tidak ada domain valid di file input.")
sys.exit(1)
processed = set()
if not args.no_resume:
processed = load_processed_domains(all_csv)
queue_domains = [d for d in domains if d not in processed]
print(f"{C_CYAN}Total input : {len(domains)}{C_RESET}")
print(f"{C_CYAN}Sudah diproses : {len(processed)}{C_RESET}")
print(f"{C_CYAN}Akan diproses : {len(queue_domains)}{C_RESET}")
print(f"{C_CYAN}Workers : {args.workers}{C_RESET}")
print(f"{C_CYAN}Output folder : {args.output}{C_RESET}")
print("-" * 110)
result_queue = Queue()
writer_thread = threading.Thread(
target=writer_loop,
args=(result_queue, args.output, len(domains), len(processed)),
daemon=True
)
writer_thread.start()
executor = ThreadPoolExecutor(max_workers=args.workers)
try:
futures = []
for domain in queue_domains:
if STOP_EVENT.is_set():
break
futures.append(executor.submit(worker, domain, args, result_queue))
for f in futures:
if STOP_EVENT.is_set():
break
try:
f.result()
except Exception:
pass
except KeyboardInterrupt:
STOP_EVENT.set()
print(f"\n{C_RED}Dihentikan user.{C_RESET}")
finally:
try:
executor.shutdown(wait=False, cancel_futures=True)
except TypeError:
executor.shutdown(wait=False)
result_queue.put(None)
writer_thread.join(timeout=15)
print("-" * 110)
print(f"{C_GREEN}Selesai / berhenti aman.{C_RESET}")
print(f"All CSV : {os.path.join(args.output, 'domain_meta_results.csv')}")
print(f"OK CSV : {os.path.join(args.output, 'domain_meta_ok.csv')}")
print(f"Problem CSV : {os.path.join(args.output, 'domain_meta_problem.csv')}")
print(f"Summary : {os.path.join(args.output, 'summary.txt')}")
if __name__ == "__main__":
main()Jalankan
python3 domain_meta_check.py -i domains.txt -o meta_outKalau mau lebih agresif ambil WHOIS juga
python3 domain_meta_check.py -i domains.txt -o meta_out --force-whoisTapi ini bisa lebih lambat.
Kalau mau mulai ulang dari nol
python3 domain_meta_check.py -i domains.txt -o meta_out --freshOutput yang dihasilkan
domain_meta_results.csv
Semua hasil.
Kolom penting:
registrarregistrant_orgcompany_guesscreated_atexpires_atage_daysdays_to_expirenameserversns_count
domain_meta_ok.csv
Yang datanya berhasil dapat minimal sebagian.
domain_meta_problem.csv
Yang datanya gagal / minim.
summary.txt
Ringkasan.
Penjelasan kolom
registrar
Perusahaan registrar domain.
registrant_org
Company pemilik/pendaftar domain, kalau tidak diprivacy.
company_guess
Prioritas:
registrant_org- kalau kosong →
registrar
Jadi ini semacam kolom “company paling mungkin”.
created_at
Tanggal registrasi domain.
expires_at
Tanggal expire domain.
age_days
Umur domain dalam hari sejak register.
days_to_expire
Sisa hari menuju expire.
nameservers
Nama NS gabungan dari RDAP/WHOIS.
Kenapa ini lebih cocok untuk kebutuhan Anda
Karena ini khusus metadata domain, bukan status web.
Jadi terpisah dari script sebelumnya yang fokus ke:
- hidup/mati website
- redirect
- SSL
- server error
Script baru ini fokus ke:
- umur domain
- expire
- NS
- registrar/company
Keterbatasan yang tetap perlu Anda tahu
Walau pakai RDAP + python-whois, tetap ada limitasi:
- Tidak semua TLD konsisten
- Registrant org sering kosong karena privacy
- Beberapa domain bisa rate limit
- Beberapa server whois lambat / aneh formatnya
Jadi kalau ada beberapa domain kosong, itu belum tentu script salah — sering memang datanya tidak dibuka publik dengan rapi.
Rekomendasi worker
Untuk script metadata ini, jangan terlalu tinggi.
Saya sarankan:
aman
-w 8masih cepat
-w 10agak agresif
-w 15Untuk WHOIS/RDAP, lebih banyak worker belum tentu lebih bagus.
Kalau Anda mau, langkah berikutnya saya bisa bantu buatkan versi gabungan 2 script:
- status website checker
- domain metadata checker
lalu output akhirnya bisa digabung jadi satu CSV besar berisi:
- domain
- live/offline
- page_type
- registrar
- company
- created_at
- expires_at
- age_days
- nameservers
Kalau mau, saya bisa lanjut buatkan script merger/final report itu.