Plugins¶
Janus plugins provide specific functionality for different use cases. This library includes support for several core Janus plugins.
Plugins are the core components that implement specific WebRTC functionality. Each plugin corresponds to a server-side Janus plugin and provides a Python interface for interacting with it.
For detailed API documentation, see the API Reference.
Overview¶
All plugins inherit from the base JanusPlugin class, which provides common functionality for plugin lifecycle management, message handling, and WebRTC signaling. The library includes support for:
- EchoTest Plugin - Testing WebRTC connectivity
- VideoCall Plugin - One-to-one video calls
- VideoRoom Plugin - Multi-party video conferencing
- TextRoom Plugin - Text-based communication via DataChannels
EchoTest Plugin¶
The EchoTest plugin is useful for testing WebRTC connectivity. It echoes back any media sent to it, making it perfect for testing your setup.
EchoTest Usage Example¶
import asyncio
from janus_client import JanusSession, JanusEchoTestPlugin
async def main():
session = JanusSession(base_url="wss://example.com/janus")
plugin = JanusEchoTestPlugin()
async with session:
await plugin.attach(session)
# Start echo test with media files
await plugin.start(input_file="input.mp4", output_file="echo_output.mp4")
# Let it run for a while
await asyncio.sleep(10)
# Stop echo test
await plugin.stop()
await plugin.destroy()
if __name__ == "__main__":
asyncio.run(main())
VideoCall Plugin¶
The VideoCall plugin enables one-to-one video calls between users. It handles user registration, call initiation, and call management using an event-driven architecture.
VideoCall Features¶
- Event-Driven Architecture: Uses callbacks for handling call events
- User Registration: Register unique usernames for receiving calls
- Call Management: Initiate, accept, and hang up calls
- Media Control: Configure audio/video settings and bitrate
- WebRTC Configuration: Optional custom STUN/TURN server configuration
VideoCall Usage Examples¶
Making Outgoing Calls¶
import asyncio
from janus_client import JanusSession, JanusVideoCallPlugin, VideoCallEventType
from aiortc.contrib.media import MediaPlayer, MediaRecorder
from aiortc import RTCConfiguration, RTCIceServer
async def main():
session = JanusSession(base_url="wss://example.com/janus")
# Optional: Configure WebRTC settings
config = RTCConfiguration(iceServers=[
RTCIceServer(urls='stun:stun.l.google.com:19302')
])
plugin = JanusVideoCallPlugin(pc_config=config)
async with session:
await plugin.attach(session)
# Register with a username
await plugin.register(username="caller")
# Set up event handlers
async def on_accepted(data):
print(f"Call accepted by {data['username']}")
async def on_hangup(data):
print(f"Call ended: {data.get('reason', 'Unknown')}")
plugin.on_event(VideoCallEventType.ACCEPTED, on_accepted)
plugin.on_event(VideoCallEventType.HANGUP, on_hangup)
# Make a call
player = MediaPlayer("input.mp4")
recorder = MediaRecorder("output.mp4")
await plugin.call(username="callee", player=player, recorder=recorder)
# Wait for call duration
await asyncio.sleep(30)
# Hang up
await plugin.hangup()
await plugin.destroy()
if __name__ == "__main__":
asyncio.run(main())
Receiving Incoming Calls¶
import asyncio
from janus_client import JanusSession, JanusVideoCallPlugin, VideoCallEventType
from aiortc.contrib.media import MediaPlayer, MediaRecorder
async def main():
session = JanusSession(base_url="wss://example.com/janus")
plugin = JanusVideoCallPlugin()
async with session:
await plugin.attach(session)
# Register username to receive calls
await plugin.register(username="callee")
# Set up event handler for incoming calls
async def on_incoming_call(data):
print(f"Incoming call from {data['username']}")
# Get JSEP from event data
jsep = data['jsep']
# Set up media
player = MediaPlayer("input.mp4")
recorder = MediaRecorder("output.mp4")
# Accept the call with JSEP
await plugin.accept(jsep, player, recorder)
print("Call accepted")
async def on_hangup(data):
print(f"Call ended: {data.get('reason', 'Unknown')}")
# Register event handlers
plugin.on_event(VideoCallEventType.INCOMINGCALL, on_incoming_call)
plugin.on_event(VideoCallEventType.HANGUP, on_hangup)
# Wait for incoming calls
print("Waiting for calls...")
await asyncio.sleep(300) # Wait 5 minutes
await plugin.destroy()
if __name__ == "__main__":
asyncio.run(main())
Listing Available Users¶
import asyncio
from janus_client import JanusSession, JanusVideoCallPlugin
async def main():
session = JanusSession(base_url="wss://example.com/janus")
plugin = JanusVideoCallPlugin()
async with session:
await plugin.attach(session)
# List all registered users
users = await plugin.list_users()
print(f"Available users: {users}")
await plugin.destroy()
if __name__ == "__main__":
asyncio.run(main())
Media Control During Call¶
import asyncio
from janus_client import JanusSession, JanusVideoCallPlugin
from aiortc.contrib.media import MediaPlayer, MediaRecorder
async def main():
session = JanusSession(base_url="wss://example.com/janus")
plugin = JanusVideoCallPlugin()
async with session:
await plugin.attach(session)
await plugin.register(username="caller")
# Make a call
player = MediaPlayer("input.mp4")
recorder = MediaRecorder("output.mp4")
await plugin.call(username="callee", player=player, recorder=recorder)
# Wait a bit
await asyncio.sleep(10)
# Mute audio
await plugin.set_media(audio=False)
# Wait a bit more
await asyncio.sleep(5)
# Unmute audio and set bitrate limit
await plugin.set_media(audio=True, bitrate=256000)
# Continue call
await asyncio.sleep(15)
await plugin.hangup()
await plugin.destroy()
if __name__ == "__main__":
asyncio.run(main())
VideoCall Event Types¶
The plugin supports the following event types via VideoCallEventType enum:
- REGISTERED: User successfully registered
- CALLING: Call is being initiated
- INCOMINGCALL: Incoming call received (includes JSEP data)
- ACCEPTED: Call was accepted by remote peer
- UPDATE: Call parameters updated (may include JSEP for renegotiation)
- HANGUP: Call ended
- SET: Media settings changed
VideoCall Best Practices¶
Event Handler Registration¶
# Register handlers before making or receiving calls
plugin.on_event(VideoCallEventType.INCOMINGCALL, on_incoming_call)
plugin.on_event(VideoCallEventType.ACCEPTED, on_accepted)
plugin.on_event(VideoCallEventType.HANGUP, on_hangup)
# Then proceed with registration and calls
await plugin.register(username="myuser")
Error Handling¶
from janus_client import VideoCallError
try:
await plugin.register(username="alice")
await plugin.call(username="bob", player=player, recorder=recorder)
except VideoCallError as e:
print(f"Call failed: {e.error_message} (code: {e.error_code})")
Resource Cleanup¶
try:
# Use the plugin
await plugin.register(username="alice")
await plugin.call(username="bob", player=player, recorder=recorder)
await asyncio.sleep(30)
finally:
# Always cleanup
try:
if plugin.in_call:
await plugin.hangup()
except:
pass
await plugin.destroy()
Handling Call State¶
# Check if currently in a call
if plugin.in_call:
print("Currently in a call")
# Get registered username
if plugin.username:
print(f"Registered as: {plugin.username}")
VideoRoom Plugin¶
The VideoRoom plugin enables multi-party video conferencing. It supports room management, publishing, and subscribing to multiple video feeds.
VideoRoom Usage Example¶
import asyncio
from janus_client import JanusSession, JanusVideoRoomPlugin
from aiortc.contrib.media import MediaPlayer, MediaRecorder
async def main():
session = JanusSession(base_url="wss://example.com/janus")
plugin = JanusVideoRoomPlugin()
async with session:
await plugin.attach(session)
# Join a room
await plugin.join(room_id=1234, username="user1")
# Start publishing media
player = MediaPlayer("video.mp4")
await plugin.publish(player=player)
# Subscribe to another participant's feed
recorder = MediaRecorder("output.mp4")
await plugin.subscribe_and_start(feed_id=5678, recorder=recorder)
# Wait and then cleanup
await asyncio.sleep(30)
await plugin.unpublish()
await plugin.unsubscribe()
await plugin.leave()
await plugin.destroy()
if __name__ == "__main__":
asyncio.run(main())
TextRoom Plugin¶
The TextRoom plugin enables text-based communication through WebRTC DataChannels. It supports multiple rooms, public and private messaging, room management, and message history.
TextRoom Usage Examples¶
Basic Room Communication¶
import asyncio
from janus_client import JanusSession, JanusTextRoomPlugin, TextRoomEventType
async def main():
session = JanusSession(base_url="wss://example.com/janus")
plugin = JanusTextRoomPlugin()
async with session:
await plugin.attach(session)
# Initialize WebRTC connection
await plugin.setup()
# Join a room
participants = await plugin.join_room(
room=1234,
username="alice",
display="Alice"
)
print(f"Joined room with {len(participants)} participants")
# Register message handler
def on_message(data):
print(f"Message from {data['from']}: {data['text']}")
plugin.on_event(TextRoomEventType.MESSAGE, on_message)
# Send a public message
await plugin.send_message(room=1234, text="Hello, everyone!")
# Wait for messages
await asyncio.sleep(10)
# Leave the room
await plugin.leave_room(room=1234)
await plugin.destroy()
if __name__ == "__main__":
asyncio.run(main())
Room Management¶
import asyncio
from janus_client import JanusSession, JanusTextRoomPlugin
async def manage_rooms():
session = JanusSession(base_url="wss://example.com/janus")
plugin = JanusTextRoomPlugin()
async with session:
await plugin.attach(session)
# List available rooms
rooms = await plugin.list_rooms()
print(f"Available rooms: {len(rooms)}")
# Create a new room
room_id = await plugin.create_room(
description="My Chat Room",
is_private=False,
history=50, # Store last 50 messages
pin="1234" # Require PIN to join
)
print(f"Created room: {room_id}")
# List participants in a room
participants = await plugin.list_participants(room=room_id)
print(f"Participants: {participants}")
# Destroy the room
await plugin.destroy_room(room=room_id)
await plugin.destroy()
if __name__ == "__main__":
asyncio.run(manage_rooms())
Private Messaging¶
import asyncio
from janus_client import JanusSession, JanusTextRoomPlugin, TextRoomEventType
async def private_messaging():
# Create two sessions for two users
session1 = JanusSession(base_url="wss://example.com/janus")
plugin1 = JanusTextRoomPlugin()
session2 = JanusSession(base_url="wss://example.com/janus")
plugin2 = JanusTextRoomPlugin()
async with session1, session2:
# Setup both plugins
await plugin1.attach(session1)
await plugin1.setup()
await plugin2.attach(session2)
await plugin2.setup()
# Create and join a room
room_id = await plugin1.create_room(description="Private Chat")
await plugin1.join_room(room=room_id, username="alice")
await plugin2.join_room(room=room_id, username="bob")
# Register message handler for bob
def on_message(data):
if data.get("whisper"):
print(f"Private message from {data['from']}: {data['text']}")
else:
print(f"Public message from {data['from']}: {data['text']}")
plugin2.on_event(TextRoomEventType.MESSAGE, on_message)
# Alice sends private message to Bob
await plugin1.send_message(
room=room_id,
text="Hi Bob, this is private!",
to="bob"
)
# Alice sends message to multiple users
await plugin1.send_message(
room=room_id,
text="Message to specific users",
tos=["bob", "charlie"]
)
await asyncio.sleep(2)
# Cleanup
await plugin1.leave_room(room=room_id)
await plugin2.leave_room(room=room_id)
await plugin1.destroy_room(room=room_id)
await plugin1.destroy()
await plugin2.destroy()
if __name__ == "__main__":
asyncio.run(private_messaging())
Event Handling¶
import asyncio
from janus_client import JanusSession, JanusTextRoomPlugin, TextRoomEventType
async def handle_events():
session = JanusSession(base_url="wss://example.com/janus")
plugin = JanusTextRoomPlugin()
async with session:
await plugin.attach(session)
await plugin.setup()
# Register event handlers
def on_join(data):
print(f"User {data['username']} joined the room")
def on_leave(data):
print(f"User {data['username']} left the room")
def on_message(data):
print(f"Message from {data['from']}: {data['text']}")
def on_kicked(data):
print(f"User {data['username']} was kicked")
def on_destroyed(data):
print(f"Room {data['room']} was destroyed")
def on_announcement(data):
print(f"Announcement: {data['text']}")
def on_error(data):
print(f"Error: {data.get('error', 'Unknown error')}")
plugin.on_event(TextRoomEventType.JOIN, on_join)
plugin.on_event(TextRoomEventType.LEAVE, on_leave)
plugin.on_event(TextRoomEventType.MESSAGE, on_message)
plugin.on_event(TextRoomEventType.KICKED, on_kicked)
plugin.on_event(TextRoomEventType.DESTROYED, on_destroyed)
plugin.on_event(TextRoomEventType.ANNOUNCEMENT, on_announcement)
plugin.on_event(TextRoomEventType.ERROR, on_error)
# Create and join room
room_id = await plugin.create_room(description="Event Demo")
await plugin.join_room(room=room_id, username="alice")
# Send a message
await plugin.send_message(room=room_id, text="Hello!")
# Wait for events
await asyncio.sleep(5)
# Cleanup
await plugin.leave_room(room=room_id)
await plugin.destroy_room(room=room_id)
await plugin.destroy()
if __name__ == "__main__":
asyncio.run(handle_events())
Message History¶
import asyncio
from janus_client import JanusSession, JanusTextRoomPlugin, TextRoomEventType
async def message_history():
session = JanusSession(base_url="wss://example.com/janus")
plugin = JanusTextRoomPlugin()
async with session:
await plugin.attach(session)
await plugin.setup()
# Create room with message history
room_id = await plugin.create_room(
description="History Room",
history=100 # Store last 100 messages
)
# Join and send some messages
await plugin.join_room(room=room_id, username="alice")
for i in range(5):
await plugin.send_message(
room=room_id,
text=f"Message {i+1}"
)
# Leave the room
await plugin.leave_room(room=room_id)
# Track history messages
history_messages = []
def on_message(data):
history_messages.append(data)
print(f"History: {data['text']}")
plugin.on_event(TextRoomEventType.MESSAGE, on_message)
# Rejoin with history enabled
await plugin.join_room(
room=room_id,
username="alice",
history=True
)
# Wait for history to be delivered
await asyncio.sleep(2)
print(f"Received {len(history_messages)} messages from history")
# Cleanup
await plugin.leave_room(room=room_id)
await plugin.destroy_room(room=room_id)
await plugin.destroy()
if __name__ == "__main__":
asyncio.run(message_history())
Room Administration¶
import asyncio
from janus_client import JanusSession, JanusTextRoomPlugin
async def room_administration():
session = JanusSession(base_url="wss://example.com/janus")
plugin = JanusTextRoomPlugin()
async with session:
await plugin.attach(session)
await plugin.setup()
# Create room with secret for admin operations
room_id = await plugin.create_room(
description="Moderated Room",
secret="admin_secret",
is_private=False
)
await plugin.join_room(room=room_id, username="moderator")
# Send announcement (requires secret)
await plugin.send_announcement(
room=room_id,
text="Welcome to the moderated room!",
secret="admin_secret"
)
# Kick a participant (requires secret)
# await plugin.kick_participant(
# room=room_id,
# username="troublemaker",
# secret="admin_secret"
# )
# Cleanup
await plugin.leave_room(room=room_id)
await plugin.destroy_room(room=room_id, secret="admin_secret")
await plugin.destroy()
if __name__ == "__main__":
asyncio.run(room_administration())
TextRoom Features¶
Room Configuration¶
- Room ID: Auto-generated or manually specified
- Description: Human-readable room description
- PIN Protection: Optional PIN required to join
- Private Rooms: Hidden from room listings
- Message History: Configurable number of messages to store
- HTTP Backend: Forward messages to external HTTP endpoint
- Permanent Rooms: Save room configuration to server config file
Messaging Features¶
- Public Messages: Broadcast to all room participants
- Private Messages: Send to specific user(s)
- Announcements: Admin-only broadcasts
- Message Acknowledgment: Optional confirmation of message delivery
- Message History: Retrieve past messages when joining
Event System¶
The plugin provides event handlers for:
- JOIN: User joins the room
- LEAVE: User leaves the room
- MESSAGE: New message received
- KICKED: User was kicked from room
- DESTROYED: Room was destroyed
- ANNOUNCEMENT: Admin announcement received
- ERROR: Error occurred
TextRoom Best Practices¶
Connection Management¶
# Always setup before joining rooms
await plugin.setup(timeout=30.0)
# Wait for setup to complete before operations
await plugin.join_room(room=1234, username="user")
Error Handling¶
from janus_client import TextRoomError
try:
await plugin.join_room(room=1234, username="alice", pin="1234")
except TextRoomError as e:
print(f"Failed to join room: {e.error_message} (code: {e.error_code})")
Resource Cleanup¶
try:
# Use the plugin
await plugin.join_room(room=1234, username="alice")
await plugin.send_message(room=1234, text="Hello!")
finally:
# Always cleanup
try:
await plugin.leave_room(room=1234)
except:
pass
await plugin.destroy()
Event Handler Registration¶
# Register handlers before joining rooms
plugin.on_event(TextRoomEventType.MESSAGE, on_message)
plugin.on_event(TextRoomEventType.JOIN, on_join)
# Then join the room
await plugin.join_room(room=1234, username="alice")
Plugin Development Guidelines¶
Creating Custom Plugins¶
To create a custom plugin, inherit from JanusPlugin and implement the required methods:
from janus_client import JanusPlugin
from typing import Dict, Any, Optional
class MyCustomPlugin(JanusPlugin):
def __init__(self) -> None:
super().__init__()
self._state = {}
async def on_receive(self, response: Dict[str, Any]) -> None:
"""Handle plugin-specific messages."""
if "plugindata" in response:
data = response["plugindata"]["data"]
# Handle plugin data
pass
async def on_receive_jsep(self, jsep: Dict[str, Any]) -> None:
"""Handle WebRTC signaling."""
if jsep["type"] == "offer":
# Handle offer
pass
elif jsep["type"] == "answer":
# Handle answer
pass
async def custom_action(self, param: str) -> bool:
"""Custom plugin action."""
message = {"request": "custom", "param": param}
response = await self.send(message)
return response.get("success", False)
Plugin Lifecycle¶
All plugins follow a standard lifecycle:
- Creation: Instantiate the plugin class
- Attachment: Attach to a session using
attach(session) - Usage: Call plugin-specific methods
- Cleanup: Destroy the plugin using
destroy()
Message Handling¶
Plugins receive two types of messages:
- Plugin Messages: Handled by
on_receive()method - JSEP Messages: WebRTC signaling handled by
on_receive_jsep()method
WebRTC Integration¶
Plugins that handle media use aiortc for WebRTC functionality:
- MediaPlayer: For input media (files, devices, streams)
- MediaRecorder: For output media recording
- RTCPeerConnection: For WebRTC peer connections
- MediaStreamTrack: For handling individual media tracks
Best Practices¶
Plugin Usage¶
- Always destroy plugins: Use
await plugin.destroy()when done - Handle errors gracefully: Wrap plugin operations in try-except blocks
- Use context managers: Consider implementing
__aenter__and__aexit__for custom plugins - Check plugin state: Verify plugin is attached before calling methods
Media Handling¶
- Resource cleanup: Always stop MediaPlayer and MediaRecorder instances
- Track management: Handle track events properly in
on_trackcallbacks - Codec compatibility: Ensure media formats are supported by Janus
- Performance: Monitor memory usage with long-running media operations
Error Handling¶
async def safe_plugin_usage():
session = JanusSession(base_url="wss://example.com/janus")
plugin = JanusVideoRoomPlugin()
try:
async with session:
await plugin.attach(session)
# Plugin operations
await plugin.join(room_id=1234, username="user1")
except Exception as e:
logger.error(f"Plugin error: {e}")
finally:
# Ensure cleanup
try:
await plugin.destroy()
except:
pass
Concurrent Plugin Usage¶
When using multiple plugins with the same session, be careful about message routing:
async def multiple_plugins():
session = JanusSession(base_url="wss://example.com/janus")
async with session:
# Sequential plugin usage (recommended)
plugin1 = JanusEchoTestPlugin()
await plugin1.attach(session)
await plugin1.start("input1.mp4")
await plugin1.destroy()
plugin2 = JanusVideoCallPlugin()
await plugin2.attach(session)
await plugin2.register("user1")
await plugin2.destroy()