Graph Reasoning Pipeline
About 833 wordsAbout 3 min
2026-04-15
1. Overview
The Graph Reasoning Pipeline is designed for multi-hop reasoning between target entity pairs in a knowledge graph. It first searches for paths connecting the target entities, evaluates and filters those paths by length, and then uses an LLM to infer latent relations from the retained evidence chains.
This pipeline is suitable for:
- multi-hop reasoning and relation completion
- constructing interpretable evidence chains for entity pairs
- controlling reasoning complexity with path length constraints
- generating relation candidates before KG QA
The main stages are:
- Multi-hop path search: search paths over
triplet. - Path length evaluation: compute the hop length of each path.
- Path length filtering: keep only paths within the target range.
- Relation generation: infer new relations from filtered paths.
2. Quick Start
Step 1: Create a new DataFlow workspace
mkdir run_dataflow_kg
cd run_dataflow_kgStep 2: Initialize pipeline code and default data
dfkg initThe initialization creates:
- Pipeline script:
api_pipelines/graph_reasoning_pipeline.py - Default data:
example_data/GraphReasoningPipeline/input.json
Step 3: Configure the API key and optional model settings
export DF_API_KEY=sk-xxxxThe default model is gpt-4o-mini. You can override the defaults with DF_API_URL, DF_LLM_MODEL, or DF_GRAPH_REASONING_INPUT_FILE.
Step 4: Run the pipeline
python api_pipelines/graph_reasoning_pipeline.py3. Data Flow and Pipeline Logic
3.1 Input Data
This pipeline requires at least the following fields:
- triplet: a list of graph triples formatted as
"<subj> ... <obj> ... <rel> ...". - target_entity: a list of target entity pairs. The current implementation works best with the nested format
[["Henry, Berlin"], ["Henry, Rome"]].
One implementation detail matters here: KGReasoningPathSearch validates the DataFrame using the triplet column name, so the pipeline explicitly uses input_key="triplet" instead of triple.
A sample input record is:
[
{
"triplet": [
"<subj> Henry <obj> Maria Rodriguez <rel> is_trained_by",
"<subj> Henry <obj> Maple Leaves <rel> forms",
"<subj> Lucy <obj> University Toronto <rel> studies_at",
"<subj> Henry <obj> Lucy <rel> is_inspired_by",
"<subj> Maple Leaves <obj> Berlin <rel> performs_in",
"<subj> Maple Leaves <obj> Rome <rel> performs_in"
],
"target_entity": [
["Henry, Berlin"],
["Henry, Rome"]
]
}
]3.2 Graph Reasoning Pipeline Logic (GraphReasoningPipeline)
Step 1: Multi-hop Path Search (KGReasoningPathSearch)
Functionality:
- search all simple paths between target entity pairs in an undirected graph view
- output
mpathgrouped by entity pair
Input: triplet, target_entity
Output: mpath
Step 2: Path Length Evaluation (KGReasoningPathLengthEvaluator)
Functionality:
- compute hop lengths for each path
- keep the same nested structure as
mpath
Input: mpath
Output: mpath_length
Step 3: Path Length Filtering (KGReasoningPathLengthFilter)
Functionality:
- remove paths that are too short or too long
- restrict later inference to evidence chains in the target range
Input: mpath, mpath_length
Output: filtered_mpath
Step 4: Relation Generation (KGReasoningRelationGeneration)
Functionality:
- infer latent relations from target entity pairs and filtered paths
- optionally restrict candidate relations to those already seen on the path
Input: target_entity, filtered_mpath
Output: inferred_triplets
3.3 Output Data
Typical output fields include:
- mpath: multi-hop paths for each target pair
- mpath_length: path lengths
- filtered_mpath: filtered paths
- inferred_triplets: inferred relations generated by the model
An example output is:
{
"mpath": [
[
[
"<subj> Henry <obj> Maple Leaves <rel> forms",
"<subj> Maple Leaves <obj> Berlin <rel> performs_in"
]
],
[
[
"<subj> Henry <obj> Maple Leaves <rel> forms",
"<subj> Maple Leaves <obj> Rome <rel> performs_in"
]
]
],
"mpath_length": [
[2],
[2]
],
"filtered_mpath": [
[
[
"<subj> Henry <obj> Maple Leaves <rel> forms",
"<subj> Maple Leaves <obj> Berlin <rel> performs_in"
]
],
[
[
"<subj> Henry <obj> Maple Leaves <rel> forms",
"<subj> Maple Leaves <obj> Rome <rel> performs_in"
]
]
],
"inferred_triplets": [
[
[
"<subj> Henry <obj> Berlin <rel> performs_inferred_via_band>"
]
],
[
[
"<subj> Henry <obj> Rome <rel> performs_inferred_via_band>"
]
]
]
}The relation names in inferred_triplets are generated by the LLM, so real outputs may vary. The example above is only used to illustrate the structure.
4. Pipeline Example
The following is a reference view of the GraphReasoningPipeline code generated by dfkg init. For execution, use the generated api_pipelines/graph_reasoning_pipeline.py file.
import os
from dataflow.core import LLMServingABC
from dataflow.operators.graph_reasoning import (
KGReasoningPathLengthEvaluator,
KGReasoningPathLengthFilter,
KGReasoningPathSearch,
KGReasoningRelationGeneration,
)
from dataflow.pipeline import PipelineABC
from dataflow.serving import APILLMServing_request
from dataflow.utils.storage import FileStorage
class GraphReasoningPipeline(PipelineABC):
"""Graph reasoning pipeline: target pairs -> multi-hop paths -> inferred relations."""
def __init__(
self,
first_entry_file_name: str,
llm_serving: LLMServingABC,
cache_path: str = "./cache_local",
file_name_prefix: str = "graph_reasoning_pipeline_step",
cache_type: str = "json",
lang: str = "en",
max_hop: int = 4,
min_length: int = 2,
max_length: int = 4,
restrict_to_path_rel: bool = True,
):
super().__init__()
if llm_serving is None:
raise ValueError("llm_serving is required for GraphReasoningPipeline")
self.storage = FileStorage(
first_entry_file_name=first_entry_file_name,
cache_path=cache_path,
file_name_prefix=file_name_prefix,
cache_type=cache_type,
)
self.path_search_step1 = KGReasoningPathSearch(max_hop=max_hop)
self.path_length_step2 = KGReasoningPathLengthEvaluator()
self.path_filter_step3 = KGReasoningPathLengthFilter(
min_length=min_length,
max_length=max_length,
)
self.relation_generation_step4 = KGReasoningRelationGeneration(
llm_serving=llm_serving,
restrict_to_path_rel=restrict_to_path_rel,
lang=lang,
)
def forward(self):
self.path_search_step1.run(
storage=self.storage.step(),
input_key="triplet",
output_key="mpath",
)
self.path_length_step2.run(
storage=self.storage.step(),
input_key="mpath",
output_key="mpath_length",
)
self.path_filter_step3.run(
storage=self.storage.step(),
mpath_key="mpath",
length_key="mpath_length",
output_path_key="filtered_mpath",
)
self.relation_generation_step4.run(
storage=self.storage.step(),
target_key="target_entity",
path_key="filtered_mpath",
output_key="inferred_triplets",
)
if __name__ == "__main__":
input_file = os.environ.get(
"DF_GRAPH_REASONING_INPUT_FILE",
os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..",
"example_data",
"GraphReasoningPipeline",
"input.json",
)
),
)
llm_serving = APILLMServing_request(
api_url=os.environ.get(
"DF_API_URL",
"https://api.openai.com/v1/chat/completions",
),
key_name_of_api_key="DF_API_KEY",
model_name=os.environ.get("DF_LLM_MODEL", "gpt-4o-mini"),
max_workers=8,
temperature=0.0,
)
pipeline = GraphReasoningPipeline(
first_entry_file_name=input_file,
llm_serving=llm_serving,
cache_path="./cache_graph_reasoning",
lang="en",
max_hop=4,
min_length=2,
max_length=3,
)
pipeline.forward()
