Back to API Reference

Code Examples

Complete working examples for integrating MailCatcher NG with your application in Python, JavaScript, and Ruby.

Python

Setup

Install the required HTTP client library:

Install dependencies
pip install requests

List All Messages

python
import requests

# Get all messages
response = requests.get('http://127.0.0.1:1080/messages')
messages = response.json()

print(f"Total messages: {len(messages)}")

# Print subject of each message
for msg in messages:
    print(f"[{msg['id']}] {msg['subject']}")
    print(f"    From: {msg['from']}")
    print(f"    To: {msg['to']}")

Get Message Details

python
import requests
import json

# Get specific message metadata
response = requests.get('http://127.0.0.1:1080/messages/1.json')
message = response.json()

print(json.dumps(message, indent=2))

# Get HTML content
html_response = requests.get('http://127.0.0.1:1080/messages/1.html')
html_content = html_response.text

# Get plain text
plain_response = requests.get('http://127.0.0.1:1080/messages/1.plain')
plain_content = plain_response.text

print("HTML Content:")
print(html_content[:500] + "...")

Extract Verification Codes & Tokens

python
import requests

# Extract OTP code
otp_response = requests.get(
    'http://127.0.0.1:1080/messages/1/extract?type=otp'
)
otp_codes = otp_response.json()

if otp_codes:
    print(f"OTP Code: {otp_codes[0]['value']}")

# Extract verification link
link_response = requests.get(
    'http://127.0.0.1:1080/messages/1/extract?type=link'
)
links = link_response.json()

if links:
    for link in links:
        print(f"Link: {link['value']}")
        print(f"Context: {link['context']}")

# Get all parsed data (recommended approach)
parsed_response = requests.get(
    'http://127.0.0.1:1080/messages/1/parsed.json'
)
parsed = parsed_response.json()

print(f"OTP: {parsed.get('otp_code')}")
print(f"Verification URL: {parsed.get('verification_url')}")
print(f"Reset Token: {parsed.get('reset_token')}")
print(f"Unsubscribe: {parsed.get('unsubscribe_link')}")

Automated Testing Example

python
import requests
import time

class MailCatcherClient:
    def __init__(self, base_url='http://127.0.0.1:1080'):
        self.base_url = base_url

    def get_latest_messages(self, count=5):
        """Get latest N messages"""
        response = requests.get(f'{self.base_url}/messages')
        messages = response.json()
        return messages[-count:] if len(messages) > 0 else []

    def wait_for_message(self, recipient, timeout=10):
        """Wait for email to specific recipient"""
        start = time.time()
        while time.time() - start < timeout:
            response = requests.get(f'{self.base_url}/messages')
            messages = response.json()

            for msg in messages:
                if recipient in msg['to']:
                    return msg

            time.sleep(0.5)

        raise TimeoutError(f"No email received for {recipient} within {timeout}s")

    def extract_verification_link(self, message_id):
        """Extract verification link from message"""
        response = requests.get(
            f'{self.base_url}/messages/{message_id}/parsed.json'
        )
        data = response.json()
        return data.get('verification_url')

    def clear_messages(self):
        """Clear all messages"""
        requests.delete(f'{self.base_url}/messages')

# Usage in tests
client = MailCatcherClient()

# Wait for signup email
signup_email = client.wait_for_message('user@example.com', timeout=5)
print(f"Signup email received: {signup_email['subject']}")

# Extract verification link
verify_link = client.extract_verification_link(signup_email['id'])
print(f"Verification link: {verify_link}")

# Clean up
client.clear_messages()

Real-Time Monitoring with WebSocket

Monitor incoming messages in real-time:

Install WebSocket client
pip install websocket-client
python
import websocket
import json
import threading

def monitor_emails():
    def on_message(ws, message):
        data = json.loads(message)

        if data['type'] == 'add':
            msg = data['message']
            print(f"New message received:")
            print(f"  From: {msg['from']}")
            print(f"  To: {msg['to']}")
            print(f"  Subject: {msg['subject']}")

        elif data['type'] == 'remove':
            print(f"Message {data['id']} deleted")

        elif data['type'] == 'clear':
            print("All messages cleared")

    def on_error(ws, error):
        print(f"WebSocket error: {error}")

    def on_close(ws, close_status_code, close_msg):
        print("WebSocket connection closed")

    def on_open(ws):
        print("Connected to MailCatcher WebSocket")

    ws = websocket.WebSocketApp(
        "ws://127.0.0.1:1080/messages",
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close
    )

    ws.run_forever()

# Run in background thread
monitor_thread = threading.Thread(target=monitor_emails, daemon=True)
monitor_thread.start()

# Your application code here
print("Monitoring emails...")
try:
    monitor_thread.join()
except KeyboardInterrupt:
    print("Stopped monitoring")

JavaScript

Setup

No installation needed - use native Fetch API (modern browsers) or install axios:

Install axios (optional)
npm install axios

List All Messages

javascript
// Using Fetch API
async function getMessages() {
  try {
    const response = await fetch('http://127.0.0.1:1080/messages');
    const messages = await response.json();

    console.log(`Total messages: ${messages.length}`);

    messages.forEach(msg => {
      console.log(`[${msg.id}] ${msg.subject}`);
      console.log(`    From: ${msg.from}`);
      console.log(`    To: ${msg.to}`);
    });
  } catch (error) {
    console.error('Error fetching messages:', error);
  }
}

getMessages();

Get Message Details

javascript
// Get message metadata
const messageId = 1;

const metadata = await fetch(
  `http://127.0.0.1:1080/messages/${messageId}.json`
).then(r => r.json());

console.log('Subject:', metadata.subject);
console.log('From:', metadata.from);

// Get HTML content
const htmlContent = await fetch(
  `http://127.0.0.1:1080/messages/${messageId}.html`
).then(r => r.text());

// Display in iframe
const iframe = document.createElement('iframe');
iframe.srcDoc = htmlContent;
document.body.appendChild(iframe);

// Get plain text
const plainText = await fetch(
  `http://127.0.0.1:1080/messages/${messageId}.plain`
).then(r => r.text());

console.log(plainText);

Extract Verification Codes & Links

javascript
// Extract OTP code
async function getOTPCode(messageId) {
  const response = await fetch(
    `http://127.0.0.1:1080/messages/${messageId}/extract?type=otp`
  );
  const codes = await response.json();
  return codes[0]?.value;
}

// Extract verification link
async function getVerificationLink(messageId) {
  const response = await fetch(
    `http://127.0.0.1:1080/messages/${messageId}/extract?type=link`
  );
  const links = await response.json();
  return links[0]?.value;
}

// Get all parsed data (recommended)
async function getParsedData(messageId) {
  const response = await fetch(
    `http://127.0.0.1:1080/messages/${messageId}/parsed.json`
  );
  return response.json();
}

// Usage
const parsed = await getParsedData(1);
console.log('OTP:', parsed.otp_code);
console.log('Verify URL:', parsed.verification_url);
console.log('Reset Token:', parsed.reset_token);
console.log('All Links:', parsed.all_links);

Testing Helper Class

javascript
class MailCatcherHelper {
  constructor(baseUrl = 'http://127.0.0.1:1080') {
    this.baseUrl = baseUrl;
  }

  async getMessages() {
    const response = await fetch(`${this.baseUrl}/messages`);
    return response.json();
  }

  async getMessage(id) {
    const response = await fetch(
      `${this.baseUrl}/messages/${id}.json`
    );
    return response.json();
  }

  async waitForMessage(recipient, timeout = 10000) {
    const start = Date.now();

    while (Date.now() - start < timeout) {
      const messages = await this.getMessages();
      const found = messages.find(m => m.to.includes(recipient));

      if (found) return found;

      await new Promise(r => setTimeout(r, 500));
    }

    throw new Error(`No email for ${recipient} within ${timeout}ms`);
  }

  async extractVerificationLink(messageId) {
    const parsed = await fetch(
      `${this.baseUrl}/messages/${messageId}/parsed.json`
    ).then(r => r.json());
    return parsed.verification_url;
  }

  async clearMessages() {
    return fetch(`${this.baseUrl}/messages`, { method: 'DELETE' });
  }
}

// Usage in tests
const mail = new MailCatcherHelper();

// Wait for signup email
const signupEmail = await mail.waitForMessage('user@test.com');
console.log('Email received:', signupEmail.subject);

// Extract link
const verifyLink = await mail.extractVerificationLink(signupEmail.id);
console.log('Verify link:', verifyLink);

// Clean up
await mail.clearMessages();

Real-Time Monitoring with WebSocket

javascript
class MailMonitor {
  constructor(baseUrl = 'http://127.0.0.1:1080') {
    this.ws = new WebSocket(baseUrl.replace('http', 'ws') + '/messages');
    this.setupListeners();
  }

  setupListeners() {
    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);

      if (data.type === 'add') {
        console.log('📧 New message:', data.message.subject);
        this.onNewMessage?.(data.message);
      } else if (data.type === 'remove') {
        console.log('🗑️ Message deleted:', data.id);
      } else if (data.type === 'clear') {
        console.log('🧹 All messages cleared');
      }
    };
  }

  onNewMessage(callback) {
    this.onNewMessage = callback;
    return this;
  }

  close() {
    this.ws.close();
  }
}

// Usage
const monitor = new MailMonitor();

monitor.onNewMessage((msg) => {
  console.log('Received:', msg.subject);
});

// Stop monitoring
// monitor.close();

Ruby

Setup

Install the HTTP client gem:

Gemfile
gem 'httparty'
Or install directly
bundle install

List All Messages

ruby
require 'httparty'

class MailCatcher
  include HTTParty
  base_uri 'http://127.0.0.1:1080'
end

# Get all messages
messages = MailCatcher.get('/messages')

puts "Total messages: #{messages.length}"

messages.each do |msg|
  puts "[#{msg['id']}] #{msg['subject']}"
  puts "    From: #{msg['from'].join(', ')}"
  puts "    To: #{msg['to'].join(', ')}"
end

Get Message Details

ruby
require 'httparty'
require 'json'

class MailCatcher
  include HTTParty
  base_uri 'http://127.0.0.1:1080'
end

# Get message metadata
message = MailCatcher.get('/messages/1.json')

puts "Subject: #{message['subject']}"
puts "From: #{message['from'].first}"
puts "To: #{message['to'].first}"

# Get HTML content
html = MailCatcher.get('/messages/1.html')

# Get plain text
plain = MailCatcher.get('/messages/1.plain')

# Save to file
File.write('email.html', html.body)

Extract Verification Codes & Links

ruby
require 'httparty'

class MailCatcher
  include HTTParty
  base_uri 'http://127.0.0.1:1080'
end

message_id = 1

# Extract OTP
otp_data = MailCatcher.get("/messages/#{message_id}/extract", query: { type: 'otp' })
otp = otp_data.first['value'] if otp_data.any?
puts "OTP: #{otp}"

# Extract verification link
links = MailCatcher.get("/messages/#{message_id}/extract", query: { type: 'link' })
verify_link = links.first['value'] if links.any?
puts "Verify Link: #{verify_link}"

# Get all parsed data (recommended)
parsed = MailCatcher.get("/messages/#{message_id}/parsed.json")

puts "Verification URL: #{parsed['verification_url']}"
puts "OTP Code: #{parsed['otp_code']}"
puts "Reset Token: #{parsed['reset_token']}"
puts "Unsubscribe: #{parsed['unsubscribe_link']}"

# Iterate all links
parsed['all_links'].each do |link|
  puts "#{link['text']}: #{link['href']}"
end

Testing Helper Class

ruby
require 'httparty'

class MailCatcherClient
  include HTTParty
  base_uri 'http://127.0.0.1:1080'

  def get_latest_messages(count = 5)
    get('/messages').last(count)
  end

  def wait_for_message(recipient, timeout: 10)
    start_time = Time.now

    loop do
      messages = get('/messages')
      message = messages.find { |m| m['to'].include?(recipient) }

      return message if message

      raise "No email for #{recipient} after #{timeout}s" if Time.now - start_time > timeout

      sleep(0.5)
    end
  end

  def get_verification_link(message_id)
    parsed = get("/messages/#{message_id}/parsed.json")
    parsed['verification_url']
  end

  def get_otp_code(message_id)
    parsed = get("/messages/#{message_id}/parsed.json")
    parsed['otp_code']
  end

  def clear_messages
    delete('/messages')
  end

  private

  def get(path)
    self.class.get(path)
  end

  def delete(path)
    self.class.delete(path)
  end
end

# Usage in tests
client = MailCatcherClient.new

# Wait for email
email = client.wait_for_message('user@example.com')
puts "Email received: #{email['subject']}"

# Extract verification link
verify_link = client.get_verification_link(email['id'])
puts "Verification link: #{verify_link}"

# Get OTP
otp = client.get_otp_code(email['id'])
puts "OTP: #{otp}"

# Clean up
client.clear_messages

RSpec Integration

ruby
# spec/support/mail_catcher.rb
require 'httparty'

class MailCatcherHelper
  include HTTParty
  base_uri 'http://127.0.0.1:1080'

  def latest_email_to(recipient)
    messages = get('/messages')
    messages.reverse_each do |msg|
      return msg if msg['to'].include?(recipient)
    end
    nil
  end

  def clear
    delete('/messages')
  end
end

# spec/spec_helper.rb
RSpec.configure do |config|
  let(:mail) { MailCatcherHelper.new }

  config.before(:each) do
    mail.clear
  end
end

# spec/features/signup_spec.rb
RSpec.describe 'User Signup', type: :feature do
  let(:mail) { MailCatcherHelper.new }

  before { mail.clear }

  it 'sends verification email' do
    visit signup_path
    fill_in 'Email', with: 'user@example.com'
    click_button 'Sign Up'

    sleep 1  # Wait for email

    email = mail.latest_email_to('user@example.com')
    expect(email).to be_present
    expect(email['subject']).to include('Verify')
  end
end