Berikut versi yang dioptimasi menggunakan ThreadPoolExecutor dan beberapa peningkatan performa:

import paramiko
import argparse
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue

# Lock untuk thread-safe file writing
file_lock = threading.Lock()

def check_ssh_connection(ip, username, password, output_file, timeout=5):
    """
    Cek koneksi SSH dengan timeout yang dapat dikonfigurasi
    """
    try:
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        
        # Tambah timeout untuk koneksi lebih cepat
        ssh.connect(
            ip,
            username=username,
            password=password,
            timeout=timeout,          # Timeout koneksi
            auth_timeout=timeout,     # Timeout autentikasi
            banner_timeout=timeout,   # Timeout banner
            look_for_keys=False,      # Skip pencarian SSH keys
            allow_agent=False         # Skip SSH agent
        )
        
        result = f"[SUCCESS] {username}@{ip}:{password}\n"
        print(result.strip())
        
        # Thread-safe file writing
        with file_lock:
            output_file.write(result)
            output_file.flush()  # Langsung tulis ke file
            
        ssh.close()
        return True
        
    except paramiko.AuthenticationException:
        result = f"[FAILED] {username}@{ip} - Authentication failed\n"
        print(result.strip())
        return False
        
    except paramiko.SSHException as e:
        result = f"[ERROR] {username}@{ip} - SSH Error: {str(e)}\n"
        print(result.strip())
        return False
        
    except Exception as e:
        result = f"[ERROR] {username}@{ip} - {str(e)}\n"
        print(result.strip())
        return False

def load_credentials(filename):
    """
    Load dan validasi kredensial dari file
    """
    credentials = []
    invalid_count = 0
    
    try:
        with open(filename, 'r') as f:
            for line_num, line in enumerate(f, 1):
                line = line.strip()
                if not line:
                    continue
                    
                parts = line.split('|')
                if len(parts) != 3:
                    print(f"[SKIP] Line {line_num}: Format tidak valid -> {line}")
                    invalid_count += 1
                    continue
                    
                ip, username, password = parts
                if ip and username and password:  # Pastikan tidak kosong
                    credentials.append((ip.strip(), username.strip(), password.strip()))
                    
    except FileNotFoundError:
        print(f"[ERROR] File '{filename}' tidak ditemukan!")
        return []
    
    print(f"[INFO] Total kredensial valid: {len(credentials)}")
    if invalid_count:
        print(f"[INFO] Total kredensial invalid: {invalid_count}")
        
    return credentials

def main():
    parser = argparse.ArgumentParser(
        description="Fast SSH Connection Checker",
        formatter_class=argparse.RawTextHelpFormatter
    )
    parser.add_argument(
        '-o', '--output',
        default='sshlives.txt',
        help='Output file (default: sshlives.txt)'
    )
    parser.add_argument(
        '-i', '--input',
        default='live.txt',
        help='Input file (default: live.txt)'
    )
    parser.add_argument(
        '-t', '--threads',
        type=int,
        default=50,  # Lebih banyak thread = lebih cepat
        help='Jumlah thread concurrent (default: 50)'
    )
    parser.add_argument(
        '--timeout',
        type=int,
        default=5,
        help='Timeout koneksi dalam detik (default: 5)'
    )
    args = parser.parse_args()

    # Load kredensial
    credentials = load_credentials(args.input)
    if not credentials:
        print("[ERROR] Tidak ada kredensial yang valid!")
        return

    total = len(credentials)
    success_count = 0
    failed_count = 0
    counter_lock = threading.Lock()

    print(f"\n[INFO] Memulai pengecekan {total} target dengan {args.threads} threads...\n")

    with open(args.output, 'a') as output_file:
        # ThreadPoolExecutor lebih efisien dari manual threading
        with ThreadPoolExecutor(max_workers=args.threads) as executor:
            
            # Submit semua task sekaligus
            futures = {
                executor.submit(
                    check_ssh_connection,
                    ip, username, password,
                    output_file,
                    args.timeout
                ): (ip, username)
                for ip, username, password in credentials
            }
            
            # Proses hasil seiring selesainya task
            for i, future in enumerate(as_completed(futures), 1):
                ip, username = futures[future]
                
                try:
                    result = future.result()
                    with counter_lock:
                        if result:
                            success_count += 1
                        else:
                            failed_count += 1
                except Exception as e:
                    print(f"[ERROR] Unexpected error untuk {username}@{ip}: {e}")
                    with counter_lock:
                        failed_count += 1
                
                # Progress indicator
                print(f"[PROGRESS] {i}/{total} | ✓ {success_count} | ✗ {failed_count}", 
                      end='\r')

    # Summary
    print(f"\n\n{'='*50}")
    print(f"[SELESAI] Total    : {total}")
    print(f"[SELESAI] Berhasil : {success_count}")
    print(f"[SELESAI] Gagal    : {failed_count}")
    print(f"[SELESAI] Output   : {args.output}")
    print(f"{'='*50}")

if __name__ == "__main__":
    main()

Cara Penggunaan

# Default (50 threads, timeout 5 detik)
python script.py

# Custom threads dan timeout
python script.py -t 100 --timeout 3

# Custom input/output file
python script.py -i targets.txt -o hasil.txt

# Kombinasi semua opsi
python script.py -i targets.txt -o hasil.txt -t 100 --timeout 3

⚠️ Catatan: Semakin banyak thread tidak selalu lebih baik. Sesuaikan dengan spesifikasi komputer dan kecepatan jaringan Anda. Mulai dari 50 thread dan naikkan bertahap jika diperlukan.