基本概述

本项目是在完成对于基于 NebulaGraph 在 LlamaIndex 中实现 Graph RAG 的 Demo 复现的基础上,将其基于 HugeGraph 进行移植,旨在为 Apache 开源社区 incubator-hugegraph-ai 项目贡献 PR。

前期准备

流程概括

以 Graph RAG for Existing KG + keyword 检索模式为例,原作者先根据 ”用户的问题/要求” 由 LLM 提取出关键词,然后根据关键词在图数据库中检索,得到子图(用 flat_rel_map 的形式表示),最后再根据查询结果由 LLM 整合进行回答。

其中,对 flat_rel_map 的解释如下:

1
2
3
4
5
6
7
8
9
# The flat means for multi-hop relation path, we could get
# knowledge like: subj -rel-> obj -rel-> obj <-rel- obj.
# This type of knowledge is useful for some tasks.
# +---------------------+---------------------------------------------...-----+
# | subj | flattened_rels ... |
# +---------------------+---------------------------------------------...-----+
# | "{name:Tony Parker}"| "{name: Tony Parker}-[follow:{degree:95}]-> ...ili}"|
# | "{name:Tony Parker}"| "{name: Tony Parker}-[follow:{degree:95}]-> ...r}" |
# ...

步骤分解

  1. 通过 LLM 获得关键词,样例提示词如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    TEMPLATE_TMPL = (
    "A question is provided below. Given the question, "
    "extract up to {max_keywords} keywords from the text. "
    "Focus on extracting the keywords that we can use "
    "to best lookup answers to the question. "
    "Avoid stopwords.\n"
    "---------------------\n"
    "{question}\n"
    "---------------------\n"
    "Provide keywords in the following comma-separated format: 'KEYWORDS: <keywords>'"
    )
  2. 解析 LLM 的回答得到其中列举出的关键词,对于包含空格(含有多个单词的关键词),在对其进行保留的基础上,将构成其的所有单词均视为关键词

  3. 对解析得到的关键词进行扩充(增加同义词或其他可能形式,如复数、时态等),样例提示词如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    TEMPLATE_TMPL =(
    "Generate synonyms or possible form of keywords up to {max_keywords} in total,\n"
    "considering possible cases of capitalization, pluralization, common expressions, etc.\n"
    "Provide all synonyms of keywords in comma-separated format: 'SYNONYMS: <keywords>'\n"
    "Note, result should be in one-line with only one 'SYNONYMS: ' prefix\n"
    "----\n"
    "KEYWORDS: {question}\n"
    "----"

  4. 重复执行步骤 2,并与步骤 2 得到的关键词去并集

  5. 根据关键词对图数据库进行查询(NebulaGraph 使用 Cypher 作为查询语言)

    以下是原作者给出的一个查询样例,其中涉及到图数据库 Schema 相关信息的均为手动在构建 NebulaGraphStore 对象时声明:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    WITH map{`true`: "-[", `false`: "<-["} AS arrow_l,
    map{`true`: "]->", `false`: "]-"} AS arrow_r,
    map{`follow`: "degree", `serve`: "start_year,end_year"} AS edge_type_map
    MATCH p=(start)-[e:follow|serve*..2]-()
    WHERE id(start) IN ["player100", "player101"]
    WITH start, id(start) AS vid, nodes(p) AS nodes, e AS rels,
    length(p) AS rel_count, arrow_l, arrow_r, edge_type_map
    WITH
    REDUCE(s = vid + '{', key IN [key_ in ["name"]
    WHERE properties(start)[key_] IS NOT NULL] | s + key + ': ' +
    COALESCE(TOSTRING(properties(start)[key]), 'null') + ', ')
    + '}'
    AS subj,
    [item in [i IN RANGE(0, rel_count - 1) | [nodes[i], nodes[i + 1],
    rels[i], typeid(rels[i]) > 0, type(rels[i]) ]] | [
    arrow_l[tostring(item[3])] +
    item[4] + ':' +
    REDUCE(s = '{', key IN SPLIT(edge_type_map[item[4]], ',') |
    s + key + ': ' + COALESCE(TOSTRING(properties(item[2])[key]),
    'null') + ', ') + '}'
    +
    arrow_r[tostring(item[3])],
    REDUCE(s = id(item[1]) + '{', key IN [key_ in ["name"]
    WHERE properties(item[1])[key_] IS NOT NULL] | s + key + ': ' +
    COALESCE(TOSTRING(properties(item[1])[key]), 'null') + ', ') + '}'
    ]
    ] AS rels
    WITH
    REPLACE(subj, ', }', '}') AS subj,
    REDUCE(acc = collect(NULL), l in rels | acc + l) AS flattened_rels
    RETURN
    subj,
    REPLACE(REDUCE(acc = subj,l in flattened_rels|acc + ' ' + l),
    ', }', '}')
    AS flattened_rels
    LIMIT 30
  6. 根据查询结果和图数据库元数据,构造一个 NodeWithScore 对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    rel_map = {
    'Guardians of the Galaxy{name: Guardians of the Galaxy}': [
    'Guardians of the Galaxy{name: Guardians of the Galaxy} <-[relationship:{relationship: was fired from}]- James Gunn{name: James Gunn} -[relationship:{relationship: was fired from}]-> Marvel{name: Marvel}',
    ...
    ],
    'Marvel{name: Marvel}': [
    'Marvel{name: Marvel} <-[relationship:{relationship: was fired from}]- James Gunn{name: James Gunn} -[relationship:{relationship: take responsibility for}]-> tweets{name: tweets}',
    ...
    ],
    ...
    }
    knowledge_sequence = [
    'Guardians of the Galaxy{name: Guardians of the Galaxy} <-[relationship:{relationship: was fired from}]- James Gunn{name: James Gunn} -[relationship:{relationship: was fired from}]-> Marvel{name: Marvel}',
    ...
    'Marvel{name: Marvel} <-[relationship:{relationship: was fired from}]- James Gunn{name: James Gunn} -[relationship:{relationship: take responsibility for}]-> tweets{name: tweets}',
    ...
    ]

    # rel_map 为查询结果,knowledge_sequence 为查询结果中所有 flattened_rel 的集合

    _new_line_char = "\n"
    context_string = (
    f"The following are knowledge sequence in max depth"
    f" {self._graph_traversal_depth} "
    f"in the form of directed graph like:\n"
    f"`subject -[predicate]->, object, <-[predicate_next_hop]-,"
    f" object_next_hop ...`"
    f" extracted based on key entities as subject:\n"
    f"{_new_line_char.join(knowledge_sequence)}"
    )

    rel_node_info = {
    "kg_rel_map": rel_map,
    "kg_rel_text": knowledge_sequence,
    }

    if self._graph_schema != "":
    rel_node_info["kg_schema"] = {"schema": self._graph_schema}

    node = NodeWithScore(
    node=TextNode(
    text=context_string,
    score=1.0,
    metadata=rel_node_info,
    excluded_embed_metadata_keys=["kg_rel_map", "kg_rel_text"],
    excluded_llm_metadata_keys=["kg_rel_map", "kg_rel_text"],
    )
    )

    return [node]
  7. 使用 LLM 进行答案整合,样例提示词如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    TEMPLATE_TMPL =(
    'Context information is below.\n'
    '---------------------\n'
    '{context_str}\n'
    '---------------------\n'
    'Given the context information and not prior knowledge, answer the query.\n'
    'Query: {query_str}\n'
    'Answer: '


    # context_str 由 NodeWithScore 对象中包含的 metadata 等数据构成
    context_str = (
    "kg_schema: {'schema': \"Node properties: [{'tag': 'entity', 'properties': [('name', 'string')]}]\nEdge properties: [{'edge': 'relationship', 'properties': [('relationship', 'string')]}]\nRelationships: ['(:entity)-[:relationship]->(:entity)']\n\"}\n"
    "\n"
    "The following are knowledge sequence in max depth 2 in the form of directed graph like:\n"
    "`subject -[predicate]->, object, <-[predicate_next_hop]-, object_next_hop ...` extracted based on key entities as subject:\n"
    "Guardians of the Galaxy{name: Guardians of the Galaxy} <-[relationship:{relationship: was fired from}]- James Gunn{name: James Gunn} -[relationship:{relationship: joined}]-> The Suicide Squad{name: The Suicide Squad}\n"
    ...
    "Marvel{name: Marvel} <-[relationship:{relationship: was fired from}]- James Gunn{name: James Gunn} -[relationship:{relationship: take responsibility for}]-> tweets{name: tweets}\n"
    ...
    )

初步实现

总体要求

  • 将 Graph RAG for Existing KG on NebulaGraph + keyword 检索模式,移植到 HugeGraph 上
  • 支持链式调用,将逻辑分解到不同的算子中去处理,方便使用和调试,预留定制化和可扩展的空间
  • 基于 HugeGraph 目前对 Gremlin 查询的支持,复现 NebulaGraph 中的查询效果
  • 可根据需要指定用于关键词匹配的图数据库字段(NebulaGraph 中固定为节点 ID)

前置任务

快速上手部署 HugeGraph-server

1
$ sudo docker pull hugegraph/hugegraph
  • 启动一个空白的样例图数据库
1
$ sudo docker run -itd --name=graph -p 18080:8080 hugegraph/hugegraph
1
2
3
4
5
6
version: '3'
services:
graph:
image: hugegraph/hugegraph
ports:
- 18080:8080
1
$ sudo docker-compose up -d
  • 启动一个加载了预设定数据的样例图数据库
1
$ sudo docker run -itd --name=graph -p 18080:8080 -e PRELOAD=true hugegraph/hugegraph
1
2
3
4
5
6
7
8
version: '3'
services:
graph:
image: hugegraph/hugegraph
environment:
- PRELOAD=true
ports:
- 18080:8080
1
$ sudo docker-compose up -d
  • 启动一个加载了自定义数据的样例图数据库
1
$ sudo docker run -itd --name=graph -p 18080:8080 -e PRELOAD=true -v /path/to/your_script:/hugegraph/scripts/example.groovy hugegraph/hugegraph
1
2
3
4
5
6
7
8
9
10
version: '3'
services:
graph:
image: hugegraph/hugegraph
environment:
- PRELOAD=true
volumes:
- /path/to/your_script:/hugegraph/scripts/example.groovy
ports:
- 18080:8080
1
$ sudo docker-compose up -d

补充说明

在设置 pre-loadtrue 时,启动容器时会执行容器内的 /hugegraph/scripts/example.groovy 脚本对图数据库进行初始化,此时可以在容器外手动编辑 groovy 脚本,设置 volumes 映射将其映射到容器内并覆盖原有的 groovy 脚本 /hugegraph/scripts/example.groovy

groovy 脚本中可以完成图数据库的 schema 定义、索引构建和数据注入等操作

以下是系统内默认的 example.groovy 脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import org.apache.hugegraph.HugeFactory
import org.apache.hugegraph.backend.id.IdGenerator
import org.apache.hugegraph.dist.RegisterUtil
import org.apache.hugegraph.type.define.NodeRole
import org.apache.tinkerpop.gremlin.structure.T

RegisterUtil.registerRocksDB()

conf = "conf/graphs/hugegraph.properties"
graph = HugeFactory.open(conf)
graph.serverStarted(IdGenerator.of("server-tinkerpop"), NodeRole.MASTER)
schema = graph.schema()

schema.propertyKey("name").asText().ifNotExist().create()
schema.propertyKey("age").asInt().ifNotExist().create()
schema.propertyKey("city").asText().ifNotExist().create()
schema.propertyKey("weight").asDouble().ifNotExist().create()
schema.propertyKey("lang").asText().ifNotExist().create()
schema.propertyKey("date").asText().ifNotExist().create()
schema.propertyKey("price").asInt().ifNotExist().create()

schema.vertexLabel("person").properties("name", "age", "city").primaryKeys("name").ifNotExist().create()
schema.vertexLabel("software").properties("name", "lang", "price").primaryKeys("name").ifNotExist().create()
schema.indexLabel("personByCity").onV("person").by("city").secondary().ifNotExist().create()
schema.indexLabel("personByAgeAndCity").onV("person").by("age", "city").secondary().ifNotExist().create()
schema.indexLabel("softwareByPrice").onV("software").by("price").range().ifNotExist().create()
schema.edgeLabel("knows").sourceLabel("person").targetLabel("person").properties("date", "weight").ifNotExist().create()
schema.edgeLabel("created").sourceLabel("person").targetLabel("software").properties("date", "weight").ifNotExist().create()
schema.indexLabel("createdByDate").onE("created").by("date").secondary().ifNotExist().create()
schema.indexLabel("createdByWeight").onE("created").by("weight").range().ifNotExist().create()
schema.indexLabel("knowsByWeight").onE("knows").by("weight").range().ifNotExist().create()

marko = graph.addVertex(T.label, "person", "name", "marko", "age", 29, "city", "Beijing")
vadas = graph.addVertex(T.label, "person", "name", "vadas", "age", 27, "city", "Hongkong")
lop = graph.addVertex(T.label, "software", "name", "lop", "lang", "java", "price", 328)
josh = graph.addVertex(T.label, "person", "name", "josh", "age", 32, "city", "Beijing")
ripple = graph.addVertex(T.label, "software", "name", "ripple", "lang", "java", "price", 199)
peter = graph.addVertex(T.label, "person", "name", "peter", "age", 35, "city", "Shanghai")

marko.addEdge("knows", vadas, "date", "20160110", "weight", 0.5)
marko.addEdge("knows", josh, "date", "20130220", "weight", 1.0)
marko.addEdge("created", lop, "date", "20171210", "weight", 0.4)
josh.addEdge("created", lop, "date", "20091111", "weight", 0.4)
josh.addEdge("created", ripple, "date", "20171210", "weight", 1.0)
peter.addEdge("created", lop, "date", "20170324", "weight", 0.2)

graph.tx().commit()

g = graph.traversal()

System.out.println(">>>> query all vertices: size=" + g.V().toList().size())
System.out.println(">>>> query all edges: size=" + g.E().toList().size())

对于需要使用自定义 ID 而非主键 ID 的场景,可以参考以下这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import org.apache.hugegraph.HugeFactory
import org.apache.hugegraph.backend.id.IdGenerator
import org.apache.hugegraph.dist.RegisterUtil
import org.apache.hugegraph.type.define.NodeRole
import org.apache.tinkerpop.gremlin.structure.T

RegisterUtil.registerRocksDB()

conf = "conf/graphs/hugegraph.properties"
graph = HugeFactory.open(conf)
graph.serverStarted(IdGenerator.of("server-tinkerpop"), NodeRole.MASTER)
schema = graph.schema()

schema.propertyKey("name").asText().ifNotExist().create()
schema.propertyKey("birthDate").asText().ifNotExist().create()

schema.vertexLabel("Person").useCustomizeStringId().properties("name", "birthDate").ifNotExist().create()
schema.vertexLabel("Movie").useCustomizeStringId().properties("name").ifNotExist().create()

schema.indexLabel("PersonByName").onV("Person").by("name").secondary().ifNotExist().create()
schema.indexLabel("MovieByName").onV("Movie").by("name").secondary().ifNotExist().create()

schema.edgeLabel("ActedIn").sourceLabel("Person").targetLabel("Movie").ifNotExist().create()

p1 = graph.addVertex(T.label, "Person", T.id, "Al Pacino", "name", "Al Pacino", "birthDate", "1940-04-25")
p2 = graph.addVertex(T.label, "Person", T.id, "Robert De Niro", "name", "Robert De Niro", "birthDate", "1943-08-17")
m1 = graph.addVertex(T.label, "Movie", T.id, "The Godfather", "name", "The Godfather")
m2 = graph.addVertex(T.label, "Movie", T.id, "The Godfather Part II", "name", "The Godfather Part II")
m3 = graph.addVertex(T.label, "Movie", T.id, "The Godfather Coda The Death of Michael Corleone", "name", "The Godfather Coda The Death of Michael Corleone")

p1.addEdge("ActedIn", m1)
p1.addEdge("ActedIn", m2)
p1.addEdge("ActedIn", m3)
p2.addEdge("ActedIn", m2)

graph.tx().commit()

g = graph.traversal()

System.out.println(">>>> query all vertices: size=" + g.V().toList().size())
System.out.println(">>>> query all edges: size=" + g.E().toList().size())

具体细节

算子定义

1
2
3
4
5
# 定义一个含有 `run(**kwargs)` 方法的 `Runnable` 接口,所有算子都要实现该接口
class Runnable(ABC):
@abstractmethod
def run(self, **kwargs):
...
  • 将整个 Graph RAG 过程定义为一个算子,该算子包含三个步骤,即三个子算子

  • 整个 Graph RAG 过程需要用一个 Python 字典 context 来维护其上下文信息,子算子的执行结果均保存在 context 中,并向下一个子算子传递

  • 子算子工作所需的参数可以在构造子算子时传入或运行子算子时通过上下文信息传入

算子描述

  1. 关键词提取:由 LLM 提取 query 中的关键词并进行扩展,得到关键词集 keywords,并加入上下文中
  2. 图数据库查询:根据 keywords 中的关键词匹配图数据库中的节点,以匹配到的节点提取子图并格式化为字符串,得到知识集合 knowledge,并加入上下文中
  3. 答案整合:根据原始 query 和 knowledge 中的知识构造 Prompt,由 LLM 给出答案 answer,并加入上下文中

代码结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class GraphRAG(Runnable):
def __init__(self, name):
self.name: str = name
self.operators: List[Runnable] = []

def extract_keyword(self, text: Optional[str] = None):
self.operators.append(KeywordExtract(text=text))
return self

def query_graph_for_rag(self, graph_store: Optional[HugeGraphStore] = None):
self.operators.append(RAGGraphQuery(graph_store=graph_store))
return self

def synthesize_answer(self):
self.operators.append(AnswerSynthesize())
return self

def run(self, **kwargs) -> Dict[str, Any]:
if len(self.operators) == 0:
self.extract_keyword().query_graph_for_rag().synthesize_answer()

context = kwargs
for op in self.operators:
context = op.run(context=context)
return context


class KeywordExtract(Runnable):
def __init__(
self,
text: Optional[str] = None,
llm: Optional[BaseLLM] = None,
max_keywords: Optional[int] = None,
extract_template: Optional[str] = None,
expand_template: Optional[str] = None,
):
self._llm = llm
self._query = text
self._max_keywords = max_keywords or 5
self._extract_template = extract_template or DEFAULT_KEYWORDS_EXTRACT_TEMPLATE_TMPL
self._expand_template = expand_template or DEFAULT_KEYWORDS_EXPAND_TEMPLATE_TMPL

def run(self, context: Dict[str, Any]) -> Dict[str, Any]:
...
return context


class RAGGraphQuery(Runnable):
ID_RAG_GREMLIN_QUERY_TEMPL = (
"g.V().hasId({keywords}).as('subj')"
".repeat("
" bothE({edge_labels}).as('rel').otherV().as('obj')"
").times({max_deep})"
".path()"
".by(project('label', 'id', 'props')"
" .by(label())"
" .by(id())"
" .by(valueMap().by(unfold()))"
")"
".by(project('label', 'inV', 'outV', 'props')"
" .by(label())"
" .by(inV().id())"
" .by(outV().id())"
" .by(valueMap().by(unfold()))"
")"
".limit({max_items})"
".toList()"
)

PROP_RAG_GREMLIN_QUERY_TEMPL = (
"g.V().has('{prop}', within({keywords})).as('subj')"
".repeat("
" bothE({edge_labels}).as('rel').otherV().as('obj')"
").times({max_deep})"
".path()"
".by(project('label', 'props')"
" .by(label())"
" .by(valueMap().by(unfold()))"
")"
".by(project('label', 'inV', 'outV', 'props')"
" .by(label())"
" .by(inV().values('{prop}'))"
" .by(outV().values('{prop}'))"
" .by(valueMap().by(unfold()))"
")"
".limit({max_items})"
".toList()"
)

def __init__(
self,
graph_store: Optional[HugeGraphStore] = None,
max_deep: int = 2,
max_items: int = 30,
prop_to_match: Optional[str] = 'name',
):
self._graph = graph_store
self._max_deep = max_deep
self._max_items = max_items
self._prop_to_match = prop_to_match

def run(self, context: Dict[str, Any]) -> Dict[str, Any]:
...
return context


class AnswerSynthesize(Runnable):
def __init__(
self,
llm: Optional[BaseLLM] = None,
template: Optional[str] = None,
):
self._llm = llm
self._prompt_template = template or DEFAULT_ANSWER_SYNTHESIZE_TEMPLATE_TMPL

def run(self, context: Dict[str, Any]) -> Dict[str, Any]:
...
return context

代码解释

  1. 关键词提取:
    1. 与原 Demo 的实现方式基本一致
    2. 搬运了原 Demo 中的提示词模板
    3. 搬运了 LlamaIndex 中的工具函数 get_cache_dir()get_stopwords()(需要 import nltk
  2. 图数据库查询:
    1. 与原 Demo 的实现思路基本一致,方式上根据 HugeGraph 做出了调整
    2. 查询语言由 Cypher 改为 Gremlin
    3. 关键词匹配字段由 id 改为可自定义,设置了进行关键词匹配的字段,默认为 None 即 id
    4. 增加了对用于关键词匹配的属性自动创建索引
    5. 在查询结果的基础上进行 flattened_rels 字符串的构造(原 Demo 在查询过程中便完成了该步骤)
    6. 额外增加了删除环形路径的处理逻辑,即将 A=>B<=A 变为 A=>B
  3. 答案整合:
    1. 与原 Demo 的实现方式基本一致
    2. 搬运了原 Demo 中的提示词模板