Back to API Reference
Code Examples
Complete working examples for integrating MailCatcher NG with your application in Python, JavaScript, and Ruby.
Python
Setup
Install the required HTTP client library:
Install dependencies
pip install requests List All Messages
python
import requests
# Get all messages
response = requests.get('http://127.0.0.1:1080/messages')
messages = response.json()
print(f"Total messages: {len(messages)}")
# Print subject of each message
for msg in messages:
print(f"[{msg['id']}] {msg['subject']}")
print(f" From: {msg['from']}")
print(f" To: {msg['to']}") Get Message Details
python
import requests
import json
# Get specific message metadata
response = requests.get('http://127.0.0.1:1080/messages/1.json')
message = response.json()
print(json.dumps(message, indent=2))
# Get HTML content
html_response = requests.get('http://127.0.0.1:1080/messages/1.html')
html_content = html_response.text
# Get plain text
plain_response = requests.get('http://127.0.0.1:1080/messages/1.plain')
plain_content = plain_response.text
print("HTML Content:")
print(html_content[:500] + "...") Search Messages
python
import requests
from datetime import datetime, timedelta
# Search for specific emails
search_params = {
'q': 'verification',
'has_attachments': 'false'
}
response = requests.get(
'http://127.0.0.1:1080/messages/search',
params=search_params
)
results = response.json()
print(f"Found {len(results)} messages matching 'verification'")
for msg in results:
print(f"- {msg['subject']}")
# Search with date range
yesterday = (datetime.now() - timedelta(days=1)).isoformat()
params_with_date = {
'q': 'reset',
'from': yesterday
}
response = requests.get(
'http://127.0.0.1:1080/messages/search',
params=params_with_date
)
recent_messages = response.json()
print(f"\nFound {len(recent_messages)} recent 'reset' messages") Extract Verification Codes & Tokens
python
import requests
# Extract OTP code
otp_response = requests.get(
'http://127.0.0.1:1080/messages/1/extract?type=otp'
)
otp_codes = otp_response.json()
if otp_codes:
print(f"OTP Code: {otp_codes[0]['value']}")
# Extract verification link
link_response = requests.get(
'http://127.0.0.1:1080/messages/1/extract?type=link'
)
links = link_response.json()
if links:
for link in links:
print(f"Link: {link['value']}")
print(f"Context: {link['context']}")
# Get all parsed data (recommended approach)
parsed_response = requests.get(
'http://127.0.0.1:1080/messages/1/parsed.json'
)
parsed = parsed_response.json()
print(f"OTP: {parsed.get('otp_code')}")
print(f"Verification URL: {parsed.get('verification_url')}")
print(f"Reset Token: {parsed.get('reset_token')}")
print(f"Unsubscribe: {parsed.get('unsubscribe_link')}") Automated Testing Example
python
import requests
import time
class MailCatcherClient:
def __init__(self, base_url='http://127.0.0.1:1080'):
self.base_url = base_url
def get_latest_messages(self, count=5):
"""Get latest N messages"""
response = requests.get(f'{self.base_url}/messages')
messages = response.json()
return messages[-count:] if len(messages) > 0 else []
def wait_for_message(self, recipient, timeout=10):
"""Wait for email to specific recipient"""
start = time.time()
while time.time() - start < timeout:
response = requests.get(f'{self.base_url}/messages')
messages = response.json()
for msg in messages:
if recipient in msg['to']:
return msg
time.sleep(0.5)
raise TimeoutError(f"No email received for {recipient} within {timeout}s")
def extract_verification_link(self, message_id):
"""Extract verification link from message"""
response = requests.get(
f'{self.base_url}/messages/{message_id}/parsed.json'
)
data = response.json()
return data.get('verification_url')
def clear_messages(self):
"""Clear all messages"""
requests.delete(f'{self.base_url}/messages')
# Usage in tests
client = MailCatcherClient()
# Wait for signup email
signup_email = client.wait_for_message('user@example.com', timeout=5)
print(f"Signup email received: {signup_email['subject']}")
# Extract verification link
verify_link = client.extract_verification_link(signup_email['id'])
print(f"Verification link: {verify_link}")
# Clean up
client.clear_messages() Real-Time Monitoring with WebSocket
Monitor incoming messages in real-time:
Install WebSocket client
pip install websocket-client python
import websocket
import json
import threading
def monitor_emails():
def on_message(ws, message):
data = json.loads(message)
if data['type'] == 'add':
msg = data['message']
print(f"New message received:")
print(f" From: {msg['from']}")
print(f" To: {msg['to']}")
print(f" Subject: {msg['subject']}")
elif data['type'] == 'remove':
print(f"Message {data['id']} deleted")
elif data['type'] == 'clear':
print("All messages cleared")
def on_error(ws, error):
print(f"WebSocket error: {error}")
def on_close(ws, close_status_code, close_msg):
print("WebSocket connection closed")
def on_open(ws):
print("Connected to MailCatcher WebSocket")
ws = websocket.WebSocketApp(
"ws://127.0.0.1:1080/messages",
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
ws.run_forever()
# Run in background thread
monitor_thread = threading.Thread(target=monitor_emails, daemon=True)
monitor_thread.start()
# Your application code here
print("Monitoring emails...")
try:
monitor_thread.join()
except KeyboardInterrupt:
print("Stopped monitoring") JavaScript
Setup
No installation needed - use native Fetch API (modern browsers) or install axios:
Install axios (optional)
npm install axios List All Messages
javascript
// Using Fetch API
async function getMessages() {
try {
const response = await fetch('http://127.0.0.1:1080/messages');
const messages = await response.json();
console.log(`Total messages: ${messages.length}`);
messages.forEach(msg => {
console.log(`[${msg.id}] ${msg.subject}`);
console.log(` From: ${msg.from}`);
console.log(` To: ${msg.to}`);
});
} catch (error) {
console.error('Error fetching messages:', error);
}
}
getMessages(); Get Message Details
javascript
// Get message metadata
const messageId = 1;
const metadata = await fetch(
`http://127.0.0.1:1080/messages/${messageId}.json`
).then(r => r.json());
console.log('Subject:', metadata.subject);
console.log('From:', metadata.from);
// Get HTML content
const htmlContent = await fetch(
`http://127.0.0.1:1080/messages/${messageId}.html`
).then(r => r.text());
// Display in iframe
const iframe = document.createElement('iframe');
iframe.srcDoc = htmlContent;
document.body.appendChild(iframe);
// Get plain text
const plainText = await fetch(
`http://127.0.0.1:1080/messages/${messageId}.plain`
).then(r => r.text());
console.log(plainText); Search Messages
javascript
// Search for messages
async function searchMessages(query, options = {}) {
const params = new URLSearchParams({
q: query,
...options
});
const response = await fetch(
`http://127.0.0.1:1080/messages/search?${params}`
);
return response.json();
}
// Search for verification emails
const results = await searchMessages('verification', {
has_attachments: 'false'
});
console.log(`Found ${results.length} matching messages`);
// Search with date range
const today = new Date().toISOString().split('T')[0];
const recent = await searchMessages('reset', {
from: today
});
console.log(`Recent reset messages: ${recent.length}`); Extract Verification Codes & Links
javascript
// Extract OTP code
async function getOTPCode(messageId) {
const response = await fetch(
`http://127.0.0.1:1080/messages/${messageId}/extract?type=otp`
);
const codes = await response.json();
return codes[0]?.value;
}
// Extract verification link
async function getVerificationLink(messageId) {
const response = await fetch(
`http://127.0.0.1:1080/messages/${messageId}/extract?type=link`
);
const links = await response.json();
return links[0]?.value;
}
// Get all parsed data (recommended)
async function getParsedData(messageId) {
const response = await fetch(
`http://127.0.0.1:1080/messages/${messageId}/parsed.json`
);
return response.json();
}
// Usage
const parsed = await getParsedData(1);
console.log('OTP:', parsed.otp_code);
console.log('Verify URL:', parsed.verification_url);
console.log('Reset Token:', parsed.reset_token);
console.log('All Links:', parsed.all_links); Testing Helper Class
javascript
class MailCatcherHelper {
constructor(baseUrl = 'http://127.0.0.1:1080') {
this.baseUrl = baseUrl;
}
async getMessages() {
const response = await fetch(`${this.baseUrl}/messages`);
return response.json();
}
async getMessage(id) {
const response = await fetch(
`${this.baseUrl}/messages/${id}.json`
);
return response.json();
}
async waitForMessage(recipient, timeout = 10000) {
const start = Date.now();
while (Date.now() - start < timeout) {
const messages = await this.getMessages();
const found = messages.find(m => m.to.includes(recipient));
if (found) return found;
await new Promise(r => setTimeout(r, 500));
}
throw new Error(`No email for ${recipient} within ${timeout}ms`);
}
async extractVerificationLink(messageId) {
const parsed = await fetch(
`${this.baseUrl}/messages/${messageId}/parsed.json`
).then(r => r.json());
return parsed.verification_url;
}
async clearMessages() {
return fetch(`${this.baseUrl}/messages`, { method: 'DELETE' });
}
}
// Usage in tests
const mail = new MailCatcherHelper();
// Wait for signup email
const signupEmail = await mail.waitForMessage('user@test.com');
console.log('Email received:', signupEmail.subject);
// Extract link
const verifyLink = await mail.extractVerificationLink(signupEmail.id);
console.log('Verify link:', verifyLink);
// Clean up
await mail.clearMessages(); Real-Time Monitoring with WebSocket
javascript
class MailMonitor {
constructor(baseUrl = 'http://127.0.0.1:1080') {
this.ws = new WebSocket(baseUrl.replace('http', 'ws') + '/messages');
this.setupListeners();
}
setupListeners() {
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'add') {
console.log('📧 New message:', data.message.subject);
this.onNewMessage?.(data.message);
} else if (data.type === 'remove') {
console.log('🗑️ Message deleted:', data.id);
} else if (data.type === 'clear') {
console.log('🧹 All messages cleared');
}
};
}
onNewMessage(callback) {
this.onNewMessage = callback;
return this;
}
close() {
this.ws.close();
}
}
// Usage
const monitor = new MailMonitor();
monitor.onNewMessage((msg) => {
console.log('Received:', msg.subject);
});
// Stop monitoring
// monitor.close(); Ruby
Setup
Install the HTTP client gem:
Gemfile
gem 'httparty' Or install directly
bundle install List All Messages
ruby
require 'httparty'
class MailCatcher
include HTTParty
base_uri 'http://127.0.0.1:1080'
end
# Get all messages
messages = MailCatcher.get('/messages')
puts "Total messages: #{messages.length}"
messages.each do |msg|
puts "[#{msg['id']}] #{msg['subject']}"
puts " From: #{msg['from'].join(', ')}"
puts " To: #{msg['to'].join(', ')}"
end Get Message Details
ruby
require 'httparty'
require 'json'
class MailCatcher
include HTTParty
base_uri 'http://127.0.0.1:1080'
end
# Get message metadata
message = MailCatcher.get('/messages/1.json')
puts "Subject: #{message['subject']}"
puts "From: #{message['from'].first}"
puts "To: #{message['to'].first}"
# Get HTML content
html = MailCatcher.get('/messages/1.html')
# Get plain text
plain = MailCatcher.get('/messages/1.plain')
# Save to file
File.write('email.html', html.body) Search Messages
ruby
require 'httparty'
class MailCatcher
include HTTParty
base_uri 'http://127.0.0.1:1080'
end
# Search for messages
results = MailCatcher.get('/messages/search', query: {
q: 'verification',
has_attachments: false
})
puts "Found #{results.length} matching messages"
# Search with date range
yesterday = (Date.today - 1).to_s
results = MailCatcher.get('/messages/search', query: {
q: 'reset',
from: yesterday
})
results.each { |msg| puts msg['subject'] } Extract Verification Codes & Links
ruby
require 'httparty'
class MailCatcher
include HTTParty
base_uri 'http://127.0.0.1:1080'
end
message_id = 1
# Extract OTP
otp_data = MailCatcher.get("/messages/#{message_id}/extract", query: { type: 'otp' })
otp = otp_data.first['value'] if otp_data.any?
puts "OTP: #{otp}"
# Extract verification link
links = MailCatcher.get("/messages/#{message_id}/extract", query: { type: 'link' })
verify_link = links.first['value'] if links.any?
puts "Verify Link: #{verify_link}"
# Get all parsed data (recommended)
parsed = MailCatcher.get("/messages/#{message_id}/parsed.json")
puts "Verification URL: #{parsed['verification_url']}"
puts "OTP Code: #{parsed['otp_code']}"
puts "Reset Token: #{parsed['reset_token']}"
puts "Unsubscribe: #{parsed['unsubscribe_link']}"
# Iterate all links
parsed['all_links'].each do |link|
puts "#{link['text']}: #{link['href']}"
end Testing Helper Class
ruby
require 'httparty'
class MailCatcherClient
include HTTParty
base_uri 'http://127.0.0.1:1080'
def get_latest_messages(count = 5)
get('/messages').last(count)
end
def wait_for_message(recipient, timeout: 10)
start_time = Time.now
loop do
messages = get('/messages')
message = messages.find { |m| m['to'].include?(recipient) }
return message if message
raise "No email for #{recipient} after #{timeout}s" if Time.now - start_time > timeout
sleep(0.5)
end
end
def get_verification_link(message_id)
parsed = get("/messages/#{message_id}/parsed.json")
parsed['verification_url']
end
def get_otp_code(message_id)
parsed = get("/messages/#{message_id}/parsed.json")
parsed['otp_code']
end
def clear_messages
delete('/messages')
end
private
def get(path)
self.class.get(path)
end
def delete(path)
self.class.delete(path)
end
end
# Usage in tests
client = MailCatcherClient.new
# Wait for email
email = client.wait_for_message('user@example.com')
puts "Email received: #{email['subject']}"
# Extract verification link
verify_link = client.get_verification_link(email['id'])
puts "Verification link: #{verify_link}"
# Get OTP
otp = client.get_otp_code(email['id'])
puts "OTP: #{otp}"
# Clean up
client.clear_messages RSpec Integration
ruby
# spec/support/mail_catcher.rb
require 'httparty'
class MailCatcherHelper
include HTTParty
base_uri 'http://127.0.0.1:1080'
def latest_email_to(recipient)
messages = get('/messages')
messages.reverse_each do |msg|
return msg if msg['to'].include?(recipient)
end
nil
end
def clear
delete('/messages')
end
end
# spec/spec_helper.rb
RSpec.configure do |config|
let(:mail) { MailCatcherHelper.new }
config.before(:each) do
mail.clear
end
end
# spec/features/signup_spec.rb
RSpec.describe 'User Signup', type: :feature do
let(:mail) { MailCatcherHelper.new }
before { mail.clear }
it 'sends verification email' do
visit signup_path
fill_in 'Email', with: 'user@example.com'
click_button 'Sign Up'
sleep 1 # Wait for email
email = mail.latest_email_to('user@example.com')
expect(email).to be_present
expect(email['subject']).to include('Verify')
end
end