Documentation Index
Fetch the complete documentation index at: https://agno-v2-rbac-doc-update.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Steps support three HITL modes: confirmation (approve/reject before execution), user input (collect parameters before execution), and output review (review/edit/reject after execution).
All HITL settings are configured via HumanReview.
Confirmation
Pause before executing a step. The user confirms to proceed or rejects to skip/cancel.
from agno.workflow import Workflow, OnReject
from agno.workflow.step import Step
from agno.workflow.types import HumanReview
from agno.db.sqlite import SqliteDb
workflow = Workflow(
name="data_pipeline",
db=SqliteDb(db_file="workflow.db"),
steps=[
Step(name="fetch_data", agent=fetch_agent),
Step(
name="process_data",
agent=process_agent,
human_review=HumanReview(
requires_confirmation=True,
confirmation_message="Process sensitive data?",
on_reject=OnReject.skip,
),
),
Step(name="save_results", agent=save_agent),
],
)
run_output = workflow.run("Process user data")
if run_output.is_paused:
for req in run_output.steps_requiring_confirmation:
print(f"Step: {req.step_name}")
print(f"Message: {req.confirmation_message}")
if input("Confirm? (y/n): ").lower() == "y":
req.confirm()
else:
req.reject()
run_output = workflow.continue_run(run_output)
print(run_output.content)
Parameters
| Field | Type | Description |
|---|
requires_confirmation | bool | Pause for user confirmation before execution |
confirmation_message | str | Message shown to the user |
on_reject | OnReject | Action when rejected: skip (default), cancel |
OnReject Options
| Value | Behavior |
|---|
OnReject.skip | Skip this step and continue with the next (default) |
OnReject.cancel | Cancel the entire workflow |
OnReject.retry | Re-execute the step. Used with output review |
Collect parameters from the user before step execution. Input values are passed to the step via step_input.additional_data["user_input"].
from agno.workflow import Workflow
from agno.workflow.step import Step
from agno.workflow.types import HumanReview, StepInput, StepOutput, UserInputField
from agno.db.sqlite import SqliteDb
def process_with_params(step_input: StepInput) -> StepOutput:
user_input = step_input.additional_data.get("user_input", {})
threshold = user_input.get("threshold", 0.5)
mode = user_input.get("mode", "fast")
return StepOutput(content=f"Processed with threshold={threshold}, mode={mode}")
workflow = Workflow(
name="configurable_pipeline",
db=SqliteDb(db_file="workflow.db"),
steps=[
Step(name="analyze", agent=analyze_agent),
Step(
name="process",
executor=process_with_params,
human_review=HumanReview(
requires_user_input=True,
user_input_message="Configure processing:",
user_input_schema=[
UserInputField(
name="threshold",
field_type="float",
description="Processing threshold (0.0-1.0)",
required=True,
),
UserInputField(
name="mode",
field_type="str",
description="Mode: 'fast' or 'accurate'",
required=True,
),
UserInputField(
name="batch_size",
field_type="int",
description="Records per batch",
required=False,
),
],
),
),
Step(name="report", agent=report_agent),
],
)
run_output = workflow.run("Process Q4 data")
if run_output.is_paused:
for req in run_output.steps_requiring_user_input:
print(f"Step: {req.step_name}")
print(f"Message: {req.user_input_message}")
values = {}
for field in req.user_input_schema:
marker = "*" if field.required else ""
prompt = f"{field.name}{marker} ({field.field_type}): "
value = input(prompt)
if value:
if field.field_type == "int":
values[field.name] = int(value)
elif field.field_type == "float":
values[field.name] = float(value)
elif field.field_type == "bool":
values[field.name] = value.lower() in ("true", "yes", "1")
else:
values[field.name] = value
req.set_user_input(**values)
run_output = workflow.continue_run(run_output)
print(run_output.content)
Parameters
| Field | Type | Description |
|---|
requires_user_input | bool | Pause to collect user input before execution |
user_input_message | str | Message shown to the user |
user_input_schema | List[UserInputField] | Schema defining expected input fields |
| Field | Type | Description |
|---|
name | str | Field name (key in user input dict) |
field_type | str | Type: str, int, float, bool |
description | str | Description shown to user |
required | bool | Whether field is required (default: True) |
allowed_values | List[Any] | Optional list of valid values |
User input is available in the step function via step_input.additional_data["user_input"]:
def my_step(step_input: StepInput) -> StepOutput:
user_input = step_input.additional_data.get("user_input", {})
threshold = user_input.get("threshold")
mode = user_input.get("mode")
return StepOutput(content=f"Done with {threshold}, {mode}")
For agent-based steps, user input is automatically appended to the message.
Output Review
Pause after a step executes so the user can review the output before the workflow continues. If the user rejects, the step re-executes (up to max_retries). See the dedicated Output Review page for full details including editing output and reject with feedback.
from agno.workflow import Workflow, OnReject
from agno.workflow.step import Step
from agno.workflow.types import HumanReview
from agno.db.sqlite import SqliteDb
workflow = Workflow(
name="content_pipeline",
db=SqliteDb(db_file="workflow.db"),
steps=[
Step(name="research", agent=research_agent),
Step(
name="draft",
agent=draft_agent,
human_review=HumanReview(
requires_output_review=True,
output_review_message="Review the draft. Approve to continue or reject to regenerate.",
on_reject=OnReject.retry,
max_retries=3,
),
),
Step(name="publish", agent=publish_agent),
],
)
run_output = workflow.run("Write a blog post about AI agents")
while run_output.is_paused:
for req in run_output.steps_requiring_output_review:
print(f"Output: {req.step_output.content}")
print(f"Message: {req.output_review_message}")
if input("Approve? (y/n): ").lower() == "y":
req.confirm()
else:
feedback = input("Feedback (optional): ").strip()
if feedback:
req.reject(feedback=feedback)
else:
req.reject()
run_output = workflow.continue_run(run_output)
print(run_output.content)
Parameters
| Field | Type | Description |
|---|
requires_output_review | bool | Pause after execution for user review |
output_review_message | str | Message shown during review |
on_reject | OnReject | Action when rejected. Use OnReject.retry to re-execute |
max_retries | int | Max re-executions on rejection (default: 3) |
Timeout
Set a deadline for the user to respond. If it expires, on_timeout fires automatically. See the dedicated Timeout page for full details.
from agno.workflow import OnTimeout
Step(
name="review_report",
agent=report_agent,
human_review=HumanReview(
requires_output_review=True,
output_review_message="Review the report.",
on_reject=OnReject.retry,
max_retries=2,
timeout=120, # 2 minutes
on_timeout=OnTimeout.approve,
),
)
OnTimeout Value | Behavior |
|---|
OnTimeout.approve | Auto-approve and continue |
OnTimeout.reject | Auto-reject (triggers retry if on_reject=OnReject.retry) |
OnTimeout.cancel | Cancel the workflow |
The @pause Decorator
Use the @pause decorator to configure HITL directly on functions:
from agno.workflow.decorators import pause
from agno.workflow.types import StepInput, StepOutput, UserInputField
@pause(
requires_confirmation=True,
confirmation_message="Execute this step?",
)
def step_with_confirmation(step_input: StepInput) -> StepOutput:
return StepOutput(content="Executed after confirmation")
@pause(
requires_user_input=True,
user_input_message="Enter parameters:",
user_input_schema=[
UserInputField(name="value", field_type="str", required=True),
],
)
def step_with_input(step_input: StepInput) -> StepOutput:
value = step_input.additional_data["user_input"]["value"]
return StepOutput(content=f"Received: {value}")
# Decorator config is auto-detected when used in a Step
workflow = Workflow(
steps=[
Step(name="confirm_step", executor=step_with_confirmation),
Step(name="input_step", executor=step_with_input),
],
...
)
Streaming
Handle HITL in streaming workflows:
from agno.run.workflow import StepPausedEvent
for event in workflow.run("input", stream=True, stream_events=True):
if isinstance(event, StepPausedEvent):
print(f"Paused at: {event.step_name}")
session = workflow.get_session()
run_output = session.runs[-1]
while run_output.is_paused:
for req in run_output.steps_requiring_confirmation:
req.confirm()
for event in workflow.continue_run(run_output, stream=True, stream_events=True):
pass
session = workflow.get_session()
run_output = session.runs[-1]
Developer Resources