Data handling for agent workflows¶
Structuring and managing data between agents and graph-based nodes is critical for building reliable processes with ADK. This guide explains data handling within graph-based workflows and collaboration agents, including how information is transmitted and received between graph nodes. It covers the essential parameters for passing data, content, and state, and explains how to implement structured data transfer for both function and agent nodes using data format schemas and specific instruction syntax.
Workflow data flow¶
Within a graph-based workflow, nodes pass data to downstream steps through session state. All execution nodes in a workflow can read from and write to session state; a step writes its output to a named key, and the next step reads it by referencing that key in its configuration.
In Python, data is exchanged between graph nodes using Events. The key parameters for node data handling are:
output: Parameter for passing information between nodes.message: Data intended as a response to a user.state: Data automatically persisted across nodes via Events throughout an ADK session.
In ADK Go, workflow data flow is managed through session state rather than through Event fields. The two primary mechanisms are:
OutputKeyonllmagent.Config: after each turn, the framework automatically captures the agent's final text response and writes it to session state under the key you specify. Downstream agents read it by placing{key}in theirInstructiontemplate — the same curly-brace syntax as Python.ctx.Session().State().Set/.Get: for customRunfunctions and tools that need to write or read arbitrary values from state directly.
State keys may carry a prefix that controls their lifetime and scope:
| Prefix constant | Prefix string | Scope |
|---|---|---|
session.KeyPrefixApp |
"app:" |
Shared across all users and sessions for the app |
session.KeyPrefixUser |
"user:" |
Tied to the user, shared across their sessions |
session.KeyPrefixTemp |
"temp:" |
Discarded after the current invocation ends |
| (none) | — | Persists for the lifetime of the session |
Node output¶
Each step in a workflow produces output by writing to session state. In Python,
a function node returns or yields an Event(output=...). In Go, an
llmagent step writes via OutputKey, and a custom Run function writes via
ctx.Session().State().Set.
Use the return or yield syntax to hand off data to the next node:
from google.adk import Event
def my_function_node(node_input: str):
output_value = node_input.upper()
return Event(output=output_value) # "THE RESULT"
Use the return syntax when outputting Event data that does not
require additional processing. When emitting data that requires additional
processing, or if you are generating more than one data item, you can use
more than one yield command. Each yield call adds to a list of
data objects on the Event which is passed to the next node of a graph. A
return or yield command without a parameter passes a None value
to the next node.
Use OutputKey on llmagent.Config to automatically save an agent's
response to session state. For custom Run functions, call
ctx.Session().State().Set directly:
// OutputKey is the primary mechanism for passing data between llmagent steps in
// a sequential workflow. When OutputKey is set, the framework automatically
// saves the agent's final text response to session state under that key after
// each turn. Downstream agents read the value by referencing {key} in their
// Instruction template.
//
// This is the Go equivalent of the Python Event(output=...) pattern:
//
// def my_function_node():
// return Event(output="The Result")
func newOutputKeyPipeline(ctx context.Context, geminiModel model.LLM) (agent.Agent, error) {
// step1 writes its response to state["upper_result"].
step1, err := llmagent.New(llmagent.Config{
Name: "step_1",
Model: geminiModel,
Description: "Transforms the user's text.",
Instruction: "Convert the user's message to uppercase. Output only the transformed text.",
OutputKey: "upper_result", // framework saves final response here
})
if err != nil {
return nil, fmt.Errorf("step1: %w", err)
}
// step2 reads state["upper_result"] via the {upper_result} template placeholder.
step2, err := llmagent.New(llmagent.Config{
Name: "step_2",
Model: geminiModel,
Description: "Reports the transformed text.",
Instruction: "The transformed text is: {upper_result}. Report it to the user.",
})
if err != nil {
return nil, fmt.Errorf("step2: %w", err)
}
return sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{
Name: "output_key_pipeline",
SubAgents: []agent.Agent{step1, step2},
},
})
}
For custom Run functions acting as workflow steps:
// customRunNode is a workflow step (node) implemented as a custom Run function.
// It transforms its input, writes the result to session state, and yields a
// session.Event — the Go equivalent of a Python FunctionNode that returns
// Event(output=value).
//
// Data written by custom Run functions via ctx.Session().State().Set is
// immediately available to the next step through the {key} Instruction template.
func customRunNode(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
return func(yield func(*session.Event, error) bool) {
// Read the user's original message.
var inputText string
if ctx.UserContent() != nil {
for _, p := range ctx.UserContent().Parts {
inputText += p.Text
}
}
// Transform and persist to state so downstream steps can read it.
result := strings.ToUpper(strings.TrimSpace(inputText))
if err := ctx.Session().State().Set("upper_result", result); err != nil {
yield(nil, fmt.Errorf("state.Set upper_result: %w", err))
return
}
// Yield the transformed text as the event output.
yield(&session.Event{
LLMResponse: model.LLMResponse{
Content: &genai.Content{
Parts: []*genai.Part{{Text: result}},
},
},
}, nil)
}
}
Node output: passing structured data¶
You can pass longer, structured data in a serializable format:
Caution: Event.output limitation
Nodes are only allowed to emit a single Event.output data payload per execution. This limitation means that while you can use more than one yield in a node, having two or more yield commands with an Event.output results in a runtime error.
In Go, structured data is stored as separate keys in session state, each
written by its producing step using OutputKey or State().Set. There is
no single-payload restriction; each key is an independent write. Downstream
agents access individual fields by placing {key} in their Instruction.
For example, to pass both a city name and a time value to the next step:
// step1 writes two separate keys to state:
// state["city_name"] = "Paris" (via OutputKey on an llmagent)
// state["city_time"] = "10:10 AM" (via OutputKey on another llmagent,
// or State().Set in a custom Run func)
//
// step2 reads both via its Instruction template:
// Instruction: "It is {city_time} in {city_name} right now."
User-facing messages¶
Use the message parameter of an Event to send a response to a user rather than pass data to the next node:
In Go, a workflow step sends a user-facing message by yielding a
session.Event whose LLMResponse.Content contains the text. The runner
surfaces this to the caller as a normal agent response:
// messageOutputNode is a workflow step that emits a progress message for the
// user. In Python this would be:
//
// async def user_message(node_input: str):
// yield Event(message="Beginning research process...")
//
// In Go, the equivalent is to yield a session.Event whose LLMResponse.Content
// contains the text intended for the user. The step does not need to write to
// state; the runner surfaces the event text to the caller as a partial response.
func messageOutputNode(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
return func(yield func(*session.Event, error) bool) {
yield(&session.Event{
LLMResponse: model.LLMResponse{
Content: &genai.Content{
Parts: []*genai.Part{{Text: "Beginning research process..."}},
},
},
}, nil)
}
}
Session state and state scopes¶
Session state persists data across the steps of a workflow and across turns within a session. State key prefixes control how long values live and who can see them.
Use the state parameter of an Event to maintain values across nodes. Nodes can modify state values, and the modified state values are available to downstream nodes:
async def init_state_node(attempts: int = 0):
yield Event(
state={
"attempts": attempts,
},
)
async def task_attempt_node(node_input: Content, attempts: int):
yield Event(
state={
"attempts": attempts + 1,
},
)
async def read_state_node(ctx: Context):
print(f"attempts state: {ctx.state}") # attempts state: attempts: 1
root_agent = Workflow(
name="root_agent",
edges=[("START", init_state_node, task_attempt_node, read_state_node)],
)
Caution: state property data limitations
The state parameter should not be used to persist large amounts of data between nodes. Use artifacts or other data persistence mechanisms, such as database Tools, to persist large data resources during the life cycle of a Workflow.
In Go, state is written with ctx.Session().State().Set(key, value) and
read with .Get(key). The session package defines prefix constants that
map to the same lifetime scopes as Python's state parameter:
// stateScopes shows how session-state key prefixes control the lifetime and
// visibility of stored values across workflow steps.
//
// Available prefixes (defined as constants in the session package):
//
// session.KeyPrefixApp ("app:") – shared across all users and sessions for the app
// session.KeyPrefixUser ("user:") – tied to the user, shared across their sessions
// session.KeyPrefixTemp ("temp:") – discarded when the current invocation ends
//
// Keys with no prefix persist for the lifetime of the session.
//
// This is the Go equivalent of the Python Event(state={...}) pattern:
//
// async def init_state_node(attempts: int = 0):
// yield Event(state={"attempts": attempts})
func stateScopes(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
return func(yield func(*session.Event, error) bool) {
st := ctx.Session().State()
// Session-scoped (no prefix) — persists for the life of this session.
if err := st.Set("attempts", 0); err != nil {
yield(nil, fmt.Errorf("state.Set attempts: %w", err))
return
}
// App-scoped — shared across all users and sessions for this app.
if err := st.Set(session.KeyPrefixApp+"global_counter", 42); err != nil {
yield(nil, fmt.Errorf("state.Set app:global_counter: %w", err))
return
}
// User-scoped — shared across all sessions belonging to this user.
if err := st.Set(session.KeyPrefixUser+"login_count", 1); err != nil {
yield(nil, fmt.Errorf("state.Set user:login_count: %w", err))
return
}
// Temp-scoped — discarded after this invocation (single request/response) ends.
if err := st.Set(session.KeyPrefixTemp+"scratch", "ephemeral"); err != nil {
yield(nil, fmt.Errorf("state.Set temp:scratch: %w", err))
return
}
yield(&session.Event{
LLMResponse: model.LLMResponse{
Content: &genai.Content{
Parts: []*genai.Part{{Text: "State initialised."}},
},
},
}, nil)
}
}
Caution: state data limitations
Session state is a lightweight key-value store. Do not use it to persist large payloads such as file contents or binary data. Use ADK artifacts or external storage tools instead.
Constrain node data with schemas¶
You can set input and output data schemas to constrain the data formats accepted and produced by any agent node.
Use input_schema and output_schema with a class that extends
BaseModel to constrain any agent's input and output:
from google.adk import Agent
from pydantic import BaseModel
class FlightSearchInput(BaseModel):
origin: str # Airport code "SFO"
destination: str # Airport code "CDG"
departure_date: date # date(2026, 3, 15)
passengers: int = 1 # Number of passengers
class FlightSearchOutput(BaseModel):
flights: list[Flight]
cheapest_price: float
flight_searcher = Agent(
name="flight_searcher",
instruction="Search for available flights.",
input_schema=FlightSearchInput,
output_schema=FlightSearchOutput,
tools=[search_flights_api],
mode="single_turn",
...
)
assistant = Agent(
name="assistant",
instruction="You help users plan trips.",
sub_agents=[flight_searcher],
...
)
In Go, schemas are defined as *genai.Schema values and assigned to
InputSchema and OutputSchema on llmagent.Config.
InputSchema: constrains what the agent accepts when called as a sub-agent. The caller must provide a JSON object matching this schema.OutputSchema: forces the model to reply with a JSON object matching the schema. WhenOutputSchemais set the agent cannot use tools.
// newSchemaAgentPipeline builds a two-agent sequential pipeline where the first
// agent has a structured InputSchema and OutputSchema and the second reads the
// first agent's output from session state.
//
// InputSchema constrains what the agent accepts when called as a sub-agent
// tool. OutputSchema forces the model to reply with a JSON object matching the
// provided schema — equivalent to Pydantic BaseModel in Python:
//
// class FlightSearchOutput(BaseModel):
// origin: str
// destination: str
//
// flight_searcher = Agent(
// input_schema=FlightSearchInput,
// output_schema=FlightSearchOutput,
// ...
// )
func newSchemaAgentPipeline(ctx context.Context, geminiModel model.LLM) (agent.Agent, error) {
// InputSchema: defines the expected JSON shape when this agent is invoked
// as a sub-agent / tool by another agent.
flightInputSchema := &genai.Schema{
Type: genai.TypeObject,
Description: "Input for a flight search request.",
Properties: map[string]*genai.Schema{
"origin": {
Type: genai.TypeString,
Description: "Departure airport code, e.g. SFO.",
},
"destination": {
Type: genai.TypeString,
Description: "Arrival airport code, e.g. CDG.",
},
"departure_date": {
Type: genai.TypeString,
Description: "Departure date in YYYY-MM-DD format.",
},
},
Required: []string{"origin", "destination", "departure_date"},
}
// OutputSchema: forces the model to reply with a JSON object matching
// this schema. When OutputSchema is set the agent cannot use tools.
flightOutputSchema := &genai.Schema{
Type: genai.TypeObject,
Description: "Result of a flight search.",
Properties: map[string]*genai.Schema{
"cheapest_price": {
Type: genai.TypeString,
Description: "Cheapest available fare as a formatted string, e.g. '$450'.",
},
"flight_count": {
Type: genai.TypeString,
Description: "Number of matching flights found.",
},
},
Required: []string{"cheapest_price", "flight_count"},
}
flightSearchAgent, err := llmagent.New(llmagent.Config{
Name: "flight_searcher",
Model: geminiModel,
Description: "Searches for available flights and returns structured results.",
Instruction: `You are a flight-search assistant.
Given a search request, respond ONLY with a JSON object.
Estimate the cheapest price and count of available flights.`,
InputSchema: flightInputSchema,
OutputSchema: flightOutputSchema,
OutputKey: "flight_search_result", // saves JSON string to state
})
if err != nil {
return nil, fmt.Errorf("flightSearchAgent: %w", err)
}
// The assistant agent reads the structured result from state via the
// {flight_search_result} template placeholder in its Instruction.
assistantAgent, err := llmagent.New(llmagent.Config{
Name: "trip_assistant",
Model: geminiModel,
Description: "Summarises flight search results for the user.",
Instruction: `You help users plan trips.
The flight search returned this result: {flight_search_result}
Summarise the cheapest option for the user in a friendly sentence.`,
})
if err != nil {
return nil, fmt.Errorf("assistantAgent: %w", err)
}
return sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{
Name: "flight_booking_pipeline",
Description: "Searches for flights then presents the results to the user.",
SubAgents: []agent.Agent{flightSearchAgent, assistantAgent},
},
})
}
Access structured data in agents¶
When structured data has been written to session state by a previous step, a
downstream agent can reference it in its Instruction using {key} template
placeholders. The ADK framework substitutes the current value of state[key]
into the instruction before it is sent to the model.
Use the curly-brace { } syntax to select properties from the input
schema, or < > to select a property and also qualify it by the name
of the source node:
class CityTime(BaseModel):
time_info: str # time information
city: str # city name
def lookup_time_function(city: str):
"""Simulate returning the current time in the specified city."""
return Event(output=CityTime(time_info='10:10 AM', city=city))
city_report_agent = Agent(
name="city_report_agent",
model="gemini-flash-latest",
input_schema=CityTime,
# data selection based on class and parameter
# instruction="""
# Return a sentence in the following format:
# It is {CityTime.time_info} in {CityTime.city} right now.
# """,
# more restrictive data selection based on source node name
instruction="""
Return a sentence in the following format:
It is <CityTime.time_info from lookup_time_function> in
<CityTime.city from lookup_time_function> right now.
""",
)
root_agent = Workflow(
name="root_agent",
edges=[
(START, city_generator_agent, lookup_time_function, city_report_agent)
],
)
In Go, each state key is referenced independently using {key} in the
Instruction field. There is no class-scoped or source-node-qualified
syntax; instead, use distinct, descriptive key names and write each value
to its own key via OutputKey or State().Set:
// newCityTimePipeline demonstrates how an agent reads structured data that was
// written to session state by the previous step using the {key} template syntax
// in its Instruction — the Go equivalent of:
//
// city_report_agent = Agent(
// input_schema=CityTime,
// instruction="It is {CityTime.time_info} in {CityTime.city} right now.",
// )
//
// In Go, each field is a separate state key. The {key} placeholder in
// Instruction is replaced by the ADK framework with the current value of
// state["key"] before the instruction is sent to the model.
func newCityTimePipeline(ctx context.Context, geminiModel model.LLM) (agent.Agent, error) {
// cityGeneratorAgent writes the city name to state["city_name"].
cityGeneratorAgent, err := llmagent.New(llmagent.Config{
Name: "city_generator_agent",
Model: geminiModel,
Description: "Returns the name of a random city.",
Instruction: "Output only the name of one well-known city. Nothing else.",
OutputKey: "city_name",
})
if err != nil {
return nil, fmt.Errorf("cityGeneratorAgent: %w", err)
}
// lookupTimeAgent reads state["city_name"] via {city_name} and writes the
// current time to state["time_info"].
lookupTimeAgent, err := llmagent.New(llmagent.Config{
Name: "lookup_time_agent",
Model: geminiModel,
Description: "Returns the current time in the city from the previous step.",
Instruction: "What time is it right now in {city_name}? Output only the time, e.g. '10:10 AM'.",
OutputKey: "time_info",
})
if err != nil {
return nil, fmt.Errorf("lookupTimeAgent: %w", err)
}
// cityReportAgent reads both state["city_name"] and state["time_info"] via
// the {city_name} and {time_info} placeholders in its Instruction.
cityReportAgent, err := llmagent.New(llmagent.Config{
Name: "city_report_agent",
Model: geminiModel,
Description: "Reports the city and the current time.",
Instruction: "Return a sentence in the following format: It is {time_info} in {city_name} right now.",
})
if err != nil {
return nil, fmt.Errorf("cityReportAgent: %w", err)
}
return sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{
Name: "city_time_pipeline",
Description: "Generates a city name, looks up the time, then reports both.",
SubAgents: []agent.Agent{cityGeneratorAgent, lookupTimeAgent, cityReportAgent},
},
})
}
For a complete, but simplified version of this workflow, see Graph-based agent workflows.