LangGraph
Resources
LangGraph Architecture
When a graph is compiled, that graph implements the LangChain Runnable
interface.
Streaming Outputs
📖 Documentation: Streaming
📖 API reference: Graphs
📖 LangChain’s streaming documentation: Streaming | 🦜️🔗 LangChain
Streaming outputs helps to make your application feel responsive and contributes towards a better user experience (UX).
With a LangGraph graph, you can stream:
- Graph outputs
- LLM tokens
Streaming graph outputs
Use .stream()
or .astream()
Any object that has implemented LangChain Runnable
will have .stream()
and .astream()
methods.
Determine what to stream with streaming_mode
parameter. Can define multiple modes at once.
g.astream(stream_mode="[...]")
Available modes:
values
: full value of state after each step in the graphupdates
: updates to the state after each step in the graphcustom
: custom data from inside nodesmessages
: streams LLM tokens in the node that invokes an LLMdebug
: stream as much info as possible
Streaming of only LLM messages (adapted from this example)
from langchain_core.messages import AIMessageChunk, HumanMessage
from langgraph.graph.state import CompiledStateGraph
g:CompiledStateGraph = get_graph()
def run_batch(q:str):
"""How a batch (i.e. non-streaming) invocation is usually executed"""
response = g.invoke(
{"query": q},
{"configurable": {"thread_id": thread_id}},
debug=True,
)
return response
async def run_streaming(g: CompiledStateGraph, q: str):
async for msg, metadata in g.astream(
stream_mode="messages",
input={"query": q},
config={"configurable": {"thread_id": thread_id}},
):
if msg.content and not isinstance(msg, HumanMessage):
print(msg.content, flush=True)
Watch out for: streaming and LangGraph State
Using the above method, if there is previous messages in the state, those messages will also get yielded at the end, leaving a response that will be new response + previous responses
. This is confusing for the user.
To overcome this, use event-based filtering and focus on the event on_chat_model_stream
.
async def run_streaming(g: CompiledStateGraph, input: dict, debug: bool = False)
async for event in g.astream_events(
input=input, config=config, version="v2", debug=debug
):
kind = event["event"]
if kind == "on_chat_model_stream":
data = event["data"]
if content := data["chunk"].content:
yield content
Streaming LLM tokens & events
Use .astream_events()
astream_events()
comes from LangChain. Most classes implement theastream_events()
method- List of possible events: How to stream runnables | 🦜️🔗 LangChain
Examples:
on_llm_start
on_llm_stream
on_llm_end
Each event consists of at least
{
"event": #event type
"name": #name of event
"data": #event data
}
async for event in g.astream_events({"messages": inputs}):
kind = event["event"]
print(f"{kind}: {event['name']}")