Post

Building Production-Ready MCP Servers for SOC Automation - A Complete Guide

Building Production-Ready MCP Servers for SOC Automation - A Complete Guide

Transform your Security Operations Center with AI-driven Model Context Protocol integrations

Table of Contents

  1. Introduction
  2. Understanding Model Context Protocol
  3. The SOC Automation Challenge
  4. MCP Architecture for Security Operations
  5. Implementation: Panther SIEM MCP Server
  6. Implementation: GitLab MCP Server
  7. Secure Credential Management
  8. Complete Investigation Workflow
  9. Deployment and Security Considerations
  10. Advanced Use Cases
  11. Best Practices and Lessons Learned
  12. Future Roadmap

Introduction

Modern Security Operations Centers (SOCs) face an overwhelming challenge: the exponential growth in security alerts far exceeds the capacity of human analysts to investigate them thoroughly. Traditional manual investigation processes that once took 2-8 hours per incident can now be reduced to 10-30 minutes through intelligent automation.

This comprehensive guide demonstrates how Model Context Protocol (MCP) servers can revolutionize SOC operations by enabling AI assistants to directly interact with security tools, correlate events across platforms, and provide detailed analysis that rivals expert human investigation.

Understanding Model Context Protocol

What Makes MCP Different

Model Context Protocol (MCP) is a standardized framework that enables AI assistants to dynamically discover and interact with external tools and services. Unlike traditional REST APIs that require specific client implementations, MCP provides:

Dynamic Tool Discovery: AI assistants can discover available tools and their capabilities at runtime Contextual Workflows: Maintains conversation context across multiple tool interactions Standardized Interface: Universal protocol that works across different AI platforms Secure Integration: Built-in patterns for authentication and authorization

MCP vs Traditional API Integration

1
2
3
4
5
6
7
Traditional Approach:
AI Assistant → Custom Integration → API → Data Source

MCP Approach:
AI Assistant → MCP Client → MCP Server → API → Data Source
                ↑                    ↑
        Universal Protocol    Tool Discovery & Context

The key advantage is that once you build an MCP server for a tool, any MCP-compatible AI assistant can use it without additional integration work.

The SOC Automation Challenge

Current State of SOC Operations

Modern SOCs typically handle:

  • 1000+ alerts per day from various security tools
  • Multiple disconnected platforms (SIEM, ticketing, code repositories, cloud services)
  • Manual correlation processes that are time-intensive and error-prone
  • Alert fatigue leading to overlooked genuine threats

Traditional Investigation Process

A typical security alert investigation involves:

  1. Alert Triage (15-30 minutes)
    • Review alert details and severity
    • Check for similar historical incidents
    • Determine initial scope
  2. Data Gathering (1-2 hours)
    • Query SIEM for related events
    • Check network logs and firewall rules
    • Review user activity and authentication logs
  3. Context Analysis (30-60 minutes)
    • Correlate events across timeframes
    • Identify potential attack patterns
    • Check for indicators of compromise
  4. Root Cause Investigation (1-3 hours)
    • Review recent system changes
    • Analyze deployment logs and code changes
    • Check for configuration modifications
  5. Documentation and Response (30-45 minutes)
    • Document findings and timeline
    • Implement containment measures
    • Create incident reports

Total Time: 3-7 hours per incident

MCP-Enabled Investigation Process

With MCP automation:

  1. Automated Triage (2-5 minutes)
    • AI assistant analyzes alert context
    • Automatically queries related data sources
    • Provides initial severity assessment
  2. Intelligent Correlation (5-10 minutes)
    • Cross-platform event correlation
    • Timeline reconstruction
    • Pattern recognition across tools
  3. Root Cause Analysis (5-15 minutes)
    • Automated code change analysis
    • Pipeline and deployment correlation
    • Configuration drift detection
  4. Report Generation (1-2 minutes)
    • Comprehensive incident timeline
    • Evidence compilation
    • Recommended actions

Total Time: 13-32 minutes per incident

MCP Architecture for Security Operations

High-Level Architecture

graph TB
    subgraph "AI Assistant Layer"
        A[AI Assistant<br/>Cursor/Claude] --> B[MCP Client]
    end
    
    subgraph "MCP Server Layer"
        B --> C[Panther SIEM MCP]
        B --> D[GitLab MCP]
        B --> E[Additional MCPs]
    end
    
    subgraph "Security Tools"
        C --> F[Panther SIEM<br/>Data Lake]
        D --> G[GitLab API<br/>Code & Pipelines]
        E --> H[Other Security<br/>Tools]
    end
    
    subgraph "Credential Management"
        I[macOS Keychain]
        J[AWS Secrets Manager]
        K[OAuth 2.1 Flows]
    end
    
    C -.-> I
    D -.-> I
    E -.-> J
    E -.-> K

Core Components

MCP Servers: Lightweight programs that expose specific security tool capabilities Tool Discovery: Dynamic registration and capability advertisement Context Management: Maintains investigation state across multiple interactions Security Layer: Secure credential storage and access control

Implementation: Panther SIEM MCP Server

Overview

The Panther SIEM MCP server enables AI assistants to query security logs using natural language. It translates conversational requests into SQL queries and executes them against Panther’s GraphQL API.

Complete Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
#!/usr/bin/env python3
"""
Panther SIEM MCP Server using FastMCP

This MCP server provides tools to query Panther SIEM's data lake using SQL queries.
Uses FastMCP for automatic protocol handling.
"""

import asyncio
import json
import logging
from typing import Any, Dict, List, Optional
import keyring
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
from mcp.server.fastmcp import FastMCP

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("panther-mcp")

class PantherClient:
    """Client for interacting with Panther SIEM's GraphQL API"""
    
    def __init__(self, api_url: str, api_key: str):
        self.api_url = api_url
        self.api_key = api_key
        self.transport = AIOHTTPTransport(
            url=api_url,
            headers={"X-API-Key": api_key}
        )
        self.client = Client(transport=self.transport, fetch_schema_from_transport=True)
        
        # GraphQL queries
        self.issue_query = gql("""
            mutation IssueQuery($sql: String!) {
                executeDataLakeQuery(input: { sql: $sql }) {
                    id
                }
            }
        """)
        
        self.get_query_results = gql("""
            query GetQueryResults($id: ID!, $cursor: String) {
                dataLakeQuery(id: $id) {
                    message
                    status
                    results(input: { cursor: $cursor }) {
                        edges {
                            node
                        }
                        pageInfo {
                            endCursor
                            hasNextPage
                        }
                    }
                }
            }
        """)
    
    async def execute_query(self, sql: str) -> Dict[str, Any]:
        """Execute a SQL query against Panther's data lake"""
        try:
            # Issue the query
            logger.info(f"Executing query: {sql[:100]}...")
            mutation_data = await self.client.execute_async(
                self.issue_query,
                variable_values={"sql": sql}
            )
            
            query_id = mutation_data["executeDataLakeQuery"]["id"]
            logger.info(f"Query issued with ID: {query_id}")
            
            # Poll for results with pagination
            all_results = []
            has_more = True
            cursor = None
            total_fetched = 0
            
            while has_more:
                query_data = await self.client.execute_async(
                    self.get_query_results,
                    variable_values={
                        "id": query_id,
                        "cursor": cursor
                    }
                )
                
                status = query_data["dataLakeQuery"]["status"]
                message = query_data["dataLakeQuery"]["message"]
                
                if status == "running":
                    logger.info(f"Query still running: {message}")
                    await asyncio.sleep(2)
                    continue
                
                if status != "succeeded":
                    logger.error(f"Query failed: {message}")
                    raise Exception(f"Query failed with status '{status}': {message}")
                
                # Extract results from current page
                edges = query_data["dataLakeQuery"]["results"]["edges"]
                page_results = [edge["node"] for edge in edges]
                all_results.extend(page_results)
                total_fetched += len(page_results)
                
                # Check pagination
                page_info = query_data["dataLakeQuery"]["results"]["pageInfo"]
                has_more = page_info["hasNextPage"]
                cursor = page_info["endCursor"]
                
                logger.info(f"Fetched {len(page_results)} results (total: {total_fetched})")
            
            return {
                "query_id": query_id,
                "status": "succeeded",
                "total_results": len(all_results),
                "results": all_results,
                "sql": sql
            }
            
        except Exception as e:
            logger.error(f"Error executing query: {str(e)}")
            return {
                "query_id": None,
                "status": "error",
                "error": str(e),
                "total_results": 0,
                "results": [],
                "sql": sql
            }

# Initialize FastMCP server
mcp = FastMCP("panther-siem")

# Global Panther client
_panther_client = None

def get_panther_client():
    """Get or create Panther client with stored credentials"""
    global _panther_client
    
    if _panther_client is None:
        service = "PantherSIEM"
        api_url = keyring.get_password(service, "PANTHER_API_URL")
        api_key = keyring.get_password(service, "PANTHER_API_KEY")
        
        if not api_url or not api_key:
            raise Exception("Panther credentials not found in Keychain. Run store_panther_credentials.py first.")
        
        _panther_client = PantherClient(api_url, api_key)
    
    return _panther_client

@mcp.tool()
async def execute_sql_query(sql: str) -> str:
    """
    Execute a SQL query against Panther SIEM's data lake
    
    Args:
        sql: The SQL query to execute against the data lake
    
    Returns:
        JSON string with query results and metadata
    """
    panther_client = get_panther_client()
    result = await panther_client.execute_query(sql)
    
    if result["status"] == "error":
        return f"Query execution failed: {result['error']}"
    
    # Format results for display
    response = f"Query executed successfully!\n\n"
    response += f"SQL: {result['sql']}\n"
    response += f"Query ID: {result['query_id']}\n"
    response += f"Total Results: {result['total_results']}\n\n"
    
    if result['results']:
        response += "Results:\n"
        response += json.dumps(result['results'], indent=2)
    else:
        response += "No results returned."
    
    return response

@mcp.tool()
async def list_available_tables() -> str:
    """
    Get a list of commonly available tables in Panther's data lake
    
    Returns:
        List of common Panther table names
    """
    tables = [
        "sample_logs_table_name",
        # Add more tables as needed
    ]
    
    response = "Common Panther Data Lake Tables:\n\n"
    for table in tables:
        response += f"{table}\n"
    
    response += "\nNote: Available tables depend on your Panther configuration and data sources."
    
    return response

@mcp.tool()
async def get_table_schema(table_name: str) -> str:
    """
    Get the schema/structure of a specific table
    
    Args:
        table_name: Name of the table to get schema for (e.g., table_name.aws_alb)
    
    Returns:
        Table schema information
    """
    panther_client = get_panther_client()
    
    # Query to get table schema
    schema_sql = f"DESCRIBE TABLE {table_name}"
    
    result = await panther_client.execute_query(schema_sql)
    
    if result["status"] == "error":
        return f"Failed to get schema for table '{table_name}': {result['error']}"
    
    response = f"Schema for table: {table_name}\n\n"
    
    if result['results']:
        response += "Columns:\n"
        response += json.dumps(result['results'], indent=2)
    else:
        response += "No schema information available."
    
    return response

def main():
    """Main entry point for the MCP server"""
    logger.info("Starting Panther MCP Server using FastMCP...")
    mcp.run(transport="stdio")

if __name__ == "__main__":
    main()

Credential Storage Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/usr/bin/env python3
"""
store_panther_credentials.py

Securely store Panther SIEM credentials in macOS Keychain
"""

import keyring
import getpass

def main():
    service = "PantherSIEM"

    # Prompt for URL (visible)
    print("Panther API URL format: https://[your-endpoint].amazonaws.com/v1/public/graphql")
    print("Get the endpoint from Panther API Playground")
    api_url = input("Enter Panther API URL: ").strip()
    if not api_url:
        print("No URL provided; aborting.")
        return

    # Prompt for API key (hidden)
    api_key = getpass.getpass("Enter Panther API Key: ").strip()
    if not api_key:
        print("No API key provided; aborting.")
        return

    try:
        keyring.set_password(service, "PANTHER_API_URL", api_url)
        keyring.set_password(service, "PANTHER_API_KEY", api_key)
        print(f"✅ Stored Panther credentials under Keychain service '{service}'.")
    except Exception as e:
        print(f"❌ Failed to store credentials: {e}")

if __name__ == "__main__":
    main()

Implementation: GitLab MCP Server

Overview

The GitLab MCP server enables investigation of code changes, merge requests, and CI/CD pipeline activities that may be related to security incidents.

Complete Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
#!/usr/bin/env python3
"""
GitLab MCP Server using FastMCP

This MCP server provides tools to interact with GitLab API for SOC investigations.
"""

import asyncio
import json
import logging
import ssl
import keyring
import aiohttp
from typing import Any, Dict, Optional
from mcp.server.fastmcp import FastMCP

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("gitlab-mcp")

# FastMCP server instance
mcp = FastMCP("gitlab")

def get_gitlab_credentials():
    """
    Retrieve GitLab API URL and Token from macOS Keychain.
    """
    service = "GitLab"
    url = keyring.get_password(service, "GITLAB_API_URL")
    token = keyring.get_password(service, "GITLAB_API_TOKEN")
    if not url or not token:
        raise Exception(
            "GitLab credentials not found in Keychain. "
            "Please run store_gitlab_creds.py first."
        )
    return url, token

# Retrieve credentials at startup
GITLAB_API_URL, GITLAB_API_TOKEN = get_gitlab_credentials()

async def gitlab_request(endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    """
    Internal helper to perform a GET against the GitLab API.
    """
    headers = {"PRIVATE-TOKEN": GITLAB_API_TOKEN}
    url = f"{GITLAB_API_URL}/{endpoint}"

    ssl_context = ssl.create_default_context()
    connector = aiohttp.TCPConnector(ssl=ssl_context)

    async with aiohttp.ClientSession(headers=headers, connector=connector) as session:
        async with session.get(url, params=params, timeout=60) as response:
            if response.status != 200:
                text = await response.text()
                raise Exception(
                    f"GitLab API request failed ({response.status}): {text}"
                )
            return await response.json()

@mcp.tool()
async def list_merge_requests(created_after: str, created_before: str) -> str:
    """
    List merge requests within a specific time window for security investigation
    
    Args:
        created_after: Start time in ISO format (e.g., '2024-01-15T00:00:00Z')
        created_before: End time in ISO format (e.g., '2024-01-15T23:59:59Z')
    
    Returns:
        JSON string with merge request details
    """
    try:
        params = {
            "created_after": created_after, 
            "created_before": created_before, 
            "scope": "all"
        }
        logger.info(f"Fetching MRs between {created_after} and {created_before}")
        mrs = await gitlab_request("merge_requests", params=params)
        
        if not mrs:
            return "No merge requests found in the specified time frame."
        
        simplified = [
            {
                "iid": mr["iid"], 
                "title": mr["title"], 
                "state": mr["state"],
                "project_id": mr["project_id"], 
                "author": mr["author"]["username"],
                "created_at": mr["created_at"], 
                "web_url": mr["web_url"]
            }
            for mr in mrs
        ]
        
        return json.dumps({
            "total_mrs": len(simplified), 
            "merge_requests": simplified
        }, indent=2)
        
    except Exception as e:
        logger.error(f"Error fetching merge requests: {e}")
        return f"Error fetching merge requests: {e}"

@mcp.tool()
async def get_merge_request_details(project_id: int, mr_iid: int) -> str:
    """
    Get detailed information about a specific merge request
    
    Args:
        project_id: GitLab project ID
        mr_iid: Merge request internal ID
    
    Returns:
        JSON string with detailed MR information
    """
    try:
        endpoint = f"projects/{project_id}/merge_requests/{mr_iid}"
        logger.info(f"Fetching details for MR {mr_iid} in project {project_id}")
        details = await gitlab_request(endpoint)
        
        result = {
            "id": details["id"], 
            "iid": details["iid"], 
            "title": details["title"],
            "state": details["state"], 
            "author": details["author"]["username"],
            "created_at": details["created_at"], 
            "merged_by": details.get("merged_by", {}).get("username"),
            "merged_at": details.get("merged_at"), 
            "web_url": details["web_url"],
            "description": details["description"], 
            "changes_count": details["changes_count"]
        }
        
        return json.dumps(result, indent=2)
        
    except Exception as e:
        logger.error(f"Error fetching MR details: {e}")
        return f"Error fetching MR details: {e}"

@mcp.tool()
async def list_pipeline_failures(project_id: int, updated_after: str) -> str:
    """
    List failed pipelines for a project within a time window
    
    Args:
        project_id: GitLab project ID
        updated_after: Start time in ISO format
    
    Returns:
        JSON string with failed pipeline information
    """
    try:
        params = {"updated_after": updated_after, "status": "failed"}
        logger.info(f"Fetching pipeline failures for project {project_id} after {updated_after}")
        pipelines = await gitlab_request(f"projects/{project_id}/pipelines", params)
        
        if not pipelines:
            return "No pipeline failures found in the specified time frame."
        
        return json.dumps({
            "total_failures": len(pipelines), 
            "pipelines": pipelines
        }, indent=2)
        
    except Exception as e:
        logger.error(f"Error fetching pipeline failures: {e}")
        return f"Error fetching pipeline failures: {e}"

@mcp.tool()
async def review_pipeline_job(job_url: str) -> str:
    """
    Review a specific pipeline job and its execution logs
    
    Args:
        job_url: Full GitLab job URL (e.g., https://gitlab.com/group/project/-/jobs/12345)
    
    Returns:
        JSON string with job details and logs
    """
    try:
        import re
        match = re.match(r"https://gitlab.com/(.+)/-/jobs/(\d+)", job_url)
        if not match:
            return "Invalid GitLab job URL format."
        
        project_path, job_id = match.groups()
        project_encoded = project_path.replace("/", "%2F")

        logger.info(f"Reviewing job {job_id} in project {project_path}")
        details = await gitlab_request(f"projects/{project_encoded}/jobs/{job_id}")

        # Fetch trace logs
        ssl_ctx = ssl.create_default_context()
        connector = aiohttp.TCPConnector(ssl=ssl_ctx)
        async with aiohttp.ClientSession(
            headers={"PRIVATE-TOKEN": GITLAB_API_TOKEN}, 
            connector=connector
        ) as session:
            trace_resp = await session.get(
                f"{GITLAB_API_URL}/projects/{project_encoded}/jobs/{job_id}/trace"
            )
            trace = await trace_resp.text()

        result = {
            "id": details["id"], 
            "name": details["name"], 
            "status": details["status"],
            "trace_log": trace[-5000:]  # Last 5000 characters
        }
        
        return json.dumps(result, indent=2)
        
    except Exception as e:
        logger.error(f"Error reviewing pipeline job: {e}")
        return f"Error reviewing pipeline job: {e}"

@mcp.tool()
async def review_merge_request_and_pipeline(project_id: int, mr_iid: int) -> str:
    """
    Comprehensive analysis of merge request and its associated pipeline traces
    
    Args:
        project_id: GitLab project ID
        mr_iid: Merge request internal ID
    
    Returns:
        JSON string with MR details and all pipeline job traces
    """
    try:
        mr = await gitlab_request(f"projects/{project_id}/merge_requests/{mr_iid}")
        pipeline = mr.get("pipeline")
        traces = None
        
        if pipeline:
            pid = pipeline["id"]
            jobs = await gitlab_request(f"projects/{project_id}/pipelines/{pid}/jobs")
            traces = {}
            
            ssl_ctx = ssl.create_default_context()
            connector = aiohttp.TCPConnector(ssl=ssl_ctx)
            async with aiohttp.ClientSession(
                headers={"PRIVATE-TOKEN": GITLAB_API_TOKEN}, 
                connector=connector
            ) as session:
                for job in jobs:
                    jid = job["id"]
                    resp = await session.get(
                        f"{GITLAB_API_URL}/projects/{project_id}/jobs/{jid}/trace"
                    )
                    log = await resp.text()
                    traces[job["name"]] = log[-5000:]  # Last 5000 characters
        
        return json.dumps({
            "merge_request": mr, 
            "pipeline_trace": traces
        }, indent=2)
        
    except Exception as e:
        logger.error(f"Error reviewing MR & pipeline: {e}")
        return f"Error reviewing MR & pipeline: {e}"

def main():
    logger.info("Starting GitLab MCP Server using FastMCP...")
    mcp.run(transport="stdio")

if __name__ == "__main__":
    main()

GitLab Credential Storage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#!/usr/bin/env python3
"""
store_gitlab_creds.py

Store GitLab API URL and personal access token in the macOS Keychain
"""

import keyring
import getpass

def main():
    service = "GitLab"
    
    # GitLab API URL is typically constant
    api_url = "https://gitlab.com/api/v4"
    
    try:
        keyring.set_password(service, "GITLAB_API_URL", api_url)
        print(f"✅ Stored GitLab API URL: {api_url}")
    except Exception as e:
        print(f"❌ Failed to store API URL: {e}")
        return

    # Prompt for Token (hidden)
    print("\nCreate a Personal Access Token at: https://gitlab.com/-/profile/personal_access_tokens")
    print("Required scopes: api, read_repository, read_user")
    token = getpass.getpass("Enter GitLab Personal Access Token: ").strip()
    if not token:
        print("No token provided; aborting.")
        return

    try:
        keyring.set_password(service, "GITLAB_API_TOKEN", token)
        print(f"✅ Stored GitLab credentials under Keychain service '{service}'.")
    except Exception as e:
        print(f"❌ Failed to store API token: {e}")

if __name__ == "__main__":
    main()

Secure Credential Management

Local Storage with macOS Keychain

The implementations above use macOS Keychain for secure credential storage:

Advantages:

  • OS-level encryption and access control
  • Integration with system authentication
  • No plaintext secrets in configuration files
  • Automatic credential lifecycle management

Security Features:

  • Encrypted storage using system keys
  • Access control via user authentication
  • Audit logging of credential access
  • Integration with system security policies

Enterprise Considerations

For production enterprise deployments, consider:

AWS Secrets Manager Integration:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import boto3
import json

def get_credentials_from_aws_secrets():
    """Retrieve credentials from AWS Secrets Manager"""
    client = boto3.client('secretsmanager', region_name='us-west-2')
    
    try:
        response = client.get_secret_value(SecretId='prod/soc/panther-credentials')
        secrets = json.loads(response['SecretString'])
        return secrets['api_url'], secrets['api_key']
    except Exception as e:
        logger.error(f"Failed to retrieve credentials from AWS Secrets Manager: {e}")
        raise

OAuth 2.1 Implementation for Remote MCPs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import secrets
import hashlib
import base64
from datetime import datetime, timedelta

class OAuth21Handler:
    def __init__(self, client_id: str, client_secret: str):
        self.client_id = client_id
        self.client_secret = client_secret
    
    def generate_pkce_challenge(self):
        """Generate PKCE code verifier and challenge for OAuth 2.1"""
        code_verifier = base64.urlsafe_b64encode(
            secrets.token_bytes(32)
        ).decode('utf-8').rstrip('=')
        
        code_challenge = base64.urlsafe_b64encode(
            hashlib.sha256(code_verifier.encode('utf-8')).digest()
        ).decode('utf-8').rstrip('=')
        
        return code_verifier, code_challenge
    
    async def exchange_code_for_token(self, authorization_code: str, code_verifier: str):
        """Exchange authorization code for access token"""
        # Implementation depends on your OAuth provider
        pass

Complete Investigation Workflow

Example: Database Breach Investigation

Let’s walk through a complete investigation using both MCP servers:

Step 1: Alert Received

1
2
3
4
Alert: "Unusual database access pattern detected"
Time: 2024-01-15 14:30:00 UTC
Source: Database monitoring system
IP: 203.0.113.42

Step 2: AI-Driven Initial Analysis

User Query:

1
"Investigate this database alert - check for related events and recent code changes"

AI Assistant Workflow:

  1. Query Panther for Related Events:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    
    SELECT 
     eventTime,
     eventName,
     sourceIPAddress,
     userIdentity,
     requestParameters,
     responseElements,
     errorCode
    FROM table_name.aws_cloudtrail 
    WHERE p_event_time BETWEEN '2024-01-15 14:00:00' AND '2024-01-15 15:00:00'
     AND (
         eventName LIKE '%RDS%' OR 
         eventName LIKE '%Database%' OR
         sourceIPAddress = '203.0.113.42' OR
         requestParameters LIKE '%database%'
     )
    ORDER BY eventTime;
    
  2. Check Recent Code Changes:
    1
    2
    3
    4
    5
    
    # AI calls GitLab MCP
    await list_merge_requests(
     created_after="2024-01-14T00:00:00Z",
     created_before="2024-01-15T16:00:00Z"
    )
    
  3. Analyze Pipeline Activities:
    1
    2
    3
    4
    5
    
    # For each relevant project
    await list_pipeline_failures(
     project_id=12345,
     updated_after="2024-01-14T00:00:00Z"
    )
    

Step 3: Correlation and Analysis

The AI assistant automatically correlates:

  • Database access anomalies from SIEM data
  • Recent deployment activities from GitLab
  • Failed pipeline jobs that might indicate security issues
  • Code changes in database-related modules

Step 4: Automated Report Generation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
## Security Incident Report: Database Access Anomaly

### Executive Summary
Detected unusual database access pattern from IP 203.0.113.42 at 14:30 UTC.
Investigation reveals correlation with recent deployment of MR #1234.

### Timeline
- **14:20 UTC**: Merge Request #1234 merged to production
- **14:25 UTC**: Database migration pipeline executed
- **14:30 UTC**: Anomalous database queries detected
- **14:32 UTC**: Security scanning job failed (bypassed)

### Root Cause Analysis
1. **Code Change**: MR #1234 introduced new database query in user module
2. **Pipeline Failure**: Security scan detected potential SQL injection but was bypassed
3. **Access Pattern**: New query pattern triggered monitoring alerts
4. **False Positive**: Investigation confirms legitimate new feature deployment

### Evidence
- Panther Query ID: abc123-def456-ghi789
- GitLab MR: #1234 (Project ID: 12345)
- Pipeline Job: https://gitlab.com/company/app/-/jobs/67890

### Recommendation
- **Immediate**: Alert closed as false positive
- **Short-term**: Review pipeline bypass procedures
- **Long-term**: Enhance SQL injection detection rules

### Investigation Duration: 8 minutes

Deployment and Security Considerations

Production Deployment Architecture

graph TB
    subgraph "DMZ"
        A[Load Balancer] --> B[Nginx Reverse Proxy]
    end
    
    subgraph "Application Tier"
        B --> C[MCP Server Pod 1]
        B --> D[MCP Server Pod 2]
        B --> E[MCP Server Pod N]
    end
    
    subgraph "Security Layer"
        F[AWS WAF]
        G[VPC Security Groups]
        H[IAM Roles]
    end
    
    subgraph "Data Layer"
        I[AWS Secrets Manager]
        J[CloudWatch Logs]
        K[Audit Trail]
    end
    
    A --> F
    C --> I
    D --> I
    E --> I
    
    C --> J
    D --> J
    E --> J
    
    C --> K
    D --> K
    E --> K

Security Best Practices

Network Security

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Nginx configuration for MCP server
server {
    listen 443 ssl http2;
    server_name mcp.internal.company.com;
    
    # SSL configuration
    ssl_certificate /path/to/cert.pem;
    ssl_certificate_key /path/to/key.pem;
    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256;
    
    # Security headers
    add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
    add_header X-Content-Type-Options nosniff;
    add_header X-Frame-Options DENY;
    add_header X-XSS-Protection "1; mode=block";
    
    # IP whitelist
    allow 10.0.0.0/8;
    allow 192.168.0.0/16;
    deny all;
    
    location / {
        proxy_pass http://mcp-servers;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # Rate limiting
        limit_req zone=api burst=20 nodelay;
    }
}

Application Security

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# Enhanced security middleware
import hashlib
import hmac
import time
from functools import wraps

def require_valid_signature(f):
    """Validate request signatures to prevent tampering"""
    @wraps(f)
    async def decorated_function(*args, **kwargs):
        # Extract signature from request headers
        signature = request.headers.get('X-Signature')
        timestamp = request.headers.get('X-Timestamp')
        
        # Verify timestamp to prevent replay attacks
        if not timestamp or abs(time.time() - int(timestamp)) > 300:
            raise Exception("Request timestamp invalid or too old")
        
        # Verify signature
        expected_signature = hmac.new(
            SECRET_KEY.encode(),
            f"{timestamp}{request.data}".encode(),
            hashlib.sha256
        ).hexdigest()
        
        if not hmac.compare_digest(signature, expected_signature):
            raise Exception("Invalid request signature")
        
        return await f(*args, **kwargs)
    return decorated_function

@mcp.tool()
@require_valid_signature
async def secure_tool(parameter: str) -> str:
    """Tool with enhanced security validation"""
    # Implementation here
    pass

Monitoring and Alerting

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import structlog
import time
from prometheus_client import Counter, Histogram

# Metrics
tool_requests = Counter('mcp_tool_requests_total', 'Total tool requests', ['tool', 'status'])
tool_duration = Histogram('mcp_tool_duration_seconds', 'Tool execution time', ['tool'])
security_events = Counter('mcp_security_events_total', 'Security events', ['event_type'])

logger = structlog.get_logger()

@mcp.tool()
async def monitored_tool(parameter: str) -> str:
    """Tool with comprehensive monitoring"""
    start_time = time.time()
    request_id = secrets.token_urlsafe(16)
    
    logger.info(
        "Tool execution started",
        request_id=request_id,
        tool="monitored_tool",
        parameter_hash=hashlib.md5(parameter.encode()).hexdigest(),
        user_agent=request.headers.get('User-Agent', 'unknown')
    )
    
    try:
        with tool_duration.labels(tool='monitored_tool').time():
            result = await process_request(parameter)
        
        tool_requests.labels(tool='monitored_tool', status='success').inc()
        
        logger.info(
            "Tool execution completed",
            request_id=request_id,
            duration=time.time() - start_time,
            result_size=len(result)
        )
        
        return result
        
    except Exception as e:
        tool_requests.labels(tool='monitored_tool', status='error').inc()
        security_events.labels(event_type='tool_execution_error').inc()
        
        logger.error(
            "Tool execution failed",
            request_id=request_id,
            error=str(e),
            duration=time.time() - start_time
        )
        raise

Advanced Use Cases

1. Automated Threat Hunting

Behavioral Analysis Queries:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@mcp.tool()
async def hunt_anomalous_user_behavior(time_window: str = "7 days") -> str:
    """Hunt for anomalous user behaviors using ML-driven queries"""
    
    sql_query = f"""
    WITH user_baselines AS (
        SELECT 
            userIdentity:userName as user_name,
            COUNT(DISTINCT sourceIPAddress) as avg_ip_count,
            COUNT(DISTINCT eventName) as avg_action_count,
            ARRAY_AGG(DISTINCT sourceIPAddress) as typical_ips
        FROM table_name.aws_cloudtrail 
        WHERE p_event_time >= CURRENT_TIMESTAMP - INTERVAL '30 days'
            AND p_event_time < CURRENT_TIMESTAMP - INTERVAL '{time_window}'
        GROUP BY userIdentity:userName
    ),
    recent_activity AS (
        SELECT 
            userIdentity:userName as user_name,
            COUNT(DISTINCT sourceIPAddress) as recent_ip_count,
            COUNT(DISTINCT eventName) as recent_action_count,
            ARRAY_AGG(DISTINCT sourceIPAddress) as recent_ips,
            ARRAY_AGG(DISTINCT eventName) as recent_actions
        FROM table_name.aws_cloudtrail 
        WHERE p_event_time >= CURRENT_TIMESTAMP - INTERVAL '{time_window}'
        GROUP BY userIdentity:userName
    )
    SELECT 
        r.user_name,
        b.avg_ip_count,
        r.recent_ip_count,
        b.avg_action_count,
        r.recent_action_count,
        (r.recent_ip_count::float / NULLIF(b.avg_ip_count, 0)) as ip_anomaly_ratio,
        (r.recent_action_count::float / NULLIF(b.avg_action_count, 0)) as action_anomaly_ratio,
        ARRAY_TO_STRING(
            ARRAY(SELECT UNNEST(r.recent_ips) EXCEPT SELECT UNNEST(b.typical_ips)), 
            ', '
        ) as new_ips,
        r.recent_actions,
        r.recent_ips
    FROM recent_activity r
    LEFT JOIN user_baselines b ON r.user_name = b.user_name
    WHERE (r.recent_ip_count::float / NULLIF(b.avg_ip_count, 0)) > 2.0
        OR (r.recent_action_count::float / NULLIF(b.avg_action_count, 0)) > 2.0
        OR b.user_name IS NULL  -- New users
    ORDER BY ip_anomaly_ratio DESC, action_anomaly_ratio DESC;
    """
    
    return await execute_sql_query(sql_query)

2. Supply Chain Security Monitoring

Detection of Suspicious Dependencies:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@mcp.tool()
async def monitor_supply_chain_changes(project_id: int, days_back: int = 7) -> str:
    """Monitor for suspicious changes in project dependencies"""
    
    # Get recent merge requests
    created_after = (datetime.utcnow() - timedelta(days=days_back)).isoformat() + "Z"
    created_before = datetime.utcnow().isoformat() + "Z"
    
    mrs = await list_merge_requests(created_after, created_before)
    mr_data = json.loads(mrs)
    
    suspicious_changes = []
    
    for mr in mr_data.get("merge_requests", []):
        if mr["project_id"] == project_id:
            # Get detailed MR information
            details = await get_merge_request_details(project_id, mr["iid"])
            mr_details = json.loads(details)
            
            # Check for dependency-related changes
            if any(keyword in mr_details["title"].lower() for keyword in 
                   ["dependency", "package", "npm", "pip", "maven", "gradle"]):
                
                # Get pipeline information
                pipeline_info = await review_merge_request_and_pipeline(
                    project_id, mr["iid"]
                )
                
                suspicious_changes.append({
                    "merge_request": mr_details,
                    "pipeline_analysis": pipeline_info,
                    "risk_factors": analyze_dependency_risk(mr_details)
                })
    
    return json.dumps({
        "monitoring_period": f"{days_back} days",
        "project_id": project_id,
        "suspicious_changes": suspicious_changes,
        "recommendations": generate_supply_chain_recommendations(suspicious_changes)
    }, indent=2)

def analyze_dependency_risk(mr_details):
    """Analyze risk factors in dependency changes"""
    risk_factors = []
    
    title = mr_details["title"].lower()
    description = mr_details["description"].lower()
    
    # Check for high-risk patterns
    if "urgent" in title or "hotfix" in title:
        risk_factors.append("Urgent/hotfix dependency change")
    
    if "security" in description and "update" in description:
        risk_factors.append("Security-related dependency update")
    
    if mr_details["changes_count"] > 100:
        risk_factors.append("Large number of file changes")
    
    return risk_factors

3. Compliance Automation

SOX Compliance Monitoring:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@mcp.tool()
async def generate_sox_compliance_report(start_date: str, end_date: str) -> str:
    """Generate SOX compliance report for financial data access"""
    
    sox_query = f"""
    SELECT 
        userIdentity:userName as user,
        eventName as action,
        eventTime as timestamp,
        sourceIPAddress as source_ip,
        requestParameters:bucketName as resource,
        requestParameters:key as object_key,
        CASE 
            WHEN errorCode IS NULL THEN 'SUCCESS'
            ELSE CONCAT('FAILED: ', errorCode)
        END as status,
        userIdentity:sessionContext:sessionIssuer:userName as assumed_role,
        'SOX_FINANCIAL_DATA_ACCESS' as compliance_type
    FROM table_name.aws_cloudtrail 
    WHERE p_event_time BETWEEN '{start_date}' AND '{end_date}'
        AND (
            eventName IN ('GetObject', 'PutObject', 'DeleteObject', 'ListObjects') OR
            eventName LIKE '%Database%' OR
            eventName LIKE '%RDS%'
        )
        AND (
            requestParameters:bucketName LIKE '%financial%' OR
            requestParameters:bucketName LIKE '%audit%' OR
            requestParameters:bucketName LIKE '%sox%' OR
            requestParameters:key LIKE '%financial%'
        )
    ORDER BY eventTime DESC;
    """
    
    result = await execute_sql_query(sox_query)
    
    # Parse results and generate compliance metrics
    query_result = json.loads(result.split("Results:\n")[1] if "Results:\n" in result else "[]")
    
    compliance_metrics = {
        "reporting_period": {"start": start_date, "end": end_date},
        "total_access_events": len(query_result),
        "unique_users": len(set(event.get("user", "") for event in query_result)),
        "failed_access_attempts": len([e for e in query_result if "FAILED" in e.get("status", "")]),
        "after_hours_access": analyze_after_hours_access(query_result),
        "privileged_access": analyze_privileged_access(query_result),
        "detailed_events": query_result
    }
    
    return json.dumps(compliance_metrics, indent=2)

def analyze_after_hours_access(events):
    """Analyze access events outside business hours"""
    after_hours = []
    
    for event in events:
        timestamp = event.get("timestamp", "")
        if timestamp:
            # Parse timestamp and check if outside 9-5 business hours
            try:
                dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
                hour = dt.hour
                
                if hour < 9 or hour > 17:  # Outside 9 AM - 5 PM
                    after_hours.append(event)
            except:
                pass
    
    return {
        "count": len(after_hours),
        "events": after_hours[:10]  # Return first 10 for review
    }

def analyze_privileged_access(events):
    """Analyze access by privileged users or roles"""
    privileged_keywords = ["admin", "root", "superuser", "privileged", "elevated"]
    privileged_access = []
    
    for event in events:
        user = event.get("user", "").lower()
        role = event.get("assumed_role", "").lower()
        
        if any(keyword in user or keyword in role for keyword in privileged_keywords):
            privileged_access.append(event)
    
    return {
        "count": len(privileged_access),
        "events": privileged_access[:10]
    }

Best Practices and Lessons Learned

1. Error Handling and Resilience

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import asyncio
from typing import Optional
import backoff

class ResilientMCPTool:
    def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
        self.max_retries = max_retries
        self.base_delay = base_delay
    
    @backoff.on_exception(
        backoff.expo,
        (aiohttp.ClientError, asyncio.TimeoutError),
        max_tries=3,
        base=2
    )
    async def resilient_api_call(self, endpoint: str, params: Optional[dict] = None):
        """API call with exponential backoff retry logic"""
        try:
            return await self.make_api_call(endpoint, params)
        except Exception as e:
            logger.error(f"API call failed after retries: {e}")
            return {"error": f"Service temporarily unavailable: {str(e)}"}
    
    @mcp.tool()
    async def robust_investigation_tool(self, query: str) -> str:
        """Investigation tool with comprehensive error handling"""
        try:
            # Validate input
            if not query or len(query.strip()) == 0:
                return "Error: Query cannot be empty"
            
            if len(query) > 10000:
                return "Error: Query too long (max 10,000 characters)"
            
            # Sanitize SQL to prevent injection
            sanitized_query = self.sanitize_sql(query)
            
            # Execute with timeout
            result = await asyncio.wait_for(
                self.execute_query(sanitized_query),
                timeout=300  # 5-minute timeout
            )
            
            return self.format_result(result)
            
        except asyncio.TimeoutError:
            return "Error: Query execution timed out (5 minutes)"
        except ValidationError as e:
            return f"Error: Invalid query format: {str(e)}"
        except AuthenticationError:
            return "Error: Authentication failed. Please check credentials."
        except RateLimitError as e:
            return f"Error: Rate limit exceeded. Retry after {e.retry_after} seconds."
        except Exception as e:
            logger.error(f"Unexpected error in investigation tool: {e}")
            return f"Error: Unexpected issue occurred. Please try again."
    
    def sanitize_sql(self, query: str) -> str:
        """Sanitize SQL query to prevent injection attacks"""
        # Remove dangerous keywords
        dangerous_keywords = [
            'DROP', 'DELETE', 'TRUNCATE', 'ALTER', 'CREATE', 'INSERT', 
            'UPDATE', 'GRANT', 'REVOKE', 'EXEC', 'EXECUTE'
        ]
        
        query_upper = query.upper()
        for keyword in dangerous_keywords:
            if keyword in query_upper:
                raise ValidationError(f"Dangerous SQL keyword '{keyword}' not allowed")
        
        return query

2. Performance Optimization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import aiohttp
from aiohttp_retry import RetryClient, ExponentialRetry
from cachetools import TTLCache
import hashlib

class OptimizedMCPServer:
    def __init__(self):
        # Connection pooling
        self.connector = aiohttp.TCPConnector(
            limit=100,  # Total connection pool size
            limit_per_host=30,  # Connections per host
            ttl_dns_cache=300,  # DNS cache TTL
            use_dns_cache=True,
        )
        
        # Retry configuration
        retry_options = ExponentialRetry(
            attempts=3, 
            start_timeout=1,
            max_timeout=10
        )
        
        self.session = RetryClient(
            connector=self.connector,
            retry_options=retry_options,
            timeout=aiohttp.ClientTimeout(total=30)
        )
        
        # Result caching
        self.cache = TTLCache(maxsize=1000, ttl=300)  # 5-minute TTL
    
    async def cached_query(self, sql: str) -> dict:
        """Execute query with result caching"""
        # Generate cache key
        cache_key = hashlib.md5(sql.encode()).hexdigest()
        
        # Check cache first
        if cache_key in self.cache:
            logger.info(f"Returning cached result for query: {sql[:50]}...")
            return self.cache[cache_key]
        
        # Execute query
        result = await self.execute_query(sql)
        
        # Cache successful results
        if result.get("status") == "succeeded":
            self.cache[cache_key] = result
        
        return result
    
    async def batch_requests(self, requests: list) -> list:
        """Execute multiple requests concurrently"""
        semaphore = asyncio.Semaphore(10)  # Limit concurrent requests
        
        async def limited_request(request):
            async with semaphore:
                return await self.process_request(request)
        
        tasks = [limited_request(req) for req in requests]
        return await asyncio.gather(*tasks, return_exceptions=True)

3. Testing Strategies

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
import json

class TestMCPServers:
    
    @pytest.mark.asyncio
    async def test_panther_query_execution(self):
        """Test Panther MCP query execution"""
        # Mock successful response
        mock_client = AsyncMock()
        mock_client.execute_query.return_value = {
            "status": "succeeded",
            "query_id": "test-123",
            "total_results": 2,
            "results": [
                {"eventName": "ConsoleLogin", "sourceIPAddress": "192.168.1.1"},
                {"eventName": "AssumeRole", "sourceIPAddress": "192.168.1.1"}
            ],
            "sql": "SELECT * FROM test_table"
        }
        
        with patch('panther_mcp.get_panther_client', return_value=mock_client):
            result = await execute_sql_query("SELECT * FROM test_table")
        
        assert "Query executed successfully" in result
        assert "test-123" in result
        assert "Total Results: 2" in result
        mock_client.execute_query.assert_called_once_with("SELECT * FROM test_table")
    
    @pytest.mark.asyncio
    async def test_gitlab_merge_request_listing(self):
        """Test GitLab MCP merge request listing"""
        # Mock GitLab API response
        mock_response = [
            {
                "iid": 123,
                "title": "Security fix for authentication",
                "state": "merged",
                "project_id": 456,
                "author": {"username": "security-team"},
                "created_at": "2024-01-15T10:00:00Z",
                "web_url": "https://gitlab.com/test/project/-/merge_requests/123"
            }
        ]
        
        with patch('gitlab_mcp.gitlab_request', return_value=mock_response):
            result = await list_merge_requests(
                "2024-01-15T00:00:00Z", 
                "2024-01-15T23:59:59Z"
            )
        
        result_data = json.loads(result)
        assert result_data["total_mrs"] == 1
        assert result_data["merge_requests"][0]["iid"] == 123
        assert "Security fix" in result_data["merge_requests"][0]["title"]
    
    @pytest.mark.integration
    async def test_complete_investigation_workflow(self):
        """Integration test for complete investigation workflow"""
        # Mock both services
        with patch('panther_mcp.PantherClient') as mock_panther, \
             patch('gitlab_mcp.gitlab_request') as mock_gitlab:
            
            # Setup mocks
            mock_panther.return_value.execute_query.return_value = {
                "status": "succeeded",
                "results": [{"event": "test_event"}]
            }
            
            mock_gitlab.return_value = [{"iid": 1, "title": "Test MR"}]
            
            # Test investigation workflow
            investigation = SecurityInvestigation()
            result = await investigation.investigate_alert("test-alert-123")
            
            assert result["status"] == "completed"
            assert "recommendations" in result
            assert len(result["evidence"]) > 0
    
    def test_credential_storage(self):
        """Test secure credential storage"""
        with patch('keyring.set_password') as mock_set, \
             patch('keyring.get_password') as mock_get:
            
            # Test storing credentials
            mock_get.return_value = None
            store_credentials("test_service", "test_key", "test_value")
            mock_set.assert_called_with("test_service", "test_key", "test_value")
            
            # Test retrieving credentials
            mock_get.return_value = "test_value"
            result = get_credentials("test_service", "test_key")
            assert result == "test_value"
    
    @pytest.mark.parametrize("sql_input,expected_error", [
        ("DROP TABLE users;", "Dangerous SQL keyword 'DROP' not allowed"),
        ("SELECT * FROM logs; DELETE FROM users;", "Dangerous SQL keyword 'DELETE' not allowed"),
        ("SELECT * FROM logs", None),  # Should pass
    ])
    def test_sql_sanitization(self, sql_input, expected_error):
        """Test SQL injection prevention"""
        sanitizer = SQLSanitizer()
        
        if expected_error:
            with pytest.raises(ValidationError, match=expected_error):
                sanitizer.sanitize_sql(sql_input)
        else:
            result = sanitizer.sanitize_sql(sql_input)
            assert result == sql_input

Future Roadmap

1. Slack Integration MCP

Real-time SOC Communication:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@mcp.tool()
async def escalate_to_soc_team(alert_details: dict, urgency: str) -> str:
    """Escalate security incident to SOC team via Slack"""
    
    channel = "#soc-alerts" if urgency == "high" else "#soc-general"
    
    message = {
        "text": f"🚨 Security Alert - {urgency.upper()} Priority",
        "blocks": [
            {
                "type": "header",
                "text": {"type": "plain_text", "text": f"Security Incident: {alert_details['id']}"}
            },
            {
                "type": "section",
                "fields": [
                    {"type": "mrkdwn", "text": f"*Severity:* {alert_details['severity']}"},
                    {"type": "mrkdwn", "text": f"*Source:* {alert_details['source']}"},
                    {"type": "mrkdwn", "text": f"*Time:* {alert_details['timestamp']}"},
                    {"type": "mrkdwn", "text": f"*Affected Systems:* {alert_details['systems']}"}
                ]
            },
            {
                "type": "actions",
                "elements": [
                    {
                        "type": "button",
                        "text": {"type": "plain_text", "text": "Acknowledge"},
                        "value": f"ack_{alert_details['id']}",
                        "action_id": "acknowledge_alert"
                    },
                    {
                        "type": "button",
                        "text": {"type": "plain_text", "text": "Investigate"},
                        "value": f"investigate_{alert_details['id']}",
                        "action_id": "start_investigation"
                    }
                ]
            }
        ]
    }
    
    response = await slack_client.post_message(channel, message)
    return f"Alert escalated to {channel}, message ID: {response['ts']}"

@mcp.tool()
async def request_analyst_approval(action: str, details: dict) -> str:
    """Request approval from human analyst for sensitive actions"""
    
    approval_message = {
        "text": f"🔐 Approval Required: {action}",
        "blocks": [
            {
                "type": "section",
                "text": {"type": "mrkdwn", "text": f"*Action:* {action}\n*Details:* {details}"}
            },
            {
                "type": "actions",
                "elements": [
                    {
                        "type": "button",
                        "text": {"type": "plain_text", "text": "✅ Approve"},
                        "style": "primary",
                        "value": f"approve_{action}",
                        "action_id": "approve_action"
                    },
                    {
                        "type": "button",
                        "text": {"type": "plain_text", "text": "❌ Deny"},
                        "style": "danger",
                        "value": f"deny_{action}",
                        "action_id": "deny_action"
                    }
                ]
            }
        ]
    }
    
    # Send to security team lead
    response = await slack_client.post_message("#soc-approvals", approval_message)
    
    # Wait for response (with timeout)
    approval_status = await wait_for_approval(response['ts'], timeout=300)
    return f"Action {action}: {approval_status}"

2. N8N Workflow Integration

Complete Autonomous Investigation Pipeline:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# N8N Workflow: Autonomous SOC Investigation
name: "soc_autonomous_investigation"
description: "Fully automated security incident investigation and response"

triggers:
  - name: "webhook_alert"
    type: "webhook"
    path: "/security/alert"
    
  - name: "scheduled_hunting"
    type: "cron"
    schedule: "0 */4 * * *"  # Every 4 hours

nodes:
  - name: "parse_alert"
    type: "function"
    code: |
      // Parse incoming alert data
      const alert = items[0].json;
      return [{
        json: {
          alert_id: alert.id,
          severity: alert.severity,
          source: alert.source,
          timestamp: alert.timestamp,
          indicators: alert.indicators || []
        }
      }];

  - name: "mcp_siem_initial_query"
    type: "http_request"
    method: "POST"
    url: "{{ $node['mcp_server'].json['panther_endpoint'] }}"
    headers:
      Authorization: "Bearer {{ $node['auth'].json['token'] }}"
    body:
      tool: "execute_sql_query"
      parameters:
        sql: |
          SELECT * FROM table_name.aws_cloudtrail 
          WHERE p_event_time >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
          AND (
            sourceIPAddress = '{{ $node["parse_alert"].json["indicators"][0] }}' OR
            userIdentity:userName = '{{ $node["parse_alert"].json["indicators"][1] }}'
          )
          ORDER BY eventTime DESC;

  - name: "mcp_gitlab_check_changes"
    type: "http_request"
    method: "POST"
    url: "{{ $node['mcp_server'].json['gitlab_endpoint'] }}"
    body:
      tool: "list_merge_requests"
      parameters:
        created_after: "{{ $now.minus({hours: 2}).toISO() }}"
        created_before: "{{ $now.toISO() }}"

  - name: "ai_correlation_analysis"
    type: "openai"
    model: "gpt-4"
    prompt: |
      Analyze the following security data and determine:
      1. Severity level (LOW/MEDIUM/HIGH/CRITICAL)
      2. Whether this is a true positive or false positive
      3. Recommended actions
      4. Evidence summary
      
      SIEM Data: {{ $node["mcp_siem_initial_query"].json }}
      Code Changes: {{ $node["mcp_gitlab_check_changes"].json }}
      Original Alert: {{ $node["parse_alert"].json }}

  - name: "decision_branch"
    type: "if"
    conditions:
      - field: "{{ $node['ai_correlation_analysis'].json['severity'] }}"
        operation: "equal"
        value: "CRITICAL"

  - name: "escalate_critical"
    type: "http_request"
    url: "{{ $node['mcp_server'].json['slack_endpoint'] }}"
    body:
      tool: "escalate_to_soc_team"
      parameters:
        alert_details: "{{ $node['parse_alert'].json }}"
        urgency: "high"

  - name: "automated_response"
    type: "http_request"
    url: "{{ $node['mcp_server'].json['response_endpoint'] }}"
    body:
      tool: "execute_automated_response"
      parameters:
        action: "isolate_user"
        target: "{{ $node['parse_alert'].json['indicators'][1] }}"

  - name: "generate_report"
    type: "function"
    code: |
      const analysis = items[0].json;
      const report = {
        incident_id: analysis.alert_id,
        investigation_summary: analysis.summary,
        timeline: analysis.timeline,
        evidence: analysis.evidence,
        recommendations: analysis.recommendations,
        automation_actions: analysis.actions_taken,
        investigation_duration: analysis.duration,
        analyst_review_required: analysis.severity === 'CRITICAL'
      };
      return [{ json: report }];

  - name: "store_investigation"
    type: "database"
    operation: "insert"
    table: "security_investigations"
    data: "{{ $node['generate_report'].json }}"

3. Machine Learning Integration

Predictive Threat Analysis:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@mcp.tool()
async def predict_attack_progression(current_indicators: list, historical_data: dict) -> str:
    """Use ML models to predict likely attack progression"""
    
    # Feature extraction from current indicators
    features = extract_threat_features(current_indicators)
    
    # Load pre-trained attack progression model
    model = load_attack_progression_model()
    
    # Generate predictions
    predictions = model.predict_progression(features)
    
    # Calculate risk scores
    risk_assessment = {
        "current_stage": classify_attack_stage(current_indicators),
        "predicted_next_steps": predictions["next_actions"],
        "time_to_critical": predictions["time_estimate"],
        "confidence_score": predictions["confidence"],
        "recommended_countermeasures": generate_countermeasures(predictions),
        "historical_similar_attacks": find_similar_attacks(features)
    }
    
    return json.dumps(risk_assessment, indent=2)

@mcp.tool()
async def behavioral_anomaly_detection(entity_type: str, entity_id: str, time_window: str) -> str:
    """Detect behavioral anomalies using unsupervised learning"""
    
    # Query historical behavior patterns
    historical_query = f"""
    SELECT 
        eventName,
        sourceIPAddress,
        userAgent,
        HOUR(eventTime) as hour_of_day,
        DAYOFWEEK(eventTime) as day_of_week,
        COUNT(*) as frequency
    FROM table_name.aws_cloudtrail 
    WHERE userIdentity:userName = '{entity_id}'
        AND p_event_time >= CURRENT_TIMESTAMP - INTERVAL '30 days'
        AND p_event_time < CURRENT_TIMESTAMP - INTERVAL '{time_window}'
    GROUP BY eventName, sourceIPAddress, userAgent, hour_of_day, day_of_week;
    """
    
    historical_data = await execute_sql_query(historical_query)
    
    # Query recent activity
    recent_query = f"""
    SELECT 
        eventName,
        sourceIPAddress,
        userAgent,
        HOUR(eventTime) as hour_of_day,
        DAYOFWEEK(eventTime) as day_of_week,
        COUNT(*) as frequency
    FROM table_name.aws_cloudtrail 
    WHERE userIdentity:userName = '{entity_id}'
        AND p_event_time >= CURRENT_TIMESTAMP - INTERVAL '{time_window}'
    GROUP BY eventName, sourceIPAddress, userAgent, hour_of_day, day_of_week;
    """
    
    recent_data = await execute_sql_query(recent_query)
    
    # Perform anomaly detection
    anomalies = detect_anomalies_ml(historical_data, recent_data)
    
    return json.dumps({
        "entity_id": entity_id,
        "entity_type": entity_type,
        "analysis_window": time_window,
        "anomalies_detected": len(anomalies),
        "anomaly_details": anomalies,
        "risk_score": calculate_risk_score(anomalies),
        "recommended_actions": recommend_actions(anomalies)
    }, indent=2)

Conclusion

The implementation of MCP servers for SOC automation represents a fundamental shift in how security operations can be conducted. Through the detailed implementations shown in this guide, we’ve demonstrated how AI assistants can be transformed from passive tools into active partners in security investigation and response.

Key Achievements

  1. Dramatic Efficiency Gains: Investigation times reduced from hours to minutes
  2. Enhanced Accuracy: Automated correlation reduces human error and oversight
  3. Scalable Operations: Handle increased alert volumes without proportional staffing
  4. Consistent Quality: Standardized investigation procedures and comprehensive documentation

Technical Innovation Highlights

  • Secure Integration Patterns: Production-ready credential management and access control
  • Asynchronous Processing: Efficient handling of long-running SIEM queries
  • Multi-platform Correlation: Seamless integration between security and development tools
  • Natural Language Interface: Complex security queries expressed in conversational language

Looking Forward

The MCP ecosystem for security automation is rapidly evolving. The patterns and implementations shared in this guide provide a solid foundation for:

  • Expanding Tool Coverage: Additional integrations with SOAR platforms, threat intelligence feeds, and cloud security tools
  • Enhanced AI Capabilities: Integration with specialized security AI models and behavioral analytics
  • Enterprise Adoption: Scalable, secure deployment patterns for large organizations
  • Community Collaboration: Open-source ecosystem development for security-focused MCP implementations

Getting Started

To begin implementing MCP automation in your SOC:

  1. Start Small: Begin with read-only integrations to build confidence
  2. Focus on High-Value Use Cases: Target repetitive, time-intensive investigation tasks
  3. Ensure Security: Implement proper authentication, authorization, and audit logging
  4. Measure Impact: Track metrics like MTTR, alert handling capacity, and analyst satisfaction
  5. Iterate and Expand: Gradually add more sophisticated automation based on demonstrated value

The journey toward AI-assisted security operations begins with building the right integration foundations. MCP servers provide that foundation, enabling a future where human analysts can focus on strategic threat analysis while AI handles the routine investigation workload.

Resources

  • Complete Source Code: All implementations provided in this guide
  • Deployment Templates: Infrastructure-as-code for secure production deployment
  • Testing Frameworks: Comprehensive test suites for validation and regression testing
  • Community: Active development community for security-focused MCP implementations

The future of SOC operations is collaborative human-AI teams enabled by robust, secure automation platforms. Start building yours today.


This guide represents real-world production experience building MCP integrations for security operations. The implementations have been battle-tested in high-volume SOC environments and continue to evolve based on operational feedback and emerging threats.

This post is licensed under CC BY 4.0 by the author.