-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathautomatic-triage.py
More file actions
661 lines (541 loc) · 23.6 KB
/
automatic-triage.py
File metadata and controls
661 lines (541 loc) · 23.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
import json
import requests
import sys
import os
import csv
import argparse
import logging
from datetime import datetime
from urllib.parse import urlparse
CONFIG_FILE = 'config.json'
REPOS_FILE = 'repos.csv'
OUTPUT_DIR = 'batch_output'
MOBB_API_BASE = 'https://api.mobb.ai'
# Tag priority order (lower index = higher priority)
TAG_PRIORITY = [
'FALSE_POSITIVE',
'SUPPRESSED',
'TEST_CODE',
'VENDOR_CODE',
'AUXILIARY_CODE',
'AUTOGENERATED_CODE',
]
# Mapping from Mobb tag to GitHub dismissed_reason
MOBB_TAG_TO_GITHUB_REASON = {
'FALSE_POSITIVE': 'false positive',
'SUPPRESSED': "won't fix",
'TEST_CODE': 'used in tests',
'VENDOR_CODE': "won't fix",
'AUXILIARY_CODE': "won't fix",
'AUTOGENERATED_CODE': "won't fix",
}
# Hardcoded dismiss comments for non-FP tag types
MOBB_TAG_COMMENTS = {
'VENDOR_CODE': (
"Mobb has marked this as vendor code. This means the issue is located in external "
"libraries or dependencies that are not owned or maintained by your team."
),
'TEST_CODE': (
"Mobb has marked this as test code. This issue resides in a test-specific path or "
"context used to support validation scenarios. Since this code is isolated from "
"production environments, it does not impact the security posture of the live application."
),
'AUXILIARY_CODE': (
"Mobb has marked this as auxiliary code. This refers to files included in the codebase "
"that do not impact the application's runtime behavior or security."
),
'AUTOGENERATED_CODE': (
"Mobb has marked this as autogenerated code. This means it was created by a tool or "
"framework during a build process rather than manually written."
),
'SUPPRESSED': (
"Mobb has marked this as suppressed. This status indicates the finding was already "
"silenced or dismissed within the original scan report."
),
}
# Setup logging
os.makedirs(OUTPUT_DIR, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(OUTPUT_DIR, 'automatic_triage.log')),
logging.StreamHandler()
]
)
class RepoInfo:
"""Parses a GitHub or GitHub Enterprise URL into owner/repo/api_base."""
def __init__(self, url):
self.original_url = url
parsed = urlparse(url)
self.domain = parsed.netloc
path_parts = parsed.path.strip('/').split('/')
if len(path_parts) >= 2:
self.owner = path_parts[0]
self.repo = path_parts[1]
self.owner_repo = f"{self.owner}/{self.repo}"
else:
raise ValueError(f"Invalid repository URL format: {url}")
if self.domain == 'github.com':
self.api_base = 'https://api.github.com'
else:
self.api_base = f"https://{self.domain}/api/v3"
def __str__(self):
return self.owner_repo
class TriageResults:
"""Tracks per-issue triage results and errors across all fix reports."""
def __init__(self):
self.results = []
self.errors = []
def add_result(self, fix_report_id, issue_id, vendor_instance_id, resolved_tag,
status, dismissed_reason=None, dismissed_comment=None, note=None):
self.results.append({
'fix_report_id': fix_report_id,
'issue_id': issue_id,
'vendor_instance_id': vendor_instance_id,
'resolved_tag': resolved_tag,
'status': status, # 'dismissed', 'skipped', 'failed', 'dry_run'
'dismissed_reason': dismissed_reason,
'dismissed_comment': dismissed_comment,
'note': note,
'timestamp': datetime.now().isoformat()
})
def add_error(self, fix_report_id, stage, message):
error = {
'fix_report_id': fix_report_id,
'stage': stage,
'message': message,
'timestamp': datetime.now().isoformat()
}
self.errors.append(error)
logging.error(f"Error in fix report '{fix_report_id}' at {stage}: {message}")
triage_results = TriageResults()
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def load_config():
"""Load GITHUB_PAT and MOBB_API_TOKEN from env vars or config.json fallback."""
config = {}
github_pat = os.getenv('GITHUB_PAT')
mobb_token = os.getenv('MOBB_API_TOKEN')
if github_pat:
config['github_pat'] = github_pat
if mobb_token:
config['mobb_api_token'] = mobb_token
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
file_config = json.load(f)
if 'github_pat' not in config:
config['github_pat'] = file_config.get('GITHUB_PAT')
if 'mobb_api_token' not in config:
config['mobb_api_token'] = file_config.get('MOBB_API_TOKEN')
if not config.get('github_pat'):
logging.error("GITHUB_PAT not found. Set env var GITHUB_PAT or add GITHUB_PAT to config.json.")
return None
if not config.get('mobb_api_token'):
logging.error("MOBB_API_TOKEN not found. Set env var MOBB_API_TOKEN or add MOBB_API_TOKEN to config.json.")
return None
return config
# ---------------------------------------------------------------------------
# Input loading
# ---------------------------------------------------------------------------
def load_repo_urls():
"""Read repository URLs from repos.csv (one per line, no header)."""
if not os.path.exists(REPOS_FILE):
logging.error(f"Repository file not found: {REPOS_FILE}")
return None
urls = []
try:
with open(REPOS_FILE, 'r', newline='') as f:
reader = csv.reader(f)
for row in reader:
if not row:
continue
url = row[0].strip()
if url:
urls.append(url)
except Exception as e:
logging.error(f"Failed to read {REPOS_FILE}: {e}")
return None
if not urls:
logging.error(f"No repository URLs found in {REPOS_FILE}.")
return None
return urls
# ---------------------------------------------------------------------------
# Mobb API helpers
# ---------------------------------------------------------------------------
def get_mobb_headers(mobb_api_token):
return {
'Accept': 'application/json',
'x-mobb-key': mobb_api_token,
}
def normalize_url(url):
"""Normalize a URL for comparison: strip whitespace, trailing slashes, lowercase."""
return url.strip().rstrip('/').lower()
def fetch_fix_report_id_for_repo(repo_url, mobb_headers):
"""
Find the most recent active fix report for repo_url using a two-step approach:
Step A: GET /api/rest/active-reports — returns all non-expired fix reports,
already sorted latest → earliest. Extracts the ordered list of IDs.
Step B: For each ID (latest first), GET /api/rest/fix-reports/{id} to read the
repo.originalUrl field. Returns the first ID whose normalised URL matches
repo_url. Stops as soon as a match is found to minimise API calls.
Returns the fix report ID string, or None if not found.
"""
target = normalize_url(repo_url)
# Step A: fetch all active report IDs ordered latest → earliest
active_url = f"{MOBB_API_BASE}/api/rest/active-reports"
try:
response = requests.get(active_url, headers=mobb_headers, timeout=30)
response.raise_for_status()
data = response.json()
except requests.exceptions.RequestException as e:
logging.error(f"[{repo_url}] Failed to fetch active reports: {e}")
return None
try:
active_records = data.get('fixReport', [])
except (AttributeError, TypeError) as e:
logging.error(f"[{repo_url}] Unexpected active-reports response structure: {e}")
return None
if not active_records:
logging.warning(f"[{repo_url}] No active fix reports exist in this Mobb account.")
return None
logging.info(f"[{repo_url}] {len(active_records)} active fix report(s) to search.")
# Step B: lazily fetch details per ID and return the first URL match
details_base = f"{MOBB_API_BASE}/api/rest/fix-reports"
for i, record in enumerate(active_records, 1):
fix_report_id = record.get('id')
if not fix_report_id:
continue
try:
resp = requests.get(f"{details_base}/{fix_report_id}", headers=mobb_headers, timeout=30)
resp.raise_for_status()
detail = resp.json()
except requests.exceptions.RequestException as e:
logging.warning(f"[{repo_url}] Could not fetch details for {fix_report_id}: {e} — skipping.")
continue
try:
fix_reports = detail.get('fixReport', [])
if not fix_reports:
continue
repo_field = fix_reports[0].get('repo') or {}
original_url = repo_field.get('originalUrl', '')
except (KeyError, IndexError, TypeError):
continue
logging.info(
f"[{repo_url}] Checking ({i}/{len(active_records)}) "
f"{fix_report_id} -> '{original_url}'"
)
if normalize_url(original_url) == target:
logging.info(
f"[{repo_url}] Match found: {fix_report_id} "
f"(created {record.get('createdOn')})"
)
return fix_report_id
logging.warning(f"No active fix report found for repo URL: {repo_url}")
return None
def resolve_tag_priority(tags):
"""
Given a list of tag value strings, return the single highest-priority tag.
Priority: FALSE_POSITIVE > SUPPRESSED > TEST_CODE > VENDOR_CODE > AUXILIARY_CODE > AUTOGENERATED_CODE
Returns None if no known tags are present.
"""
for tag in TAG_PRIORITY:
if tag in tags:
return tag
return None
def fetch_all_irrelevant_issues(fix_report_id, mobb_headers):
"""
Step 2: Fetch all issues for the fix report using cursor-based pagination.
Returns a list of classified dicts: {id, vendorInstanceId, fpId, resolved_tag}.
Skips issues with no tags or unrecognised tags.
"""
all_issues = []
page_num = 0
cursor_issue_id = None
while True:
page_num += 1
params = {'fixReportId': fix_report_id}
if cursor_issue_id:
params['issueId'] = cursor_issue_id
url = f"{MOBB_API_BASE}/api/rest/v5/issues"
try:
response = requests.get(url, headers=mobb_headers, params=params, timeout=30)
response.raise_for_status()
data = response.json()
except requests.exceptions.RequestException as e:
logging.error(f"[{fix_report_id}] Failed to fetch issues (page {page_num}): {e}")
return None
try:
api_data = data.get('getIssuesApiV5', {})
issues = api_data.get('vulnerability_report_issue', [])
has_next_page = api_data.get('hasNextPage', False)
except (AttributeError, TypeError) as e:
logging.error(f"[{fix_report_id}] Unexpected issues response structure (page {page_num}): {e}")
return None
logging.info(f"[{fix_report_id}] Page {page_num}: fetched {len(issues)} issues (hasNextPage={has_next_page})")
for issue in issues:
tags_raw = issue.get('vulnerabilityReportIssueTags', [])
if not tags_raw:
continue # No tags = not an irrelevant issue
tag_values = [t.get('vulnerability_report_issue_tag_value') for t in tags_raw if t.get('vulnerability_report_issue_tag_value')]
resolved_tag = resolve_tag_priority(tag_values)
if not resolved_tag:
logging.warning(f"[{fix_report_id}] Issue {issue.get('id')} has unrecognised tags {tag_values}, skipping.")
continue
all_issues.append({
'id': issue.get('id'),
'vendorInstanceId': issue.get('vendorInstanceId'),
'fpId': issue.get('fpId'),
'resolved_tag': resolved_tag,
})
if not has_next_page or not issues:
break
cursor_issue_id = issues[-1].get('id')
if not cursor_issue_id:
logging.error(f"[{fix_report_id}] Could not determine cursor for next page, stopping pagination.")
break
logging.info(f"[{fix_report_id}] Total irrelevant issues found: {len(all_issues)}")
return all_issues
def fetch_fp_summary(fp_id, fix_report_id, mobb_headers):
"""
Step 3 (FALSE_POSITIVE only): Fetch the short FP description from Mobb.
Returns the fpDescriptionShort string or None on failure.
"""
url = f"{MOBB_API_BASE}/api/rest/fp-summary"
try:
response = requests.get(url, headers=mobb_headers, params={'fpId': fp_id}, timeout=30)
response.raise_for_status()
data = response.json()
except requests.exceptions.RequestException as e:
logging.error(f"[{fix_report_id}] Failed to fetch FP summary for fpId={fp_id}: {e}")
return None
try:
description = data['getFpSummary']['fpDescriptionShort']
except (KeyError, TypeError) as e:
logging.error(f"[{fix_report_id}] Could not parse fpDescriptionShort for fpId={fp_id}: {e}")
return None
return description
# ---------------------------------------------------------------------------
# GitHub API helpers
# ---------------------------------------------------------------------------
def get_github_headers(github_pat):
return {
'Accept': 'application/json',
'Authorization': f"Bearer {github_pat}",
'Content-Type': 'application/json',
}
def dismiss_github_alert(repo_info, vendor_instance_id, dismissed_reason, dismissed_comment,
github_headers, dry_run=False):
"""
Step 4: Dismiss a single GitHub code scanning alert.
Returns ('dismissed', None), ('skipped', reason_str), or ('failed', error_str).
"""
alert_url = f"{repo_info.api_base}/repos/{repo_info.owner_repo}/code-scanning/alerts/{vendor_instance_id}"
if dry_run:
logging.info(
f"[DRY RUN] Would dismiss alert #{vendor_instance_id} in {repo_info.owner_repo} "
f"| reason='{dismissed_reason}' | comment='{dismissed_comment[:80]}...'"
if len(dismissed_comment) > 80
else f"[DRY RUN] Would dismiss alert #{vendor_instance_id} in {repo_info.owner_repo} "
f"| reason='{dismissed_reason}' | comment='{dismissed_comment}'"
)
return 'dry_run', None
payload = {
'state': 'dismissed',
'dismissed_reason': dismissed_reason,
'dismissed_comment': dismissed_comment,
'create_request': True,
}
try:
response = requests.patch(alert_url, headers=github_headers, json=payload, timeout=30)
except requests.exceptions.RequestException as e:
return 'failed', f"Request error: {e}"
if response.status_code == 200:
logging.info(f"Dismissed alert #{vendor_instance_id} in {repo_info.owner_repo} (reason: {dismissed_reason})")
return 'dismissed', None
if response.status_code == 400:
try:
msg = response.json().get('message', '')
except Exception:
msg = response.text
if 'already dismissed' in msg.lower():
logging.warning(f"Alert #{vendor_instance_id} in {repo_info.owner_repo} is already dismissed — skipping.")
return 'skipped', 'Alert is already dismissed'
return 'failed', f"HTTP 400: {msg}"
return 'failed', f"HTTP {response.status_code}: {response.text[:200]}"
# ---------------------------------------------------------------------------
# Core per-report processing
# ---------------------------------------------------------------------------
def process_fix_report(fix_report_id, repo_info, config, dry_run=False):
"""Orchestrate triage steps for a single fix report ID against a known repo."""
logging.info(f"{'[DRY RUN] ' if dry_run else ''}Processing fix report: {fix_report_id}")
mobb_headers = get_mobb_headers(config['mobb_api_token'])
github_headers = get_github_headers(config['github_pat'])
# Step 1: Fetch all irrelevant issues
issues = fetch_all_irrelevant_issues(fix_report_id, mobb_headers)
if issues is None:
triage_results.add_error(fix_report_id, 'fetch_issues', 'Failed to fetch issues from Mobb API')
return
if not issues:
logging.info(f"[{fix_report_id}] No irrelevant issues found — nothing to triage.")
return
logging.info(f"[{fix_report_id}] Processing {len(issues)} irrelevant issue(s) for {repo_info.owner_repo}...")
# Steps 3 & 4: Resolve comment and dismiss each issue
for issue in issues:
issue_id = issue['id']
vendor_instance_id = issue['vendorInstanceId']
fp_id = issue['fpId']
resolved_tag = issue['resolved_tag']
# Guard: skip if no vendorInstanceId (can't reference the GitHub alert)
if vendor_instance_id is None:
logging.warning(
f"[{fix_report_id}] Issue {issue_id} has no vendorInstanceId (tag={resolved_tag}) — skipping."
)
triage_results.add_result(
fix_report_id, issue_id, vendor_instance_id, resolved_tag,
status='skipped', note='vendorInstanceId is null'
)
continue
# Step 3: Resolve dismiss comment
dismissed_reason = MOBB_TAG_TO_GITHUB_REASON[resolved_tag]
if resolved_tag == 'FALSE_POSITIVE':
if not fp_id:
logging.error(
f"[{fix_report_id}] Issue {issue_id} is FALSE_POSITIVE but fpId is null — skipping."
)
triage_results.add_result(
fix_report_id, issue_id, vendor_instance_id, resolved_tag,
status='skipped', note='FALSE_POSITIVE with null fpId'
)
continue
dismissed_comment = fetch_fp_summary(fp_id, fix_report_id, mobb_headers)
if not dismissed_comment:
logging.error(
f"[{fix_report_id}] Could not retrieve FP summary for issue {issue_id} "
f"(fpId={fp_id}) — skipping this issue."
)
triage_results.add_result(
fix_report_id, issue_id, vendor_instance_id, resolved_tag,
status='skipped', note='FP summary fetch failed'
)
continue
else:
dismissed_comment = MOBB_TAG_COMMENTS[resolved_tag]
# Step 4: Dismiss the GitHub alert
status, note = dismiss_github_alert(
repo_info, vendor_instance_id, dismissed_reason, dismissed_comment,
github_headers, dry_run=dry_run
)
triage_results.add_result(
fix_report_id, issue_id, vendor_instance_id, resolved_tag,
status=status,
dismissed_reason=dismissed_reason,
dismissed_comment=dismissed_comment,
note=note,
)
# ---------------------------------------------------------------------------
# Final report
# ---------------------------------------------------------------------------
def generate_final_report(dry_run=False):
"""Write a JSON triage report and print a summary to stdout."""
report_path = os.path.join(
OUTPUT_DIR, f"triage_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
total = len(triage_results.results)
dismissed = len([r for r in triage_results.results if r['status'] == 'dismissed'])
dry_run_count = len([r for r in triage_results.results if r['status'] == 'dry_run'])
skipped = len([r for r in triage_results.results if r['status'] == 'skipped'])
failed = len([r for r in triage_results.results if r['status'] == 'failed'])
actioned = dismissed if not dry_run else dry_run_count
success_rate = f"{(actioned / total * 100):.1f}%" if total > 0 else "0%"
report_data = {
'summary': {
'dry_run': dry_run,
'total_issues': total,
'dismissed': dismissed,
'dry_run_planned': dry_run_count,
'skipped': skipped,
'failed': failed,
'success_rate': success_rate,
},
'results': triage_results.results,
'errors': triage_results.errors,
'generated_at': datetime.now().isoformat(),
}
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report_data, f, indent=2)
mode_label = "DRY RUN COMPLETE" if dry_run else "TRIAGE COMPLETE"
print("\n" + "=" * 60)
print(f"AUTOMATIC TRIAGE — {mode_label}")
print("=" * 60)
print(f"Total issues processed : {total}")
if dry_run:
print(f"Would be dismissed : {dry_run_count}")
else:
print(f"Dismissed : {dismissed}")
print(f"Skipped : {skipped}")
print(f"Failed : {failed}")
print(f"Success rate : {success_rate}")
print(f"\nDetailed report saved to: {report_path}")
print("=" * 60)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description='Automatic triage: dismiss irrelevant GHAS alerts using Mobb fix report data.'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Preview what would be dismissed without making any changes to GitHub.'
)
args = parser.parse_args()
dry_run = args.dry_run
print("Mobb Automatic Triage Pipeline")
print("==============================")
if dry_run:
print("*** DRY RUN MODE — no GitHub alerts will be modified ***\n")
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Step 1: Load configuration
print("1. Loading configuration...")
config = load_config()
if not config:
sys.exit(1)
# Step 2: Load repository URLs
print("2. Loading repository URLs...")
repo_urls = load_repo_urls()
if not repo_urls:
sys.exit(1)
print(f" Found {len(repo_urls)} repository URL(s) to process.")
for url in repo_urls:
print(f" - {url}")
mobb_headers = get_mobb_headers(config['mobb_api_token'])
# Step 3: Process each repository
print("\n3. Processing repositories...")
for i, repo_url in enumerate(repo_urls, 1):
print(f"\n[{i}/{len(repo_urls)}] Repository: {repo_url}")
# Build RepoInfo from the URL
try:
repo_info = RepoInfo(repo_url)
except ValueError as e:
logging.error(f"Invalid repository URL '{repo_url}': {e} — skipping.")
continue
# Discover the most recent active fix report for this repo
print(f" Searching for active fix report...")
fix_report_id = fetch_fix_report_id_for_repo(repo_url, mobb_headers)
if not fix_report_id:
logging.warning(f"No active fix report found for {repo_url} — skipping.")
continue
print(f" Found fix report: {fix_report_id}")
try:
process_fix_report(fix_report_id, repo_info, config, dry_run=dry_run)
except Exception as e:
triage_results.add_error(fix_report_id, 'general', f"Unexpected error: {e}")
logging.error(f"Unexpected error processing fix report {fix_report_id}: {e}")
# Step 4: Generate final report
print("\n4. Generating final report...")
generate_final_report(dry_run=dry_run)
if __name__ == '__main__':
main()