Reactive / Proactive Agent

Demo tecnica de un agente que responde en el turno actual mientras las operaciones HTTP siguen trabajando en background. El chat visible no se bloquea y el cierre vuelve una sola vez cuando el lote termina.

Demo reproducible

Donde verlo bien: esta pagina de GitHub Pages es la documentacion principal. El README del repositorio queda como portada corta para llegar aqui; aqui se ve el video y todo el contenido tecnico en una sola pagina.

Que ocurre en una interaccion

Lo que ve el usuario Lo que hace el runtime
"Te aviso al cerrar." Registra la tool call y reserva una operacion.
El chat sigue libre. Lanza la HTTP en un worker en background.
Puede hacer otra pregunta. Mantiene contexto operativo sin ensuciar el chat.
Recibe un unico "Listo" al final. Agrega resultados y decide proactive vs next-turn.
flowchart LR
    U["Usuario"] --> M["LLM: responde o llama tool"]
    M -->|Sin tool| A["Respuesta normal"]
    M -->|Con tool| R["Respuesta inmediata de aceptacion"]
    R --> B["BackgroundTaskRunner"]
    B --> S["RuntimeState acumula pending updates"]
    S --> D{"¿El lote ya termino y la sesion esta libre?"}
    D -->|Si| P["Follow-up proactivo unico"]
    D -->|No| N["Resincronizacion en el siguiente turno"]
        

Que problema resuelve

Muchas demos de agentes hacen una de estas dos cosas: bloquean la respuesta del assistant hasta que termina la integracion tecnica, o mezclan mensajes internos, resultados parciales y ruido operativo dentro del chat visible.

Este repo ensena una tercera via: respuesta reactiva inmediata, trabajo tecnico desacoplado del turno visible, estado operativo fuera del hilo de chat, y cierre agregado y deterministicamente controlado.

Respuesta inmediata

El assistant acepta la solicitud en el turno actual sin esperar a que termine la integracion tecnica.

Trabajo desacoplado

La operacion HTTP se ejecuta fuera del hilo visible y mantiene estado operativo independiente.

Cierre unico

Los resultados se acumulan y vuelven como un unico cierre proactivo o se resincronizan en el siguiente turno.

Piezas reutilizables

La UI sirve para hacer visible el comportamiento, pero la parte reutilizable del repo esta en el esqueleto para declarar tools HTTP asincronas con decoradores y enchufarlas a un runtime conversacional estable.

Pieza Para que sirve Si quieres reaprovechar este repo
tools/async_http.py Decorador @async_http_tool, metadata y registro de tools. Empieza aqui.
execution/background_tasks.py Worker loop, concurrencia, retries y callback de fin. Reutiliza tal cual.
runtime/conversation.py Contrato reactivo/proactivo y decision de entrega final. Reutiliza tal cual.
state/runtime_state.py Estado por sesion, operaciones, pending updates y trazas. Reutiliza tal cual.
tools/access_tool.py Ejemplo real de una tool batch declarada con decorador. Copia este patron.
adapters.py Adaptador real de Responses y adaptador mock. Reutiliza segun tu proveedor.

Contrato activo

El usuario pide una operacion que puede requerir una tool HTTP asincrona.
El assistant responde en el turno actual y reserva la operacion.
El worker ejecuta la HTTP en background con retries configurados.
El runtime acumula resultados en pending_updates_by_session.
Cuando el lote drena, emite un follow-up proactivo si la sesion esta libre.
Si la sesion esta ocupada, resincroniza el cierre como contexto efimero en el siguiente turno.

Reglas protegidas

Maquina de estados

stateDiagram-v2
    [*] --> accepted
    accepted --> running
    running --> retrying
    retrying --> running
    running --> queued_for_sync
    running --> failed
    queued_for_sync --> proactively_notified
    queued_for_sync --> consumed_next_turn
        

Estados observables por operacion

Modos de entrega

Resumen en UI

Arquitectura

flowchart TB
    UI["demo_page.py + demo_server.py"] --> RT["ConversationRuntime"]
    RT --> AD["Responses adapter<br/>real o mock"]
    RT --> REG["ToolRegistry"]
    REG --> TOOL["Tool declarada con @async_http_tool"]
    RT --> ST["RuntimeState"]
    RT --> BG["BackgroundTaskRunner"]
    BG --> HTTP["HTTP externa o mock service"]
    BG --> ST
    ST --> RT
    RT --> UI
        

Secuencia observable

sequenceDiagram
    autonumber
    actor U as Usuario
    participant UI as Demo UI
    participant RT as ConversationRuntime
    participant LLM as Responses adapter
    participant BG as BackgroundTaskRunner
    participant API as HTTP externa
    participant ST as RuntimeState

    U->>UI: "Dale acceso a ana@example.com..."
    UI->>RT: handle_user_turn(...)
    RT->>LLM: generate_turn(tools=provider_definitions)
    LLM-->>RT: tool_call provision_access_async
    RT->>ST: accept_operation(...)
    RT->>BG: submit_item(...)
    RT-->>UI: "Te aviso al cerrar."
    BG->>API: POST /mock/access/provision
    API-->>BG: success / retry / error final
    BG->>ST: finish_processing(...)
    ST-->>RT: hay pending updates
    alt lote drenado y sesion libre
        RT-->>UI: follow-up proactivo unico
    else sesion ocupada
        RT->>ST: conservar pending updates
        U->>UI: siguiente mensaje
        UI->>RT: nuevo turno
        RT->>RT: inyectar contexto efimero
        RT-->>UI: respuesta con resincronizacion natural
    end
        
Pieza Rol
tools/async_http.py Decorador @async_http_tool, metadata y registro de tools.
execution/background_tasks.py Worker loop, concurrencia, retries y callback de fin.
runtime/conversation.py Contrato reactivo/proactivo y decision de entrega final.
state/runtime_state.py Estado por sesion, operaciones, pending updates y trazas.
tools/access_tool.py Ejemplo real de una tool batch declarada con decorador.
adapters.py Adaptador real de Responses y adaptador mock.

Patron con decoradores

La tool no conoce el chat, los locks de turno, el agregado final ni la politica proactive/next-turn. Solo declara su contrato de entrada, construye el RequestSpec y define mensajes de aceptacion, exito y error.

@async_http_tool(
    name="provision_access_async",
    description="Provisiona accesos a aplicaciones corporativas en segundo plano.",
    schema={...},
    batch_field="accesses",
    acceptance_message=_batch_acceptance_message,
    success_message=lambda item, outcome: (
        f"el acceso de {item['user_email']} a {item['application']} con rol '{item['role']}' ya esta activo."
    ),
    error_message=lambda item, outcome: (
        f"no he podido activar el acceso de {item['user_email']} a {item['application']}: {outcome.user_message}"
    ),
    max_attempts=config.max_retries,
    backoff_seconds=config.retry_backoff_seconds,
)
def build_access_request(item: dict) -> RequestSpec:
    return RequestSpec(...)

Como crear otra tool sin tocar el runtime

  1. Crear una nueva funcion decorada con @async_http_tool.
  2. Definir su schema, mensajes y politica de retry.
  3. Construir el RequestSpec.
  4. Registrarla en ToolRegistry.
  5. Dejar intactos ConversationRuntime, BackgroundTaskRunner y RuntimeState.

Ejemplo minimo para una tool de tickets

from config import AppConfig
from domain import RequestSpec
from tools.async_http import ToolRegistry, async_http_tool


def register_ticket_tool(config: AppConfig, registry: ToolRegistry) -> None:
    @async_http_tool(
        name="create_ticket_async",
        description="Crea tickets IT en background y avisa al usuario al cerrar.",
        schema={
            "type": "object",
            "properties": {
                "title": {"type": "string"},
                "priority": {"type": "string"},
            },
            "required": ["title", "priority"],
        },
        acceptance_message=lambda args: (
            f"Creo el ticket '{args['title']}' con prioridad '{args['priority']}'. Te aviso al cerrar."
        ),
        success_message=lambda args, outcome: (
            f"el ticket '{args['title']}' ya esta creado."
        ),
        error_message=lambda args, outcome: (
            f"no he podido crear el ticket '{args['title']}': {outcome.user_message}"
        ),
        validation_error_message=lambda missing: (
            "Para crear el ticket necesito el titulo y la prioridad."
        ),
        max_attempts=2,
        backoff_seconds=0.5,
    )
    def build_ticket_request(args: dict) -> RequestSpec:
        return RequestSpec(
            method="POST",
            url=config.ticket_api_url,
            json_body={
                "title": args["title"],
                "priority": args["priority"],
            },
            headers={"Content-Type": "application/json"},
            timeout_seconds=3.0,
        )

    registry.register(build_ticket_request)

Cuando usar batch_field

Usa batch_field cuando una sola tool call del modelo deba representar varias operaciones tecnicas independientes.

Que cambia y que no

Cambias No cambias
name, description, schema Locking de turnos
Construccion del RequestSpec Tracking de inflight_count
Mensajes de aceptacion y cierre Politica proactive vs next-turn
max_attempts, backoff_seconds y batch_field Agregado de pending_updates, snapshots y trazas

Quickstart

Desde cero:

python3 -m venv .venv
. .venv/bin/activate
pip install -r requirements.txt
./run_demo.sh

La demo local queda en http://127.0.0.1:8010. Si no hay configuracion real del modelo, arranca igualmente en modo mock.

Modos de ejecucion y configuracion

Plug-and-play

No requiere credenciales:

./run_demo.sh

Mock explicito

DEMO_MODEL_MODE=mock ./run_demo.sh

Real

DEMO_MODEL_MODE=real ./run_demo.sh

Configuracion del modelo real

El runtime soporta estos caminos:

  1. RESPONSES_CONFIG_PATH=/ruta/a/config.toml
  2. .local.responses.toml en la raiz del repo.
  3. Variables RESPONSES_*.
  4. OPENAI_API_KEY o AZURE_OPENAI_API_KEY junto con RESPONSES_BASE_URL.

Opcion A: archivo TOML

cp config.example.toml ./.local.responses.toml
export RESPONSES_CONFIG_PATH=$PWD/.local.responses.toml
DEMO_MODEL_MODE=real ./run_demo.sh

Opcion B: OpenAI por variables

export DEMO_MODEL_MODE=real
export RESPONSES_PROVIDER=openai
export RESPONSES_BASE_URL=https://api.openai.com/v1
export OPENAI_API_KEY=tu_api_key
export RESPONSES_MODEL=gpt-5-mini
./run_demo.sh

Opcion C: Azure por variables

export DEMO_MODEL_MODE=real
export RESPONSES_PROVIDER=azure
export RESPONSES_BASE_URL=https://TU-RECURSO.openai.azure.com/openai/v1
export AZURE_OPENAI_API_KEY=tu_api_key
export RESPONSES_MODEL=gpt-5-mini
./run_demo.sh

Si prefieres una variable neutral, tambien puedes usar RESPONSES_API_KEY.

Que probar en la demo

Que deberias observar:

  1. La aceptacion llega antes de que termine la HTTP.
  2. El panel interno se mueve mientras el chat sigue libre.
  3. Puedes hacer otra pregunta mientras quedan operaciones en vuelo.
  4. El lote emite un unico cierre final.

Validacion

Suite principal

python3 -m unittest discover -s tests

Checks dirigidos de servidor y UI

python3 -m unittest tests.test_demo_server tests.test_demo_view

Smoke directo

python3 -m demo_server

E2E contra la demo viva

python3 test_e2e.py

Estructura del repo

Archivo o modulo Rol
agent.pyEnsamblaje del agente y wiring del runtime.
adapters.pyAdaptador real de Responses y adaptador mock.
tools/async_http.pyDecorador reusable y ToolRegistry.
tools/access_tool.pyTool real de ejemplo basada en el decorador.
runtime/conversation.pyOrquestacion reactiva/proactiva.
execution/background_tasks.pyWorkers async, retries y callback de fin.
state/runtime_state.pyEstado por sesion, operaciones y pending updates.
demo_server.pyBackend HTTP local.
demo_page.pyUI inline de la demo.
test_e2e.pyFlujo observable end-to-end.
tests/Contratos puntuales del runtime, servidor y UI.

Limites

Donde mirar primero si quieres reutilizarlo

  1. tools/async_http.py
  2. tools/access_tool.py
  3. execution/background_tasks.py
  4. runtime/conversation.py
  5. state/runtime_state.py
  6. test_e2e.py

Si despues de leer esas seis piezas aun hay algo importante que solo se entiende mirando detalles secundarios, entonces esta documentacion todavia se puede mejorar.