Build graph routes for agent workflows¶
Graph-based workflows in ADK define agent logic as a graph of execution nodes and edges, allowing you to build more reliable processes that combine artificial intelligence (AI) reasoning and code logic. These workflows allow you to create logical routes of execution nodes that can encapsulate code functions, AI-powered agents, Tools, and human input. By explicitly mapping out routing logic, this approach allows you to define a specific, step-wise process workflow in code, providing improved precision and reliability over purely prompt-based agents.
In ADK Go, workflow graphs are expressed through three composable workflow
agent types rather than an edges array DSL. Each type maps to a common
graph topology:
| Graph topology | Go workflow agent |
|---|---|
| Ordered sequence of nodes | sequentialagent |
| Parallel fan-out across nodes | parallelagent |
| Repeated execution / loop with exit condition | loopagent |
Agents are composed by nesting them in each other's SubAgents field.
Data flows between steps through session state: a step writes its output
to a named key with llmagent.Config.OutputKey, and downstream steps
read it by referencing {key} in their Instruction template.
Figure 1. Visualization of a task graph and the routing code to implement it.
The advantage of using a graph-based agent workflow is the significant increase in control, predictability, and reliability over prompt-based agents. By defining the overall process workflow in code, you gain more control over how tasks are routed and executed. This structured node definition improves the predictability of agents and enhances reliability for complex tasks that require defined steps and process management.
Get started with graph-based workflows in ADK by checking out Graph-based agent workflows.
Nodes¶
A graph is composed of execution nodes. These nodes can be Agents, ADK Tools, human input tasks, or code functions you write. Nodes can take inputs from previously executed nodes, and emit data through Event objects.
The following shows a simple FunctionNode that handles text inputs and sends a text output:
In Go, a workflow step (node) is an agent.Agent whose Run function
yields *session.Event values. The following custom Run function acts
as a node that transforms its input and writes the result to session state:
// upperCaseRun is a custom Run function that acts as a workflow step (node).
// It reads the user content, transforms it, writes to state, and yields an
// event — equivalent to a Python FunctionNode that returns Event(output=...).
func upperCaseRun(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
return func(yield func(*session.Event, error) bool) {
var inputText string
if ctx.UserContent() != nil {
for _, p := range ctx.UserContent().Parts {
inputText += p.Text
}
}
result := strings.ToUpper(inputText)
if err := ctx.Session().State().Set("upper_result", result); err != nil {
yield(nil, fmt.Errorf("state.Set: %w", err))
return
}
yield(&session.Event{
LLMResponse: model.LLMResponse{
Content: &genai.Content{
Parts: []*genai.Part{{Text: result}},
},
},
}, nil)
}
}
For more information about transferring data between nodes, see Data handling for agent workflows.
Workflow graphs syntax¶
You define a graph by composing workflow agents. This section provides an overview of the common routing patterns.
Caution: Workflow agent limitations
You can add LlmAgents to graph-based workflows, however they must be configured for single-turn (task) mode. For more information about agent modes, see Build collaborative agent teams.
Route sequences¶
A sequential route runs each node once, in the listed order.
The edges array uses the START keyword to indicate the beginning of a
graph execution, with each listed node executed in sequence:
sequentialagent.New accepts a list of SubAgents and runs them in the
listed order — one after another, passing session state between steps:
// newSequentialNodes builds a two-step sequential workflow.
// It is the Go equivalent of:
//
// edges=[("START", task_A_node, task_B_node)]
//
// The sequentialagent runs each SubAgent once, in the listed order.
func newSequentialNodes(ctx context.Context) (agent.Agent, error) {
geminiModel, err := gemini.NewModel(ctx, modelName, &genai.ClientConfig{})
if err != nil {
return nil, fmt.Errorf("gemini.NewModel: %w", err)
}
taskA, err := llmagent.New(llmagent.Config{
Name: "task_A_agent",
Model: geminiModel,
Description: "Performs task A.",
Instruction: "Summarise the user request in one sentence.",
OutputKey: "task_a_result",
})
if err != nil {
return nil, fmt.Errorf("taskA: %w", err)
}
taskB, err := llmagent.New(llmagent.Config{
Name: "task_B_agent",
Model: geminiModel,
Description: "Performs task B using task A output.",
Instruction: "Translate this summary into French: {task_a_result}",
})
if err != nil {
return nil, fmt.Errorf("taskB: %w", err)
}
return sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{
Name: "sequential_workflow",
Description: "Runs task A then task B in order.",
SubAgents: []agent.Agent{taskA, taskB},
},
})
}
Route branches and conditional execution¶
In Python, branching is handled by a FunctionNode that returns an
Event(route=...) value, which the edges dict dispatches to different nodes.
def router(node_input: str):
"""Route to task B or C based on node_input."""
if condition(node_input):
return Event(route="RUN_TASK_C")
return Event(route="RUN_TASK_B")
task_B_node = Agent(name="task_B_agent") # An agent to execute node B
def task_C_node(node_input: str):
"""A FunctionNode to execute node C."""
return Event(output="Task C completed")
root_agent = Workflow(
name="routing_workflow",
edges=[
("START", task_A_node, router),
(router,
{
# "route value": node_to_run
"RUN_TASK_B": task_B_node,
"RUN_TASK_C": task_C_node,
},
),
],
)
ADK Go does not have Event(route=...) conditional dispatch. The
equivalent pattern is to encode the routing decision in session state
(via OutputKey) and let downstream agents inspect it in their
Instruction template, or to use a loopagent with Escalate-based
exit for loop-until-done patterns. See the
loop and escalation example below for the
canonical Go approach to conditional routing.
Parallel tasks: fan out and join paths¶
You can create graphs that split execution across multiple, parallel nodes, and typically you need to assemble the output of each node for further processing. This task execution pattern has two stages. The workflow first fans out when it starts multiple parallel tasks, and then it re-joins those paths when those tasks are completed before proceeding to the next step.
Figure 2. The output of parallel task nodes can be assembled and joined before passing results to the next step.
You accomplish the join step by using a JoinNode object, which waits for each parallel task to complete and then passes the collection of outputs from these nodes to the next node.
from google.adk.workflow import JoinNode
my_join_node = JoinNode(name="my_join_node")
edges=[
("START", parallel_task_A, my_join_node),
("START", parallel_task_B, my_join_node),
("START", parallel_task_C, my_join_node),
(my_join_node, final_task_D),
]
Caution: Stuck JoinNode from incomplete nodes
The JoinNode object proceeds only after all its upstream nodes have provided an Event output. If one of the upstream nodes fails to provide output, the JoinNode is stuck and workflow execution stops. Make sure to include failsafe output from any node that outputs to a JoinNode.
In Go, the fan-out and join pattern is expressed by nesting a
parallelagent as the first sub-agent of a sequentialagent. Each
parallel branch writes its output to a unique OutputKey in session
state. After all branches complete, the synthesis agent reads those keys
through its Instruction template — providing the same join semantics
as JoinNode:
// newParallelFanOut builds a workflow that fans out across three parallel
// research agents and then re-joins by passing their OutputKey results into
// a single synthesis agent via session state — the Go equivalent of using a
// JoinNode followed by a final task.
//
// Python equivalent:
//
// edges=[
// ("START", research_A, join_node),
// ("START", research_B, join_node),
// ("START", research_C, join_node),
// (join_node, synthesis_agent),
// ]
func newParallelFanOut(ctx context.Context) (agent.Agent, error) {
geminiModel, err := gemini.NewModel(ctx, modelName, &genai.ClientConfig{})
if err != nil {
return nil, fmt.Errorf("gemini.NewModel: %w", err)
}
researchA, err := llmagent.New(llmagent.Config{
Name: "research_agent_A",
Model: geminiModel,
Description: "Researches topic A.",
Instruction: "Give a 1-sentence fact about renewable energy.",
OutputKey: "result_A",
})
if err != nil {
return nil, fmt.Errorf("researchA: %w", err)
}
researchB, err := llmagent.New(llmagent.Config{
Name: "research_agent_B",
Model: geminiModel,
Description: "Researches topic B.",
Instruction: "Give a 1-sentence fact about electric vehicles.",
OutputKey: "result_B",
})
if err != nil {
return nil, fmt.Errorf("researchB: %w", err)
}
researchC, err := llmagent.New(llmagent.Config{
Name: "research_agent_C",
Model: geminiModel,
Description: "Researches topic C.",
Instruction: "Give a 1-sentence fact about carbon capture.",
OutputKey: "result_C",
})
if err != nil {
return nil, fmt.Errorf("researchC: %w", err)
}
// parallelagent runs researchA, researchB, and researchC concurrently.
// Each agent writes its output to a distinct key in session state.
parallelResearch, err := parallelagent.New(parallelagent.Config{
AgentConfig: agent.Config{
Name: "parallel_research",
Description: "Runs three research agents in parallel.",
SubAgents: []agent.Agent{researchA, researchB, researchC},
},
})
if err != nil {
return nil, fmt.Errorf("parallelagent: %w", err)
}
// synthesisAgent reads all three results from state and produces a report.
// This is the "join" step: it only runs after parallelResearch completes.
synthesisAgent, err := llmagent.New(llmagent.Config{
Name: "synthesis_agent",
Model: geminiModel,
Instruction: `Combine the following research results into one paragraph:
A: {result_A}
B: {result_B}
C: {result_C}`,
Description: "Synthesises outputs from the parallel research agents.",
})
if err != nil {
return nil, fmt.Errorf("synthesisAgent: %w", err)
}
// The top-level sequentialagent guarantees that synthesis only starts
// after all parallel branches have completed.
return sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{
Name: "fan_out_workflow",
Description: "Parallel research followed by synthesis.",
SubAgents: []agent.Agent{parallelResearch, synthesisAgent},
},
})
}
Caution: Parallel agent isolation
Each sub-agent in a parallelagent runs in an isolated branch
context. They share the underlying session state for writes, but
they do not see each other's in-progress events. Use distinct
OutputKey values to avoid write collisions between branches.
Nested workflows¶
When building more complex workflows, you may want to encapsulate the functionality for specific tasks into reusable workflows. One or more workflow agents can be used as a sub-agent within another workflow agent to accomplish this goal.
Figure 3. Nested workflow agents as sub-agents inside a parent workflow.
from google.adk import Workflow
root_agent = Workflow(
name="parent_workflow",
edges=[
("START", task_A1, router),
(router, {
"RUN_WORKFLOW_B": workflow_B,
"RUN_WORKFLOW_C": workflow_C,
},
),
],
)
Nested workflow data output¶
Output for nested Workflow objects works slightly differently from individual nodes. When the nested workflow completes one of its nodes, it transmits data to the next node in the nested workflow's graph and the system bubbles up the Event for that node to the parent workflow for process traceability. When the nested workflow completes the last node in its process, the parent node extracts data from the final leaf nodes and emits it as the output of the nested workflow.
In Go, any workflow agent (sequential, parallel, or loop) can be
provided as a SubAgent of another workflow agent. The nested workflow
runs to completion as a single logical step from the parent's perspective.
State written by steps inside the nested workflow is immediately visible
to steps that follow it in the parent:
// newNestedWorkflows shows how to use one workflow agent as a sub-agent of
// another — the Go equivalent of nesting Workflow objects as nodes.
//
// Python equivalent:
//
// root_agent = Workflow(
// name="parent_workflow",
// edges=[("START", task_A1, nested_workflow_B)],
// )
func newNestedWorkflows(ctx context.Context) (agent.Agent, error) {
geminiModel, err := gemini.NewModel(ctx, modelName, &genai.ClientConfig{})
if err != nil {
return nil, fmt.Errorf("gemini.NewModel: %w", err)
}
// --- Inner workflow B ---
innerStep1, err := llmagent.New(llmagent.Config{
Name: "inner_step_1",
Model: geminiModel,
Description: "First step of the inner workflow.",
Instruction: "Translate the user's request into Spanish.",
OutputKey: "spanish_text",
})
if err != nil {
return nil, fmt.Errorf("innerStep1: %w", err)
}
innerStep2, err := llmagent.New(llmagent.Config{
Name: "inner_step_2",
Model: geminiModel,
Description: "Second step of the inner workflow.",
Instruction: "Now translate this Spanish text back into English: {spanish_text}",
})
if err != nil {
return nil, fmt.Errorf("innerStep2: %w", err)
}
// workflowB is a self-contained sequential workflow used as a sub-agent.
workflowB, err := sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{
Name: "workflow_B",
Description: "Translates to Spanish then back to English.",
SubAgents: []agent.Agent{innerStep1, innerStep2},
},
})
if err != nil {
return nil, fmt.Errorf("workflowB: %w", err)
}
// --- Outer step that runs before workflowB ---
taskA1, err := llmagent.New(llmagent.Config{
Name: "task_A1",
Model: geminiModel,
Description: "Prepares the input for the nested workflow.",
Instruction: "Summarise the user request in one sentence.",
OutputKey: "task_a1_result",
})
if err != nil {
return nil, fmt.Errorf("taskA1: %w", err)
}
// parentWorkflow runs taskA1, then hands off to workflowB as a single node.
return sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{
Name: "parent_workflow",
Description: "Runs a pre-processing step then a nested workflow.",
SubAgents: []agent.Agent{taskA1, workflowB},
},
})
}
Loop and escalation exit¶
A loop repeats a set of steps until a termination condition is met. In Python
this is a cycle in the edges graph that routes back to an earlier node; in
Go it is a loopagent that stops when any sub-agent sets
EventActions.Escalate = true or when MaxIterations is reached.
def router(node_input: str):
"""Route to task B or C based on node_input."""
if condition(node_input):
return Event(route="RUN_TASK_C")
return Event(route="RUN_TASK_B")
root_agent = Workflow(
name="routing_workflow",
edges=[
("START", task_A_node, router),
(router,
{
"RUN_TASK_B": task_B_node,
"RUN_TASK_C": task_C_node,
},
),
],
)
loopagent repeatedly runs its SubAgents in order. A sub-agent (or a
tool called by a sub-agent) signals termination by setting
ctx.Actions().Escalate = true. The loop exits immediately after the
current iteration completes. This is the idiomatic Go equivalent of
conditional routing back to an earlier node:
// ExitLoopArgs is the (empty) input struct for the exitLoop tool.
type ExitLoopArgs struct{}
// ExitLoopResults is the (empty) output struct for the exitLoop tool.
type ExitLoopResults struct{}
// exitLoop signals the loopagent to stop by setting Escalate = true on the
// current event's actions. This is the Go equivalent of routing to an exit
// node in a Python conditional-branch graph.
func exitLoop(ctx tool.Context, _ ExitLoopArgs) (ExitLoopResults, error) {
ctx.Actions().Escalate = true
return ExitLoopResults{}, nil
}
// newLoopEscalate builds a workflow that iteratively refines a document and
// exits the loop when the critic is satisfied.
//
// Python equivalent of the conditional routing pattern:
//
// edges=[
// ("START", critic_node, router),
// (router, {"DONE": exit_node, "REFINE": refiner_node}),
// (refiner_node, critic_node), # loop back
// ]
func newLoopEscalate(ctx context.Context) (agent.Agent, error) {
geminiModel, err := gemini.NewModel(ctx, modelName, &genai.ClientConfig{})
if err != nil {
return nil, fmt.Errorf("gemini.NewModel: %w", err)
}
const (
stateDoc = "current_draft"
stateCrit = "criticism"
donePhrase = "No major issues found."
)
exitLoopTool, err := functiontool.New(
functiontool.Config{
Name: "exitLoop",
Description: "Call this tool ONLY when the critique says the document needs no further changes.",
},
exitLoop,
)
if err != nil {
return nil, fmt.Errorf("functiontool.New: %w", err)
}
// criticAgent reviews the current draft and either provides feedback or
// writes donePhrase when no further changes are needed.
criticAgent, err := llmagent.New(llmagent.Config{
Name: "critic_agent",
Model: geminiModel,
Description: "Reviews the draft and suggests improvements, or signals completion.",
Instruction: fmt.Sprintf(`Review this draft:
"""{%s}"""
If it needs improvement, provide 1-2 specific suggestions.
If it is good enough, respond exactly with: "%s"`, stateDoc, donePhrase),
OutputKey: stateCrit,
})
if err != nil {
return nil, fmt.Errorf("criticAgent: %w", err)
}
// refinerAgent applies the critique or calls exitLoop if done.
refinerAgent, err := llmagent.New(llmagent.Config{
Name: "refiner_agent",
Model: geminiModel,
Description: "Refines the draft or exits the loop when the critique signals completion.",
Instruction: fmt.Sprintf(`Current draft:
"""{%s}"""
Critique: {%s}
If the critique is exactly "%s", call the exitLoop tool.
Otherwise apply the suggestions and output the improved draft.`, stateDoc, stateCrit, donePhrase),
Tools: []tool.Tool{exitLoopTool},
OutputKey: stateDoc,
})
if err != nil {
return nil, fmt.Errorf("refinerAgent: %w", err)
}
// loopagent repeatedly runs [criticAgent → refinerAgent] until either
// refinerAgent calls exitLoop (Escalate = true) or MaxIterations is reached.
refinementLoop, err := loopagent.New(loopagent.Config{
MaxIterations: 5,
AgentConfig: agent.Config{
Name: "refinement_loop",
Description: "Iteratively refines the draft until the critic is satisfied.",
SubAgents: []agent.Agent{criticAgent, refinerAgent},
},
})
if err != nil {
return nil, fmt.Errorf("loopagent: %w", err)
}
// initialWriterAgent produces the first draft before the loop starts.
initialWriterAgent, err := llmagent.New(llmagent.Config{
Name: "initial_writer_agent",
Model: geminiModel,
Description: "Writes the first draft.",
Instruction: "Write a 2-3 sentence draft about the user's topic.",
OutputKey: stateDoc,
})
if err != nil {
return nil, fmt.Errorf("initialWriterAgent: %w", err)
}
// The top-level sequentialagent runs the writer once, then hands off to
// the loop, which runs until escalation or max iterations.
return sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{
Name: "iterative_writer",
Description: "Writes then iteratively refines a document.",
SubAgents: []agent.Agent{initialWriterAgent, refinementLoop},
},
})
}