2024-11-28 21:43:16 +01:00

206 lines
7.2 KiB
Python

import pycuda.autoinit
import pycuda.driver as cuda
from pycuda.compiler import SourceModule
import numpy as np
import time
import json
import os
from dotenv import load_dotenv
from libs.cudacode import CUDA_CRYPTO_CODE
# Load environment variables
load_dotenv()
PREFIX = os.getenv("PREFIX", "dEAD000000000000000042069420694206942069").lower()
BATCH_SIZE = 2**16 # 65536 addresses per batch
class EthereumVanityMiner:
def __init__(self):
# Initialize CUDA
cuda.init()
self.device = cuda.Device(0)
self.context = self.device.make_context()
# Compile CUDA module
self.mod = SourceModule(CUDA_CRYPTO_CODE, no_extern_c=True)
self.kernel = self.mod.get_function("generate_and_check")
# Prepare memory buffers
self.gpu_private_keys = cuda.mem_alloc(BATCH_SIZE * 32)
self.gpu_addresses = cuda.mem_alloc(BATCH_SIZE * 20)
self.gpu_match_lengths = cuda.mem_alloc(BATCH_SIZE * 4)
# Prepare target prefix
prefix = PREFIX[2:] if PREFIX.startswith("0x") else PREFIX
# Debug print
print(f"Converting prefix: {prefix}")
# Convert each pair of hex chars to a byte
self.prefix_bytes = bytearray()
for i in range(0, len(prefix), 2):
hex_pair = prefix[i:i+2]
byte_val = int(hex_pair, 16)
self.prefix_bytes.append(byte_val)
# Debug print
print(f"Prefix bytes: {[hex(b) for b in self.prefix_bytes]}")
self.gpu_target = cuda.mem_alloc(len(self.prefix_bytes))
cuda.memcpy_htod(self.gpu_target, self.prefix_bytes)
# Host buffers
self.host_match_lengths = np.zeros(BATCH_SIZE, dtype=np.int32)
self.host_private_key = np.zeros(32, dtype=np.uint8)
self.host_address = np.zeros(20, dtype=np.uint8)
def cleanup(self):
"""Clean up GPU resources"""
try:
self.gpu_private_keys.free()
self.gpu_addresses.free()
self.gpu_match_lengths.free()
self.gpu_target.free()
finally:
self.context.pop()
self.context.detach()
def save_match(
self, address_hex, private_key_hex, match_length, raw_address, raw_private_key
):
"""Save match to JSON file"""
try:
matches = []
if os.path.exists("cuda_matches.json"):
with open("cuda_matches.json", "r") as f:
matches = json.load(f)
match_data = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"address": f"0x{address_hex}",
"private_key": private_key_hex,
"match_length": int(match_length),
"raw_address": raw_address,
"raw_private_key": raw_private_key,
}
matches.append(match_data)
with open("cuda_matches.json", "w") as f:
json.dump(matches, f, indent=2)
except Exception as e:
print(f"\nWarning: Failed to save match to file: {e}")
def check_batch(self, seed):
"""Run one batch of address generation and checking"""
block = (256, 1, 1)
grid = ((BATCH_SIZE + block[0] - 1) // block[0], 1)
self.kernel(
self.gpu_private_keys,
self.gpu_addresses,
self.gpu_match_lengths,
self.gpu_target,
np.int32(len(self.prefix_bytes)),
np.uint32(seed),
block=block,
grid=grid
)
# Get results
cuda.memcpy_dtoh(self.host_match_lengths, self.gpu_match_lengths)
best_idx = np.argmax(self.host_match_lengths)
match_length = self.host_match_lengths[best_idx]
if match_length > 0:
temp_private_key = np.zeros(32, dtype=np.uint8)
temp_address = np.zeros(20, dtype=np.uint8)
cuda.memcpy_dtoh(temp_private_key, self.gpu_private_keys)
cuda.memcpy_dtoh(temp_address, self.gpu_addresses)
address_hex = ''.join(format(x, '02x') for x in temp_address)
private_key_hex = ''.join(format(x, '02x') for x in temp_private_key)
# Verify the match
target = PREFIX[2:] if PREFIX.startswith('0x') else PREFIX
actual_match = 0
for i, (t, a) in enumerate(zip(target, address_hex)):
if t.lower() != a.lower():
break
actual_match += 1
if actual_match > 0:
return {
'address': address_hex,
'private_key': private_key_hex,
'match_length': actual_match,
'raw_address': temp_address.tobytes().hex(),
'raw_private_key': temp_private_key.tobytes().hex()
}
return None
def mine(self, target_score=None):
"""Main mining loop"""
try:
best_match = {"match_length": 0}
start_time = time.time()
last_status_time = start_time
addresses_checked = 0
while True:
seed = np.random.randint(0, 2**32, dtype=np.uint32)
result = self.check_batch(seed)
addresses_checked += BATCH_SIZE
current_time = time.time()
# Print status every 5 seconds
if current_time - last_status_time >= 5:
elapsed_time = current_time - start_time
rate = addresses_checked / elapsed_time
print(
f"\rChecked {addresses_checked:,} addresses ({rate:,.0f}/s) - Best match: {best_match['match_length']} chars",
end="",
)
last_status_time = current_time
if result and result["match_length"] > best_match["match_length"]:
best_match = result
print(f"\nNew best match ({result['match_length']} chars):")
print(f"Address: 0x{result['address']}")
print(f"Private key: {result['private_key']}")
self.save_match(
result["address"],
result["private_key"],
result["match_length"],
result["raw_address"],
result["raw_private_key"],
)
if target_score and result["match_length"] >= target_score:
return result
except KeyboardInterrupt:
print("\nMining interrupted by user")
return best_match
finally:
self.cleanup()
if __name__ == "__main__":
try:
miner = EthereumVanityMiner()
print(f"Mining with {cuda.Device.count()} GPU(s)")
print(f"Target prefix: {PREFIX}")
print(f"Batch size: {BATCH_SIZE}")
result = miner.mine(target_score=len(PREFIX))
if result:
print("\nFinal result:")
print(f"Address: 0x{result['address']}")
print(f"Private key: {result['private_key']}")
print(f"Match length: {result['match_length']}")
except Exception as e:
print(f"Error: {e}")