返回

图数据库与传统数仓实现联邦查询

发布时间:2023-08-28 14:58:38 259

图数据库与传统数仓实现联邦查询

  • ​​使用CYPHER实现从关系数据库过滤时间序列指标​​
  • ​​一、MySQL得到研报实体在Oracle中的唯一ID​​
  • ​​二、Oracle中过滤时间序列数据​​
  • ​​三、CYPHER实现MySQL和Oracle查询语句串联​​
  • ​​四、通过`apoc.case`实现布尔值的判断​​
  • ​​五、将查询封装为函数​​
  • ​​六、将函数运用在数据过滤查询中​​
  • ​​七、总结​​

 

Here’s the table of contents:

使用CYPHER实现从关系数据库过滤时间序列指标

    本文中涉及的图数据模型主要是研报相关的数据,对研报数据分词之后得到关键词的数据,模型路径为​​(股票)<-[涉及]-(研报)-[包含]->(关键词)​​​。在建立起​​股票​​​、​​研报​​​、​​关键词​​​数据网络之后,需要通过研报的撰写时间过滤出相关​​股票​​​和​​关键词数据​​。其中研报实体本身相关的时间序列数据存储在MySQL和Oracle中,通过研报的唯一ID实现不同存储中的数据关联。

一、MySQL得到研报实体在Oracle中的唯一ID

    图库中保存的研报实体只有​​code​​​和​​name​​​两个属性,在关联时需要用该​​code​​在MySQL中拿到关联Oracle的ID,因此有了下面这个SQL语句。其中,为了保证在MySQL没有命中数据时CYPHER也能有连贯的数据传递操作,在SQL中加入了一个固定默认值的输出操作。

SELECT zyyx_yanbao_code 
FROM
(SELECT zyyx_yanbao_code
FROM ZYYX_YANBAO
WHERE yanbao_hcode='HDOCec613f2d8b707b66a8edc8c1eaeb29f0'
UNION
SELECT zyyx_yanbao_code
FROM ZYYX_YANBAO_old
WHERE yanbao_hcode='HDOCec613f2d8b707b66a8edc8c1eaeb29f0'
UNION
SELECT -1 AS ZYYX_YANBAO)
AS a ORDER BY zyyx_yanbao_code ASC LIMIT 2

二、Oracle中过滤时间序列数据

    在​​一​​中得到ID之后,从Oracle中过滤时序指标,同样为了保证在Oracle没有命中数据时CYPHER也能有连贯的数据传递操作,在SQL中加入了一个固定默认值的输出操作。

SELECT rownum rm, a.* 
FROM
(SELECT REPORT_ID
FROM
(SELECT REPORT_ID
FROM ODSZYYX.RPT_FORECAST_STK WHERE REPORT_ID='1359506' AND CREATE_DATE BETWEEN TO_DATE(20170902000000,'YYYY-MM-DD HH24:MI:SS') AND TO_DATE(20210902000000,'YYYY-MM-DD HH24:MI:SS')
UNION
SELECT -1 AS REPORT_ID
FROM
(SELECT rownum rm, a.*
FROM
( SELECT REPORT_ID
FROM ODSZYYX.RPT_FORECAST_STK)
a WHERE
rownum <= 1 ) b
WHERE b.rm > 0) a) a
WHERE rownum <= 2
ORDER BY REPORT_ID ASC

三、CYPHER实现MySQL和Oracle查询语句串联

    使用CYPHER实现对MySQL和Oracle查询语句的串联,并保证数据的连续性。

// 查询MySQL
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/test?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC',
'SELECT zyyx_yanbao_code FROM (SELECT zyyx_yanbao_code FROM ZYYX_YANBAO WHERE yanbao_hcode=? UNION SELECT zyyx_yanbao_code FROM ZYYX_YANBAO_old WHERE yanbao_hcode=? UNION SELECT -1 AS ZYYX_YANBAO) AS a ORDER BY zyyx_yanbao_code ASC LIMIT 2',
['HDOCec613f2d8b707b66a8edc8c1eaeb29f0','HDOCec613f2d8b707b66a8edc8c1eaeb29f0'])
YIELD row WITH row.zyyx_yanbao_code AS zyyx_yanbao_code
// 查询Oracle
CALL apoc.load.jdbc('jdbc:oracle:thin:ngdp/test@ngdpdb-sync-prod.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:1521/ORCL',
'SELECT rownum rm, a.* FROM (SELECT REPORT_ID FROM (SELECT REPORT_ID FROM TEST.RPT_FORECAST_STK WHERE REPORT_ID=? AND CREATE_DATE BETWEEN TO_DATE(?,\'YYYY-MM-DD HH24:MI:SS\') AND TO_DATE(?,\'YYYY-MM-DD HH24:MI:SS\') UNION SELECT -1 AS REPORT_ID FROM (SELECT rownum rm, a.* FROM ( SELECT REPORT_ID FROM ODSZYYX.RPT_FORECAST_STK) a WHERE rownum <= 1 ) b WHERE b.rm > 0) a) a WHERE rownum <= 2 ORDER BY REPORT_ID ASC',
[zyyx_yanbao_code,20170902000000,20210902000000])
YIELD row RETURN row

四、通过​​apoc.case​​实现布尔值的判断

    使用​​apoc.case​​过程,实现对SQL返回值结果的逻辑判断,并重定向数据结果为一个布尔值,返回布尔值是因为在后续的图数据路径过滤中会依赖该查询使用布尔值做判断。

// 查询MySQL
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/test?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC',
'SELECT zyyx_yanbao_code FROM (SELECT zyyx_yanbao_code FROM ZYYX_YANBAO WHERE yanbao_hcode=? UNION SELECT zyyx_yanbao_code FROM ZYYX_YANBAO_old WHERE yanbao_hcode=? UNION SELECT -1 AS ZYYX_YANBAO) AS a ORDER BY zyyx_yanbao_code ASC LIMIT 2',
['HDOCec613f2d8b707b66a8edc8c1eaeb29f0','HDOCec613f2d8b707b66a8edc8c1eaeb29f0'])
YIELD row WITH row.zyyx_yanbao_code AS zyyx_yanbao_code
// 查询Oracle
CALL apoc.load.jdbc('jdbc:oracle:thin:ngdp/test@ngdpdb-sync-prod.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:1521/ORCL',
'SELECT rownum rm, a.* FROM (SELECT REPORT_ID FROM (SELECT REPORT_ID FROM TEST.RPT_FORECAST_STK WHERE REPORT_ID=? AND CREATE_DATE BETWEEN TO_DATE(?,\'YYYY-MM-DD HH24:MI:SS\') AND TO_DATE(?,\'YYYY-MM-DD HH24:MI:SS\') UNION SELECT -1 AS REPORT_ID FROM (SELECT rownum rm, a.* FROM ( SELECT REPORT_ID FROM ODSZYYX.RPT_FORECAST_STK) a WHERE rownum <= 1 ) b WHERE b.rm > 0) a) a WHERE rownum <= 2 ORDER BY REPORT_ID ASC',
[zyyx_yanbao_code,20170902000000,20210902000000])
YIELD row WITH row
WITH COLLECT(row.REPORT_ID) AS REPORT_ID_LIST
WITH [REPORT_ID IN REPORT_ID_LIST WHERE REPORT_ID<>'-1'] AS RE_REPORT_ID_LIST
CALL apoc.case(
[RE_REPORT_ID_LIST<>[],
'RETURN TRUE AS bool'],
'RETURN FALSE AS bool'
)
YIELD value
RETURN value.bool AS bool

五、将查询封装为函数

    为了方便在后续的CYPHER中调用​​四​​​中复杂的查询,将​​四​​中的查询封装为一个CYPHER函数。

CALL apoc.custom.asFunction(
'yanbaoHcode.createDate.range.bool',
'CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT zyyx_yanbao_code FROM (SELECT zyyx_yanbao_code FROM ZYYX_YANBAO WHERE yanbao_hcode=? UNION SELECT zyyx_yanbao_code FROM ZYYX_YANBAO_old WHERE yanbao_hcode=? UNION SELECT -1 AS ZYYX_YANBAO) AS a ORDER BY zyyx_yanbao_code ASC LIMIT 2\',[$yanban_hcode,$yanban_hcode]) YIELD row WITH row.zyyx_yanbao_code AS zyyx_yanbao_code CALL apoc.load.jdbc(\'jdbc:oracle:thin:ngdp/datalabgogo@ngdpdb-sync-prod.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:1521/ORCL\', \'SELECT rownum rm, a.* FROM (SELECT REPORT_ID FROM (SELECT REPORT_ID FROM ODSZYYX.RPT_FORECAST_STK WHERE REPORT_ID=? AND CREATE_DATE BETWEEN TO_DATE(?,\\\'YYYY-MM-DD HH24:MI:SS\\\') AND TO_DATE(?,\\\'YYYY-MM-DD HH24:MI:SS\\\') UNION SELECT -1 AS REPORT_ID FROM (SELECT rownum rm, a.* FROM ( SELECT REPORT_ID FROM ODSZYYX.RPT_FORECAST_STK) a WHERE rownum <= 1 ) b WHERE b.rm > 0) a) a WHERE rownum <= 2 ORDER BY REPORT_ID ASC\',[zyyx_yanbao_code,$create_date_start,$create_date_stop]) YIELD row WITH row WITH COLLECT(row.REPORT_ID) AS REPORT_ID_LIST WITH [REPORT_ID IN REPORT_ID_LIST WHERE REPORT_ID<>\'-1\'] AS RE_REPORT_ID_LIST CALL apoc.case([RE_REPORT_ID_LIST<>[],\'RETURN TRUE AS bool\'],\'RETURN FALSE AS bool\') YIELD value RETURN value.bool AS bool',
'BOOLEAN',
[['yanban_hcode','STRING'],['create_date_start','LONG'],['create_date_stop','LONG']],
false,
'通过判断研报撰写日期返回FALSE或者TRUE【结果集大于0返回TRUE】【时间范围左闭右闭】'
);

六、将函数运用在数据过滤查询中

    通过一系列的查询下推拆分在​​一到五​​​节中,实现了复杂查询的封装,在应用这个时序指标过滤函数时就可以方便地调用。下面的查询实现了对​​20060902000000​​​到​​20210902000000​​之间撰写的研报数据关联网络的过滤。

MATCH p=(n:股票)<-[:涉及]-(c:研报)-[r:包含]->(k:关键词)
WHERE
custom.yanbaoHcode.createDate.range.bool(
c.hcode,
20060902000000,
20210902000000)
RETURN k.name AS keyword, r.weight AS weight LIMIT 10
  • 查看执行效率【查询100条路径耗时100ms】

七、总结

    在本文中可以看到,数据架构中拆分了时序指标数据和关联网络,这样的做法可以尽可能节省​​图数据​​​库单节点服务器的硬盘​​存储资源​​​,使得一台服务器可以存储更多的关联网络;同时,充分利用了​​数仓​​​的​​存储资源​​​和​​计算资源​​​。但是这种架构方式,启用了更多的​​网络查询消耗​​,和本地存储方式相比会多消耗一些时间资源。在实际跑模型中,可以根据实际场景优化查询。将查询较多的属性数据存放在图库中,可以减少网络消耗;时序数据可以用JSON串保存在属性中,使用存储过程过滤。

 

特别声明:以上内容(图片及文字)均为互联网收集或者用户上传发布,本站仅提供信息存储服务!如有侵权或有涉及法律问题请联系我们。
举报
评论区(0)
按点赞数排序
用户头像
精选文章
thumb 中国研究员首次曝光美国国安局顶级后门—“方程式组织”
thumb 俄乌线上战争,网络攻击弥漫着数字硝烟
thumb 从网络安全角度了解俄罗斯入侵乌克兰的相关事件时间线
下一篇
图数据库ONgDB Release v-1.0.0 2023-08-28 12:07:08