Qwen3-ASR-Flash-Realtime
Copied!
Real-time Speech Recognition
Overview
Real-time Speech Recognition
The real-time version of Qwen3-ASR-Flash is a highly accurate, intelligent, and robust multilingual speech recognition model based on a large language model. Leveraging a powerful foundational model, massive amounts of text and multimodal data, and tens of millions of hours of audio data, Qwen3-ASR-Flash achieves highly accurate speech recognition, automatically determining the language and accurately identifying speech in 11 languages, while ensuring precise transcription even in complex audio environments.
Input
Audio
Output
Text
Features
Prefix Completion
Function Calling
Cache
Structured Outputs
Batches
Web Search
Pricing
- Audio Duration $0.00009Per second
Rate Limits
- RPMRequests Per Minute1.20K
API Reference
Get API KeyCopied!
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
# example requires websocket-client library:
# pip install websocket-client
import os
import time
import json
import threading
import base64
import websocket
import logging
import logging.handlers
from datetime import datetime
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# If environment variable is not configured, replace the line below with: API_KEY="sk-xxx"
API_KEY = os.environ.get("DASHSCOPE_API_KEY")
QWEN_MODEL = "qwen3-asr-flash-realtime"
baseUrl = "wss://dashscope.aliyuncs.com/api-ws/v1/realtime"
url = f"{baseUrl}?model={QWEN_MODEL}"
print(f"Connecting to server: {url}")
# If it is not in vad mode, it is recommended that the cumulative duration of continuously sent audio does not exceed 60 seconds
enableServerVad = True
headers = [
"Authorization: Bearer " + API_KEY,
"OpenAI-Beta: realtime=v1"
]
def send_event(ws, event):
logger.info(f" Send event: {event['event_id']}, type={event['type']}")
ws.send(json.dumps(event))
def init_logger():
formatter = logging.Formatter('%(asctime)s|%(levelname)s|%(message)s')
filter = logging.handlers.RotatingFileHandler("omni_tester.log", maxBytes = 100 * 1024 *1024, backupCount = 3)
filter.setLevel(logging.DEBUG)
filter.setFormatter(formatter)
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
console.setFormatter(formatter)
logger.addHandler(filter)
logger.addHandler(console)
def on_open(ws):
logger.info("Connected to server.")
# Session update event
event0 = {
"event_id": "event_123",
"type": "session.update",
"session": {
"modalities": ["text"],
"input_audio_format": "pcm",
"sample_rate": 16000,
"input_audio_transcription": {
# Language identification is optional. If there is clear language information, it is recommended to set it
"language": "zh",
# Corpus, optional. If there is a corpus, it is recommended to set it up to enhance the recognition effect
# "corpus": {
# "text": ""
# }
},
"turn_detection": None
}
}
event1 = {
"event_id": "event_123",
"type": "session.update",
"session": {
"modalities": ["text"],
"input_audio_format": "pcm",
"sample_rate": 16000,
"input_audio_transcription": {
# Language identification is optional. If there is clear language information, it is recommended to set it
"language": "zh",
# Corpus, optional. If there is a corpus, it is recommended to set it up to enhance the recognition effect
# "corpus": {
# "text": ""
# }
},
"turn_detection": {
"type": "server_vad",
"threshold": 0.2,
"silence_duration_ms": 800
}
}
}
global enableServerVad
if enableServerVad:
logger.info(f"Sending event: {json.dumps(event1, indent=2)}")
ws.send(json.dumps(event1))
else:
logger.info(f"Sending event: {json.dumps(event0, indent=2)}")
ws.send(json.dumps(event0))
def on_message(ws, message):
try:
data = json.loads(message)
logger.info(f"Received event: {json.dumps(data, ensure_ascii=False, indent=2)}")
except json.JSONDecodeError:
logger.error(f"Failed to parse message: {message}")
def on_error(ws, error):
logger.error(f"Error: {error}")
def on_close(ws, close_status_code, close_msg):
logger.info(f"Connection closed: {close_status_code} - {close_msg}")
def send_audio(ws, local_audio_path):
time.sleep(5)
with open(local_audio_path, 'rb') as audio_file:
logger.info(f"文件读取开始: {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
while True:
# Read binary data of a specified size
audio_data = audio_file.read(3200)
if not audio_data:
logger.info(f"文件读取完毕: {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
global enableServerVad
if enableServerVad is False:
event = {
"event_id": "event_789",
"type": "input_audio_buffer.commit"
}
ws.send(json.dumps(event))
break # If the end of the file has been reached, exit the loop
# Perform Base64 encoding on the read binary data
encoded_data = base64.b64encode(audio_data).decode('utf-8')
eventd = {
"event_id": "event_" + str(int(time.time() * 1000)),
"type": "input_audio_buffer.append",
"audio": encoded_data
}
ws.send(json.dumps(eventd))
logger.info(f"Sending audio event: {eventd['event_id']}")
# Simulate real-time audio acquisition
time.sleep(0.1)
# Add a connection closure handling function
ws = websocket.WebSocketApp(
url,
header=headers,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
init_logger()
logger.info(f"Connecting to local WebSocket server at {url}...")
# Replace with the path of the audio file to be recognized
local_audio_path = "your_audio_file"
thread = threading.Thread(target=send_audio, args=(ws, local_audio_path))
thread.start()
ws.run_forever()