Post

Microsoft's AI Stack Decoded, Part 5: Multi-Agent Workflows

Orchestrating multiple AI agents for complex processes. Graph-based workflows, checkpointing, human approval gates, and production patterns.

Microsoft's AI Stack Decoded, Part 5: Multi-Agent Workflows

When One Agent Isn’t Enough

A single agent can answer questions. But real enterprise processes need:

  • Multiple specialized agents working together
  • Explicit control over execution order
  • Checkpoints to resume long-running processes
  • Human approval at critical decision points
  • Parallel execution for performance
  • Error handling and rollback

This is where workflows come in.

This is Part 5 — the finale of the Microsoft AI Stack series.


Graph-Based Workflows

Agent Framework uses a graph model for workflows. Each node is a function (AI-powered or not), and edges define the execution flow:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from agent_framework import Workflow, Node

workflow = Workflow("ContractReview")

@workflow.node
async def extract_clauses(contract: Document) -> List[Clause]:
    """Extract all clauses from the contract."""
    return await extraction_agent.run(contract)

@workflow.node
async def assess_risk(clauses: List[Clause]) -> RiskReport:
    """Analyze clauses for risk factors."""
    return await risk_agent.run(clauses)

@workflow.node
async def generate_summary(risk_report: RiskReport) -> Summary:
    """Create executive summary."""
    return await summary_agent.run(risk_report)

# Define the graph
workflow.add_edge("extract_clauses", "assess_risk")
workflow.add_edge("assess_risk", "generate_summary")

# Run
result = await workflow.run(contract=uploaded_contract)

Why Graphs?

Graphs give you:

  • Visibility — See exactly what will execute
  • Debugging — Know which node failed
  • Parallelism — Independent nodes run concurrently
  • Checkpointing — Resume from any node

A Real Example: Procurement Review

Let’s build a complete procurement document review workflow:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
from agent_framework import Workflow, Condition
from agents import (
    document_parser,
    compliance_checker,
    pricing_analyst,
    legal_reviewer,
    summary_writer
)

workflow = Workflow("ProcurementReview")

# ═══════════════════════════════════════════════════════════
# Stage 1: Parse and classify the document
# ═══════════════════════════════════════════════════════════

@workflow.node
async def parse_document(document: bytes, filename: str) -> ParsedDocument:
    """Extract text and structure from the procurement document."""
    return await document_parser.run(
        f"Parse this procurement document: {filename}",
        attachments=[document]
    )

@workflow.node
async def classify_document(parsed: ParsedDocument) -> DocumentType:
    """Determine document type: RFP, RFQ, Contract, Amendment, etc."""
    doc_type = await classifier_agent.run(
        f"Classify this document:\n{parsed.summary}"
    )
    return DocumentType(doc_type)

# ═══════════════════════════════════════════════════════════
# Stage 2: Parallel analysis (these run concurrently)
# ═══════════════════════════════════════════════════════════

@workflow.node
async def check_compliance(parsed: ParsedDocument) -> ComplianceReport:
    """Check FAR/DFAR compliance requirements."""
    return await compliance_checker.run(
        f"""Review for FAR/DFAR compliance:
        
        Key areas:
        - Required clauses (FAR 52.XXX)
        - Small business requirements
        - Cybersecurity requirements (CMMC if DoD)
        - Labor law compliance
        
        Document: {parsed.full_text}"""
    )

@workflow.node  
async def analyze_pricing(parsed: ParsedDocument) -> PricingAnalysis:
    """Analyze pricing structure and competitiveness."""
    return await pricing_analyst.run(
        f"""Analyze pricing:
        
        - Identify pricing model (FFP, T&M, Cost-Plus)
        - Compare to similar contracts (if available)
        - Flag unusual terms
        - Calculate total ceiling value
        
        Document: {parsed.full_text}"""
    )

@workflow.node
async def review_legal(parsed: ParsedDocument) -> LegalReview:
    """Review for legal risks and non-standard terms."""
    return await legal_reviewer.run(
        f"""Legal review:
        
        - Identify non-standard clauses
        - Flag liability concerns
        - Review IP provisions
        - Check termination clauses
        - Assess indemnification terms
        
        Document: {parsed.full_text}"""
    )

# ═══════════════════════════════════════════════════════════
# Stage 3: Human review gate
# ═══════════════════════════════════════════════════════════

@workflow.node(requires_approval=True)
async def human_review_gate(
    compliance: ComplianceReport,
    pricing: PricingAnalysis,
    legal: LegalReview
) -> ApprovalDecision:
    """Pause for human review of all findings."""
    
    # This creates a task in the review queue
    return {
        "compliance_issues": compliance.issues,
        "pricing_concerns": pricing.concerns,
        "legal_risks": legal.risks,
        "overall_risk": calculate_risk(compliance, pricing, legal),
        "recommendation": "Proceed" if all_clear else "Review Required"
    }

# ═══════════════════════════════════════════════════════════
# Stage 4: Generate final output
# ═══════════════════════════════════════════════════════════

@workflow.node
async def generate_summary(
    parsed: ParsedDocument,
    compliance: ComplianceReport,
    pricing: PricingAnalysis,
    legal: LegalReview,
    approval: ApprovalDecision
) -> FinalReport:
    """Generate executive summary with all findings."""
    return await summary_writer.run(
        f"""Create executive summary:
        
        Document: {parsed.summary}
        Compliance: {compliance.summary}
        Pricing: {pricing.summary}
        Legal: {legal.summary}
        Human Review: {approval.notes}
        
        Format as 1-page brief for procurement officer."""
    )

@workflow.node
async def notify_stakeholders(report: FinalReport) -> None:
    """Send notifications to relevant parties."""
    await email_service.send(
        to=report.stakeholders,
        subject=f"Procurement Review Complete: {report.document_name}",
        body=report.summary
    )

# ═══════════════════════════════════════════════════════════
# Define the graph
# ═══════════════════════════════════════════════════════════

# Sequential: parse → classify
workflow.add_edge("parse_document", "classify_document")

# Parallel: classify → [compliance, pricing, legal]
workflow.add_edge("classify_document", "check_compliance")
workflow.add_edge("classify_document", "analyze_pricing")
workflow.add_edge("classify_document", "review_legal")

# Converge: [compliance, pricing, legal] → human review
workflow.add_edge("check_compliance", "human_review_gate")
workflow.add_edge("analyze_pricing", "human_review_gate")
workflow.add_edge("review_legal", "human_review_gate")

# After approval: generate summary → notify
workflow.add_edge("human_review_gate", "generate_summary")
workflow.add_edge("generate_summary", "notify_stakeholders")

Visual Representation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
                    ┌─────────────────┐
                    │  parse_document │
                    └────────┬────────┘
                             │
                             ▼
                    ┌─────────────────┐
                    │ classify_document│
                    └────────┬────────┘
                             │
              ┌──────────────┼──────────────┐
              │              │              │
              ▼              ▼              ▼
    ┌──────────────┐ ┌─────────────┐ ┌────────────┐
    │check_compliance│ │analyze_pricing│ │review_legal│
    └───────┬──────┘ └──────┬──────┘ └──────┬─────┘
              │              │              │
              └──────────────┼──────────────┘
                             │
                             ▼
                  ┌─────────────────────┐
                  │ human_review_gate 🧑 │ ← Pauses here
                  └──────────┬──────────┘
                             │
                             ▼
                    ┌─────────────────┐
                    │ generate_summary │
                    └────────┬────────┘
                             │
                             ▼
                  ┌─────────────────────┐
                  │ notify_stakeholders │
                  └─────────────────────┘

Human-in-the-Loop Patterns

Approval Gates

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@workflow.node(requires_approval=True)
async def approve_purchase(amount: float, vendor: str) -> ApprovalResult:
    """Requires human approval for purchases over $10k."""
    return {
        "amount": amount,
        "vendor": vendor,
        "auto_approve": amount < 10000
    }

# Handle the approval
@workflow.on_approval("approve_purchase")
async def handle_approval(context, approved: bool, approver: str, notes: str):
    if approved:
        return context.continue_to("process_purchase")
    else:
        return context.continue_to("notify_rejection", reason=notes)

Review Queues

Approvals appear in a review queue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Get pending approvals
pending = await workflow.get_pending_approvals(
    assignee="procurement-officers@agency.gov"
)

for approval in pending:
    print(f"Workflow: {approval.workflow_id}")
    print(f"Node: {approval.node_name}")
    print(f"Data: {approval.context}")
    print(f"Waiting since: {approval.created_at}")

# Approve or reject
await workflow.resolve_approval(
    workflow_id="abc123",
    approved=True,
    approver="jsmith@agency.gov",
    notes="Approved per policy exception memo dated 4/10/26"
)

Timeout and Escalation

1
2
3
4
5
6
7
8
@workflow.node(
    requires_approval=True,
    timeout_hours=24,
    escalation_path=["supervisor@agency.gov", "director@agency.gov"]
)
async def urgent_approval(request: UrgentRequest) -> ApprovalResult:
    """Urgent requests escalate if not handled in 24 hours."""
    return request

Checkpointing and Recovery

Long-running workflows need to survive failures:

1
2
3
4
5
6
7
8
9
10
11
12
13
# Enable checkpointing
workflow = Workflow(
    "ProcurementReview",
    checkpoint_store=AzureBlobCheckpointStore(
        connection_string=os.environ["STORAGE_CONNECTION"]
    )
)

# Run with automatic checkpointing
run_id = await workflow.run(document=contract)

# Later: resume from checkpoint (after crash, restart, etc.)
result = await workflow.resume(run_id=run_id)

Manual Checkpoints

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@workflow.node
async def long_analysis(data: LargeDataset) -> AnalysisResult:
    results = []
    
    for i, chunk in enumerate(data.chunks):
        result = await analyze_chunk(chunk)
        results.append(result)
        
        # Checkpoint every 100 chunks
        if i % 100 == 0:
            await workflow.checkpoint(
                progress=i,
                partial_results=results
            )
    
    return combine_results(results)

Parallel Execution

Nodes without dependencies run concurrently:

1
2
3
4
5
6
7
8
9
10
11
# These three run in parallel (no edges between them)
workflow.add_edge("classify", "check_compliance")
workflow.add_edge("classify", "analyze_pricing")
workflow.add_edge("classify", "review_legal")

# With explicit parallelism control
workflow = Workflow(
    "HighVolumeProcessing",
    max_parallel_nodes=10,  # Limit concurrent execution
    rate_limit_per_minute=100  # Throttle API calls
)

Fan-Out / Fan-In

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@workflow.node
async def split_document(document: Document) -> List[Section]:
    """Split into sections for parallel processing."""
    return document.sections

@workflow.node(map_over="sections")
async def analyze_section(section: Section) -> SectionAnalysis:
    """Runs once per section, in parallel."""
    return await analyzer_agent.run(section)

@workflow.node
async def combine_analyses(analyses: List[SectionAnalysis]) -> FullAnalysis:
    """Combine all section analyses."""
    return FullAnalysis(sections=analyses)

# Fan-out: split → [analyze each]
# Fan-in: [all analyses] → combine
workflow.add_edge("split_document", "analyze_section")
workflow.add_edge("analyze_section", "combine_analyses")

Conditional Routing

Route based on results:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@workflow.node
async def assess_risk(document: Document) -> RiskLevel:
    """Determine risk level."""
    return await risk_agent.run(document)

@workflow.node
async def standard_review(document: Document) -> Review:
    """Standard review process."""
    return await standard_agent.run(document)

@workflow.node
async def enhanced_review(document: Document) -> Review:
    """Enhanced review for high-risk items."""
    return await enhanced_agent.run(document)

# Conditional routing
workflow.add_edge(
    "assess_risk",
    Condition(
        when=lambda result: result.level == "HIGH",
        then="enhanced_review",
        otherwise="standard_review"
    )
)

Error Handling

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@workflow.node(
    retries=3,
    retry_delay_seconds=60,
    fallback="manual_processing"
)
async def call_external_api(data: InputData) -> APIResult:
    """Calls external API with retry logic."""
    return await external_api.call(data)

@workflow.node
async def manual_processing(data: InputData, error: Exception) -> APIResult:
    """Fallback when API fails."""
    # Create ticket for manual processing
    await create_support_ticket(data, error)
    return APIResult(status="pending_manual")

Compensation (Rollback)

1
2
3
4
5
6
7
8
9
@workflow.node(compensation="undo_reservation")
async def reserve_resource(resource_id: str) -> Reservation:
    """Reserve a resource."""
    return await resource_api.reserve(resource_id)

@workflow.compensation("reserve_resource")
async def undo_reservation(reservation: Reservation):
    """Release the reservation if workflow fails."""
    await resource_api.release(reservation.id)

Observability

Traces

Every workflow run generates traces:

1
2
3
4
5
6
7
8
9
from opentelemetry import trace

# Traces automatically include:
# - Workflow start/end
# - Each node execution
# - Agent calls within nodes
# - Tool invocations
# - Human approval wait times
# - Errors and retries

Metrics

1
2
3
4
5
6
7
# Built-in metrics
workflow_duration_seconds
workflow_success_rate
node_execution_time_seconds
human_approval_wait_time_seconds
parallel_utilization_percent
checkpoint_size_bytes

Dashboard

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Workflow: ProcurementReview
Run ID: abc123
Status: ⏳ Waiting for human approval

Nodes:
  ✅ parse_document (2.3s)
  ✅ classify_document (1.1s)
  ✅ check_compliance (8.4s)
  ✅ analyze_pricing (6.2s)  
  ✅ review_legal (7.8s)
  ⏳ human_review_gate (waiting 4h 23m)
  ⬜ generate_summary
  ⬜ notify_stakeholders

Assigned to: procurement-officers@agency.gov
Escalation in: 19h 37m

Production Patterns

Pattern 1: Document Processing Pipeline

1
2
3
4
5
6
7
8
# High-volume document processing
workflow = Workflow(
    "DocumentIngestion",
    max_parallel_nodes=20,
    checkpoint_interval_seconds=300
)

# Stages: Upload → Extract → Classify → Index → Notify

Pattern 2: Approval Chain

1
2
3
4
5
6
7
8
9
10
11
12
# Multi-level approval workflow
# Junior → Senior → Director (for high-value items)
workflow = Workflow("PurchaseApproval")

@workflow.node
async def route_by_amount(request: PurchaseRequest):
    if request.amount < 10000:
        return "junior_approval"
    elif request.amount < 100000:
        return "senior_approval"
    else:
        return "director_approval"

Pattern 3: Scheduled Workflows

1
2
3
4
5
6
7
# Daily report generation
from agent_framework.triggers import Schedule

@workflow.trigger(Schedule.daily(hour=6, tz="America/New_York"))
async def generate_daily_report():
    data = await fetch_daily_data()
    return await workflow.run(data=data)

The Complete Picture

Over this series, we’ve covered:

  1. Part 1: The full Microsoft AI stack — models to agents to governance
  2. Part 2: Agent Framework — building individual agents with code
  3. Part 3: Copilot Studio — building agents without code
  4. Part 4: Governance — Content Safety, Purview, Defender, Entra
  5. Part 5: Workflows — orchestrating agents for complex processes

Together, this is the platform for enterprise AI:

1
2
3
4
5
6
7
8
9
10
11
12
13
┌─────────────────────────────────────────────────────────┐
│                    Enterprise AI                         │
├─────────────────────────────────────────────────────────┤
│  Workflows (orchestration, human-in-loop)               │
├─────────────────────────────────────────────────────────┤
│  Agents (Copilot Studio + Agent Framework)              │
├─────────────────────────────────────────────────────────┤
│  Governance (Content Safety, Purview, Defender, Entra)  │
├─────────────────────────────────────────────────────────┤
│  Models (GPT, Phi, MAI, Florence, KOSMOS)               │
├─────────────────────────────────────────────────────────┤
│  Infrastructure (Azure AI Foundry)                       │
└─────────────────────────────────────────────────────────┘

Microsoft is the only vendor with the complete stack. That’s the thesis.


This concludes the “Microsoft AI Stack Decoded” series. If you found this valuable, share it with your team. The enterprises that understand this stack will move faster than those still assembling point solutions.

This post is licensed under CC BY 4.0 by the author.