Skip to main content

Overview

The Graphiti class is the main entry point for building and managing real-time, temporally-aware knowledge graphs. It provides methods for adding episodes, searching, building communities, and managing the graph structure.

Constructor

Graphiti(
    uri: str | None = None,
    user: str | None = None,
    password: str | None = None,
    llm_client: LLMClient | None = None,
    embedder: EmbedderClient | None = None,
    cross_encoder: CrossEncoderClient | None = None,
    store_raw_episode_content: bool = True,
    graph_driver: GraphDriver | None = None,
    max_coroutines: int | None = None,
    tracer: Tracer | None = None,
    trace_span_prefix: str = 'graphiti',
)
Initialize a Graphiti instance with database connection and client configurations.
uri
str | None
The URI of the Neo4j database. Required when graph_driver is None.
user
str | None
The username for authenticating with the Neo4j database.
password
str | None
The password for authenticating with the Neo4j database.
llm_client
LLMClient | None
An instance of LLMClient for natural language processing tasks. If not provided, a default OpenAIClient will be initialized.
embedder
EmbedderClient | None
An instance of EmbedderClient for embedding tasks. If not provided, a default OpenAIEmbedder will be initialized.
cross_encoder
CrossEncoderClient | None
An instance of CrossEncoderClient for reranking tasks. If not provided, a default OpenAIRerankerClient will be initialized.
store_raw_episode_content
bool
default:"True"
Whether to store the raw content of episodes.
graph_driver
GraphDriver | None
An instance of GraphDriver for database operations. If not provided, a default Neo4jDriver will be initialized.
max_coroutines
int | None
The maximum number of concurrent operations allowed. Overrides SEMAPHORE_LIMIT set in the environment. If not set, the Graphiti default is used.
tracer
Tracer | None
An OpenTelemetry tracer instance for distributed tracing. If not provided, tracing is disabled (no-op).
trace_span_prefix
str
default:"graphiti"
Prefix to prepend to all span names.

Example

from graphiti_core import Graphiti

graphiti = Graphiti(
    uri="bolt://localhost:7687",
    user="neo4j",
    password="password"
)

Methods

add_episode

async def add_episode(
    name: str,
    episode_body: str,
    source_description: str,
    reference_time: datetime,
    source: EpisodeType = EpisodeType.message,
    group_id: str | None = None,
    uuid: str | None = None,
    update_communities: bool = False,
    entity_types: dict[str, type[BaseModel]] | None = None,
    excluded_entity_types: list[str] | None = None,
    previous_episode_uuids: list[str] | None = None,
    edge_types: dict[str, type[BaseModel]] | None = None,
    edge_type_map: dict[tuple[str, str], list[str]] | None = None,
    custom_extraction_instructions: str | None = None,
    saga: str | SagaNode | None = None,
    saga_previous_episode_uuid: str | None = None,
) -> AddEpisodeResults
Process an episode and update the graph with extracted entities and relationships.
name
str
required
The name of the episode.
episode_body
str
required
The content of the episode.
source_description
str
required
A description of the episode’s source.
reference_time
datetime
required
The reference time for the episode.
source
EpisodeType
default:"EpisodeType.message"
The type of the episode. Options: EpisodeType.message, EpisodeType.json, EpisodeType.text.
group_id
str | None
An id for the graph partition the episode is a part of.
uuid
str | None
Optional uuid of the episode.
update_communities
bool
default:"False"
Whether to update communities with new node information.
entity_types
dict[str, type[BaseModel]] | None
Dictionary mapping entity type names to their Pydantic model definitions.
excluded_entity_types
list[str] | None
List of entity type names to exclude from the graph. Entities classified into these types will not be added to the graph. Can include ‘Entity’ to exclude the default entity type.
previous_episode_uuids
list[str] | None
List of episode uuids to use as the previous episodes. If not provided, the most recent episodes by created_at date will be used.
edge_types
dict[str, type[BaseModel]] | None
Dictionary mapping edge type names to their Pydantic model definitions.
edge_type_map
dict[tuple[str, str], list[str]] | None
Mapping of (source_type, target_type) tuples to allowed edge type names.
custom_extraction_instructions
str | None
Custom extraction instructions string to be included in the extract entities and extract edges prompts. This allows for additional instructions or context to guide the extraction process.
saga
str | SagaNode | None
Either a saga name (str) or a SagaNode object to associate this episode with. If a string is provided and a saga with this name already exists in the group, the episode will be added to it. Otherwise, a new saga will be created. Sagas are connected to episodes via HAS_EPISODE edges, and consecutive episodes are linked via NEXT_EPISODE edges.
saga_previous_episode_uuid
str | None
UUID of the previous episode in the saga. If provided, skips the database query to find the most recent episode. Useful for efficiently adding multiple episodes to the same saga in sequence. The returned AddEpisodeResults.episode.uuid can be passed as this parameter for the next episode.
episode
EpisodicNode
The created or updated episodic node.
episodic_edges
list[EpisodicEdge]
List of episodic edges connecting entities to the episode.
nodes
list[EntityNode]
List of entity nodes extracted from the episode.
edges
list[EntityEdge]
List of entity edges (relationships) extracted from the episode.
communities
list[CommunityNode]
List of community nodes (only if update_communities=True).
community_edges
list[CommunityEdge]
List of community edges (only if update_communities=True).

Example

from datetime import datetime
from graphiti_core.nodes import EpisodeType

result = await graphiti.add_episode(
    name="User Conversation",
    episode_body="user: I love pizza\nassistant: Pizza is delicious!",
    source_description="Chat conversation",
    reference_time=datetime.now(),
    source=EpisodeType.message,
    group_id="user_123"
)

print(f"Extracted {len(result.nodes)} entities")
print(f"Extracted {len(result.edges)} relationships")

add_episode_bulk

async def add_episode_bulk(
    bulk_episodes: list[RawEpisode],
    group_id: str | None = None,
    entity_types: dict[str, type[BaseModel]] | None = None,
    excluded_entity_types: list[str] | None = None,
    edge_types: dict[str, type[BaseModel]] | None = None,
    edge_type_map: dict[tuple[str, str], list[str]] | None = None,
    custom_extraction_instructions: str | None = None,
    saga: str | SagaNode | None = None,
) -> AddBulkEpisodeResults
Process multiple episodes in bulk and update the graph.
bulk_episodes
list[RawEpisode]
required
A list of RawEpisode objects to be processed and added to the graph. Each RawEpisode contains: name, content, source_description, source, reference_time, and optional uuid.
group_id
str | None
An id for the graph partition the episode is a part of.
entity_types
dict[str, type[BaseModel]] | None
Dictionary mapping entity type names to Pydantic models.
excluded_entity_types
list[str] | None
List of entity type names to exclude from extraction.
edge_types
dict[str, type[BaseModel]] | None
Dictionary mapping edge type names to Pydantic models.
edge_type_map
dict[tuple[str, str], list[str]] | None
Mapping of (source_type, target_type) to allowed edge types.
custom_extraction_instructions
str | None
Custom extraction instructions string to be included in the extract entities and extract edges prompts.
saga
str | SagaNode | None
Either a saga name (str) or a SagaNode object to associate all episodes with. If a string is provided and a saga with this name already exists in the group, the episodes will be added to it. Otherwise, a new saga will be created.
episodes
list[EpisodicNode]
List of created episodic nodes.
episodic_edges
list[EpisodicEdge]
List of episodic edges.
nodes
list[EntityNode]
List of extracted entity nodes.
edges
list[EntityEdge]
List of extracted entity edges.
communities
list[CommunityNode]
List of community nodes (empty in bulk operations).
community_edges
list[CommunityEdge]
List of community edges (empty in bulk operations).

Example

from graphiti_core.utils.bulk_utils import RawEpisode
from graphiti_core.nodes import EpisodeType
from datetime import datetime

episodes = [
    RawEpisode(
        name="Episode 1",
        content="user: Hello",
        source_description="Chat",
        source=EpisodeType.message,
        reference_time=datetime.now()
    ),
    RawEpisode(
        name="Episode 2",
        content="user: How are you?",
        source_description="Chat",
        source=EpisodeType.message,
        reference_time=datetime.now()
    )
]

result = await graphiti.add_episode_bulk(episodes, group_id="user_123")
print(f"Processed {len(result.episodes)} episodes")
async def search(
    query: str,
    center_node_uuid: str | None = None,
    group_ids: list[str] | None = None,
    num_results: int = 10,
    search_filter: SearchFilters | None = None,
    driver: GraphDriver | None = None,
) -> list[EntityEdge]
Perform a hybrid search on the knowledge graph.
query
str
required
The search query string.
center_node_uuid
str | None
Facts will be reranked based on proximity to this node.
group_ids
list[str] | None
The graph partitions to return data from.
num_results
int
default:"10"
The maximum number of results to return.
search_filter
SearchFilters | None
Filters to apply to the search.
driver
GraphDriver | None
The graph driver to use. If not provided, uses the default driver.
edges
list[EntityEdge]
List of EntityEdge objects that are relevant to the search query.

Example

edges = await graphiti.search(
    query="What does the user like?",
    group_ids=["user_123"],
    num_results=5
)

for edge in edges:
    print(f"{edge.source_node_uuid} -> {edge.target_node_uuid}: {edge.fact}")

search_

async def search_(
    query: str,
    config: SearchConfig = COMBINED_HYBRID_SEARCH_CROSS_ENCODER,
    group_ids: list[str] | None = None,
    center_node_uuid: str | None = None,
    bfs_origin_node_uuids: list[str] | None = None,
    search_filter: SearchFilters | None = None,
    driver: GraphDriver | None = None,
) -> SearchResults
Advanced search method that returns Graph objects (nodes and edges) with configurable search strategies and rerankers.
query
str
required
The search query string.
config
SearchConfig
default:"COMBINED_HYBRID_SEARCH_CROSS_ENCODER"
Search configuration specifying search methods and rerankers. See search_config_recipes for preset configurations.
group_ids
list[str] | None
The graph partitions to return data from.
center_node_uuid
str | None
Center node for node distance reranking.
bfs_origin_node_uuids
list[str] | None
Origin nodes for breadth-first search.
search_filter
SearchFilters | None
Filters to apply to the search.
driver
GraphDriver | None
The graph driver to use.
edges
list[EntityEdge]
List of relevant entity edges.
edge_reranker_scores
list[float]
Reranker scores for edges.
nodes
list[EntityNode]
List of relevant entity nodes.
node_reranker_scores
list[float]
Reranker scores for nodes.
episodes
list[EpisodicNode]
List of relevant episodes.
episode_reranker_scores
list[float]
Reranker scores for episodes.
communities
list[CommunityNode]
List of relevant communities.
community_reranker_scores
list[float]
Reranker scores for communities.

Example

from graphiti_core.search.search_config_recipes import COMBINED_HYBRID_SEARCH_CROSS_ENCODER

results = await graphiti.search_(
    query="What does the user like?",
    config=COMBINED_HYBRID_SEARCH_CROSS_ENCODER,
    group_ids=["user_123"]
)

print(f"Found {len(results.edges)} edges")
print(f"Found {len(results.nodes)} nodes")
print(f"Found {len(results.episodes)} episodes")

build_communities

async def build_communities(
    group_ids: list[str] | None = None,
    driver: GraphDriver | None = None,
) -> tuple[list[CommunityNode], list[CommunityEdge]]
Use a community clustering algorithm to find communities of nodes and create community nodes summarizing the content.
group_ids
list[str] | None
Create communities only for the listed group_ids. If blank, the entire graph will be used.
driver
GraphDriver | None
The graph driver to use.
community_nodes
list[CommunityNode]
List of created community nodes.
community_edges
list[CommunityEdge]
List of edges connecting entities to communities.

Example

community_nodes, community_edges = await graphiti.build_communities(
    group_ids=["user_123"]
)

print(f"Created {len(community_nodes)} communities")

retrieve_episodes

async def retrieve_episodes(
    reference_time: datetime,
    last_n: int = EPISODE_WINDOW_LEN,
    group_ids: list[str] | None = None,
    source: EpisodeType | None = None,
    driver: GraphDriver | None = None,
    saga: str | None = None,
) -> list[EpisodicNode]
Retrieve the last n episodic nodes from the graph.
reference_time
datetime
required
The reference time to retrieve episodes before.
last_n
int
default:"EPISODE_WINDOW_LEN"
The number of episodes to retrieve.
group_ids
list[str] | None
The group ids to return data from.
source
EpisodeType | None
Filter episodes by source type.
driver
GraphDriver | None
The graph driver to use.
saga
str | None
If provided, only retrieve episodes that belong to the saga with this name.
episodes
list[EpisodicNode]
List of the most recent EpisodicNode objects.

build_indices_and_constraints

async def build_indices_and_constraints(
    delete_existing: bool = False
)
Build indices and constraints in the Neo4j database to optimize query performance and ensure data integrity.
delete_existing
bool
default:"False"
Whether to clear existing indices before creating new ones.

Example

await graphiti.build_indices_and_constraints()

add_triplet

async def add_triplet(
    source_node: EntityNode,
    edge: EntityEdge,
    target_node: EntityNode
) -> AddTripletResults
Add a single triplet (source node, edge, target node) to the graph.
source_node
EntityNode
required
The source entity node.
edge
EntityEdge
required
The edge connecting the nodes.
target_node
EntityNode
required
The target entity node.
nodes
list[EntityNode]
List of saved nodes (source and target).
edges
list[EntityEdge]
List of saved edges.

remove_episode

async def remove_episode(
    episode_uuid: str
)
Remove an episode and its associated edges and nodes from the graph.
episode_uuid
str
required
The UUID of the episode to remove.

Example

await graphiti.remove_episode("episode-uuid-123")

get_nodes_and_edges_by_episode

async def get_nodes_and_edges_by_episode(
    episode_uuids: list[str]
) -> SearchResults
Retrieve all nodes and edges associated with specific episodes.
episode_uuids
list[str]
required
List of episode UUIDs to retrieve data for.
nodes
list[EntityNode]
List of entity nodes mentioned in the episodes.
edges
list[EntityEdge]
List of entity edges from the episodes.

close

async def close()
Close the connection to the Neo4j database. This should be called when the Graphiti instance is no longer needed.

Example

try:
    # Use graphiti
    await graphiti.add_episode(...)
finally:
    await graphiti.close()

Properties

token_tracker

@property
def token_tracker
Access the LLM client’s token usage tracker. Returns: TokenUsageTracker that can be used to:
  • Get token usage by prompt type: tracker.get_usage()
  • Get total token usage: tracker.get_total_usage()
  • Print a formatted summary: tracker.print_summary()
  • Reset tracking: tracker.reset()

Example

# Add some episodes
await graphiti.add_episode(...)

# Check token usage
usage = graphiti.token_tracker.get_total_usage()
print(f"Total tokens used: {usage['total_tokens']}")

# Print detailed summary
graphiti.token_tracker.print_summary()