< Back to Knowledgebase

LangGraph

Updated on 03 December 2024 | about 1 month ago | First published on 19 November 2024

Resources

LangGraph Architecture

When a graph is compiled, that graph implements the LangChain Runnable interface.

Streaming Outputs

📖 Documentation: Streaming

📖 API reference: Graphs

📖 LangChain’s streaming documentation: Streaming | 🦜️🔗 LangChain

Streaming outputs helps to make your application feel responsive and contributes towards a better user experience (UX).

With a LangGraph graph, you can stream:

  • Graph outputs
  • LLM tokens

Streaming graph outputs

Use .stream() or .astream()

Any object that has implemented LangChain Runnable will have .stream() and .astream() methods.

Determine what to stream with streaming_mode parameter. Can define multiple modes at once.

g.astream(stream_mode="[...]")

Available modes:

  • values: full value of state after each step in the graph
  • updates: updates to the state after each step in the graph
  • custom: custom data from inside nodes
  • messages: streams LLM tokens in the node that invokes an LLM
  • debug: stream as much info as possible

Streaming of only LLM messages (adapted from this example)

from langchain_core.messages import AIMessageChunk, HumanMessage
from langgraph.graph.state import CompiledStateGraph

g:CompiledStateGraph = get_graph()

def run_batch(q:str):
	"""How a batch (i.e. non-streaming) invocation is usually executed"""
	response = g.invoke(
			{"query": q},
			{"configurable": {"thread_id": thread_id}},
			debug=True,
		)
	
	return response

async def run_streaming(g: CompiledStateGraph, q: str):
    async for msg, metadata in g.astream(
        stream_mode="messages",
        input={"query": q},
        config={"configurable": {"thread_id": thread_id}},
    ):
        if msg.content and not isinstance(msg, HumanMessage):
            print(msg.content, flush=True)

Watch out for: streaming and LangGraph State

Using the above method, if there is previous messages in the state, those messages will also get yielded at the end, leaving a response that will be new response + previous responses. This is confusing for the user.

To overcome this, use event-based filtering and focus on the event on_chat_model_stream.

async def run_streaming(g: CompiledStateGraph, input: dict, debug: bool = False)
	async for event in g.astream_events(
		input=input, config=config, version="v2", debug=debug
	):
		kind = event["event"]
		if kind == "on_chat_model_stream":
			data = event["data"]
			if content := data["chunk"].content:
				yield content

Streaming LLM tokens & events

Use .astream_events()

Examples:

  • on_llm_start
  • on_llm_stream
  • on_llm_end

Each event consists of at least

{
 "event": #event type
 "name": #name of event
 "data": #event data
}
async for event in g.astream_events({"messages": inputs}):
    kind = event["event"]
    print(f"{kind}: {event['name']}")
#langgraph
#langchain
#langsmith
#genai
#llm