mirror of
https://github.com/maglore9900/max_headroom.git
synced 2025-06-06 19:45:31 +00:00
209 lines
7.8 KiB
Python
209 lines
7.8 KiB
Python
import requests
|
|
import winsound
|
|
import speech_recognition as sr
|
|
import pyttsx3
|
|
import os
|
|
import vlc
|
|
import time
|
|
import pyaudio
|
|
from pydub import AudioSegment
|
|
import random
|
|
import urllib.parse
|
|
|
|
import os
|
|
import json
|
|
import pyaudio
|
|
from vosk import Model, KaldiRecognizer
|
|
import noisereduce as nr
|
|
from numpy import frombuffer, int16
|
|
import numpy as np
|
|
|
|
class Speak:
|
|
def __init__(self):
|
|
self.url = "http://127.0.0.1:7851/api/tts-generate"
|
|
self.recognizer = sr.Recognizer()
|
|
self.microphone = sr.Microphone()
|
|
self.engine = pyttsx3.init()
|
|
self.engine.setProperty('rate', 150)
|
|
|
|
self.model_path = os.path.join(os.path.dirname(__file__), "../models/vosk-model-en-us-0.42-gigaspeech")
|
|
self.model = Model(self.model_path)
|
|
self.recognizer = KaldiRecognizer(self.model, 16000)
|
|
|
|
|
|
#! listen with google
|
|
def listen(self):
|
|
with self.microphone as source:
|
|
# Adjust for ambient noise
|
|
self.recognizer.adjust_for_ambient_noise(source, duration=1)
|
|
print("Listening...")
|
|
try:
|
|
# Listen with a 5-second timeout
|
|
audio = self.recognizer.listen(source, timeout=10)
|
|
try:
|
|
text = self.recognizer.recognize_google(audio)
|
|
print("You said: ", text)
|
|
return text
|
|
except sr.UnknownValueError:
|
|
print("Sorry, I didn't get that.")
|
|
return None
|
|
except sr.RequestError as e:
|
|
print("Sorry, I couldn't request results; {0}".format(e))
|
|
return None
|
|
except sr.WaitTimeoutError:
|
|
print("Timeout. No speech detected.")
|
|
return None
|
|
|
|
#! listen with vosk
|
|
def listen2(self, noise_threshold=500):
|
|
p = pyaudio.PyAudio()
|
|
stream = p.open(format=pyaudio.paInt16, channels=1, rate=16000, input=True, frames_per_buffer=8000)
|
|
stream.start_stream()
|
|
print("Listening...")
|
|
count = 0
|
|
try:
|
|
while count < 10:
|
|
data = stream.read(8000, exception_on_overflow=False)
|
|
filtered_data = nr.reduce_noise(y=frombuffer(data, dtype=int16), sr=16000).astype(int16).tobytes()
|
|
|
|
# Calculate RMS to detect ambient noise levels
|
|
rms_value = np.sqrt(np.mean(np.square(np.frombuffer(filtered_data, dtype=int16))))
|
|
|
|
if rms_value < noise_threshold:
|
|
if self.recognizer.AcceptWaveform(filtered_data):
|
|
result = json.loads(self.recognizer.Result())
|
|
if result["text"]:
|
|
print(f"Recognized: {result['text']}")
|
|
return result['text']
|
|
else:
|
|
print(f"Ambient noise detected: RMS {rms_value} exceeds threshold {noise_threshold}")
|
|
count += 1
|
|
except KeyboardInterrupt:
|
|
print("Stopping...")
|
|
finally:
|
|
stream.stop_stream()
|
|
stream.close()
|
|
p.terminate()
|
|
|
|
def stream_output(self, text):
|
|
import urllib.parse
|
|
# Example parameters
|
|
voice = "maxheadroom_00000045.wav"
|
|
language = "en"
|
|
output_file = "stream_output.wav"
|
|
|
|
# Encode the text for URL
|
|
encoded_text = urllib.parse.quote(text)
|
|
|
|
# Create the streaming URL
|
|
streaming_url = f"http://localhost:7851/api/tts-generate-streaming?text={encoded_text}&voice={voice}&language={language}&output_file={output_file}"
|
|
|
|
# Create and play the audio stream using VLC
|
|
player = vlc.MediaPlayer(streaming_url)
|
|
|
|
def on_end_reached(event):
|
|
print("End of stream reached.")
|
|
player.stop()
|
|
|
|
# Attach event to detect when the stream ends
|
|
event_manager = player.event_manager()
|
|
event_manager.event_attach(vlc.EventType.MediaPlayerEndReached, on_end_reached)
|
|
|
|
# Start playing the stream
|
|
player.play()
|
|
|
|
# Keep the script running to allow the stream to play
|
|
while True:
|
|
state = player.get_state()
|
|
if state in [vlc.State.Ended, vlc.State.Stopped, vlc.State.Error]:
|
|
break
|
|
time.sleep(1)
|
|
|
|
def glitch_stream_output(self, text):
|
|
def change_pitch(sound, octaves):
|
|
val = random.randint(0, 7)
|
|
if val == 1:
|
|
new_sample_rate = int(sound.frame_rate * (2.0 ** octaves))
|
|
return sound._spawn(sound.raw_data, overrides={'frame_rate': new_sample_rate}).set_frame_rate(sound.frame_rate)
|
|
else:
|
|
return sound
|
|
|
|
def convert_audio_format(sound, target_sample_rate=16000):
|
|
# Ensure the audio is in PCM16 format
|
|
sound = sound.set_sample_width(2) # PCM16 = 2 bytes per sample
|
|
# Resample the audio to the target sample rate
|
|
sound = sound.set_frame_rate(target_sample_rate)
|
|
return sound
|
|
|
|
# Example parameters
|
|
voice = "maxheadroom_00000045.wav"
|
|
language = "en"
|
|
output_file = "stream_output.wav"
|
|
|
|
# Encode the text for URL
|
|
encoded_text = urllib.parse.quote(text)
|
|
|
|
# Create the streaming URL
|
|
streaming_url = f"http://localhost:7851/api/tts-generate-streaming?text={encoded_text}&voice={voice}&language={language}&output_file={output_file}"
|
|
try:
|
|
# Stream the audio data
|
|
response = requests.get(streaming_url, stream=True)
|
|
|
|
# Initialize PyAudio
|
|
p = pyaudio.PyAudio()
|
|
stream = None
|
|
|
|
# Process the audio stream in chunks
|
|
chunk_size = 1024 * 7 # Adjust chunk size if needed
|
|
audio_buffer = b''
|
|
|
|
for chunk in response.iter_content(chunk_size=chunk_size):
|
|
audio_buffer += chunk
|
|
|
|
if len(audio_buffer) < chunk_size:
|
|
continue
|
|
|
|
audio_segment = AudioSegment(
|
|
data=audio_buffer,
|
|
sample_width=2, # 2 bytes for 16-bit audio
|
|
frame_rate=24000, # Assumed frame rate, adjust as necessary
|
|
channels=1 # Assuming mono audio
|
|
)
|
|
|
|
# Randomly adjust pitch
|
|
octaves = random.uniform(-0.5, 1.5)
|
|
modified_chunk = change_pitch(audio_segment, octaves)
|
|
|
|
if random.random() < 0.01: # 1% chance to trigger stutter
|
|
repeat_times = random.randint(2, 5) # Repeat 2 to 5 times
|
|
for _ in range(repeat_times):
|
|
stream.write(modified_chunk.raw_data)
|
|
|
|
# Convert to PCM16 and 16kHz sample rate after the stutter effect
|
|
modified_chunk = convert_audio_format(modified_chunk, target_sample_rate=16000)
|
|
|
|
if stream is None:
|
|
# Define stream parameters
|
|
stream = p.open(format=pyaudio.paInt16,
|
|
channels=1,
|
|
rate=modified_chunk.frame_rate,
|
|
output=True)
|
|
|
|
# Play the modified chunk
|
|
stream.write(modified_chunk.raw_data)
|
|
|
|
# Reset buffer
|
|
audio_buffer = b''
|
|
|
|
# Final cleanup
|
|
if stream:
|
|
stream.stop_stream()
|
|
stream.close()
|
|
p.terminate()
|
|
except:
|
|
self.engine.say(text)
|
|
self.engine.runAndWait()
|
|
|
|
|
|
# sp = Speak()
|
|
# sp.glitch_stream_output2("this is a test of pitch and stutter. test 1 2 3. I just need a long enough sentence to see the frequecy of sound changes.") |