Kenapa di script pertama terasa lambat?
Biasanya karena kombinasi ini:
- Sequential
- domain dicek satu per satu
- kalau 1 domain makan 3–8 detik, 100 domain bisa lama sekali
- Coba banyak variasi URL
https://domainhttps://www.domainhttp://domainhttp://www.domain- kalau semuanya dites penuh untuk semua domain, request jadi banyak
- DNS terlalu lengkap
- A, AAAA, CNAME, MX, NS semuanya
- ini bagus, tapi lebih lambat
- Response dibaca terlalu banyak
- body halaman besar
- JavaScript besar
- bisa bikin lambat / berat
Solusi terbaik
Kalau target Anda:
- cepat seperti HeadMasterSEO
- tetap bisa klasifikasi domain
- tetap aman kalau script berhenti
- hasil langsung ditulis
- input dari file
- output dipisah
maka solusi terbaik adalah:
CLI + multi-thread + safe write
Artinya:
- baca domain dari file
- cek paralel, misalnya 20–40 worker
- tiap domain selesai → langsung tulis ke file
- kalau
Ctrl+C, hasil yang sudah selesai tetap aman - bisa resume
Saya buatkan versi yang lebih cepat
Versi ini fokus ke:
- lebih cepat
- tetap aman
- hasil langsung tulis
- split hidup / warning / offline
- deteksi suspended / expired / parked / default page / 503 / forbidden / redirect / ssl invalid
- resume
Install dependency
pip install requests dnspython beautifulsoup4
Script cepat CLI
Simpan sebagai:
domain_status_fast.py
Code Python
import argparse
import csv
import os
import re
import signal
import sys
import tempfile
import threading
import time
import warnings
from collections import Counter
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from queue import Queue
from urllib.parse import urljoin, urlparse
import dns.resolver
import requests
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from urllib3.exceptions import InsecureRequestWarning
from urllib3.util.retry import Retry
warnings.simplefilter("ignore", InsecureRequestWarning)
# =========================
# GLOBALS
# =========================
STOP_EVENT = threading.Event()
THREAD_LOCAL = threading.local()
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,text/plain;q=0.8,*/*;q=0.7",
}
DEFAULT_CONNECT_TIMEOUT = 5
DEFAULT_READ_TIMEOUT = 7
DEFAULT_DNS_TIMEOUT = 2
DEFAULT_MAX_REDIRECTS = 8
DEFAULT_MAX_BYTES = 98304 # 96 KB
DEFAULT_RETRIES = 1
DEFAULT_WORKERS = 25
FOR_SALE_KEYWORDS = [
"domain is for sale",
"buy this domain",
"this domain may be for sale",
"purchase this domain",
"afternic",
"sedo",
"dan.com",
"undeveloped",
]
PARKED_KEYWORDS = [
"domain parked",
"parked free",
"parkingcrew",
"bodis",
"cashparking",
"sedo parking",
"parked domain",
"this domain is parked",
]
DEFAULT_HOSTING_KEYWORDS = [
"apache2 ubuntu default page",
"apache2 debian default page",
"welcome to nginx",
"nginx test page",
"test page for the nginx",
"default web site page",
"iis windows server",
]
COMING_SOON_KEYWORDS = [
"coming soon",
"under construction",
"launching soon",
"website coming soon",
"site is coming soon",
]
SUSPENDED_KEYWORDS = [
"this account has been suspended",
"account suspended",
"website suspended",
"site suspended",
"hosting account has been suspended",
"suspended due to non-payment",
"please contact billing",
"contact your hosting provider",
"billing issue",
]
EXPIRED_KEYWORDS = [
"this domain has expired",
"domain expired",
"expired domain",
"renew this domain",
"renewal required",
"domain renewal",
"renew now",
"expiration notice",
"registrant verification failed",
"has expired and may be available",
]
FIELDNAMES = [
"checked_at",
"domain",
"bucket",
"page_type",
"dns_ok",
"dns_error",
"status_code",
"ssl_status",
"content_type",
"elapsed_ms",
"best_start_url",
"best_final_url",
"title",
"notes",
"error",
"redirect_chain",
"A",
"AAAA",
"CNAME",
"all_attempts",
]
# ANSI color
C_RESET = "\033[0m"
C_RED = "\033[91m"
C_GREEN = "\033[92m"
C_YELLOW = "\033[93m"
C_CYAN = "\033[96m"
C_DIM = "\033[2m"
# =========================
# UTIL
# =========================
def now_utc():
return datetime.now(timezone.utc).isoformat()
def clean_text(s):
if not s:
return ""
return re.sub(r"\s+", " ", s).strip()
def normalize_domain(raw):
s = raw.strip()
if not s or s.startswith("#"):
return ""
s = s.split("#", 1)[0].strip()
if not s:
return ""
if "://" not in s:
s = "http://" + s
try:
p = urlparse(s)
host = p.netloc or p.path
host = host.split("/")[0].split(":")[0].strip().lower().strip(".")
if host.startswith("www."):
host = host[4:]
return host
except Exception:
return ""
def load_domains(input_file):
domains = []
seen = set()
with open(input_file, "r", encoding="utf-8") as f:
for line in f:
d = normalize_domain(line)
if d and d not in seen:
seen.add(d)
domains.append(d)
return domains
def load_processed_domains(all_results_csv):
processed = set()
if not os.path.exists(all_results_csv) or os.path.getsize(all_results_csv) == 0:
return processed
try:
with open(all_results_csv, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
d = (row.get("domain") or "").strip().lower()
if d:
processed.add(d)
except Exception:
pass
return processed
def host_of(url):
try:
return (urlparse(url).hostname or "").lower()
except Exception:
return ""
def is_html_like(content_type):
ct = (content_type or "").lower()
return any(x in ct for x in [
"text/html",
"application/xhtml+xml",
"text/plain",
"application/xml",
"text/xml",
])
def extract_title(html):
if not html:
return ""
try:
soup = BeautifulSoup(html, "html.parser")
if soup.title and soup.title.string:
return clean_text(soup.title.string)
except Exception:
pass
m = re.search(r"<title[^>]*>(.*?)</title>", html, re.I | re.S)
if m:
return clean_text(m.group(1))
return ""
def colorize_bucket(bucket):
if bucket == "LIVE_OK":
return f"{C_GREEN}{bucket}{C_RESET}"
if bucket == "WARNING":
return f"{C_YELLOW}{bucket}{C_RESET}"
return f"{C_RED}{bucket}{C_RESET}"
# =========================
# SAFE FILE WRITERS
# =========================
def sync_file(f):
f.flush()
os.fsync(f.fileno())
def atomic_write_text(path, text):
os.makedirs(os.path.dirname(path), exist_ok=True)
fd, tmp = tempfile.mkstemp(prefix=".tmp_", dir=os.path.dirname(path))
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(text)
sync_file(f)
os.replace(tmp, path)
finally:
try:
if os.path.exists(tmp):
os.remove(tmp)
except Exception:
pass
class SafeCsvWriter:
def __init__(self, path, fieldnames):
self.path = path
os.makedirs(os.path.dirname(path), exist_ok=True)
file_exists = os.path.exists(path) and os.path.getsize(path) > 0
self.f = open(path, "a", newline="", encoding="utf-8", buffering=1)
self.writer = csv.DictWriter(self.f, fieldnames=fieldnames)
if not file_exists:
self.writer.writeheader()
sync_file(self.f)
def writerow(self, row):
self.writer.writerow(row)
sync_file(self.f)
def close(self):
try:
self.f.close()
except Exception:
pass
class SafeLineWriter:
def __init__(self, path):
self.path = path
os.makedirs(os.path.dirname(path), exist_ok=True)
self.f = open(path, "a", encoding="utf-8", buffering=1)
def write_line(self, text):
self.f.write(text.rstrip("\n") + "\n")
sync_file(self.f)
def close(self):
try:
self.f.close()
except Exception:
pass
# =========================
# SIGNAL HANDLER
# =========================
def signal_handler(signum, frame):
if not STOP_EVENT.is_set():
STOP_EVENT.set()
print(
f"\n{C_YELLOW}Signal diterima. Menghentikan submit job baru... "
f"hasil yang sudah selesai tetap disimpan.{C_RESET}"
)
else:
raise KeyboardInterrupt
# =========================
# SESSION PER THREAD
# =========================
def get_session(retries=1, pool_size=100):
session = getattr(THREAD_LOCAL, "session", None)
if session is None:
session = requests.Session()
retry_cfg = Retry(
total=retries,
connect=retries,
read=0,
redirect=0,
status=0,
backoff_factor=0.2,
allowed_methods=frozenset(["GET"]),
raise_on_status=False,
)
adapter = HTTPAdapter(
max_retries=retry_cfg,
pool_connections=pool_size,
pool_maxsize=pool_size
)
session.mount("http://", adapter)
session.mount("https://", adapter)
THREAD_LOCAL.session = session
return session
# =========================
# DNS
# =========================
def get_dns_info(domain, dns_timeout=2, full_dns=False):
result = {
"dns_ok": False,
"A": [],
"AAAA": [],
"CNAME": [],
"dns_error": "",
}
resolver = dns.resolver.Resolver()
resolver.timeout = dns_timeout
resolver.lifetime = dns_timeout
record_types = ["A", "AAAA", "CNAME"]
if full_dns:
record_types = ["A", "AAAA", "CNAME"]
for rtype in record_types:
try:
answers = resolver.resolve(domain, rtype)
vals = []
for r in answers:
if hasattr(r, "target"):
vals.append(str(r.target).rstrip("."))
else:
vals.append(str(r).rstrip("."))
result[rtype] = vals
except dns.resolver.NXDOMAIN:
result["dns_error"] = "NXDOMAIN"
return result
except (dns.resolver.NoAnswer, dns.resolver.NoNameservers, dns.resolver.LifetimeTimeout):
pass
except Exception as e:
if not result["dns_error"]:
result["dns_error"] = type(e).__name__
if any(result[k] for k in ["A", "AAAA", "CNAME"]):
result["dns_ok"] = True
elif not result["dns_error"]:
result["dns_error"] = "NO_RECORDS"
return result
# =========================
# HTTP / PROBE
# =========================
def read_limited_text(resp, max_bytes):
chunks = []
total = 0
try:
for chunk in resp.iter_content(chunk_size=8192, decode_unicode=False):
if not chunk:
continue
remain = max_bytes - total
if remain <= 0:
break
if len(chunk) > remain:
chunk = chunk[:remain]
chunks.append(chunk)
total += len(chunk)
if total >= max_bytes:
break
except Exception:
pass
raw = b"".join(chunks)
enc = resp.encoding or "utf-8"
try:
return raw.decode(enc, errors="replace")
except Exception:
return raw.decode("utf-8", errors="replace")
def classify_page(status_code, title, body, content_type):
blob = ((title or "") + "\n" + (body or "")[:7000]).lower()
if any(k in blob for k in EXPIRED_KEYWORDS):
return "EXPIRED / RENEWAL ISSUE"
if any(k in blob for k in SUSPENDED_KEYWORDS):
return "SUSPENDED"
if any(k in blob for k in FOR_SALE_KEYWORDS):
return "FOR SALE"
if any(k in blob for k in PARKED_KEYWORDS):
return "PARKED"
if any(k in blob for k in DEFAULT_HOSTING_KEYWORDS):
return "DEFAULT HOSTING PAGE"
if any(k in blob for k in COMING_SOON_KEYWORDS):
return "COMING SOON"
if 200 <= status_code <= 299:
if content_type and not is_html_like(content_type):
return "LIVE NON-HTML"
return "LIVE 200"
if status_code in (301, 302, 303, 307, 308):
return "REDIRECT"
if status_code == 401:
return "UNAUTHORIZED"
if status_code == 403:
return "FORBIDDEN"
if status_code == 404:
return "NOT FOUND"
if status_code == 410:
return "GONE"
if status_code == 429:
return "RATE LIMITED"
if 500 <= status_code <= 599:
return "SERVER ERROR"
return f"HTTP {status_code}"
def score_result(r):
if not r["ok"]:
return 0
pt = r["page_type"]
sc = r["status_code"]
if pt == "LIVE 200":
return 100
if pt == "LIVE NON-HTML":
return 98
if pt in [
"EXPIRED / RENEWAL ISSUE",
"SUSPENDED",
"FOR SALE",
"PARKED",
"DEFAULT HOSTING PAGE",
"COMING SOON",
]:
return 95
if sc == 403:
return 85
if sc == 401:
return 84
if 500 <= sc <= 599:
return 82
if sc in (404, 410):
return 80
if 300 <= sc < 400:
return 75
return 10
def probe_url(url, connect_timeout, read_timeout, max_redirects, max_bytes, retries, pool_size):
session = get_session(retries=retries, pool_size=pool_size)
current_url = url
chain = []
ssl_status = "N/A"
for _ in range(max_redirects):
resp = None
try:
resp = session.get(
current_url,
headers=HEADERS,
timeout=(connect_timeout, read_timeout),
allow_redirects=False,
verify=True,
stream=True,
)
if current_url.startswith("https://"):
ssl_status = "VALID"
except requests.exceptions.SSLError:
ssl_status = "INVALID"
try:
resp = session.get(
current_url,
headers=HEADERS,
timeout=(connect_timeout, read_timeout),
allow_redirects=False,
verify=False,
stream=True,
)
except requests.exceptions.Timeout:
return {
"ok": False, "start_url": url, "final_url": current_url, "status_code": "",
"page_type": "TIMEOUT", "title": "", "chain": " | ".join(chain),
"ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": "Timeout",
}
except requests.exceptions.ConnectionError:
return {
"ok": False, "start_url": url, "final_url": current_url, "status_code": "",
"page_type": "CONNECTION FAILED", "title": "", "chain": " | ".join(chain),
"ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": "ConnectionError",
}
except Exception as e:
return {
"ok": False, "start_url": url, "final_url": current_url, "status_code": "",
"page_type": "SSL ERROR", "title": "", "chain": " | ".join(chain),
"ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": type(e).__name__,
}
except requests.exceptions.Timeout:
return {
"ok": False, "start_url": url, "final_url": current_url, "status_code": "",
"page_type": "TIMEOUT", "title": "", "chain": " | ".join(chain),
"ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": "Timeout",
}
except requests.exceptions.ConnectionError:
return {
"ok": False, "start_url": url, "final_url": current_url, "status_code": "",
"page_type": "CONNECTION FAILED", "title": "", "chain": " | ".join(chain),
"ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": "ConnectionError",
}
except Exception as e:
return {
"ok": False, "start_url": url, "final_url": current_url, "status_code": "",
"page_type": "REQUEST ERROR", "title": "", "chain": " | ".join(chain),
"ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": type(e).__name__,
}
try:
elapsed_ms = int(resp.elapsed.total_seconds() * 1000)
except Exception:
elapsed_ms = ""
chain.append(f"{resp.status_code} {current_url}")
if 300 <= resp.status_code < 400 and resp.headers.get("Location"):
next_url = urljoin(current_url, resp.headers.get("Location"))
try:
resp.close()
except Exception:
pass
current_url = next_url
continue
content_type = resp.headers.get("Content-Type", "")
body = ""
if resp.status_code not in (204, 304):
body = read_limited_text(resp, max_bytes)
title = extract_title(body)
page_type = classify_page(resp.status_code, title, body, content_type)
final_url = resp.url
try:
resp.close()
except Exception:
pass
return {
"ok": True,
"start_url": url,
"final_url": final_url,
"status_code": resp.status_code,
"page_type": page_type,
"title": title,
"chain": " | ".join(chain),
"ssl_status": ssl_status,
"content_type": content_type,
"elapsed_ms": elapsed_ms,
"error": "",
}
return {
"ok": False,
"start_url": url,
"final_url": current_url,
"status_code": "",
"page_type": "TOO MANY REDIRECTS",
"title": "",
"chain": " | ".join(chain),
"ssl_status": ssl_status,
"content_type": "",
"elapsed_ms": "",
"error": "TooManyRedirects",
}
def is_good_enough(result):
if not result["ok"]:
return False
if result["page_type"] in [
"LIVE 200", "LIVE NON-HTML",
"PARKED", "FOR SALE", "DEFAULT HOSTING PAGE",
"COMING SOON", "SUSPENDED", "EXPIRED / RENEWAL ISSUE"
]:
return True
if result["status_code"] in (401, 403, 404, 410, 500, 501, 502, 503, 504):
return True
return False
def best_probe(domain, connect_timeout, read_timeout, max_redirects, max_bytes, retries, pool_size):
"""
Lebih cepat dari versi lama:
- utamakan https://domain
- lalu http://domain
- hanya coba www jika hasil awal masih jelek
"""
candidates_primary = [
f"https://{domain}",
f"http://{domain}",
]
candidates_www = [
f"https://www.{domain}",
f"http://www.{domain}",
]
results = []
best = None
for url in candidates_primary:
r = probe_url(url, connect_timeout, read_timeout, max_redirects, max_bytes, retries, pool_size)
results.append(r)
if best is None or score_result(r) > score_result(best):
best = r
if is_good_enough(best):
return results, best
for url in candidates_www:
r = probe_url(url, connect_timeout, read_timeout, max_redirects, max_bytes, retries, pool_size)
results.append(r)
if best is None or score_result(r) > score_result(best):
best = r
if is_good_enough(best):
return results, best
return results, best
# =========================
# CLASSIFICATION
# =========================
def classify_bucket(best):
if best["page_type"] in ["LIVE 200", "LIVE NON-HTML"]:
return "LIVE_OK"
if best["status_code"] != "":
return "WARNING"
if best["page_type"] == "TOO MANY REDIRECTS" and best["chain"]:
return "WARNING"
return "OFFLINE"
def build_notes(domain, dns_info, best):
notes = []
if not dns_info["dns_ok"]:
notes.append("DNS problem")
if best["ssl_status"] == "INVALID":
notes.append("SSL invalid")
final_url = best["final_url"] or ""
start_url = best["start_url"] or ""
final_host = host_of(final_url)
if start_url and final_url and start_url != final_url:
notes.append("Redirected")
if final_url.startswith("http://"):
notes.append("HTTP only")
if final_host and final_host not in {domain, f"www.{domain}"}:
notes.append(f"Redirect to other host: {final_host}")
pt = best["page_type"]
if pt == "DEFAULT HOSTING PAGE":
notes.append("Server default page")
elif pt == "PARKED":
notes.append("Parked domain")
elif pt == "FOR SALE":
notes.append("Domain for sale")
elif pt == "COMING SOON":
notes.append("Coming soon page")
elif pt == "SUSPENDED":
notes.append("Suspended; check billing/hosting")
elif pt == "EXPIRED / RENEWAL ISSUE":
notes.append("Expired or renewal issue")
elif pt == "SERVER ERROR":
notes.append("Website reachable but server error")
elif pt == "NOT FOUND":
notes.append("Host reachable but page not found")
elif pt == "FORBIDDEN":
notes.append("Host reachable but forbidden")
elif pt == "CONNECTION FAILED":
notes.append("Cannot connect to web server")
elif pt == "TIMEOUT":
notes.append("Request timeout")
ct = (best["content_type"] or "").strip()
if ct and not is_html_like(ct):
notes.append(f"Non-HTML content: {ct}")
return "; ".join(notes)
def summarize_domain(domain, args):
dns_info = get_dns_info(domain, dns_timeout=args.dns_timeout, full_dns=False)
attempts, best = best_probe(
domain=domain,
connect_timeout=args.connect_timeout,
read_timeout=args.read_timeout,
max_redirects=args.max_redirects,
max_bytes=args.max_bytes,
retries=args.retries,
pool_size=max(args.workers, 20),
)
bucket = classify_bucket(best)
notes = build_notes(domain, dns_info, best)
row = {
"checked_at": now_utc(),
"domain": domain,
"bucket": bucket,
"page_type": best["page_type"],
"dns_ok": dns_info["dns_ok"],
"dns_error": dns_info["dns_error"],
"status_code": best["status_code"],
"ssl_status": best["ssl_status"],
"content_type": best["content_type"],
"elapsed_ms": best["elapsed_ms"],
"best_start_url": best["start_url"],
"best_final_url": best["final_url"],
"title": best["title"],
"notes": notes,
"error": best["error"],
"redirect_chain": best["chain"],
"A": ", ".join(dns_info["A"]),
"AAAA": ", ".join(dns_info["AAAA"]),
"CNAME": ", ".join(dns_info["CNAME"]),
"all_attempts": " || ".join(
f'{r["start_url"]} => {r["page_type"]} ({r["status_code"]}) -> {r["final_url"]}'
for r in attempts
),
}
return row
def fallback_error_row(domain, err_msg):
return {
"checked_at": now_utc(),
"domain": domain,
"bucket": "OFFLINE",
"page_type": "SCRIPT ERROR",
"dns_ok": "",
"dns_error": "",
"status_code": "",
"ssl_status": "",
"content_type": "",
"elapsed_ms": "",
"best_start_url": "",
"best_final_url": "",
"title": "",
"notes": "Internal script error",
"error": err_msg,
"redirect_chain": "",
"A": "",
"AAAA": "",
"CNAME": "",
"all_attempts": "",
}
# =========================
# SUMMARY
# =========================
def update_summary(path, total_input, skipped_resume, processed_now, counts_bucket, counts_type):
lines = []
lines.append("DOMAIN STATUS FAST CHECKER")
lines.append("=" * 40)
lines.append(f"generated_at : {now_utc()}")
lines.append(f"total_input : {total_input}")
lines.append(f"skipped_resume : {skipped_resume}")
lines.append(f"processed_now : {processed_now}")
lines.append(f"remaining_est : {max(total_input - skipped_resume - processed_now, 0)}")
lines.append("")
lines.append("BUCKET COUNTS")
lines.append("-" * 40)
for k in ["LIVE_OK", "WARNING", "OFFLINE"]:
lines.append(f"{k:12}: {counts_bucket.get(k, 0)}")
lines.append("")
lines.append("PAGE TYPE COUNTS")
lines.append("-" * 40)
for k, v in counts_type.most_common():
lines.append(f"{k:28}: {v}")
atomic_write_text(path, "\n".join(lines) + "\n")
# =========================
# WRITER THREAD
# =========================
def writer_loop(result_queue, args, total_input, skipped_resume):
out = args.output
os.makedirs(out, exist_ok=True)
all_csv = os.path.join(out, "all_results.csv")
hidup_csv = os.path.join(out, "hidup_results.csv")
live_ok_csv = os.path.join(out, "live_ok_results.csv")
warning_csv = os.path.join(out, "warning_results.csv")
offline_csv = os.path.join(out, "offline_results.csv")
hidup_txt = os.path.join(out, "hidup_domains.txt")
live_ok_txt = os.path.join(out, "live_ok_domains.txt")
warning_txt = os.path.join(out, "warning_domains.txt")
offline_txt = os.path.join(out, "offline_domains.txt")
summary_txt = os.path.join(out, "summary.txt")
progress_log = os.path.join(out, "progress.log")
all_writer = SafeCsvWriter(all_csv, FIELDNAMES)
hidup_writer = SafeCsvWriter(hidup_csv, FIELDNAMES)
live_ok_writer = SafeCsvWriter(live_ok_csv, FIELDNAMES)
warning_writer = SafeCsvWriter(warning_csv, FIELDNAMES)
offline_writer = SafeCsvWriter(offline_csv, FIELDNAMES)
hidup_list_writer = SafeLineWriter(hidup_txt)
live_ok_list_writer = SafeLineWriter(live_ok_txt)
warning_list_writer = SafeLineWriter(warning_txt)
offline_list_writer = SafeLineWriter(offline_txt)
log_writer = SafeLineWriter(progress_log)
counts_bucket = Counter()
counts_type = Counter()
processed_now = 0
update_summary(summary_txt, total_input, skipped_resume, processed_now, counts_bucket, counts_type)
log_writer.write_line(f"RUN START {now_utc()} total_input={total_input} skipped_resume={skipped_resume}")
while True:
item = result_queue.get()
if item is None:
break
row = item
domain = row["domain"]
all_writer.writerow(row)
if row["bucket"] in ("LIVE_OK", "WARNING"):
hidup_writer.writerow(row)
hidup_list_writer.write_line(domain)
if row["bucket"] == "LIVE_OK":
live_ok_writer.writerow(row)
live_ok_list_writer.write_line(domain)
elif row["bucket"] == "WARNING":
warning_writer.writerow(row)
warning_list_writer.write_line(domain)
else:
offline_writer.writerow(row)
offline_list_writer.write_line(domain)
counts_bucket[row["bucket"]] += 1
counts_type[row["page_type"]] += 1
processed_now += 1
update_summary(summary_txt, total_input, skipped_resume, processed_now, counts_bucket, counts_type)
log_writer.write_line(
f"DONE {now_utc()} {domain} bucket={row['bucket']} type={row['page_type']} code={row['status_code']}"
)
code_text = str(row["status_code"]) if row["status_code"] != "" else "-"
print(
f"[{processed_now}] "
f"{domain:30} -> {colorize_bucket(row['bucket']):18} | "
f"{row['page_type'][:28]:28} | "
f"{code_text:4} | "
f"{str(row['elapsed_ms'])[:6]:6} ms | "
f"{row['best_final_url'][:45]}"
)
log_writer.write_line(f"RUN END {now_utc()} processed_now={processed_now}")
all_writer.close()
hidup_writer.close()
live_ok_writer.close()
warning_writer.close()
offline_writer.close()
hidup_list_writer.close()
live_ok_list_writer.close()
warning_list_writer.close()
offline_list_writer.close()
log_writer.close()
# =========================
# WORKER
# =========================
def worker(domain, args, result_queue):
if STOP_EVENT.is_set():
return
try:
row = summarize_domain(domain, args)
except Exception as e:
row = fallback_error_row(domain, f"{type(e).__name__}: {e}")
result_queue.put(row)
if args.delay > 0:
time.sleep(args.delay)
# =========================
# FILE CLEANUP
# =========================
def remove_if_exists(path):
try:
if os.path.exists(path):
os.remove(path)
except Exception:
pass
def cleanup_output(outdir):
files = [
"all_results.csv",
"hidup_results.csv",
"live_ok_results.csv",
"warning_results.csv",
"offline_results.csv",
"hidup_domains.txt",
"live_ok_domains.txt",
"warning_domains.txt",
"offline_domains.txt",
"summary.txt",
"progress.log",
]
for f in files:
remove_if_exists(os.path.join(outdir, f))
# =========================
# CLI
# =========================
def parse_args():
parser = argparse.ArgumentParser(description="Fast CLI Domain Status Checker")
parser.add_argument("-i", "--input", required=True, help="File domain input (.txt)")
parser.add_argument("-o", "--output", default="results", help="Folder output")
parser.add_argument("-w", "--workers", type=int, default=DEFAULT_WORKERS, help="Jumlah worker paralel")
parser.add_argument("--connect-timeout", type=int, default=DEFAULT_CONNECT_TIMEOUT, help="Connect timeout")
parser.add_argument("--read-timeout", type=int, default=DEFAULT_READ_TIMEOUT, help="Read timeout")
parser.add_argument("--dns-timeout", type=int, default=DEFAULT_DNS_TIMEOUT, help="DNS timeout")
parser.add_argument("--max-redirects", type=int, default=DEFAULT_MAX_REDIRECTS, help="Maks redirect")
parser.add_argument("--max-bytes", type=int, default=DEFAULT_MAX_BYTES, help="Max body dibaca")
parser.add_argument("--retries", type=int, default=DEFAULT_RETRIES, help="Retry koneksi ringan")
parser.add_argument("--delay", type=float, default=0.0, help="Delay per domain")
parser.add_argument("--fresh", action="store_true", help="Hapus hasil lama")
parser.add_argument("--no-resume", action="store_true", help="Jangan skip hasil lama")
return parser.parse_args()
def main():
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
args = parse_args()
os.makedirs(args.output, exist_ok=True)
if args.fresh:
cleanup_output(args.output)
all_csv = os.path.join(args.output, "all_results.csv")
domains = load_domains(args.input)
if not domains:
print("Tidak ada domain valid di file input.")
sys.exit(1)
processed = set()
if not args.no_resume:
processed = load_processed_domains(all_csv)
queue_domains = [d for d in domains if d not in processed]
print(f"{C_CYAN}Total input : {len(domains)}{C_RESET}")
print(f"{C_CYAN}Sudah diproses : {len(processed)}{C_RESET}")
print(f"{C_CYAN}Akan diproses : {len(queue_domains)}{C_RESET}")
print(f"{C_CYAN}Workers : {args.workers}{C_RESET}")
print(f"{C_CYAN}Output folder : {args.output}{C_RESET}")
print("-" * 110)
result_queue = Queue()
writer_thread = threading.Thread(
target=writer_loop,
args=(result_queue, args, len(domains), len(processed)),
daemon=True
)
writer_thread.start()
executor = ThreadPoolExecutor(max_workers=args.workers)
try:
futures = []
for domain in queue_domains:
if STOP_EVENT.is_set():
break
futures.append(executor.submit(worker, domain, args, result_queue))
for f in futures:
if STOP_EVENT.is_set():
break
try:
f.result()
except Exception:
pass
except KeyboardInterrupt:
STOP_EVENT.set()
print(f"\n{C_RED}Dihentikan paksa oleh user.{C_RESET}")
finally:
try:
executor.shutdown(wait=False, cancel_futures=True)
except TypeError:
executor.shutdown(wait=False)
# beri waktu singkat supaya worker yang sudah selesai sempat push queue
time.sleep(0.5)
result_queue.put(None)
writer_thread.join(timeout=10)
print("-" * 110)
print(f"{C_GREEN}Selesai / berhenti aman.{C_RESET}")
print(f"All CSV : {os.path.join(args.output, 'all_results.csv')}")
print(f"Hidup CSV : {os.path.join(args.output, 'hidup_results.csv')}")
print(f"Live OK CSV : {os.path.join(args.output, 'live_ok_results.csv')}")
print(f"Warning CSV : {os.path.join(args.output, 'warning_results.csv')}")
print(f"Offline CSV : {os.path.join(args.output, 'offline_results.csv')}")
print(f"Summary : {os.path.join(args.output, 'summary.txt')}")
if __name__ == "__main__":
main()
Cara pakai
1. Buat file input
Misalnya domains.txt
trazam.com
deudex.com
4bx-cd.com
j3kram.com
edzzup.com
usaiptv.tv
Txt
2. Jalankan
python3 domain_status_fast.py -i domains.txt -o hasil
3. Untuk 100 domain, saya sarankan
Kalau VPS/server cukup kuat:
python3 domain_status_fast.py -i domains.txt -o hasil -w 25
Kalau ingin lebih cepat:
python3 domain_status_fast.py -i domains.txt -o hasil -w 40
Kalau server kecil:
python3 domain_status_fast.py -i domains.txt -o hasil -w 15
4. Mulai ulang dari nol
python3 domain_status_fast.py -i domains.txt -o hasil --fresh
5. Resume otomatis
Kalau pernah berhenti di tengah:
python3 domain_status_fast.py -i domains.txt -o hasil
Dia akan baca all_results.csv, lalu skip domain yang sudah pernah dicek.
File output
CSV
all_results.csv→ semua hasilhidup_results.csv→ semua domain yang masih reachablelive_ok_results.csv→ website yang normal/livewarning_results.csv→ reachable tapi bermasalahoffline_results.csv→ tidak reachable / gagal
TXT
hidup_domains.txtlive_ok_domains.txtwarning_domains.txtoffline_domains.txt
Ringkasan
summary.txtprogress.log
Arti bucket hasil
LIVE_OK
Domain/website aktif normal:
LIVE 200LIVE NON-HTML
WARNING
Domain masih hidup / reachable, tapi ada masalah:
DEFAULT HOSTING PAGEPARKEDFOR SALECOMING SOONSUSPENDEDEXPIRED / RENEWAL ISSUE403404503SSL invalid- redirect aneh
OFFLINE
Benar-benar gagal:
- DNS tidak ada
- connection failed
- timeout
- request error
Kenapa versi ini lebih cepat?
1. Multi-thread
Bukan 1 domain selesai dulu baru pindah,
tapi banyak domain dicek bersamaan.
Itu alasan utama kenapa lebih mirip kecepatan HeadMasterSEO.
2. Optimasi kandidat URL
Script ini tidak selalu memaksa cek semua:
https://domainhttp://domain
Dulu duluan.
Kalau sudah dapat hasil yang “cukup jelas”, dia bisa stop lebih cepat
tanpa harus selalu lanjut ke versi www.
3. Body dibatasi
Hanya baca sebagian isi halaman,
cukup untuk ambil:
- title
- keyword parked
- keyword suspended
- keyword expired
- default hosting page
Jadi lebih cepat dan lebih ringan.
4. DNS dibuat ringan
Default-nya DNS ringan:
AAAAACNAME
Bukan scan DNS yang terlalu lengkap.
5. Hasil langsung ditulis
Setiap domain selesai:
- langsung tulis file
- flush
- fsync
Jadi kalau proses berhenti, hasil tetap aman.
Rekomendasi setting untuk 100 domain
Aman dan cukup cepat
python3 domain_status_fast.py -i domains.txt -o hasil -w 20 --connect-timeout 5 --read-timeout 7 --dns-timeout 2
Lebih cepat
python3 domain_status_fast.py -i domains.txt -o hasil -w 35 --connect-timeout 4 --read-timeout 6 --dns-timeout 2
Kalau sering kena hang / host lambat
python3 domain_status_fast.py -i domains.txt -o hasil -w 15 --connect-timeout 4 --read-timeout 5 --dns-timeout 2
Penting: jangan terlalu besar worker
Kalau Anda pakai:
-w 100
belum tentu lebih cepat.
Malah bisa:
- bikin VPS berat
- koneksi padat
- DNS resolver kepanasan
- banyak timeout palsu
Untuk 100 domain, biasanya sweet spot:
- 20 sampai 40 worker
Kalau Anda bandingkan dengan HeadMasterSEO
Tetap ada kemungkinan HeadMasterSEO masih lebih cepat, karena:
- dia aplikasi native
- optimasi network stack lebih matang
- kadang tidak membaca body terlalu banyak
- bisa jadi dia tidak mengecek DNS sedetail script Python
Tapi dengan script di atas, performanya harus jauh lebih cepat daripada versi sequential/non-CLI.
Saran saya untuk Anda
Coba langsung begini:
python3 domain_status_fast.py -i domains.txt -o hasil -w 25 --fresh
Kalau sudah keluar hasil, kirim 5–10 baris sample output atau isi CSV ke sini.
Nanti saya bantu lihat apakah:
- klasifikasinya sudah pas
- masih ada kategori yang perlu ditambah
- perlu dibuat lebih mirip HeadMasterSEO
- perlu ditambah mode ultra-fast
Kalau mau, langkah berikutnya saya bisa buatkan 2 mode:
1. --mode fast
Sangat cepat, klasifikasi dasar, cocok untuk list besar.
2. --mode full
Lebih detail, lebih akurat, lebih lambat sedikit.
Kalau Anda mau, saya bisa lanjutkan langsung buatkan versi dual-mode (fast/full) supaya:
- untuk 1000 domain bisa pakai fast
- untuk domain penting bisa pakai full.