Building Production-Ready MCP Servers for SOC Automation - A Complete Guide
Transform your Security Operations Center with AI-driven Model Context Protocol integrations
Table of Contents
- Introduction
- Understanding Model Context Protocol
- The SOC Automation Challenge
- MCP Architecture for Security Operations
- Implementation: Panther SIEM MCP Server
- Implementation: GitLab MCP Server
- Secure Credential Management
- Complete Investigation Workflow
- Deployment and Security Considerations
- Advanced Use Cases
- Best Practices and Lessons Learned
- Future Roadmap
Introduction
Modern Security Operations Centers (SOCs) face an overwhelming challenge: the exponential growth in security alerts far exceeds the capacity of human analysts to investigate them thoroughly. Traditional manual investigation processes that once took 2-8 hours per incident can now be reduced to 10-30 minutes through intelligent automation.
This comprehensive guide demonstrates how Model Context Protocol (MCP) servers can revolutionize SOC operations by enabling AI assistants to directly interact with security tools, correlate events across platforms, and provide detailed analysis that rivals expert human investigation.
Understanding Model Context Protocol
What Makes MCP Different
Model Context Protocol (MCP) is a standardized framework that enables AI assistants to dynamically discover and interact with external tools and services. Unlike traditional REST APIs that require specific client implementations, MCP provides:
Dynamic Tool Discovery: AI assistants can discover available tools and their capabilities at runtime Contextual Workflows: Maintains conversation context across multiple tool interactions Standardized Interface: Universal protocol that works across different AI platforms Secure Integration: Built-in patterns for authentication and authorization
MCP vs Traditional API Integration
1
2
3
4
5
6
7
Traditional Approach:
AI Assistant → Custom Integration → API → Data Source
MCP Approach:
AI Assistant → MCP Client → MCP Server → API → Data Source
↑ ↑
Universal Protocol Tool Discovery & Context
The key advantage is that once you build an MCP server for a tool, any MCP-compatible AI assistant can use it without additional integration work.
The SOC Automation Challenge
Current State of SOC Operations
Modern SOCs typically handle:
- 1000+ alerts per day from various security tools
- Multiple disconnected platforms (SIEM, ticketing, code repositories, cloud services)
- Manual correlation processes that are time-intensive and error-prone
- Alert fatigue leading to overlooked genuine threats
Traditional Investigation Process
A typical security alert investigation involves:
- Alert Triage (15-30 minutes)
- Review alert details and severity
- Check for similar historical incidents
- Determine initial scope
- Data Gathering (1-2 hours)
- Query SIEM for related events
- Check network logs and firewall rules
- Review user activity and authentication logs
- Context Analysis (30-60 minutes)
- Correlate events across timeframes
- Identify potential attack patterns
- Check for indicators of compromise
- Root Cause Investigation (1-3 hours)
- Review recent system changes
- Analyze deployment logs and code changes
- Check for configuration modifications
- Documentation and Response (30-45 minutes)
- Document findings and timeline
- Implement containment measures
- Create incident reports
Total Time: 3-7 hours per incident
MCP-Enabled Investigation Process
With MCP automation:
- Automated Triage (2-5 minutes)
- AI assistant analyzes alert context
- Automatically queries related data sources
- Provides initial severity assessment
- Intelligent Correlation (5-10 minutes)
- Cross-platform event correlation
- Timeline reconstruction
- Pattern recognition across tools
- Root Cause Analysis (5-15 minutes)
- Automated code change analysis
- Pipeline and deployment correlation
- Configuration drift detection
- Report Generation (1-2 minutes)
- Comprehensive incident timeline
- Evidence compilation
- Recommended actions
Total Time: 13-32 minutes per incident
MCP Architecture for Security Operations
High-Level Architecture
graph TB
subgraph "AI Assistant Layer"
A[AI Assistant<br/>Cursor/Claude] --> B[MCP Client]
end
subgraph "MCP Server Layer"
B --> C[Panther SIEM MCP]
B --> D[GitLab MCP]
B --> E[Additional MCPs]
end
subgraph "Security Tools"
C --> F[Panther SIEM<br/>Data Lake]
D --> G[GitLab API<br/>Code & Pipelines]
E --> H[Other Security<br/>Tools]
end
subgraph "Credential Management"
I[macOS Keychain]
J[AWS Secrets Manager]
K[OAuth 2.1 Flows]
end
C -.-> I
D -.-> I
E -.-> J
E -.-> K
Core Components
MCP Servers: Lightweight programs that expose specific security tool capabilities Tool Discovery: Dynamic registration and capability advertisement Context Management: Maintains investigation state across multiple interactions Security Layer: Secure credential storage and access control
Implementation: Panther SIEM MCP Server
Overview
The Panther SIEM MCP server enables AI assistants to query security logs using natural language. It translates conversational requests into SQL queries and executes them against Panther’s GraphQL API.
Complete Implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
#!/usr/bin/env python3
"""
Panther SIEM MCP Server using FastMCP
This MCP server provides tools to query Panther SIEM's data lake using SQL queries.
Uses FastMCP for automatic protocol handling.
"""
import asyncio
import json
import logging
from typing import Any, Dict, List, Optional
import keyring
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
from mcp.server.fastmcp import FastMCP
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("panther-mcp")
class PantherClient:
"""Client for interacting with Panther SIEM's GraphQL API"""
def __init__(self, api_url: str, api_key: str):
self.api_url = api_url
self.api_key = api_key
self.transport = AIOHTTPTransport(
url=api_url,
headers={"X-API-Key": api_key}
)
self.client = Client(transport=self.transport, fetch_schema_from_transport=True)
# GraphQL queries
self.issue_query = gql("""
mutation IssueQuery($sql: String!) {
executeDataLakeQuery(input: { sql: $sql }) {
id
}
}
""")
self.get_query_results = gql("""
query GetQueryResults($id: ID!, $cursor: String) {
dataLakeQuery(id: $id) {
message
status
results(input: { cursor: $cursor }) {
edges {
node
}
pageInfo {
endCursor
hasNextPage
}
}
}
}
""")
async def execute_query(self, sql: str) -> Dict[str, Any]:
"""Execute a SQL query against Panther's data lake"""
try:
# Issue the query
logger.info(f"Executing query: {sql[:100]}...")
mutation_data = await self.client.execute_async(
self.issue_query,
variable_values={"sql": sql}
)
query_id = mutation_data["executeDataLakeQuery"]["id"]
logger.info(f"Query issued with ID: {query_id}")
# Poll for results with pagination
all_results = []
has_more = True
cursor = None
total_fetched = 0
while has_more:
query_data = await self.client.execute_async(
self.get_query_results,
variable_values={
"id": query_id,
"cursor": cursor
}
)
status = query_data["dataLakeQuery"]["status"]
message = query_data["dataLakeQuery"]["message"]
if status == "running":
logger.info(f"Query still running: {message}")
await asyncio.sleep(2)
continue
if status != "succeeded":
logger.error(f"Query failed: {message}")
raise Exception(f"Query failed with status '{status}': {message}")
# Extract results from current page
edges = query_data["dataLakeQuery"]["results"]["edges"]
page_results = [edge["node"] for edge in edges]
all_results.extend(page_results)
total_fetched += len(page_results)
# Check pagination
page_info = query_data["dataLakeQuery"]["results"]["pageInfo"]
has_more = page_info["hasNextPage"]
cursor = page_info["endCursor"]
logger.info(f"Fetched {len(page_results)} results (total: {total_fetched})")
return {
"query_id": query_id,
"status": "succeeded",
"total_results": len(all_results),
"results": all_results,
"sql": sql
}
except Exception as e:
logger.error(f"Error executing query: {str(e)}")
return {
"query_id": None,
"status": "error",
"error": str(e),
"total_results": 0,
"results": [],
"sql": sql
}
# Initialize FastMCP server
mcp = FastMCP("panther-siem")
# Global Panther client
_panther_client = None
def get_panther_client():
"""Get or create Panther client with stored credentials"""
global _panther_client
if _panther_client is None:
service = "PantherSIEM"
api_url = keyring.get_password(service, "PANTHER_API_URL")
api_key = keyring.get_password(service, "PANTHER_API_KEY")
if not api_url or not api_key:
raise Exception("Panther credentials not found in Keychain. Run store_panther_credentials.py first.")
_panther_client = PantherClient(api_url, api_key)
return _panther_client
@mcp.tool()
async def execute_sql_query(sql: str) -> str:
"""
Execute a SQL query against Panther SIEM's data lake
Args:
sql: The SQL query to execute against the data lake
Returns:
JSON string with query results and metadata
"""
panther_client = get_panther_client()
result = await panther_client.execute_query(sql)
if result["status"] == "error":
return f"Query execution failed: {result['error']}"
# Format results for display
response = f"Query executed successfully!\n\n"
response += f"SQL: {result['sql']}\n"
response += f"Query ID: {result['query_id']}\n"
response += f"Total Results: {result['total_results']}\n\n"
if result['results']:
response += "Results:\n"
response += json.dumps(result['results'], indent=2)
else:
response += "No results returned."
return response
@mcp.tool()
async def list_available_tables() -> str:
"""
Get a list of commonly available tables in Panther's data lake
Returns:
List of common Panther table names
"""
tables = [
"sample_logs_table_name",
# Add more tables as needed
]
response = "Common Panther Data Lake Tables:\n\n"
for table in tables:
response += f"• {table}\n"
response += "\nNote: Available tables depend on your Panther configuration and data sources."
return response
@mcp.tool()
async def get_table_schema(table_name: str) -> str:
"""
Get the schema/structure of a specific table
Args:
table_name: Name of the table to get schema for (e.g., table_name.aws_alb)
Returns:
Table schema information
"""
panther_client = get_panther_client()
# Query to get table schema
schema_sql = f"DESCRIBE TABLE {table_name}"
result = await panther_client.execute_query(schema_sql)
if result["status"] == "error":
return f"Failed to get schema for table '{table_name}': {result['error']}"
response = f"Schema for table: {table_name}\n\n"
if result['results']:
response += "Columns:\n"
response += json.dumps(result['results'], indent=2)
else:
response += "No schema information available."
return response
def main():
"""Main entry point for the MCP server"""
logger.info("Starting Panther MCP Server using FastMCP...")
mcp.run(transport="stdio")
if __name__ == "__main__":
main()
Credential Storage Implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/usr/bin/env python3
"""
store_panther_credentials.py
Securely store Panther SIEM credentials in macOS Keychain
"""
import keyring
import getpass
def main():
service = "PantherSIEM"
# Prompt for URL (visible)
print("Panther API URL format: https://[your-endpoint].amazonaws.com/v1/public/graphql")
print("Get the endpoint from Panther API Playground")
api_url = input("Enter Panther API URL: ").strip()
if not api_url:
print("No URL provided; aborting.")
return
# Prompt for API key (hidden)
api_key = getpass.getpass("Enter Panther API Key: ").strip()
if not api_key:
print("No API key provided; aborting.")
return
try:
keyring.set_password(service, "PANTHER_API_URL", api_url)
keyring.set_password(service, "PANTHER_API_KEY", api_key)
print(f"✅ Stored Panther credentials under Keychain service '{service}'.")
except Exception as e:
print(f"❌ Failed to store credentials: {e}")
if __name__ == "__main__":
main()
Implementation: GitLab MCP Server
Overview
The GitLab MCP server enables investigation of code changes, merge requests, and CI/CD pipeline activities that may be related to security incidents.
Complete Implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
#!/usr/bin/env python3
"""
GitLab MCP Server using FastMCP
This MCP server provides tools to interact with GitLab API for SOC investigations.
"""
import asyncio
import json
import logging
import ssl
import keyring
import aiohttp
from typing import Any, Dict, Optional
from mcp.server.fastmcp import FastMCP
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("gitlab-mcp")
# FastMCP server instance
mcp = FastMCP("gitlab")
def get_gitlab_credentials():
"""
Retrieve GitLab API URL and Token from macOS Keychain.
"""
service = "GitLab"
url = keyring.get_password(service, "GITLAB_API_URL")
token = keyring.get_password(service, "GITLAB_API_TOKEN")
if not url or not token:
raise Exception(
"GitLab credentials not found in Keychain. "
"Please run store_gitlab_creds.py first."
)
return url, token
# Retrieve credentials at startup
GITLAB_API_URL, GITLAB_API_TOKEN = get_gitlab_credentials()
async def gitlab_request(endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Internal helper to perform a GET against the GitLab API.
"""
headers = {"PRIVATE-TOKEN": GITLAB_API_TOKEN}
url = f"{GITLAB_API_URL}/{endpoint}"
ssl_context = ssl.create_default_context()
connector = aiohttp.TCPConnector(ssl=ssl_context)
async with aiohttp.ClientSession(headers=headers, connector=connector) as session:
async with session.get(url, params=params, timeout=60) as response:
if response.status != 200:
text = await response.text()
raise Exception(
f"GitLab API request failed ({response.status}): {text}"
)
return await response.json()
@mcp.tool()
async def list_merge_requests(created_after: str, created_before: str) -> str:
"""
List merge requests within a specific time window for security investigation
Args:
created_after: Start time in ISO format (e.g., '2024-01-15T00:00:00Z')
created_before: End time in ISO format (e.g., '2024-01-15T23:59:59Z')
Returns:
JSON string with merge request details
"""
try:
params = {
"created_after": created_after,
"created_before": created_before,
"scope": "all"
}
logger.info(f"Fetching MRs between {created_after} and {created_before}")
mrs = await gitlab_request("merge_requests", params=params)
if not mrs:
return "No merge requests found in the specified time frame."
simplified = [
{
"iid": mr["iid"],
"title": mr["title"],
"state": mr["state"],
"project_id": mr["project_id"],
"author": mr["author"]["username"],
"created_at": mr["created_at"],
"web_url": mr["web_url"]
}
for mr in mrs
]
return json.dumps({
"total_mrs": len(simplified),
"merge_requests": simplified
}, indent=2)
except Exception as e:
logger.error(f"Error fetching merge requests: {e}")
return f"Error fetching merge requests: {e}"
@mcp.tool()
async def get_merge_request_details(project_id: int, mr_iid: int) -> str:
"""
Get detailed information about a specific merge request
Args:
project_id: GitLab project ID
mr_iid: Merge request internal ID
Returns:
JSON string with detailed MR information
"""
try:
endpoint = f"projects/{project_id}/merge_requests/{mr_iid}"
logger.info(f"Fetching details for MR {mr_iid} in project {project_id}")
details = await gitlab_request(endpoint)
result = {
"id": details["id"],
"iid": details["iid"],
"title": details["title"],
"state": details["state"],
"author": details["author"]["username"],
"created_at": details["created_at"],
"merged_by": details.get("merged_by", {}).get("username"),
"merged_at": details.get("merged_at"),
"web_url": details["web_url"],
"description": details["description"],
"changes_count": details["changes_count"]
}
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error fetching MR details: {e}")
return f"Error fetching MR details: {e}"
@mcp.tool()
async def list_pipeline_failures(project_id: int, updated_after: str) -> str:
"""
List failed pipelines for a project within a time window
Args:
project_id: GitLab project ID
updated_after: Start time in ISO format
Returns:
JSON string with failed pipeline information
"""
try:
params = {"updated_after": updated_after, "status": "failed"}
logger.info(f"Fetching pipeline failures for project {project_id} after {updated_after}")
pipelines = await gitlab_request(f"projects/{project_id}/pipelines", params)
if not pipelines:
return "No pipeline failures found in the specified time frame."
return json.dumps({
"total_failures": len(pipelines),
"pipelines": pipelines
}, indent=2)
except Exception as e:
logger.error(f"Error fetching pipeline failures: {e}")
return f"Error fetching pipeline failures: {e}"
@mcp.tool()
async def review_pipeline_job(job_url: str) -> str:
"""
Review a specific pipeline job and its execution logs
Args:
job_url: Full GitLab job URL (e.g., https://gitlab.com/group/project/-/jobs/12345)
Returns:
JSON string with job details and logs
"""
try:
import re
match = re.match(r"https://gitlab.com/(.+)/-/jobs/(\d+)", job_url)
if not match:
return "Invalid GitLab job URL format."
project_path, job_id = match.groups()
project_encoded = project_path.replace("/", "%2F")
logger.info(f"Reviewing job {job_id} in project {project_path}")
details = await gitlab_request(f"projects/{project_encoded}/jobs/{job_id}")
# Fetch trace logs
ssl_ctx = ssl.create_default_context()
connector = aiohttp.TCPConnector(ssl=ssl_ctx)
async with aiohttp.ClientSession(
headers={"PRIVATE-TOKEN": GITLAB_API_TOKEN},
connector=connector
) as session:
trace_resp = await session.get(
f"{GITLAB_API_URL}/projects/{project_encoded}/jobs/{job_id}/trace"
)
trace = await trace_resp.text()
result = {
"id": details["id"],
"name": details["name"],
"status": details["status"],
"trace_log": trace[-5000:] # Last 5000 characters
}
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error reviewing pipeline job: {e}")
return f"Error reviewing pipeline job: {e}"
@mcp.tool()
async def review_merge_request_and_pipeline(project_id: int, mr_iid: int) -> str:
"""
Comprehensive analysis of merge request and its associated pipeline traces
Args:
project_id: GitLab project ID
mr_iid: Merge request internal ID
Returns:
JSON string with MR details and all pipeline job traces
"""
try:
mr = await gitlab_request(f"projects/{project_id}/merge_requests/{mr_iid}")
pipeline = mr.get("pipeline")
traces = None
if pipeline:
pid = pipeline["id"]
jobs = await gitlab_request(f"projects/{project_id}/pipelines/{pid}/jobs")
traces = {}
ssl_ctx = ssl.create_default_context()
connector = aiohttp.TCPConnector(ssl=ssl_ctx)
async with aiohttp.ClientSession(
headers={"PRIVATE-TOKEN": GITLAB_API_TOKEN},
connector=connector
) as session:
for job in jobs:
jid = job["id"]
resp = await session.get(
f"{GITLAB_API_URL}/projects/{project_id}/jobs/{jid}/trace"
)
log = await resp.text()
traces[job["name"]] = log[-5000:] # Last 5000 characters
return json.dumps({
"merge_request": mr,
"pipeline_trace": traces
}, indent=2)
except Exception as e:
logger.error(f"Error reviewing MR & pipeline: {e}")
return f"Error reviewing MR & pipeline: {e}"
def main():
logger.info("Starting GitLab MCP Server using FastMCP...")
mcp.run(transport="stdio")
if __name__ == "__main__":
main()
GitLab Credential Storage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#!/usr/bin/env python3
"""
store_gitlab_creds.py
Store GitLab API URL and personal access token in the macOS Keychain
"""
import keyring
import getpass
def main():
service = "GitLab"
# GitLab API URL is typically constant
api_url = "https://gitlab.com/api/v4"
try:
keyring.set_password(service, "GITLAB_API_URL", api_url)
print(f"✅ Stored GitLab API URL: {api_url}")
except Exception as e:
print(f"❌ Failed to store API URL: {e}")
return
# Prompt for Token (hidden)
print("\nCreate a Personal Access Token at: https://gitlab.com/-/profile/personal_access_tokens")
print("Required scopes: api, read_repository, read_user")
token = getpass.getpass("Enter GitLab Personal Access Token: ").strip()
if not token:
print("No token provided; aborting.")
return
try:
keyring.set_password(service, "GITLAB_API_TOKEN", token)
print(f"✅ Stored GitLab credentials under Keychain service '{service}'.")
except Exception as e:
print(f"❌ Failed to store API token: {e}")
if __name__ == "__main__":
main()
Secure Credential Management
Local Storage with macOS Keychain
The implementations above use macOS Keychain for secure credential storage:
Advantages:
- OS-level encryption and access control
- Integration with system authentication
- No plaintext secrets in configuration files
- Automatic credential lifecycle management
Security Features:
- Encrypted storage using system keys
- Access control via user authentication
- Audit logging of credential access
- Integration with system security policies
Enterprise Considerations
For production enterprise deployments, consider:
AWS Secrets Manager Integration:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import boto3
import json
def get_credentials_from_aws_secrets():
"""Retrieve credentials from AWS Secrets Manager"""
client = boto3.client('secretsmanager', region_name='us-west-2')
try:
response = client.get_secret_value(SecretId='prod/soc/panther-credentials')
secrets = json.loads(response['SecretString'])
return secrets['api_url'], secrets['api_key']
except Exception as e:
logger.error(f"Failed to retrieve credentials from AWS Secrets Manager: {e}")
raise
OAuth 2.1 Implementation for Remote MCPs:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import secrets
import hashlib
import base64
from datetime import datetime, timedelta
class OAuth21Handler:
def __init__(self, client_id: str, client_secret: str):
self.client_id = client_id
self.client_secret = client_secret
def generate_pkce_challenge(self):
"""Generate PKCE code verifier and challenge for OAuth 2.1"""
code_verifier = base64.urlsafe_b64encode(
secrets.token_bytes(32)
).decode('utf-8').rstrip('=')
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode('utf-8')).digest()
).decode('utf-8').rstrip('=')
return code_verifier, code_challenge
async def exchange_code_for_token(self, authorization_code: str, code_verifier: str):
"""Exchange authorization code for access token"""
# Implementation depends on your OAuth provider
pass
Complete Investigation Workflow
Example: Database Breach Investigation
Let’s walk through a complete investigation using both MCP servers:
Step 1: Alert Received
1
2
3
4
Alert: "Unusual database access pattern detected"
Time: 2024-01-15 14:30:00 UTC
Source: Database monitoring system
IP: 203.0.113.42
Step 2: AI-Driven Initial Analysis
User Query:
1
"Investigate this database alert - check for related events and recent code changes"
AI Assistant Workflow:
- Query Panther for Related Events:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
SELECT eventTime, eventName, sourceIPAddress, userIdentity, requestParameters, responseElements, errorCode FROM table_name.aws_cloudtrail WHERE p_event_time BETWEEN '2024-01-15 14:00:00' AND '2024-01-15 15:00:00' AND ( eventName LIKE '%RDS%' OR eventName LIKE '%Database%' OR sourceIPAddress = '203.0.113.42' OR requestParameters LIKE '%database%' ) ORDER BY eventTime;
- Check Recent Code Changes:
1 2 3 4 5
# AI calls GitLab MCP await list_merge_requests( created_after="2024-01-14T00:00:00Z", created_before="2024-01-15T16:00:00Z" )
- Analyze Pipeline Activities:
1 2 3 4 5
# For each relevant project await list_pipeline_failures( project_id=12345, updated_after="2024-01-14T00:00:00Z" )
Step 3: Correlation and Analysis
The AI assistant automatically correlates:
- Database access anomalies from SIEM data
- Recent deployment activities from GitLab
- Failed pipeline jobs that might indicate security issues
- Code changes in database-related modules
Step 4: Automated Report Generation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
## Security Incident Report: Database Access Anomaly
### Executive Summary
Detected unusual database access pattern from IP 203.0.113.42 at 14:30 UTC.
Investigation reveals correlation with recent deployment of MR #1234.
### Timeline
- **14:20 UTC**: Merge Request #1234 merged to production
- **14:25 UTC**: Database migration pipeline executed
- **14:30 UTC**: Anomalous database queries detected
- **14:32 UTC**: Security scanning job failed (bypassed)
### Root Cause Analysis
1. **Code Change**: MR #1234 introduced new database query in user module
2. **Pipeline Failure**: Security scan detected potential SQL injection but was bypassed
3. **Access Pattern**: New query pattern triggered monitoring alerts
4. **False Positive**: Investigation confirms legitimate new feature deployment
### Evidence
- Panther Query ID: abc123-def456-ghi789
- GitLab MR: #1234 (Project ID: 12345)
- Pipeline Job: https://gitlab.com/company/app/-/jobs/67890
### Recommendation
- **Immediate**: Alert closed as false positive
- **Short-term**: Review pipeline bypass procedures
- **Long-term**: Enhance SQL injection detection rules
### Investigation Duration: 8 minutes
Deployment and Security Considerations
Production Deployment Architecture
graph TB
subgraph "DMZ"
A[Load Balancer] --> B[Nginx Reverse Proxy]
end
subgraph "Application Tier"
B --> C[MCP Server Pod 1]
B --> D[MCP Server Pod 2]
B --> E[MCP Server Pod N]
end
subgraph "Security Layer"
F[AWS WAF]
G[VPC Security Groups]
H[IAM Roles]
end
subgraph "Data Layer"
I[AWS Secrets Manager]
J[CloudWatch Logs]
K[Audit Trail]
end
A --> F
C --> I
D --> I
E --> I
C --> J
D --> J
E --> J
C --> K
D --> K
E --> K
Security Best Practices
Network Security
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Nginx configuration for MCP server
server {
listen 443 ssl http2;
server_name mcp.internal.company.com;
# SSL configuration
ssl_certificate /path/to/cert.pem;
ssl_certificate_key /path/to/key.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256;
# Security headers
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
add_header X-Content-Type-Options nosniff;
add_header X-Frame-Options DENY;
add_header X-XSS-Protection "1; mode=block";
# IP whitelist
allow 10.0.0.0/8;
allow 192.168.0.0/16;
deny all;
location / {
proxy_pass http://mcp-servers;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Rate limiting
limit_req zone=api burst=20 nodelay;
}
}
Application Security
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# Enhanced security middleware
import hashlib
import hmac
import time
from functools import wraps
def require_valid_signature(f):
"""Validate request signatures to prevent tampering"""
@wraps(f)
async def decorated_function(*args, **kwargs):
# Extract signature from request headers
signature = request.headers.get('X-Signature')
timestamp = request.headers.get('X-Timestamp')
# Verify timestamp to prevent replay attacks
if not timestamp or abs(time.time() - int(timestamp)) > 300:
raise Exception("Request timestamp invalid or too old")
# Verify signature
expected_signature = hmac.new(
SECRET_KEY.encode(),
f"{timestamp}{request.data}".encode(),
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected_signature):
raise Exception("Invalid request signature")
return await f(*args, **kwargs)
return decorated_function
@mcp.tool()
@require_valid_signature
async def secure_tool(parameter: str) -> str:
"""Tool with enhanced security validation"""
# Implementation here
pass
Monitoring and Alerting
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import structlog
import time
from prometheus_client import Counter, Histogram
# Metrics
tool_requests = Counter('mcp_tool_requests_total', 'Total tool requests', ['tool', 'status'])
tool_duration = Histogram('mcp_tool_duration_seconds', 'Tool execution time', ['tool'])
security_events = Counter('mcp_security_events_total', 'Security events', ['event_type'])
logger = structlog.get_logger()
@mcp.tool()
async def monitored_tool(parameter: str) -> str:
"""Tool with comprehensive monitoring"""
start_time = time.time()
request_id = secrets.token_urlsafe(16)
logger.info(
"Tool execution started",
request_id=request_id,
tool="monitored_tool",
parameter_hash=hashlib.md5(parameter.encode()).hexdigest(),
user_agent=request.headers.get('User-Agent', 'unknown')
)
try:
with tool_duration.labels(tool='monitored_tool').time():
result = await process_request(parameter)
tool_requests.labels(tool='monitored_tool', status='success').inc()
logger.info(
"Tool execution completed",
request_id=request_id,
duration=time.time() - start_time,
result_size=len(result)
)
return result
except Exception as e:
tool_requests.labels(tool='monitored_tool', status='error').inc()
security_events.labels(event_type='tool_execution_error').inc()
logger.error(
"Tool execution failed",
request_id=request_id,
error=str(e),
duration=time.time() - start_time
)
raise
Advanced Use Cases
1. Automated Threat Hunting
Behavioral Analysis Queries:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@mcp.tool()
async def hunt_anomalous_user_behavior(time_window: str = "7 days") -> str:
"""Hunt for anomalous user behaviors using ML-driven queries"""
sql_query = f"""
WITH user_baselines AS (
SELECT
userIdentity:userName as user_name,
COUNT(DISTINCT sourceIPAddress) as avg_ip_count,
COUNT(DISTINCT eventName) as avg_action_count,
ARRAY_AGG(DISTINCT sourceIPAddress) as typical_ips
FROM table_name.aws_cloudtrail
WHERE p_event_time >= CURRENT_TIMESTAMP - INTERVAL '30 days'
AND p_event_time < CURRENT_TIMESTAMP - INTERVAL '{time_window}'
GROUP BY userIdentity:userName
),
recent_activity AS (
SELECT
userIdentity:userName as user_name,
COUNT(DISTINCT sourceIPAddress) as recent_ip_count,
COUNT(DISTINCT eventName) as recent_action_count,
ARRAY_AGG(DISTINCT sourceIPAddress) as recent_ips,
ARRAY_AGG(DISTINCT eventName) as recent_actions
FROM table_name.aws_cloudtrail
WHERE p_event_time >= CURRENT_TIMESTAMP - INTERVAL '{time_window}'
GROUP BY userIdentity:userName
)
SELECT
r.user_name,
b.avg_ip_count,
r.recent_ip_count,
b.avg_action_count,
r.recent_action_count,
(r.recent_ip_count::float / NULLIF(b.avg_ip_count, 0)) as ip_anomaly_ratio,
(r.recent_action_count::float / NULLIF(b.avg_action_count, 0)) as action_anomaly_ratio,
ARRAY_TO_STRING(
ARRAY(SELECT UNNEST(r.recent_ips) EXCEPT SELECT UNNEST(b.typical_ips)),
', '
) as new_ips,
r.recent_actions,
r.recent_ips
FROM recent_activity r
LEFT JOIN user_baselines b ON r.user_name = b.user_name
WHERE (r.recent_ip_count::float / NULLIF(b.avg_ip_count, 0)) > 2.0
OR (r.recent_action_count::float / NULLIF(b.avg_action_count, 0)) > 2.0
OR b.user_name IS NULL -- New users
ORDER BY ip_anomaly_ratio DESC, action_anomaly_ratio DESC;
"""
return await execute_sql_query(sql_query)
2. Supply Chain Security Monitoring
Detection of Suspicious Dependencies:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@mcp.tool()
async def monitor_supply_chain_changes(project_id: int, days_back: int = 7) -> str:
"""Monitor for suspicious changes in project dependencies"""
# Get recent merge requests
created_after = (datetime.utcnow() - timedelta(days=days_back)).isoformat() + "Z"
created_before = datetime.utcnow().isoformat() + "Z"
mrs = await list_merge_requests(created_after, created_before)
mr_data = json.loads(mrs)
suspicious_changes = []
for mr in mr_data.get("merge_requests", []):
if mr["project_id"] == project_id:
# Get detailed MR information
details = await get_merge_request_details(project_id, mr["iid"])
mr_details = json.loads(details)
# Check for dependency-related changes
if any(keyword in mr_details["title"].lower() for keyword in
["dependency", "package", "npm", "pip", "maven", "gradle"]):
# Get pipeline information
pipeline_info = await review_merge_request_and_pipeline(
project_id, mr["iid"]
)
suspicious_changes.append({
"merge_request": mr_details,
"pipeline_analysis": pipeline_info,
"risk_factors": analyze_dependency_risk(mr_details)
})
return json.dumps({
"monitoring_period": f"{days_back} days",
"project_id": project_id,
"suspicious_changes": suspicious_changes,
"recommendations": generate_supply_chain_recommendations(suspicious_changes)
}, indent=2)
def analyze_dependency_risk(mr_details):
"""Analyze risk factors in dependency changes"""
risk_factors = []
title = mr_details["title"].lower()
description = mr_details["description"].lower()
# Check for high-risk patterns
if "urgent" in title or "hotfix" in title:
risk_factors.append("Urgent/hotfix dependency change")
if "security" in description and "update" in description:
risk_factors.append("Security-related dependency update")
if mr_details["changes_count"] > 100:
risk_factors.append("Large number of file changes")
return risk_factors
3. Compliance Automation
SOX Compliance Monitoring:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@mcp.tool()
async def generate_sox_compliance_report(start_date: str, end_date: str) -> str:
"""Generate SOX compliance report for financial data access"""
sox_query = f"""
SELECT
userIdentity:userName as user,
eventName as action,
eventTime as timestamp,
sourceIPAddress as source_ip,
requestParameters:bucketName as resource,
requestParameters:key as object_key,
CASE
WHEN errorCode IS NULL THEN 'SUCCESS'
ELSE CONCAT('FAILED: ', errorCode)
END as status,
userIdentity:sessionContext:sessionIssuer:userName as assumed_role,
'SOX_FINANCIAL_DATA_ACCESS' as compliance_type
FROM table_name.aws_cloudtrail
WHERE p_event_time BETWEEN '{start_date}' AND '{end_date}'
AND (
eventName IN ('GetObject', 'PutObject', 'DeleteObject', 'ListObjects') OR
eventName LIKE '%Database%' OR
eventName LIKE '%RDS%'
)
AND (
requestParameters:bucketName LIKE '%financial%' OR
requestParameters:bucketName LIKE '%audit%' OR
requestParameters:bucketName LIKE '%sox%' OR
requestParameters:key LIKE '%financial%'
)
ORDER BY eventTime DESC;
"""
result = await execute_sql_query(sox_query)
# Parse results and generate compliance metrics
query_result = json.loads(result.split("Results:\n")[1] if "Results:\n" in result else "[]")
compliance_metrics = {
"reporting_period": {"start": start_date, "end": end_date},
"total_access_events": len(query_result),
"unique_users": len(set(event.get("user", "") for event in query_result)),
"failed_access_attempts": len([e for e in query_result if "FAILED" in e.get("status", "")]),
"after_hours_access": analyze_after_hours_access(query_result),
"privileged_access": analyze_privileged_access(query_result),
"detailed_events": query_result
}
return json.dumps(compliance_metrics, indent=2)
def analyze_after_hours_access(events):
"""Analyze access events outside business hours"""
after_hours = []
for event in events:
timestamp = event.get("timestamp", "")
if timestamp:
# Parse timestamp and check if outside 9-5 business hours
try:
dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
hour = dt.hour
if hour < 9 or hour > 17: # Outside 9 AM - 5 PM
after_hours.append(event)
except:
pass
return {
"count": len(after_hours),
"events": after_hours[:10] # Return first 10 for review
}
def analyze_privileged_access(events):
"""Analyze access by privileged users or roles"""
privileged_keywords = ["admin", "root", "superuser", "privileged", "elevated"]
privileged_access = []
for event in events:
user = event.get("user", "").lower()
role = event.get("assumed_role", "").lower()
if any(keyword in user or keyword in role for keyword in privileged_keywords):
privileged_access.append(event)
return {
"count": len(privileged_access),
"events": privileged_access[:10]
}
Best Practices and Lessons Learned
1. Error Handling and Resilience
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import asyncio
from typing import Optional
import backoff
class ResilientMCPTool:
def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
self.max_retries = max_retries
self.base_delay = base_delay
@backoff.on_exception(
backoff.expo,
(aiohttp.ClientError, asyncio.TimeoutError),
max_tries=3,
base=2
)
async def resilient_api_call(self, endpoint: str, params: Optional[dict] = None):
"""API call with exponential backoff retry logic"""
try:
return await self.make_api_call(endpoint, params)
except Exception as e:
logger.error(f"API call failed after retries: {e}")
return {"error": f"Service temporarily unavailable: {str(e)}"}
@mcp.tool()
async def robust_investigation_tool(self, query: str) -> str:
"""Investigation tool with comprehensive error handling"""
try:
# Validate input
if not query or len(query.strip()) == 0:
return "Error: Query cannot be empty"
if len(query) > 10000:
return "Error: Query too long (max 10,000 characters)"
# Sanitize SQL to prevent injection
sanitized_query = self.sanitize_sql(query)
# Execute with timeout
result = await asyncio.wait_for(
self.execute_query(sanitized_query),
timeout=300 # 5-minute timeout
)
return self.format_result(result)
except asyncio.TimeoutError:
return "Error: Query execution timed out (5 minutes)"
except ValidationError as e:
return f"Error: Invalid query format: {str(e)}"
except AuthenticationError:
return "Error: Authentication failed. Please check credentials."
except RateLimitError as e:
return f"Error: Rate limit exceeded. Retry after {e.retry_after} seconds."
except Exception as e:
logger.error(f"Unexpected error in investigation tool: {e}")
return f"Error: Unexpected issue occurred. Please try again."
def sanitize_sql(self, query: str) -> str:
"""Sanitize SQL query to prevent injection attacks"""
# Remove dangerous keywords
dangerous_keywords = [
'DROP', 'DELETE', 'TRUNCATE', 'ALTER', 'CREATE', 'INSERT',
'UPDATE', 'GRANT', 'REVOKE', 'EXEC', 'EXECUTE'
]
query_upper = query.upper()
for keyword in dangerous_keywords:
if keyword in query_upper:
raise ValidationError(f"Dangerous SQL keyword '{keyword}' not allowed")
return query
2. Performance Optimization
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import aiohttp
from aiohttp_retry import RetryClient, ExponentialRetry
from cachetools import TTLCache
import hashlib
class OptimizedMCPServer:
def __init__(self):
# Connection pooling
self.connector = aiohttp.TCPConnector(
limit=100, # Total connection pool size
limit_per_host=30, # Connections per host
ttl_dns_cache=300, # DNS cache TTL
use_dns_cache=True,
)
# Retry configuration
retry_options = ExponentialRetry(
attempts=3,
start_timeout=1,
max_timeout=10
)
self.session = RetryClient(
connector=self.connector,
retry_options=retry_options,
timeout=aiohttp.ClientTimeout(total=30)
)
# Result caching
self.cache = TTLCache(maxsize=1000, ttl=300) # 5-minute TTL
async def cached_query(self, sql: str) -> dict:
"""Execute query with result caching"""
# Generate cache key
cache_key = hashlib.md5(sql.encode()).hexdigest()
# Check cache first
if cache_key in self.cache:
logger.info(f"Returning cached result for query: {sql[:50]}...")
return self.cache[cache_key]
# Execute query
result = await self.execute_query(sql)
# Cache successful results
if result.get("status") == "succeeded":
self.cache[cache_key] = result
return result
async def batch_requests(self, requests: list) -> list:
"""Execute multiple requests concurrently"""
semaphore = asyncio.Semaphore(10) # Limit concurrent requests
async def limited_request(request):
async with semaphore:
return await self.process_request(request)
tasks = [limited_request(req) for req in requests]
return await asyncio.gather(*tasks, return_exceptions=True)
3. Testing Strategies
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
import json
class TestMCPServers:
@pytest.mark.asyncio
async def test_panther_query_execution(self):
"""Test Panther MCP query execution"""
# Mock successful response
mock_client = AsyncMock()
mock_client.execute_query.return_value = {
"status": "succeeded",
"query_id": "test-123",
"total_results": 2,
"results": [
{"eventName": "ConsoleLogin", "sourceIPAddress": "192.168.1.1"},
{"eventName": "AssumeRole", "sourceIPAddress": "192.168.1.1"}
],
"sql": "SELECT * FROM test_table"
}
with patch('panther_mcp.get_panther_client', return_value=mock_client):
result = await execute_sql_query("SELECT * FROM test_table")
assert "Query executed successfully" in result
assert "test-123" in result
assert "Total Results: 2" in result
mock_client.execute_query.assert_called_once_with("SELECT * FROM test_table")
@pytest.mark.asyncio
async def test_gitlab_merge_request_listing(self):
"""Test GitLab MCP merge request listing"""
# Mock GitLab API response
mock_response = [
{
"iid": 123,
"title": "Security fix for authentication",
"state": "merged",
"project_id": 456,
"author": {"username": "security-team"},
"created_at": "2024-01-15T10:00:00Z",
"web_url": "https://gitlab.com/test/project/-/merge_requests/123"
}
]
with patch('gitlab_mcp.gitlab_request', return_value=mock_response):
result = await list_merge_requests(
"2024-01-15T00:00:00Z",
"2024-01-15T23:59:59Z"
)
result_data = json.loads(result)
assert result_data["total_mrs"] == 1
assert result_data["merge_requests"][0]["iid"] == 123
assert "Security fix" in result_data["merge_requests"][0]["title"]
@pytest.mark.integration
async def test_complete_investigation_workflow(self):
"""Integration test for complete investigation workflow"""
# Mock both services
with patch('panther_mcp.PantherClient') as mock_panther, \
patch('gitlab_mcp.gitlab_request') as mock_gitlab:
# Setup mocks
mock_panther.return_value.execute_query.return_value = {
"status": "succeeded",
"results": [{"event": "test_event"}]
}
mock_gitlab.return_value = [{"iid": 1, "title": "Test MR"}]
# Test investigation workflow
investigation = SecurityInvestigation()
result = await investigation.investigate_alert("test-alert-123")
assert result["status"] == "completed"
assert "recommendations" in result
assert len(result["evidence"]) > 0
def test_credential_storage(self):
"""Test secure credential storage"""
with patch('keyring.set_password') as mock_set, \
patch('keyring.get_password') as mock_get:
# Test storing credentials
mock_get.return_value = None
store_credentials("test_service", "test_key", "test_value")
mock_set.assert_called_with("test_service", "test_key", "test_value")
# Test retrieving credentials
mock_get.return_value = "test_value"
result = get_credentials("test_service", "test_key")
assert result == "test_value"
@pytest.mark.parametrize("sql_input,expected_error", [
("DROP TABLE users;", "Dangerous SQL keyword 'DROP' not allowed"),
("SELECT * FROM logs; DELETE FROM users;", "Dangerous SQL keyword 'DELETE' not allowed"),
("SELECT * FROM logs", None), # Should pass
])
def test_sql_sanitization(self, sql_input, expected_error):
"""Test SQL injection prevention"""
sanitizer = SQLSanitizer()
if expected_error:
with pytest.raises(ValidationError, match=expected_error):
sanitizer.sanitize_sql(sql_input)
else:
result = sanitizer.sanitize_sql(sql_input)
assert result == sql_input
Future Roadmap
1. Slack Integration MCP
Real-time SOC Communication:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@mcp.tool()
async def escalate_to_soc_team(alert_details: dict, urgency: str) -> str:
"""Escalate security incident to SOC team via Slack"""
channel = "#soc-alerts" if urgency == "high" else "#soc-general"
message = {
"text": f"🚨 Security Alert - {urgency.upper()} Priority",
"blocks": [
{
"type": "header",
"text": {"type": "plain_text", "text": f"Security Incident: {alert_details['id']}"}
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"*Severity:* {alert_details['severity']}"},
{"type": "mrkdwn", "text": f"*Source:* {alert_details['source']}"},
{"type": "mrkdwn", "text": f"*Time:* {alert_details['timestamp']}"},
{"type": "mrkdwn", "text": f"*Affected Systems:* {alert_details['systems']}"}
]
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "Acknowledge"},
"value": f"ack_{alert_details['id']}",
"action_id": "acknowledge_alert"
},
{
"type": "button",
"text": {"type": "plain_text", "text": "Investigate"},
"value": f"investigate_{alert_details['id']}",
"action_id": "start_investigation"
}
]
}
]
}
response = await slack_client.post_message(channel, message)
return f"Alert escalated to {channel}, message ID: {response['ts']}"
@mcp.tool()
async def request_analyst_approval(action: str, details: dict) -> str:
"""Request approval from human analyst for sensitive actions"""
approval_message = {
"text": f"🔐 Approval Required: {action}",
"blocks": [
{
"type": "section",
"text": {"type": "mrkdwn", "text": f"*Action:* {action}\n*Details:* {details}"}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "✅ Approve"},
"style": "primary",
"value": f"approve_{action}",
"action_id": "approve_action"
},
{
"type": "button",
"text": {"type": "plain_text", "text": "❌ Deny"},
"style": "danger",
"value": f"deny_{action}",
"action_id": "deny_action"
}
]
}
]
}
# Send to security team lead
response = await slack_client.post_message("#soc-approvals", approval_message)
# Wait for response (with timeout)
approval_status = await wait_for_approval(response['ts'], timeout=300)
return f"Action {action}: {approval_status}"
2. N8N Workflow Integration
Complete Autonomous Investigation Pipeline:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# N8N Workflow: Autonomous SOC Investigation
name: "soc_autonomous_investigation"
description: "Fully automated security incident investigation and response"
triggers:
- name: "webhook_alert"
type: "webhook"
path: "/security/alert"
- name: "scheduled_hunting"
type: "cron"
schedule: "0 */4 * * *" # Every 4 hours
nodes:
- name: "parse_alert"
type: "function"
code: |
// Parse incoming alert data
const alert = items[0].json;
return [{
json: {
alert_id: alert.id,
severity: alert.severity,
source: alert.source,
timestamp: alert.timestamp,
indicators: alert.indicators || []
}
}];
- name: "mcp_siem_initial_query"
type: "http_request"
method: "POST"
url: "{{ $node['mcp_server'].json['panther_endpoint'] }}"
headers:
Authorization: "Bearer {{ $node['auth'].json['token'] }}"
body:
tool: "execute_sql_query"
parameters:
sql: |
SELECT * FROM table_name.aws_cloudtrail
WHERE p_event_time >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
AND (
sourceIPAddress = '{{ $node["parse_alert"].json["indicators"][0] }}' OR
userIdentity:userName = '{{ $node["parse_alert"].json["indicators"][1] }}'
)
ORDER BY eventTime DESC;
- name: "mcp_gitlab_check_changes"
type: "http_request"
method: "POST"
url: "{{ $node['mcp_server'].json['gitlab_endpoint'] }}"
body:
tool: "list_merge_requests"
parameters:
created_after: "{{ $now.minus({hours: 2}).toISO() }}"
created_before: "{{ $now.toISO() }}"
- name: "ai_correlation_analysis"
type: "openai"
model: "gpt-4"
prompt: |
Analyze the following security data and determine:
1. Severity level (LOW/MEDIUM/HIGH/CRITICAL)
2. Whether this is a true positive or false positive
3. Recommended actions
4. Evidence summary
SIEM Data: {{ $node["mcp_siem_initial_query"].json }}
Code Changes: {{ $node["mcp_gitlab_check_changes"].json }}
Original Alert: {{ $node["parse_alert"].json }}
- name: "decision_branch"
type: "if"
conditions:
- field: "{{ $node['ai_correlation_analysis'].json['severity'] }}"
operation: "equal"
value: "CRITICAL"
- name: "escalate_critical"
type: "http_request"
url: "{{ $node['mcp_server'].json['slack_endpoint'] }}"
body:
tool: "escalate_to_soc_team"
parameters:
alert_details: "{{ $node['parse_alert'].json }}"
urgency: "high"
- name: "automated_response"
type: "http_request"
url: "{{ $node['mcp_server'].json['response_endpoint'] }}"
body:
tool: "execute_automated_response"
parameters:
action: "isolate_user"
target: "{{ $node['parse_alert'].json['indicators'][1] }}"
- name: "generate_report"
type: "function"
code: |
const analysis = items[0].json;
const report = {
incident_id: analysis.alert_id,
investigation_summary: analysis.summary,
timeline: analysis.timeline,
evidence: analysis.evidence,
recommendations: analysis.recommendations,
automation_actions: analysis.actions_taken,
investigation_duration: analysis.duration,
analyst_review_required: analysis.severity === 'CRITICAL'
};
return [{ json: report }];
- name: "store_investigation"
type: "database"
operation: "insert"
table: "security_investigations"
data: "{{ $node['generate_report'].json }}"
3. Machine Learning Integration
Predictive Threat Analysis:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@mcp.tool()
async def predict_attack_progression(current_indicators: list, historical_data: dict) -> str:
"""Use ML models to predict likely attack progression"""
# Feature extraction from current indicators
features = extract_threat_features(current_indicators)
# Load pre-trained attack progression model
model = load_attack_progression_model()
# Generate predictions
predictions = model.predict_progression(features)
# Calculate risk scores
risk_assessment = {
"current_stage": classify_attack_stage(current_indicators),
"predicted_next_steps": predictions["next_actions"],
"time_to_critical": predictions["time_estimate"],
"confidence_score": predictions["confidence"],
"recommended_countermeasures": generate_countermeasures(predictions),
"historical_similar_attacks": find_similar_attacks(features)
}
return json.dumps(risk_assessment, indent=2)
@mcp.tool()
async def behavioral_anomaly_detection(entity_type: str, entity_id: str, time_window: str) -> str:
"""Detect behavioral anomalies using unsupervised learning"""
# Query historical behavior patterns
historical_query = f"""
SELECT
eventName,
sourceIPAddress,
userAgent,
HOUR(eventTime) as hour_of_day,
DAYOFWEEK(eventTime) as day_of_week,
COUNT(*) as frequency
FROM table_name.aws_cloudtrail
WHERE userIdentity:userName = '{entity_id}'
AND p_event_time >= CURRENT_TIMESTAMP - INTERVAL '30 days'
AND p_event_time < CURRENT_TIMESTAMP - INTERVAL '{time_window}'
GROUP BY eventName, sourceIPAddress, userAgent, hour_of_day, day_of_week;
"""
historical_data = await execute_sql_query(historical_query)
# Query recent activity
recent_query = f"""
SELECT
eventName,
sourceIPAddress,
userAgent,
HOUR(eventTime) as hour_of_day,
DAYOFWEEK(eventTime) as day_of_week,
COUNT(*) as frequency
FROM table_name.aws_cloudtrail
WHERE userIdentity:userName = '{entity_id}'
AND p_event_time >= CURRENT_TIMESTAMP - INTERVAL '{time_window}'
GROUP BY eventName, sourceIPAddress, userAgent, hour_of_day, day_of_week;
"""
recent_data = await execute_sql_query(recent_query)
# Perform anomaly detection
anomalies = detect_anomalies_ml(historical_data, recent_data)
return json.dumps({
"entity_id": entity_id,
"entity_type": entity_type,
"analysis_window": time_window,
"anomalies_detected": len(anomalies),
"anomaly_details": anomalies,
"risk_score": calculate_risk_score(anomalies),
"recommended_actions": recommend_actions(anomalies)
}, indent=2)
Conclusion
The implementation of MCP servers for SOC automation represents a fundamental shift in how security operations can be conducted. Through the detailed implementations shown in this guide, we’ve demonstrated how AI assistants can be transformed from passive tools into active partners in security investigation and response.
Key Achievements
- Dramatic Efficiency Gains: Investigation times reduced from hours to minutes
- Enhanced Accuracy: Automated correlation reduces human error and oversight
- Scalable Operations: Handle increased alert volumes without proportional staffing
- Consistent Quality: Standardized investigation procedures and comprehensive documentation
Technical Innovation Highlights
- Secure Integration Patterns: Production-ready credential management and access control
- Asynchronous Processing: Efficient handling of long-running SIEM queries
- Multi-platform Correlation: Seamless integration between security and development tools
- Natural Language Interface: Complex security queries expressed in conversational language
Looking Forward
The MCP ecosystem for security automation is rapidly evolving. The patterns and implementations shared in this guide provide a solid foundation for:
- Expanding Tool Coverage: Additional integrations with SOAR platforms, threat intelligence feeds, and cloud security tools
- Enhanced AI Capabilities: Integration with specialized security AI models and behavioral analytics
- Enterprise Adoption: Scalable, secure deployment patterns for large organizations
- Community Collaboration: Open-source ecosystem development for security-focused MCP implementations
Getting Started
To begin implementing MCP automation in your SOC:
- Start Small: Begin with read-only integrations to build confidence
- Focus on High-Value Use Cases: Target repetitive, time-intensive investigation tasks
- Ensure Security: Implement proper authentication, authorization, and audit logging
- Measure Impact: Track metrics like MTTR, alert handling capacity, and analyst satisfaction
- Iterate and Expand: Gradually add more sophisticated automation based on demonstrated value
The journey toward AI-assisted security operations begins with building the right integration foundations. MCP servers provide that foundation, enabling a future where human analysts can focus on strategic threat analysis while AI handles the routine investigation workload.
Resources
- Complete Source Code: All implementations provided in this guide
- Deployment Templates: Infrastructure-as-code for secure production deployment
- Testing Frameworks: Comprehensive test suites for validation and regression testing
- Community: Active development community for security-focused MCP implementations
The future of SOC operations is collaborative human-AI teams enabled by robust, secure automation platforms. Start building yours today.
This guide represents real-world production experience building MCP integrations for security operations. The implementations have been battle-tested in high-volume SOC environments and continue to evolve based on operational feedback and emerging threats.