| | import gradio as gr |
| | import json |
| | import requests |
| | import os |
| | import urllib.request |
| | import ssl |
| | import base64 |
| | import soundfile as sf |
| | from io import BytesIO |
| | import tempfile |
| | from datetime import datetime |
| | import logging |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| | logger = logging.getLogger(__name__) |
| |
|
| | class AzureSpeechTranslatorApp: |
| | def __init__(self): |
| | |
| | self.url = os.getenv("AZURE_ENDPOINT") |
| | self.api_key = os.getenv("AZURE_API_KEY") |
| | |
| | |
| | self.languages = { |
| | "English": { |
| | "code": "en", |
| | "native": "English" |
| | }, |
| | "Chinese": { |
| | "code": "zh", |
| | "native": "中文" |
| | }, |
| | "German": { |
| | "code": "de", |
| | "native": "Deutsch" |
| | }, |
| | "French": { |
| | "code": "fr", |
| | "native": "Français" |
| | }, |
| | "Italian": { |
| | "code": "it", |
| | "native": "Italiano" |
| | }, |
| | "Japanese": { |
| | "code": "ja", |
| | "native": "日本語" |
| | }, |
| | "Spanish": { |
| | "code": "es", |
| | "native": "Español" |
| | }, |
| | "Portuguese": { |
| | "code": "pt", |
| | "native": "Português" |
| | } |
| | } |
| | |
| | |
| | self.translations_dir = "translations" |
| | os.makedirs(self.translations_dir, exist_ok=True) |
| | self.translations = self.load_translations() |
| | |
| | def get_translation_file_path(self, lang_code): |
| | """Get path for language-specific translation file""" |
| | return os.path.join(self.translations_dir, f"translations_{lang_code}.json") |
| | |
| | def load_translations(self): |
| | """Load translations for all languages""" |
| | translations = {} |
| | for lang_info in self.languages.values(): |
| | file_path = self.get_translation_file_path(lang_info["code"]) |
| | if os.path.exists(file_path): |
| | with open(file_path, 'r', encoding='utf-8') as f: |
| | translations[lang_info["code"]] = json.load(f) |
| | else: |
| | translations[lang_info["code"]] = [] |
| | return translations |
| |
|
| | def save_translation(self, lang_code, translation): |
| | """Save translation for specific language""" |
| | file_path = self.get_translation_file_path(lang_code) |
| | with open(file_path, 'w', encoding='utf-8') as f: |
| | json.dump(translation, f, ensure_ascii=False, indent=2) |
| | |
| | def call_azure_endpoint(self, payload): |
| | """Call Azure ML endpoint with the given payload.""" |
| | |
| | def allow_self_signed_https(allowed): |
| | if allowed and not os.environ.get('PYTHONHTTPSVERIFY', '') and getattr(ssl, '_create_unverified_context', None): |
| | ssl._create_default_https_context = ssl._create_unverified_context |
| |
|
| | allow_self_signed_https(True) |
| | |
| | |
| | parameters = {"temperature": 0.7} |
| | if "parameters" not in payload["input_data"]: |
| | payload["input_data"]["parameters"] = parameters |
| | |
| | |
| | body = str.encode(json.dumps(payload)) |
| | |
| | if not self.api_key: |
| | raise Exception("A key should be provided to invoke the endpoint") |
| |
|
| | |
| | headers = {'Content-Type': 'application/json', 'Authorization': ('Bearer ' + self.api_key)} |
| | |
| | |
| | req = urllib.request.Request(self.url, body, headers) |
| |
|
| | try: |
| | logger.info(f"Sending request to {self.url}") |
| | response = urllib.request.urlopen(req) |
| | result = response.read().decode('utf-8') |
| | logger.info("Received response successfully") |
| | return json.loads(result) |
| | except urllib.error.HTTPError as error: |
| | logger.error(f"Request failed with status code: {error.code}") |
| | logger.error(f"Headers: {error.info()}") |
| | error_message = error.read().decode("utf8", 'ignore') |
| | logger.error(f"Error message: {error_message}") |
| | return {"error": error_message} |
| | |
| | def encode_audio_base64(self, audio_path): |
| | """Encode audio file to base64 and determine MIME type""" |
| | file_extension = os.path.splitext(audio_path)[1].lower() |
| | |
| | |
| | if file_extension == '.flac': |
| | mime_type = "audio/flac" |
| | elif file_extension == '.wav': |
| | mime_type = "audio/wav" |
| | elif file_extension == '.mp3': |
| | mime_type = "audio/mpeg" |
| | elif file_extension in ['.m4a', '.aac']: |
| | mime_type = "audio/aac" |
| | elif file_extension == '.ogg': |
| | mime_type = "audio/ogg" |
| | else: |
| | mime_type = "audio/wav" |
| | |
| | |
| | with open(audio_path, "rb") as file: |
| | encoded_string = base64.b64encode(file.read()).decode('utf-8') |
| | |
| | return encoded_string, mime_type |
| | |
| | def transcribe_audio(self, audio_input, source_lang="English"): |
| | """Transcribe audio to text using Azure endpoint""" |
| | try: |
| | |
| | base64_audio, mime_type = self.encode_audio_base64(audio_input) |
| | |
| | |
| | content_items = [ |
| | { |
| | "type": "text", |
| | "text": f"Transcribe this {source_lang} audio to text." |
| | }, |
| | { |
| | "type": "audio_url", |
| | "audio_url": { |
| | "url": f"data:{mime_type};base64,{base64_audio}" |
| | } |
| | } |
| | ] |
| | |
| | |
| | conversation_state = [ |
| | { |
| | "role": "user", |
| | "content": content_items |
| | } |
| | ] |
| | |
| | |
| | payload = { |
| | "input_data": { |
| | "input_string": conversation_state |
| | } |
| | } |
| | |
| | |
| | response = self.call_azure_endpoint(payload) |
| | |
| | |
| | try: |
| | if isinstance(response, dict): |
| | if "result" in response: |
| | result = response["result"] |
| | elif "output" in response: |
| | if isinstance(response["output"], list) and len(response["output"]) > 0: |
| | result = response["output"][0] |
| | else: |
| | result = str(response["output"]) |
| | elif "error" in response: |
| | result = f"Error: {response['error']}" |
| | else: |
| | result = f"Unexpected response format: {json.dumps(response)}" |
| | else: |
| | result = str(response) |
| | except Exception as e: |
| | result = f"Error processing response: {str(e)}" |
| | |
| | return result.strip() |
| | except Exception as e: |
| | logger.error(f"Error in transcription: {str(e)}") |
| | return f"Transcription failed: {str(e)}" |
| |
|
| | def translate_text(self, text, source_lang, target_lang): |
| | """Translate text between languages using Azure endpoint""" |
| | if not text: |
| | return "No text to translate" |
| | |
| | try: |
| | |
| | content_items = [ |
| | { |
| | "type": "text", |
| | "text": f"Translate the following {source_lang} text to {target_lang}. Provide only the translation without any additional text or explanation:\n\n{text}" |
| | } |
| | ] |
| | |
| | |
| | conversation_state = [ |
| | { |
| | "role": "system", |
| | "content": [{"type": "text", "text": "You are a professional translator."}] |
| | }, |
| | { |
| | "role": "user", |
| | "content": content_items |
| | } |
| | ] |
| | |
| | |
| | payload = { |
| | "input_data": { |
| | "input_string": conversation_state |
| | } |
| | } |
| | |
| | |
| | response = self.call_azure_endpoint(payload) |
| | |
| | |
| | try: |
| | if isinstance(response, dict): |
| | if "result" in response: |
| | result = response["result"] |
| | elif "output" in response: |
| | if isinstance(response["output"], list) and len(response["output"]) > 0: |
| | result = response["output"][0] |
| | else: |
| | result = str(response["output"]) |
| | elif "error" in response: |
| | result = f"Error: {response['error']}" |
| | else: |
| | result = f"Unexpected response format: {json.dumps(response)}" |
| | else: |
| | result = str(response) |
| | except Exception as e: |
| | result = f"Error processing response: {str(e)}" |
| | |
| | return result.strip() |
| | except Exception as e: |
| | logger.error(f"Error in translation: {str(e)}") |
| | return f"Translation failed: {str(e)}" |
| |
|
| | def process_translation(self, audio, source_lang, target_lang): |
| | """Process audio input and generate translation""" |
| | if not audio: |
| | return "Please provide an audio file to translate." |
| | |
| | |
| | source_text = self.transcribe_audio(audio, source_lang) |
| | |
| | |
| | translation = self.translate_text(source_text, source_lang, target_lang) |
| | |
| | |
| | translation_entry = { |
| | "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
| | "source_language": source_lang, |
| | "target_language": target_lang, |
| | "source_text": source_text, |
| | "translated_text": translation |
| | } |
| | |
| | |
| | source_code = self.languages[source_lang]["code"] |
| | target_code = self.languages[target_lang]["code"] |
| | |
| | if source_code not in self.translations: |
| | self.translations[source_code] = [] |
| | if target_code not in self.translations: |
| | self.translations[target_code] = [] |
| | |
| | self.translations[source_code].append(translation_entry) |
| | self.translations[target_code].append(translation_entry) |
| | |
| | self.save_translation(source_code, self.translations[source_code]) |
| | self.save_translation(target_code, self.translations[target_code]) |
| | |
| | return self.format_translation_display(translation_entry) |
| |
|
| | def format_translation_display(self, entry): |
| | """Format translation for display""" |
| | output = f"""Timestamp: {entry['timestamp']}\n\n""" |
| | output += f"""Source Language ({entry['source_language']}):\n{entry['source_text']}\n\n""" |
| | output += f"""Target Language ({entry['target_language']}):\n{entry['translated_text']}\n""" |
| | return output |
| |
|
| | def list_translations(self, lang_code): |
| | """List translations for specific language""" |
| | if lang_code not in self.translations or not self.translations[lang_code]: |
| | return "No translations found" |
| | |
| | return "\n\n---\n\n".join([ |
| | self.format_translation_display(entry) |
| | for entry in self.translations[lang_code] |
| | ]) |
| |
|
| | def create_interface(self): |
| | """Create Gradio interface""" |
| | with gr.Blocks(theme=gr.themes.Soft()) as interface: |
| | gr.Markdown("# Phine Speech Translator with Phi-4-Multimodal") |
| | gr.Markdown("Record speech or upload audio file for translation between multiple languages using [Phi-4-Multimodal](https://aka.ms/phi-4-multimodal/azure). Other demos include [Phi-4-Mini playground](https://huggingface.co/spaces/microsoft/phi-4-mini), [Stories Come Alive](https://huggingface.co/spaces/microsoft/StoriesComeAlive), [Thoughts Organizer](https://huggingface.co/spaces/microsoft/ThoughtsOrganizer)") |
| | |
| | with gr.Row(): |
| | source_lang = gr.Dropdown( |
| | choices=list(self.languages.keys()), |
| | value="English", |
| | label="Source Language" |
| | ) |
| | target_lang = gr.Dropdown( |
| | choices=list(self.languages.keys()), |
| | value="Chinese", |
| | label="Target Language" |
| | ) |
| | |
| | with gr.Row(): |
| | audio_input = gr.Audio( |
| | sources=["microphone", "upload"], |
| | type="filepath", |
| | label="Record or Upload Audio" |
| | ) |
| | |
| | with gr.Row(): |
| | translate_btn = gr.Button("Translate") |
| | |
| | with gr.Row(): |
| | output = gr.Textbox( |
| | label="Translation Results", |
| | lines=10 |
| | ) |
| | |
| | |
| | with gr.Accordion("Translation History", open=False): |
| | lang_select = gr.Dropdown( |
| | choices=list(self.languages.keys()), |
| | value="English", |
| | label="Select Language" |
| | ) |
| | history_output = gr.Textbox( |
| | label="Translation History", |
| | lines=20 |
| | ) |
| | |
| | |
| | translate_btn.click( |
| | fn=self.process_translation, |
| | inputs=[audio_input, source_lang, target_lang], |
| | outputs=output |
| | ) |
| | |
| | lang_select.change( |
| | fn=lambda x: self.list_translations(self.languages[x]["code"]), |
| | inputs=[lang_select], |
| | outputs=history_output |
| | ) |
| | |
| | return interface |
| |
|
| | def run_app(): |
| | |
| | app = AzureSpeechTranslatorApp() |
| | |
| | |
| | interface = app.create_interface() |
| | interface.launch( |
| | share=True, |
| | server_name="0.0.0.0" |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | run_app() |