Berikut versi yang dioptimasi menggunakan ThreadPoolExecutor dan beberapa peningkatan performa:
import paramiko
import argparse
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
# Lock untuk thread-safe file writing
file_lock = threading.Lock()
def check_ssh_connection(ip, username, password, output_file, timeout=5):
"""
Cek koneksi SSH dengan timeout yang dapat dikonfigurasi
"""
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Tambah timeout untuk koneksi lebih cepat
ssh.connect(
ip,
username=username,
password=password,
timeout=timeout, # Timeout koneksi
auth_timeout=timeout, # Timeout autentikasi
banner_timeout=timeout, # Timeout banner
look_for_keys=False, # Skip pencarian SSH keys
allow_agent=False # Skip SSH agent
)
result = f"[SUCCESS] {username}@{ip}:{password}\n"
print(result.strip())
# Thread-safe file writing
with file_lock:
output_file.write(result)
output_file.flush() # Langsung tulis ke file
ssh.close()
return True
except paramiko.AuthenticationException:
result = f"[FAILED] {username}@{ip} - Authentication failed\n"
print(result.strip())
return False
except paramiko.SSHException as e:
result = f"[ERROR] {username}@{ip} - SSH Error: {str(e)}\n"
print(result.strip())
return False
except Exception as e:
result = f"[ERROR] {username}@{ip} - {str(e)}\n"
print(result.strip())
return False
def load_credentials(filename):
"""
Load dan validasi kredensial dari file
"""
credentials = []
invalid_count = 0
try:
with open(filename, 'r') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
parts = line.split('|')
if len(parts) != 3:
print(f"[SKIP] Line {line_num}: Format tidak valid -> {line}")
invalid_count += 1
continue
ip, username, password = parts
if ip and username and password: # Pastikan tidak kosong
credentials.append((ip.strip(), username.strip(), password.strip()))
except FileNotFoundError:
print(f"[ERROR] File '{filename}' tidak ditemukan!")
return []
print(f"[INFO] Total kredensial valid: {len(credentials)}")
if invalid_count:
print(f"[INFO] Total kredensial invalid: {invalid_count}")
return credentials
def main():
parser = argparse.ArgumentParser(
description="Fast SSH Connection Checker",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
'-o', '--output',
default='sshlives.txt',
help='Output file (default: sshlives.txt)'
)
parser.add_argument(
'-i', '--input',
default='live.txt',
help='Input file (default: live.txt)'
)
parser.add_argument(
'-t', '--threads',
type=int,
default=50, # Lebih banyak thread = lebih cepat
help='Jumlah thread concurrent (default: 50)'
)
parser.add_argument(
'--timeout',
type=int,
default=5,
help='Timeout koneksi dalam detik (default: 5)'
)
args = parser.parse_args()
# Load kredensial
credentials = load_credentials(args.input)
if not credentials:
print("[ERROR] Tidak ada kredensial yang valid!")
return
total = len(credentials)
success_count = 0
failed_count = 0
counter_lock = threading.Lock()
print(f"\n[INFO] Memulai pengecekan {total} target dengan {args.threads} threads...\n")
with open(args.output, 'a') as output_file:
# ThreadPoolExecutor lebih efisien dari manual threading
with ThreadPoolExecutor(max_workers=args.threads) as executor:
# Submit semua task sekaligus
futures = {
executor.submit(
check_ssh_connection,
ip, username, password,
output_file,
args.timeout
): (ip, username)
for ip, username, password in credentials
}
# Proses hasil seiring selesainya task
for i, future in enumerate(as_completed(futures), 1):
ip, username = futures[future]
try:
result = future.result()
with counter_lock:
if result:
success_count += 1
else:
failed_count += 1
except Exception as e:
print(f"[ERROR] Unexpected error untuk {username}@{ip}: {e}")
with counter_lock:
failed_count += 1
# Progress indicator
print(f"[PROGRESS] {i}/{total} | ✓ {success_count} | ✗ {failed_count}",
end='\r')
# Summary
print(f"\n\n{'='*50}")
print(f"[SELESAI] Total : {total}")
print(f"[SELESAI] Berhasil : {success_count}")
print(f"[SELESAI] Gagal : {failed_count}")
print(f"[SELESAI] Output : {args.output}")
print(f"{'='*50}")
if __name__ == "__main__":
main()Cara Penggunaan
# Default (50 threads, timeout 5 detik)
python script.py
# Custom threads dan timeout
python script.py -t 100 --timeout 3
# Custom input/output file
python script.py -i targets.txt -o hasil.txt
# Kombinasi semua opsi
python script.py -i targets.txt -o hasil.txt -t 100 --timeout 3⚠️ Catatan: Semakin banyak thread tidak selalu lebih baik. Sesuaikan dengan spesifikasi komputer dan kecepatan jaringan Anda. Mulai dari 50 thread dan naikkan bertahap jika diperlukan.