-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparser.py
More file actions
95 lines (78 loc) · 2.9 KB
/
parser.py
File metadata and controls
95 lines (78 loc) · 2.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""Parse syzkaller reproducer text files into structured records."""
from __future__ import annotations
import json
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable
BUG_URL_RE = re.compile(r"https://syzkaller\.appspot\.com/bug\?id=([0-9a-f]+)")
CALL_RE = re.compile(r"^\s*(?:(r\d+)\s*=\s*)?([a-zA-Z_][a-zA-Z0-9_]*(?:\$[a-zA-Z0-9_]+)?)\s*\(")
RESOURCE_USE_RE = re.compile(r"\br(\d+)\b")
@dataclass
class Call:
name: str # e.g. "socket"
variant: str | None # e.g. "nl_netfilter" (after $), or None
raw: str # "socket$nl_netfilter"
defines: str | None # e.g. "r0" if this call assigns a resource
uses: tuple[str, ...] # resources referenced in args
@dataclass
class Program:
path: str
bug_id: str | None
options: dict
calls: list[Call] = field(default_factory=list)
@property
def syscall_names(self) -> list[str]:
return [c.name for c in self.calls]
@property
def syscall_variants(self) -> list[str]:
return [c.raw for c in self.calls]
def _split_name(raw: str) -> tuple[str, str | None]:
if "$" in raw:
base, variant = raw.split("$", 1)
return base, variant
return raw, None
def parse_file(path: str | Path) -> Program:
p = Path(path)
text = p.read_text(errors="replace")
bug_id: str | None = None
options: dict = {}
calls: list[Call] = []
for line in text.splitlines():
s = line.strip()
if not s:
continue
if s.startswith("#"):
if bug_id is None:
m = BUG_URL_RE.search(s)
if m:
bug_id = m.group(1)
continue
if s.startswith("#{") and not options:
try:
options = json.loads(s[1:])
except json.JSONDecodeError:
pass
continue
m = CALL_RE.match(line)
if not m:
continue
defines = m.group(1)
raw = m.group(2)
name, variant = _split_name(raw)
# args = everything after the first '('
args_start = line.find("(", m.end() - 1)
args_blob = line[args_start:] if args_start >= 0 else ""
# Strip the resource-definition target from use-detection.
uses = tuple(sorted({f"r{mm.group(1)}" for mm in RESOURCE_USE_RE.finditer(args_blob)}))
calls.append(Call(name=name, variant=variant, raw=raw,
defines=defines, uses=uses))
return Program(path=str(p), bug_id=bug_id, options=options, calls=calls)
def iter_corpus(files_dir: str | Path) -> Iterable[Program]:
d = Path(files_dir)
for entry in sorted(d.iterdir()):
if entry.suffix == ".txt":
try:
yield parse_file(entry)
except Exception as e: # keep indexing robust
print(f"parse error {entry.name}: {e}")