Building a Research Assistant¶
This guide builds a research assistant that accumulates documents in a ContextPool and answers questions by querying the pool with the LLM — presenting lightweight descriptions to decide which items are relevant, then injecting only those contents into the answer prompt.
It covers all five pygents abstractions working together in one coherent example:
| Pattern | Where |
|---|---|
Tool-driven flow — tools return Turn objects |
think, select_context |
ContextQueue for conversation memory |
All reading tools via context injection |
ContextPool for document accumulation |
fetch_document returns items; the agent stores them |
| Context injection — typed parameters, no manual wiring | pool: ContextPool, memory: ContextQueue in every tool |
| LLM-driven selective retrieval | select_context sends descriptions only; answer reads content |
The code blocks below are meant to be combined into one script.
Full implementation
import asyncio
import logging
from py_ai_toolkit import PyAIToolkit
from py_ai_toolkit.core.domain.interfaces import LLMConfig
from pydantic import BaseModel
from pygents import Agent, ContextPool, ContextQueue, Turn, tool
from pygents.context import ContextItem
logger = logging.getLogger(__name__)
toolkit = PyAIToolkit(main_model_config=LLMConfig())
DOCUMENTS = {
"q3-earnings": {
"description": "Q3 earnings report — revenue $42M (+18% YoY), gross margin 68%, guidance raised",
"body": "Revenue: $42M. Gross margin: 68%. Net income: $8.2M. "
"Guidance for Q4: $46–48M. Key driver: enterprise tier growth (+34%).",
},
"product-roadmap": {
"description": "2025 product roadmap — three new integrations, mobile app beta, AI features",
"body": "Q1: Slack and Jira integrations. Q2: Mobile app public beta. "
"Q3: AI-assisted workflows. Q4: Enterprise audit logs.",
},
"incident-2024-11": {
"description": "Incident report Nov 2024 — 47-minute outage, root cause: DB connection pool exhaustion",
"body": "Duration: 47 min. Impact: 12% of API requests failed. "
"Root cause: connection pool limit hit during traffic spike. "
"Fix: pool size doubled; circuit breaker added.",
},
"hiring-plan": {
"description": "2025 hiring plan — 8 engineers, 3 sales, 2 support; budget $3.2M",
"body": "Engineering: 4 backend, 2 frontend, 1 ML, 1 DevOps. "
"Sales: 2 AEs, 1 SDR. Support: 2 CSMs. Total budget: $3.2M.",
},
"security-audit": {
"description": "Security audit results — SOC 2 Type II passed, 3 low-severity findings",
"body": "Result: SOC 2 Type II certified. Findings: (1) session tokens not rotated on role change — fixed; "
"(2) verbose error messages in staging — fixed; (3) MFA not enforced for API keys — in progress.",
},
}
class ContextSelection(BaseModel):
"""IDs of the pooled items directly relevant to the question."""
relevant_ids: list[str]
@tool()
async def fetch_document(doc_id: str) -> ContextItem:
"""Fetch a document. The agent stores it in the context pool."""
doc = DOCUMENTS.get(doc_id)
if doc is None:
raise ValueError(f"Unknown document: {doc_id!r}")
return ContextItem(
id=doc_id,
description=doc["description"],
content=doc["body"],
)
def latest_user_message(memory: ContextQueue) -> str:
for item in reversed(memory.items):
if isinstance(item.content, str) and item.content.startswith("User:"):
return item.content.removeprefix("User:").strip()
return ""
@tool()
async def think(pool: ContextPool, memory: ContextQueue) -> Turn:
"""Check if the pool has context; route to select_context or answer directly."""
if not pool:
return Turn(answer, kwargs={"relevant_ids": []})
return Turn(select_context)
@think.before_invoke
async def log_pool_state(**kwargs) -> None:
pool = kwargs.get("pool")
if pool:
logger.debug("think: pool has %d items\n%s", len(pool), pool.catalogue())
@tool()
async def select_context(pool: ContextPool, memory: ContextQueue) -> Turn:
"""Send descriptions to the LLM, collect relevant ids, hand off to answer."""
question = latest_user_message(memory)
response = await toolkit.asend(
response_model=ContextSelection,
template=(
"Question: {{ question }}\n\n"
"Available context items (id + one-line description only):\n{{ catalogue }}\n\n"
"Return only the IDs of items directly relevant to answering this question."
),
question=question,
catalogue=pool.catalogue(),
)
return Turn(answer, kwargs={"relevant_ids": response.content.relevant_ids})
@tool()
async def answer(pool: ContextPool, memory: ContextQueue, relevant_ids: list[str]):
"""Stream the final answer from selected pool content."""
question = latest_user_message(memory)
pool_ids = {item.id for item in pool.items}
selected = [pool.get(id) for id in relevant_ids if id in pool_ids]
if selected:
context_block = "\n\n".join(
f"[{item.id}] {item.description}\n{item.content}"
for item in selected
)
else:
context_block = "(no relevant context found)"
full_text = ""
async for response in toolkit.stream(
prompt=(
"Answer this question using only the context below. "
"Be concise and cite the source id.\n\n"
"Question: {{ question }}\n\n"
"Context:\n{{ context_block }}"
),
question=question,
context_block=context_block,
):
full_text += response.content
yield response.content
yield ContextItem(content=f"Assistant: {full_text}") # id=None → auto-appended to context_queue
memory = ContextQueue(limit=30)
@memory.after_append
async def log_memory(queue, appended_items, current) -> None:
logger.debug("ContextQueue: %d items", len(current))
agent = Agent(
"researcher",
"Answers questions from a document pool",
[fetch_document, think, select_context, answer],
context_queue=memory,
context_pool=ContextPool(limit=50),
)
async def ask(question: str, doc_ids: list[str]) -> None:
await agent.context_queue.append(ContextItem(content=f"User: {question}"))
for doc_id in doc_ids:
await agent.put(Turn(fetch_document, kwargs={"doc_id": doc_id}))
await agent.put(Turn(think))
async for turn, value in agent.run():
if isinstance(value, str):
print(value, end="", flush=True)
print()
asyncio.run(ask(
question="What caused the November outage and has it been fixed?",
doc_ids=list(DOCUMENTS.keys()),
))
Install¶
Setup¶
Configure the LLM:
import logging
from py_ai_toolkit import PyAIToolkit
from py_ai_toolkit.core.domain.interfaces import LLMConfig
logger = logging.getLogger(__name__)
toolkit = PyAIToolkit(main_model_config=LLMConfig())
LLM configuration
LLMConfig() with no arguments reads from the environment. Set LLM_MODEL, LLM_API_KEY, and optionally LLM_BASE_URL (see py-ai-toolkit Getting Started).
Document store¶
In production this would call an API, database, or vector store. For this example, documents live in a plain dict.
DOCUMENTS = {
"q3-earnings": {
"description": "Q3 earnings report — revenue $42M (+18% YoY), gross margin 68%, guidance raised",
"body": "Revenue: $42M. Gross margin: 68%. Net income: $8.2M. "
"Guidance for Q4: $46–48M. Key driver: enterprise tier growth (+34%).",
},
"product-roadmap": {
"description": "2025 product roadmap — three new integrations, mobile app beta, AI features",
"body": "Q1: Slack and Jira integrations. Q2: Mobile app public beta. "
"Q3: AI-assisted workflows. Q4: Enterprise audit logs.",
},
"incident-2024-11": {
"description": "Incident report Nov 2024 — 47-minute outage, root cause: DB connection pool exhaustion",
"body": "Duration: 47 min. Impact: 12% of API requests failed. "
"Root cause: connection pool limit hit during traffic spike. "
"Fix: pool size doubled; circuit breaker added.",
},
"hiring-plan": {
"description": "2025 hiring plan — 8 engineers, 3 sales, 2 support; budget $3.2M",
"body": "Engineering: 4 backend, 2 frontend, 1 ML, 1 DevOps. "
"Sales: 2 AEs, 1 SDR. Support: 2 CSMs. Total budget: $3.2M.",
},
"security-audit": {
"description": "Security audit results — SOC 2 Type II passed, 3 low-severity findings",
"body": "Result: SOC 2 Type II certified. Findings: (1) session tokens not rotated on role change — fixed; "
"(2) verbose error messages in staging — fixed; (3) MFA not enforced for API keys — in progress.",
},
}
Response models¶
from pydantic import BaseModel
class ContextSelection(BaseModel):
"""IDs of the pooled items directly relevant to the question."""
relevant_ids: list[str]
The fetch_document tool¶
Returns a ContextItem. The tool has no knowledge of — or interaction with — the pool. The agent stores the item automatically when the turn completes.
from pygents import tool
from pygents.context import ContextItem
@tool()
async def fetch_document(doc_id: str) -> ContextItem:
"""Fetch a document. The agent stores it in the context pool."""
doc = DOCUMENTS.get(doc_id)
if doc is None:
raise ValueError(f"Unknown document: {doc_id!r}")
return ContextItem(
id=doc_id,
description=doc["description"],
content=doc["body"],
)
Streaming ingestion with async generators
If you need to fetch many documents in one turn, use an async generator. Each yielded ContextItem is stored immediately as it's produced — the pool is populated incrementally rather than all at once:
The think tool¶
think receives the pool and memory via context injection — the agent provides its own instances automatically. It checks whether the pool has anything and routes accordingly. No LLM call, no writes.
from pygents import ContextPool, ContextQueue, Turn
@tool()
async def think(pool: ContextPool, memory: ContextQueue) -> Turn:
"""Check if the pool has context; route to select_context or answer directly."""
if not pool:
return Turn(answer, kwargs={"relevant_ids": []})
return Turn(select_context)
@think.before_invoke
async def log_pool_state(**kwargs) -> None:
pool = kwargs.get("pool")
if pool:
logger.debug("think: pool has %d items\n%s", len(pool), pool.catalogue())
The logging concern lives in the hook, not in the tool body — think only routes. When a tool returns a Turn, the agent enqueues it and runs it next — that is how the chain think → select_context → answer forms without any external orchestration.
The select_context tool¶
Sends only id + description lines to the LLM — never content. The LLM returns the relevant ids; those are forwarded to answer as the only explicitly passed kwarg.
@tool()
async def select_context(pool: ContextPool, memory: ContextQueue) -> Turn:
"""Send descriptions to the LLM, collect relevant ids, hand off to answer."""
question = latest_user_message(memory)
response = await toolkit.asend(
response_model=ContextSelection,
template=(
"Question: {{ question }}\n\n"
"Available context items (id + one-line description only):\n{{ catalogue }}\n\n"
"Return only the IDs of items directly relevant to answering this question."
),
question=question,
catalogue=pool.catalogue(),
)
return Turn(answer, kwargs={"relevant_ids": response.content.relevant_ids})
The answer tool¶
pool and memory arrive via injection; relevant_ids is passed explicitly because it is computed by select_context, not provided by the agent. The tool is an async generator — it streams each chunk to the caller, then yields a ContextItem at the end. The agent sees the ContextItem (with id=None) and appends it to the context queue automatically.
@tool()
async def answer(pool: ContextPool, memory: ContextQueue, relevant_ids: list[str]):
"""Stream the final answer from selected pool content."""
question = latest_user_message(memory)
pool_ids = {item.id for item in pool.items}
selected = [pool.get(id) for id in relevant_ids if id in pool_ids]
if selected:
context_block = "\n\n".join(
f"[{item.id}] {item.description}\n{item.content}"
for item in selected
)
else:
context_block = "(no relevant context found)"
full_text = ""
async for response in toolkit.stream(
prompt=(
"Answer this question using only the context below. "
"Be concise and cite the source id.\n\n"
"Question: {{ question }}\n\n"
"Context:\n{{ context_block }}"
),
question=question,
context_block=context_block,
):
full_text += response.content
yield response.content
yield ContextItem(content=f"Assistant: {full_text}") # id=None → auto-appended to context_queue
Helper¶
def latest_user_message(memory: ContextQueue) -> str:
for item in reversed(memory.items):
if isinstance(item.content, str) and item.content.startswith("User:"):
return item.content.removeprefix("User:").strip()
return ""
Putting it together¶
Pass a ContextQueue and ContextPool directly to the agent — this is what makes context injection work. When think, select_context, and answer declare pool: ContextPool or memory: ContextQueue, the agent provides these exact instances automatically.
import asyncio
from pygents import Agent, ContextPool, ContextQueue, Turn
memory = ContextQueue(limit=30)
@memory.after_append
async def log_memory(queue, appended_items, current) -> None:
logger.debug("ContextQueue: %d items", len(current))
agent = Agent(
"researcher",
"Answers questions from a document pool",
[fetch_document, think, select_context, answer],
context_queue=memory,
context_pool=ContextPool(limit=50),
)
Append the user message first¶
Before enqueueing a think turn, append the user message to memory. Every tool reads context from there — no tool takes a raw message argument directly.
Append before enqueueing
If you call agent.put(Turn(think)) without appending a user message first, every tool in the chain will see stale or empty context.
async def ask(question: str, doc_ids: list[str]) -> None:
await agent.context_queue.append(ContextItem(content=f"User: {question}"))
# Pre-load documents — the agent stores each ContextItem automatically
for doc_id in doc_ids:
await agent.put(Turn(fetch_document, kwargs={"doc_id": doc_id}))
# pool and memory are injected; no kwargs needed on this turn
await agent.put(Turn(think))
async for turn, value in agent.run():
if isinstance(value, str):
print(value, end="", flush=True)
print()
asyncio.run(ask(
question="What caused the November outage and has it been fixed?",
doc_ids=list(DOCUMENTS.keys()),
))
Expected execution¶
- fetch_document × 5 — each returns a
ContextItem; agent stores all five in the pool - think — pool has 5 items; logs the catalogue; routes to
select_context - select_context — sends 5 descriptions (5 short lines) to the LLM; receives
["incident-2024-11"]; routes toanswer - answer — reads only
incident-2024-11content from the pool; streams reply chunks to the caller; yields aContextItemat the end which the agent auto-appends to the context queue
For "What is the Q3 revenue and are there any open security findings?", step 3 returns ["q3-earnings", "security-audit"] — two documents, not five. The other three bodies never leave the pool.
Why descriptions matter¶
| Approach | Tokens per prompt | Scales to 200 items? |
|---|---|---|
| Dump entire pool into every prompt | N × avg_content_size |
No — hits limits, high cost |
| Similarity search (vector DB) | Fixed retrieval window | Yes, but requires embedding infra |
| LLM-driven description query | N × avg_description_size for selection, then only selected content |
Yes — descriptions are tiny |
Descriptions are typically 1–2 sentences. A pool of 200 items with 20-word descriptions adds roughly 700 tokens to the selection prompt — well within any model's budget. The answer prompt receives only the 2–5 selected items' full content.
Extending ContextPool for external resources
ContextPool is designed to be subclassed. If your items live in an external store — a vector database, Redis, a relational table — you can override add, remove, clear, and get to proxy through that store while keeping the same interface that agents and tools expect. Pass your subclass instance to Agent via context_pool. branch() returns the correct subclass type automatically as long as your __init__ accepts the same limit and hooks keyword arguments; otherwise override branch() as well.
Summary¶
| Piece | Interaction | Role |
|---|---|---|
fetch_document |
Returns ContextItem → agent auto-stores in pool |
Produces items |
think |
Reads pool via injection — checks if populated, routes | Guards / routes |
select_context |
Reads pool descriptions via injection — LLM narrows to relevant ids | Selects |
answer |
Reads pool content via injection — only the selected ids; streams chunks | Injects and streams |
memory |
Appended to before first turn; tools read via injection; answer appends reply |
Conversation thread |
| Agent | Stores ContextItem outputs; injects memory and pool into tools |
Owns context |