在minikube 上运行 graphscope

在minikube 上运行 graphscope

1.环境安装

  • 安装docker环境。

  • 安装minikube。https://minikube.sigs.k8s.io/docs/start/

  • python版本升级到 3.7 - 3.9。

  • 安装GraphScope 客户端

    1
    2
    3
    4
    5
    # Requires the latest pip
    pip3 install --upgrade pip

    # Current stable release
    pip3 install --upgrade graphscope-client

 

2. 启动

2.1 minikube

  • 启动 minikube

    1
    minikube start
  • minikube映射

    1
    minikube mount $HOME:/host

    例如这里把 本机的home目录映射到minikube的 /host目录下。

    参考 https://minikube.sigs.k8s.io/docs/handbook/mount/

    mount 需要一直挂在终端,建议tmux启动

    mount映射功能需要较新的Linux内核,我之前用3.x的内核失败了,后来升级成了5.x的内核就可以了。

minikube修改时区方法,在宿主机执行

1
2
3
4
5
6
7
8
9
10
11
12
> mkdir -p ~/.minikube/files/etc
> cp /etc/localtime ~/.minikube/files/etc

### 2.2 启动graphscope

```python
import graphscope

if __name__ == '__main__':
graphscope.set_option(show_log=True)
sess = graphscope.session()
print(sess)

第一次启动的时候会去拉取镜像,可能需要比较久

例如会看到这样的日志打印

1
2023-05-29 02:27:33,087 [INFO][kubernetes_launcher:553]: [gs-engine-cltlyf-0]: Pulling image "registry.cn-hongkong.aliyuncs.com/graphscope/interactive-executor:0.21.0"

也可以提前在本机准备好镜像,然后导入到minikube中

以上面的镜像为例子

1
2
3
4
5
6
7
8
9
# 准备镜像
## 下载镜像
docker pull registry.cn-hongkong.aliyuncs.com/graphscope/interactive-executor:0.21.0
## 找到镜像ID
docker images | grep registry.cn-hongkong.aliyuncs.com/graphscope/interactive-executor
## 生成镜像文件
docker save -o interactive-executor.tar d8cdf0b6f8db
# 导入到minikube
minikube image load interactive-executor.tar

启动成功后 可以通过 kubectl get pods看到,graphscope启动了4个pods

1
2
3
4
5
6
[miku@centos7 gsremote]$ kubectl get pods
NAME READY STATUS RESTARTS AGE
coordinator-ysagwr-65c5d7f6f9-ppcb7 1/1 Running 0 23s
gs-engine-ysagwr-0 4/4 Running 0 20s
gs-engine-ysagwr-1 4/4 Running 0 13s
gs-interactive-frontend-ysagwr-7c696bdc7d-4vct5 1/1 Running 0 19s

参考https://graphscope.io/docs/v0.20.0/reference/session#session获取sess

如果遇到执行结束后,pods却还一直在运行,可以通过下面的命令删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
> # 找到当前的deployments
> kubectl get deployments
> # 删除 graphscope 相关的 deployments
> kubectl delete deployments deployments_name
> # 如果全部都是graphscope相关的 deployments,也可以直接删除所有
> kubectl delete deployments --all
>
> # 找到当前的statefulsets
> kubectl get statefulsets
> # 删除grahpscope相关的statefulsets
> kubectl delete statefulset statefulset_name
> # 如果全部都是rahpscope相关的statefulsets,也可以直接删除所有
> kubectl delete statefulset --all
>

3. 图的导入

以p2p-network 为例子

参考:

3.1 直接通过网络导入

映射准备

需要把本地的home目录最终映射到k8s的home目录,例如本地的用户是miku,home目录是 /home/miku。

需要把 /home/miku目录映射到k8s的/home/miku目录上。

可以这样做:

执行mount命令,映射本地/home/miku到minikube上的/home/miku

1
minikube mount /home/miku:/home/miku

在python代码中配置k8s上映射

我们需要把minikube上的/home/miku映射到k8s上的/home/miku

1
2
3
4
5
6
7
8
9
10
11
12
k8s_volumes = {
"data": {
"type": "hostPath",
"field": {
"path": "/home/miku",
"type": "Directory"
},
"mounts": {
"mountPath": "/home/miku"
}
}
}

图的导入(完整代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import graphscope
from graphscope.dataset import load_p2p_network

k8s_volumes = {
"data": {
"type": "hostPath",
"field": {
"path": "/home/miku",
"type": "Directory"
},
"mounts": {
"mountPath": "/home/miku"
}
}
}

if __name__ == '__main__':
graphscope.set_option(show_log=True)
sess = graphscope.session(k8s_volumes=k8s_volumes)
print(sess)
g = load_p2p_network(sess, directed=True)
print(g.schema)

最后可以看到图的结构打印

1
2
3
4
5
6
7
8
9
10
oid_type: LONG
vid_type: ULONG
type: VERTEX
Label: host
Properties: Property(0, weight, LONG), Property(1, id, LONG)

type: EDGE
Label: connect
Properties: Property(0, eid, LONG), Property(1, src_label_id, LONG), Property(2, dst_label_id, LONG), Property(3, dist, LONG)
Relations: [Relation(source='host', destination='host')]

3.2 本地离线导入

数据准备

我们可以提前准备好点和边的文件。

下载 graphscope p2p-networkd的数据 https://github.com/MikuSugar/data/blob/master/p2p_network.tar.gz

解压后放在一个目录上,例如我放在 /home/miku/test_data/p2p_network

目录内如下

1
2
3
4
[miku@centos7 p2p_network]$ ll
total 3144
-rwxrwxrwx. 1 miku miku 2723852 Nov 16 2021 p2p-31_property_e_0
-rwxrwxrwx. 1 miku miku 489592 Nov 16 2021 p2p-31_property_v_0

映射准备

目的是要让数据文件目录映射到k8s上,让graphscope能够读取到数据。

可以这样做

把本地目录映射到minikube上

1
minikube mount /home/miku:/host

在python代码中配置k8s上的映射

1
2
3
4
5
6
7
8
9
10
11
12
k8s_volumes = {
"data": {
"type": "hostPath",
"field": {
"path": "/host/test_data/",
"type": "Directory"
},
"mounts": {
"mountPath": "/testingdata"
}
}
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import graphscope
from graphscope.dataset import load_p2p_network

k8s_volumes = {
"data": {
"type": "hostPath",
"field": {
"path": "/host/test_data/",
"type": "Directory"
},
"mounts": {
"mountPath": "/testingdata"
}
}
}

if __name__ == '__main__':
graphscope.set_option(show_log=True)
sess = graphscope.session(k8s_volumes=k8s_volumes)
print(sess)
g = load_p2p_network(sess, "/testingdata/p2p_network/")
print(g.schema)

# 手动指定文件导入
g1: Graph = sess.g(directed=False)
g1 = g1.add_vertices(
"/testingdata/p2p_network/p2p-31_property_v_0", label="host"
).add_edges(
"/testingdata/p2p_network/p2p-31_property_e_0",
label="connect",
src_label="host",
dst_label="host",
)
print(g1.schema)

这里导入的图的结构与3.1一致。

4. 运行算法

为了方便,下面都采用3.1的方法导入图,用sssp算法举例。

参考 https://graphscope.io/docs/v0.20.0/zh/analytics_engine

文档有一些问题,已经反馈给grapescope,下个版本修复(当前版本0.21.0)

https://github.com/alibaba/GraphScope/discussions/2735

4.1 运行内置算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import graphscope
from graphscope.dataset import load_p2p_network
from pandas import DataFrame

k8s_volumes = {
"data": {
"type": "hostPath",
"field": {
"path": "/home/miku",
"type": "Directory"
},
"mounts": {
"mountPath": "/home/miku"
}
}
}

if __name__ == '__main__':
graphscope.set_option(show_log=True)
sess = graphscope.session(k8s_volumes=k8s_volumes)
print(sess)
g = load_p2p_network(sess, directed=True)
print(g.schema)
g2 = g.project(vertices={"host": ["id"]}, edges={"connect": ["dist"]})
res = graphscope.sssp(g2, src=6)
print(res.schema)
r_df: DataFrame = res.to_dataframe({"node": "v.id", "result": "r"}).sort_values(by=["node"])
print(r_df.head(20))

4.2 运行自定义pregel算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import graphscope
from graphscope.analytical.udf.decorators import pregel
from graphscope.dataset import load_p2p_network
from graphscope.framework.app import AppAssets
from graphscope.framework.context import VertexDataContextDAGNode

k8s_volumes = {
"data": {
"type": "hostPath",
"field": {
"path": "/home/miku",
"type": "Directory"
},
"mounts": {
"mountPath": "/home/miku"
}
}
}


@pregel(vd_type="int", md_type="double")
class SSSP_Pregel(AppAssets):
@staticmethod
def Init(v, context):
v.set_value(1000000000)

@staticmethod
def Compute(messages, v, context):
src_id = context.get_config(b"src")
cur_dist = v.value()
new_dist = 1000000000
if v.id() == src_id:
new_dist = 0
for message in messages:
new_dist = min(message, new_dist)
if new_dist < cur_dist:
v.set_value(new_dist)
for e_label_id in range(context.edge_label_num()):
edges = v.outgoing_edges(e_label_id)
for e in edges:
v.send(e.vertex(), new_dist + e.get_int(3))
v.vote_to_halt()

@staticmethod
def Combine(messages):
ret = 1000000000.0
for m in messages:
ret = min(ret, m)
return ret


if __name__ == '__main__':
graphscope.set_option(show_log=True)
sess = graphscope.session(k8s_volumes=k8s_volumes)
g = load_p2p_network(sess)
print(g.schema)
my_app = SSSP_Pregel()
ret: graphscope.framework.context.Context = my_app(g, src=6)
r = ret.to_dataframe({"node": "v:host.id", "result": "r:host"}).sort_values(by=["node"])
print(r.head(20))

4.3 运行自定义PIE算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import graphscope
from graphscope.dataset import load_p2p_network
from graphscope.analytical.udf.decorators import pie
from graphscope.framework.app import AppAssets

k8s_volumes = {
"data": {
"type": "hostPath",
"field": {
"path": "/home/miku",
"type": "Directory"
},
"mounts": {
"mountPath": "/home/miku"
}
}
}

@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
@staticmethod
def Init(frag, context):
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
nodes = frag.nodes(v_label_id)
context.init_value(
nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
)
context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)

@staticmethod
def PEval(frag, context):
src = context.get_config(b"src")
graphscope.declare(graphscope.Vertex, source)
native_source = False
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
if frag.get_inner_node(v_label_id, src, source):
native_source = True
break
if native_source:
context.set_node_value(source, 0)
else:
return
e_label_num = frag.edge_label_num()
for e_label_id in range(e_label_num):
edges = frag.get_outgoing_edges(source, e_label_id)
for e in edges:
dst = e.neighbor()
distv = e.get_int(3)
if context.get_node_value(dst) > distv:
context.set_node_value(dst, distv)

@staticmethod
def IncEval(frag, context):
print(context.superstep(), flush=True)
v_label_num = frag.vertex_label_num()
e_label_num = frag.edge_label_num()
for v_label_id in range(v_label_num):
iv = frag.inner_nodes(v_label_id)
for v in iv:
v_dist = context.get_node_value(v)
for e_label_id in range(e_label_num):
es = frag.get_outgoing_edges(v, e_label_id)
for e in es:
u = e.neighbor()
u_dist = v_dist + e.get_int(3)
if context.get_node_value(u) > u_dist:
context.set_node_value(u, u_dist)




if __name__ == '__main__':
graphscope.set_option(show_log=True)
sess = graphscope.session(k8s_volumes=k8s_volumes)
g = load_p2p_network(sess)
print(g.schema)
my_app = SSSP_PIE()
ret = my_app(g, src=6)
r = ret.to_dataframe({"node": "v:host.id", "result": "r:host"}).sort_values(by=["node"])
print(r.head(20))

4.4 运行flash算法

这里以K-core算法为例子,参考 https://github.com/alibaba/GraphScope/tree/main/python/graphscope/analytical/app/flash

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import graphscope
from graphscope.dataset import load_p2p_network

k8s_volumes = {
"data": {
"type": "hostPath",
"field": {
"path": "/home/miku",
"type": "Directory"
},
"mounts": {
"mountPath": "/home/miku"
}
}
}


if __name__ == '__main__':
graphscope.set_option(show_log=True)
sess = graphscope.session(k8s_volumes=k8s_volumes)
g = load_p2p_network(sess)
print(g.schema)
pg = g.project(vertices={"host": []}, edges={"connect": []})
print(pg.schema)
c = graphscope.flash.kcore_searching(pg, k=5)
print(c.schema)
print(c.to_numpy("r")[0])

flash算法编写参考 https://github.com/alibaba/GraphScope/tree/main/analytical_engine/apps/flash

5. 交互式运行

参考

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import graphscope
from graphscope.dataset import load_ogbn_mag

k8s_volumes = {
"data": {
"type": "hostPath",
"field": {
"path": "/home/miku",
"type": "Directory"
},
"mounts": {
"mountPath": "/home/miku"
}
}
}

if __name__ == '__main__':
graphscope.set_option(show_log=True)
sess = graphscope.session(k8s_volumes=k8s_volumes)
graph = load_ogbn_mag(sess)
print(graph.schema)
# Get the entrypoint for submitting Gremlin queries on graph g.
interactive = graphscope.gremlin(graph)

# Count the number of papers two authors (with id 2 and 4307) have co-authored.
papers = interactive.execute(
"g.V().has('author', 'id', 2).out('writes').where(__.in('writes').has('id', 4307)).count()"
).one()
print("result", papers)