# treeloom

> Language-agnostic Code Property Graph library for Python. Parses source code via tree-sitter, builds a unified graph (AST + control flow + data flow + call graph), and provides taint analysis, pattern matching, and visualization. Supports Python, JavaScript, TypeScript, Go, Java, C, C++, and Rust. 1123 tests. Latest release: 0.3.0 (main branch includes additional unreleased features).

All public types are importable from the top-level `treeloom` package. Install with `pip install treeloom[languages]` for all grammar packages, or `pip install treeloom` for core only (no parsing without grammars).

## Core Workflow

```python
from pathlib import Path
from treeloom import (
    CPGBuilder, BuildProgressCallback, BuildTimeoutError,
    TaintPolicy, TaintLabel, NodeKind, to_json, from_json, generate_html,
)

# 1. Build
cpg = CPGBuilder().add_directory(Path("src/")).build()

# 2. Annotate (consumer metadata, separate from structural attrs)
for node in cpg.nodes(kind=NodeKind.CALL):
    if node.name in ("exec", "eval"):
        cpg.annotate_node(node.id, "role", "sink")

# 3. Analyze
policy = TaintPolicy(
    sources=lambda n: TaintLabel("user_input", n.id) if cpg.get_annotation(n.id, "role") == "entry_point" else None,
    sinks=lambda n: cpg.get_annotation(n.id, "role") == "sink",
    sanitizers=lambda n: cpg.get_annotation(n.id, "role") == "sanitizer",
)
result = cpg.taint(policy)
for path in result.unsanitized_paths():
    print(f"{path.source.name} -> {path.sink.name}")

# 4. Export
json_str = to_json(cpg)                       # round-trips with from_json()
html = generate_html(cpg, title="My CPG")      # self-contained HTML with Cytoscape.js
```

## CPGBuilder

```python
class CPGBuilder:
    def __init__(
        self,
        registry: LanguageRegistry | None = None,
        progress: BuildProgressCallback | None = None,
        timeout: float | None = None,
    ) -> None
    def add_file(self, path: Path) -> CPGBuilder
    def add_directory(self, path: Path, exclude: list[str] | None = None) -> CPGBuilder
    def add_source(self, source: bytes, filename: str, language: str | None = None) -> CPGBuilder
    def build(self) -> CodePropertyGraph
    def rebuild(self, changed: list[Path] | None = None) -> CodePropertyGraph
```

`progress` is a `BuildProgressCallback` (`Callable[[str, str], None]`) called at phase boundaries (e.g., parse, visit, CFG, call resolution). `timeout` limits wall-clock build time in seconds; raises `BuildTimeoutError` if exceeded. Both `BuildProgressCallback` and `BuildTimeoutError` are exported from the top-level `treeloom` package.

`rebuild(changed=...)` re-parses only the listed files, removing their old nodes/edges and re-visiting them. Unchanged nodes, edges, and annotations are preserved. When `changed` is None, uses SHA-256 content hashing to auto-detect which files changed since the last build. The watch command uses incremental rebuild.

Default directory exclusions: `__pycache__`, `node_modules`, `.git`, `venv`, `.venv`. The `exclude` parameter accepts gitignore-style patterns.

Language is auto-detected from file extension. The `LanguageRegistry.default()` registers all visitors whose grammar packages are installed.

## CodePropertyGraph

```python
class CodePropertyGraph:
    # Node access
    def node(self, node_id: NodeId) -> CpgNode | None
    def nodes(self, kind: NodeKind | None = None, file: Path | None = None) -> Iterator[CpgNode]
    def edges(self, kind: EdgeKind | None = None) -> Iterator[CpgEdge]

    # Traversal
    def successors(self, node_id: NodeId, edge_kind: EdgeKind | None = None) -> list[CpgNode]
    def predecessors(self, node_id: NodeId, edge_kind: EdgeKind | None = None) -> list[CpgNode]

    # Scope navigation
    def scope_of(self, node_id: NodeId) -> CpgNode | None
    def children_of(self, node_id: NodeId) -> list[CpgNode]

    # Annotations (stored separately from CpgNode.attrs)
    def annotate_node(self, node_id: NodeId, key: str, value: Any) -> None
    def annotate_edge(self, source: NodeId, target: NodeId, key: str, value: Any) -> None
    def get_annotation(self, node_id: NodeId, key: str) -> Any | None
    def get_edge_annotation(self, source: NodeId, target: NodeId, key: str) -> Any | None
    def annotations_for(self, node_id: NodeId) -> dict[str, Any]

    # Mutation (for incremental rebuild)
    def remove_node(self, node_id: NodeId) -> None        # cascading: also removes all edges touching this node
    def remove_edge(self, source: NodeId, target: NodeId, kind: EdgeKind | None = None) -> None
    def nodes_for_file(self, file: Path) -> list[NodeId]   # all node IDs originating from the given source file

    # Analysis
    def query(self) -> GraphQuery
    def taint(self, policy: TaintPolicy) -> TaintResult

    # Serialization (round-trip: from_dict(to_dict()) == equivalent graph)
    def to_dict(self) -> dict[str, Any]
    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> CodePropertyGraph

    # Properties
    @property
    def node_count(self) -> int
    @property
    def edge_count(self) -> int
    @property
    def files(self) -> list[Path]
```

## Data Model

```python
@dataclass(frozen=True, slots=True)
class SourceLocation:
    file: Path
    line: int       # 1-based
    column: int = 0 # 0-based

@dataclass(frozen=True, slots=True)
class NodeId:
    _value: str     # opaque -- never parse or construct directly

class NodeKind(str, Enum):
    MODULE = "module"
    CLASS = "class"
    FUNCTION = "function"
    PARAMETER = "parameter"
    VARIABLE = "variable"
    CALL = "call"
    LITERAL = "literal"
    RETURN = "return"
    IMPORT = "import"
    BRANCH = "branch"
    LOOP = "loop"
    BLOCK = "block"

@dataclass
class CpgNode:
    id: NodeId
    kind: NodeKind
    name: str
    location: SourceLocation | None
    scope: NodeId | None = None
    attrs: dict[str, Any]  # structural metadata from visitors, NOT consumer data

class EdgeKind(str, Enum):
    # AST
    CONTAINS = "contains"
    HAS_PARAMETER = "has_parameter"
    HAS_RETURN_TYPE = "has_return_type"
    # Control flow
    FLOWS_TO = "flows_to"
    BRANCHES_TO = "branches_to"
    # Data flow
    DATA_FLOWS_TO = "data_flows_to"
    DEFINED_BY = "defined_by"
    USED_BY = "used_by"
    # Call graph
    CALLS = "calls"
    RESOLVES_TO = "resolves_to"
    # Module
    IMPORTS = "imports"

@dataclass(frozen=True, slots=True)
class CpgEdge:
    source: NodeId
    target: NodeId
    kind: EdgeKind
    attrs: dict[str, Any]
```

Common CpgNode.attrs keys by kind:

| NodeKind | Common attrs |
|----------|-------------|
| FUNCTION | is_async, is_method, is_static, decorators (list of strings) |
| PARAMETER | type_annotation, position, default_value |
| VARIABLE | type_annotation, is_global, is_field, inferred_type (from constructor assignments, e.g. `"Dog"`) |
| CALL | args_count, is_method_call, receiver, receiver_inferred_type (from type inference, e.g. `"Dog"`) |
| CLASS | bases (list of base class name strings, e.g. `["Animal"]`) |
| LITERAL | literal_type (str/int/float/bool/none), raw_value |
| IMPORT | module, names, is_from, alias |
| BRANCH | branch_type (if/elif/switch/match), has_else |
| LOOP | loop_type (for/while/do_while), iterator_var |

## Taint Analysis

```python
@dataclass(frozen=True)
class TaintLabel:
    name: str           # e.g. "user_input", "env_var"
    origin: NodeId      # the node that introduced this taint
    field_path: str | None = None  # e.g. "password" for field-sensitive taint on obj.password
    attrs: dict         # consumer metadata (excluded from hash/eq, default_factory=dict)

@dataclass
class TaintPolicy:
    sources: Callable[[CpgNode], TaintLabel | None]
    sinks: Callable[[CpgNode], bool]
    sanitizers: Callable[[CpgNode], bool]
    propagators: list[TaintPropagator] = []
    implicit_param_sources: bool = False  # auto-seed all PARAMETER nodes as taint sources with label "param:{name}"

@dataclass
class TaintPropagator:
    match: Callable[[CpgNode], bool]
    param_to_return: bool = True
    param_to_param: dict[int, int] | None = None
    params_to_return: list[int] | None = None  # specific param positions; takes precedence over param_to_return

@dataclass
class TaintPath:
    source: CpgNode
    sink: CpgNode
    intermediates: list[CpgNode]    # full path including source and sink
    labels: frozenset[TaintLabel]
    is_sanitized: bool
    sanitizers: list[CpgNode]

@dataclass
class TaintResult:
    paths: list[TaintPath]
    def paths_to_sink(self, sink_id: NodeId) -> list[TaintPath]
    def paths_from_source(self, source_id: NodeId) -> list[TaintPath]
    def unsanitized_paths(self) -> list[TaintPath]
    def sanitized_paths(self) -> list[TaintPath]
    def labels_at(self, node_id: NodeId) -> frozenset[TaintLabel]
    def edge_labels(self, source: NodeId, target: NodeId) -> frozenset[TaintLabel]
    def apply_to(self, cpg: CodePropertyGraph) -> None
```

The taint engine uses worklist-based forward analysis over DATA_FLOWS_TO edges. Sanitizers mark paths as sanitized but do not stop propagation. Inter-procedural flow uses function summaries computed from param-to-return DFG paths. When `implicit_param_sources=True`, all PARAMETER nodes are automatically seeded as taint sources with labels of the form `param:{name}`.

**Field-sensitive propagation**: When `TaintLabel.field_path` is set, the engine tracks taint at the field level. When taint flows through a DATA_FLOWS_TO edge that has a `field_name` attr (from attribute access), object-level taint narrows to field-level. Mismatching fields are filtered out. Falls back to object-level propagation when field information is ambiguous.

**Stamping taint onto the graph**: After running taint, call `result.apply_to(cpg)` to annotate every tainted node/edge in the CPG. This makes the graph self-describing — any node inspection, subgraph extraction, or serialization carries taint status:

```python
result = cpg.taint(policy)
result.apply_to(cpg)  # stamps tainted, taint_labels, taint_role, taint_sanitized
# Now: cpg.get_annotation(node.id, "tainted") -> True/False
# Subgraphs carry annotations: cpg.query().subgraph(func.id, max_depth=5)
```

## Function Summaries

```python
@dataclass
class FunctionSummary:
    function_id: NodeId
    function_name: str
    params_to_return: list[int]                  # 0-based param positions that flow to return
    params_to_sinks: dict[int, list[NodeId]]     # params that flow to internal sinks
    introduces_taint: bool                        # function reads from an external source

def compute_summaries(cpg: CodePropertyGraph) -> dict[NodeId, FunctionSummary]
```

Summaries are computed once per `build()` and cached. They enable inter-procedural taint without full function inlining. Both `FunctionSummary` and `compute_summaries` are exported from the top-level `treeloom` package.

## Stdlib Propagation Models

```python
from treeloom import load_models, list_builtin_models, load_model_file

# Load all Python stdlib propagators
propagators = load_models(["python-stdlib"])

# Use in a taint policy
policy = TaintPolicy(
    sources=my_sources,
    sinks=my_sinks,
    sanitizers=my_sanitizers,
    propagators=propagators,
)

# See available model names
names = list_builtin_models()  # e.g. ["python-stdlib"]

# Load a specific YAML model file
propagators = load_model_file(Path("custom-models.yaml"))
```

Models are YAML files in `src/treeloom/models/builtin/`. The `python-stdlib` model covers: json, pickle, os.path, subprocess, urllib.parse, base64, shlex, builtins, and string/dict methods. Each model entry maps a function name to `TaintPropagator` fields (param_to_return, param_to_param). All three functions are exported from the top-level `treeloom` package.

## Reachability

```python
from treeloom import forward_reachable, backward_reachable

# BFS forward from a node, optionally filtered to specific edge kinds
reachable: set[CpgNode] = forward_reachable(cpg, node_id, edge_kinds=frozenset({EdgeKind.DATA_FLOWS_TO}))

# BFS backward (predecessors)
ancestors: set[CpgNode] = backward_reachable(cpg, node_id)
```

These are convenience wrappers around `cpg.query().reachable_from()` and `cpg.query().reaching()`.

## Query API

```python
class GraphQuery:
    def paths_between(self, source: NodeId, target: NodeId, cutoff: int = 10) -> list[list[CpgNode]]
    def reachable_from(self, node_id: NodeId, edge_kinds: frozenset[EdgeKind] | None = None) -> set[CpgNode]
    def reaching(self, node_id: NodeId, edge_kinds: frozenset[EdgeKind] | None = None) -> set[CpgNode]
    def paths_to_sink(self, sink_id: NodeId, edge_kinds: frozenset[EdgeKind] | None = None, cutoff: int = 20) -> list[list[CpgNode]]
    def node_at(self, file: Path, line: int) -> CpgNode | None
    def nodes_in_file(self, file: Path) -> list[CpgNode]
    def nodes_in_scope(self, scope_id: NodeId) -> list[CpgNode]
    def subgraph(self, root: NodeId, edge_kinds: frozenset[EdgeKind] | None = None, max_depth: int = 10) -> CodePropertyGraph
    def match_chain(self, pattern: ChainPattern) -> list[list[CpgNode]]
```

## Pattern Matching

```python
@dataclass
class StepMatcher:
    kind: NodeKind | None = None
    name_pattern: str | None = None       # regex against node.name
    annotation_key: str | None = None
    annotation_value: Any = None
    wildcard: bool = False                # matches 0+ intermediate nodes

@dataclass
class ChainPattern:
    steps: list[StepMatcher]
    edge_kind: EdgeKind | None = None     # restrict traversal to this edge type
```

Example -- find parameter-to-exec paths via data flow:

```python
from treeloom import ChainPattern, StepMatcher, NodeKind, EdgeKind

pattern = ChainPattern(
    steps=[
        StepMatcher(kind=NodeKind.PARAMETER),
        StepMatcher(wildcard=True),
        StepMatcher(kind=NodeKind.CALL, name_pattern=r"exec|eval|os\.system"),
    ],
    edge_kind=EdgeKind.DATA_FLOWS_TO,
)
matches = cpg.query().match_chain(pattern)
```

## Export

```python
# JSON (full round-trip including annotations)
from treeloom import to_json, from_json
json_str = to_json(cpg, indent=2)
restored = from_json(json_str)

# Graphviz DOT (optional edge/node kind filtering)
from treeloom import to_dot
dot = to_dot(cpg, edge_kinds=frozenset({EdgeKind.DATA_FLOWS_TO}))

# Interactive HTML (Cytoscape.js + Dagre layout)
from treeloom import generate_html, Overlay, OverlayStyle, VisualizationLayer
html = generate_html(cpg, overlays=[overlay], title="Analysis")

# Exclude noisy node kinds (imports typically dominate the graph)
html = generate_html(cpg, exclude_kinds=frozenset({NodeKind.IMPORT, NodeKind.LITERAL}))
# Default layers: Structure, Data Flow, Control Flow, Call Graph (all ON), Imports (OFF)
```

`generate_html` signature:

```python
def generate_html(
    cpg: CodePropertyGraph,
    layers: list[VisualizationLayer] | None = None,
    overlays: list[Overlay] | None = None,
    title: str = "Code Property Graph",
    exclude_kinds: frozenset[NodeKind] | None = None,
) -> str
```

The `exclude_kinds` parameter removes those node kinds entirely from the output, along with any edges whose source or target is an excluded node.

## Overlay System

```python
@dataclass
class OverlayStyle:
    color: str | None = None        # CSS color
    shape: str | None = None        # Cytoscape node shape
    size: int | None = None
    line_style: str | None = None   # "solid", "dashed", "dotted"
    width: float | None = None      # edge width
    label: str | None = None        # tooltip
    opacity: float | None = None    # 0.0-1.0

@dataclass
class Overlay:
    name: str
    description: str = ""
    default_visible: bool = True
    node_styles: dict[NodeId, OverlayStyle]
    edge_styles: dict[tuple[NodeId, NodeId], OverlayStyle]
```

Example -- color unsanitized sinks red:

```python
overlay = Overlay(name="Security")
for path in result.unsanitized_paths():
    overlay.node_styles[path.sink.id] = OverlayStyle(color="#E53935", label="SINK")
    overlay.node_styles[path.source.id] = OverlayStyle(color="#FF9800", label="SOURCE")
html = generate_html(cpg, overlays=[overlay],
                     exclude_kinds=frozenset({NodeKind.IMPORT, NodeKind.LITERAL}))
```

## CLI

All CLI operations work on CPG JSON files produced by `treeloom build`. Output formats where applicable: `table` (default), `json`, `csv`, `tsv`, `jsonl`.

`treeloom build` -- parse source and write a CPG JSON file:

```
treeloom build src/ -o cpg.json
treeloom build src/ -o cpg.json --progress --language python --language java
treeloom build src/ -o cpg.json --exclude "**/*.generated.py"
treeloom build src/ -o cpg.json --progress --timeout 300
```

`treeloom info` -- display node/edge statistics:

```
treeloom info cpg.json
treeloom info cpg.json --json
```

`treeloom query` -- filter nodes:

```
treeloom query cpg.json --kind function --name "handle_.*"
treeloom query cpg.json --kind call --file routes.py --output-format json
treeloom query cpg.json --scope my_function --count
treeloom query cpg.json --annotation role --annotation-value sink
```

`treeloom edges` -- filter edges:

```
treeloom edges cpg.json --kind data_flows_to
treeloom edges cpg.json --source "user_input" --target "execute"
treeloom edges cpg.json --kind calls --output-format csv --limit 100
```

`treeloom taint` -- YAML-driven taint analysis:

```
treeloom taint cpg.json --policy policy.yaml
treeloom taint cpg.json --policy base.yaml --policy overrides.yaml --show-sanitized
treeloom taint cpg.json --policy policy.yaml --apply -o annotated.json
treeloom taint cpg.json --policy policy.yaml --json
```

`treeloom annotate` -- apply YAML annotation rules:

```
treeloom annotate cpg.json --rules rules.yaml -o annotated.json
```

`treeloom pattern` -- match declarative node chains:

```
treeloom pattern cpg.json --pattern pattern.yaml
treeloom pattern cpg.json --pattern pattern.yaml --json --limit 20
```

`treeloom subgraph` -- extract a focused subgraph:

```
treeloom subgraph cpg.json --function handle_request -o sub.json
treeloom subgraph cpg.json --class UserController --depth 5 -o sub.json
treeloom subgraph cpg.json --file routes.py -o sub.json
treeloom subgraph cpg.json --root "function:src/api.py:42:0:1" -o sub.json
```

`treeloom diff` -- compare two CPGs:

```
treeloom diff old.json new.json
treeloom diff old.json new.json --match-by-basename --strip-prefix /home/ci/build/
```

`treeloom viz` -- interactive HTML visualization:

```
treeloom viz cpg.json
treeloom viz cpg.json -o report.html --title "My App" --open
treeloom viz cpg.json --exclude-kind import --exclude-kind literal
```

`treeloom dot` -- Graphviz DOT export:

```
treeloom dot cpg.json | dot -Tsvg > graph.svg
treeloom dot cpg.json --edge-kind data_flows_to --edge-kind calls -o flow.dot
```

`treeloom serve` -- local HTTP JSON API:

```
treeloom serve cpg.json
treeloom serve cpg.json --host 0.0.0.0 --port 9090
# Endpoints: GET /nodes, /edges, /node?id=..., /subgraph?root=...&depth=...
```

`treeloom watch` -- rebuild on file changes:

```
treeloom watch src/ -o cpg.json --interval 5
```

`treeloom completions` -- shell completion scripts:

```
treeloom completions bash >> ~/.bashrc
treeloom completions zsh >> ~/.zshrc
treeloom completions fish > ~/.config/fish/completions/treeloom.fish
```

`treeloom config` -- show resolved configuration.

## YAML Schemas

**Taint policy** (`--policy` for `treeloom taint`):

```yaml
sources:
  - kind: parameter          # match by node kind
    label: user_input        # label name attached to taint (default: "tainted")
  - kind: call
    name: "request\\..*"     # regex against node name
  - kind: variable
    attr:
      is_global: true        # match against CpgNode.attrs keys

sinks:
  - kind: call
    name: "execute|exec|eval"

sanitizers:
  - kind: call
    name: "escape|sanitize|validate"

propagators:                 # optional: describe how taint flows through a function
  - name: "format"
    param_to_return: true
```

**Annotation rules** (`--rules` for `treeloom annotate`):

```yaml
annotations:
  - match:
      kind: call
      name: "execute|exec"
    set:
      role: sink
      cwe_id: 78
  - match:
      kind: parameter
      attr:
        is_method: true
    set:
      role: entry_point
```

**Pattern file** (`--pattern` for `treeloom pattern`):

```yaml
steps:
  - kind: parameter
  - wildcard: true
  - kind: call
    name: "exec|eval|os\\.system"
edge_kind: data_flows_to     # optional; omit to traverse all edge types
```

## Supported Languages

| Language | Extensions | Grammar Package |
|----------|-----------|----------------|
| Python | .py, .pyi | tree-sitter-python |
| JavaScript | .js, .mjs, .cjs | tree-sitter-javascript |
| TypeScript | .ts, .tsx | tree-sitter-typescript |
| Go | .go | tree-sitter-go |
| Java | .java | tree-sitter-java |
| C | .c, .h | tree-sitter-c |
| C++ | .cpp, .cxx, .cc, .hpp, .hxx, .hh | tree-sitter-cpp |
| Rust | .rs | tree-sitter-rust |

## Gotchas

- **Annotations vs attrs**: `CpgNode.attrs` is structural metadata from language visitors. Consumer metadata (roles, CWE IDs, domains) goes in `cpg.annotate_node()` which stores in a separate dict. Never put consumer data in attrs.
- **Lines are 1-based**: `SourceLocation.line` is 1-based (matching editors).
- **NodeId is opaque**: Never parse or construct NodeIds. Only use IDs returned by the builder or graph methods.
- **Round-trip contract**: `from_dict(to_dict())` and `from_json(to_json())` always produce equivalent graphs including all annotations.
- **MultiDiGraph**: Multiple edge kinds can exist between the same node pair. The backend uses NetworkX MultiDiGraph.
- **Field sensitivity**: `obj.field_a` and `obj.field_b` are tracked as separate variable nodes. Chained attribute receivers like `request.form.attr` resolve recursively through DFG edges (added in 0.2.5).
- **String formatting generates DFG**: `.format()`, `%` operator, and f-strings all produce DATA_FLOWS_TO edges (fixed in 0.2.2 -- this was the primary cause of taint false-negatives).
- **Chained method calls tracked**: `.format().fetchone()` and similar chains propagate taint across the full chain (fixed in 0.2.3).
- **Decorators captured**: Decorator names are stored in `function_node.attrs["decorators"]` as a list of strings (e.g. `["app.route"]`). Useful for identifying Flask/FastAPI route handlers.
- **Grammar packages are optional**: Core library works without them. Missing grammars produce clear ImportError messages, not crashes.
- **No network calls**: treeloom is fully offline. No telemetry, no downloads at runtime.
- **Backend-agnostic**: The public API never leaks NetworkX types. All return types are treeloom's own dataclasses and Python builtins.
- **Stdlib propagation models**: `load_models(["python-stdlib"])` returns `TaintPropagator` instances for common stdlib functions. Pass to `TaintPolicy(propagators=...)`. Use `list_builtin_models()` to discover available model names. Models are YAML in `src/treeloom/models/builtin/`.
- **Incremental rebuild**: `CPGBuilder.rebuild(changed=[path])` re-parses only listed files; pass `changed=None` for automatic SHA-256-based change detection. `remove_node()` cascades to remove all touching edges. Annotations on unchanged nodes are preserved across rebuilds.
- **Type inference is best-effort**: Python visitor tracks simple constructor assignments (`d = Dog()`) and sets `inferred_type` on VARIABLE nodes, `receiver_inferred_type` on CALL nodes. Class `attrs["bases"]` lists base class names for MRO-based method resolution. No cross-module tracking, no complex expression inference, flat type map only.
- **Field-sensitive taint via TaintLabel.field_path**: When set, the engine narrows object-level taint to field-level when flowing through attribute access edges (DATA_FLOWS_TO with `field_name` attr). Mismatching fields are filtered. Falls back to object-level when ambiguous.
- **emit_data_flow accepts keyword attrs**: `emitter.emit_data_flow(src, tgt, field_name="x")` attaches metadata to DATA_FLOWS_TO edges. Used internally for field-sensitive taint tracking.
