neo4j、solr、es数据同步和增量更新
一、Neo4j的数据增量更新:
图数据除了节点的增量更新还牵扯到边的增量更新,节点其实还好说,无论是新增节点还是新增节点属性或者是节点属性更新,实际上都是在节点表可以完成,不是很消耗资源;比较复杂且影响更新效率的其实边的增量更新。
1、节点更新
这个需求其实很普遍,比如我有一个节点:
(n:Person {id: 'argan', name: 'argan', age: 32})
然后用户又传递了一个人数据过来:
{id: 'argan', age: 30, sex: 'male', email: 'arganzheng@gmail.com'}
可以看到更新了一个属性:年龄,新增了两个属性:性别和电子邮件。我们希望最后的结果是:
(n:Person {id: 'argan', name: 'argan', age: 30, sex: 'male', email: 'arganzheng@gmail.com'})。
需要注意的是name是没有传递的,但还是保留着的。如果要删除一个属性,需要把它的值显式的设置为空。
在Neo4j的要怎么做到呢?
Neo4j的提供了合并语句来实现这个功能。
如果需要创建节点,请合并节点并设置属性。
MERGE (n:Person { id: 'argan' })
ON CREATE SET n.created = timestamp()
ON MATCH SET n.lastAccessed = timestamp()
RETURN n.name, n.created, n.lastAccessed
上面的例子可以这么写:
MERGE (n:Node {id: 'argan'}) SET n += {id: 'argan', age: 30, sex: 'male', email: 'arganzheng@gmail.com'}
RETURN n
因为这里采用了+=
本身就是合并属性,所以区分不需要的英文ON CREATE
还是ON MATCH
。
同样关系也可以用合并保证只创建一次:
MATCH (n), (m) WHERE n.id = "argan" AND m.id = "magi" CREATE (n)-[:KNOWS]->(m)
写成这样子就可以保证唯一了:
MATCH (n:User {name: "argan"}), (m:User {name: "magi"}) MERGE (n)-[:KNOWS]->(m)
2、neo4j如何支持动态节点标签和关系类型?
上面的合并语句能够实现“存在更新,否则创建”的逻辑,但是还有一个问题没有解决,就是没有设置节点的标签。我们希望构建的节点数据完全是运行时根据用户提供的数据构造的,包括。标签比如用户提供如下数据:
:param batch: [{properties: {name: "argan", label: "Person", id: "1", age: 31}}, {properties: {name: "magi", label: "Person", id: "2", age: 28}}]
下面的暗号语句并没有设置节点的标签,虽然节点有一个叫做标签的属性:
UNWIND {batch} as row
MERGE (n {id: row.id})
SET n += row.properties
那么我们能不能简单的指定标签呢?
UNWIND {batch} as row
MERGE (n:row.properties.label {id: row.id})
SET n += row.properties
但是遗憾的是这个语句会报错,因为neo4j不支持动态的节点标签。把row.properties.label
去掉或者改成一个固定的字符串就没有问题。
改成这样子也不行:
UNWIND {batch} as row MERGE (n {id: row.id} ) SET n:row.properties.label, n += row.properties
绑定变量也不行:
UNWIND {batch} as row MERGE (n {id: row.id} ) SET n:{label}, n += row.properties
直接指定标签就可以了:
UNWIND {batch} as row MERGE (n {id: row.id} ) SET n:Test, n += row.properties
也就是说3.3.13.9。在节点上设置标签也并不支持动态标签..
笔记
neo4j的设置标签还有一个问题,就是它其实是新增标签,不是修改标签。要到更新的效果,你需要先删除掉,再新增..
MATCH (n) WHERE ID(n) = 14 REMOVE n:oldLabel SET n:newLabel
如果是单条数据更新,那其实很简单,我们只需要做字符串拼接就可以了:
String label = vertex.getLabel(); "MERGE (n:" + label + " {id: {id}} " + "SET n += {properties}"
但是关键是我们这里是在Neo4j的内部用开卷展开的服务端变量,如果它不允许动态变量,根本搞不定。难道真的要一条条的插入,那会非常慢的!Neo4j的的插入性能是众所周知的差。一种做法就是先批量插入数据,设置一个临时的标签,然后再批量的更新标签。不过需要两次操作,性能肯定至少慢两倍。
有没有什么方式呢?谷歌了很久,发现了也有人遇到这样的问题:功能请求:apoc支持MERGE节点和rels#271和是否可以使用数据驱动的节点或关系标签进行合并?。
原理跟单条数据插入一样,只是由于退绕是在服务端(Neo4j的)进行的,所以拼接也只能在服务端进行,怎么拼接的就是用?apoc.cypher.doIt
拼接后让它在服务端执行:
UNWIND {batch} as row WITH 'MERGE (n:' + row.properties.label + ' { id: row.id }) SET n += row.properties return n' AS cypher CALL apoc.cypher.doIt(cypher, {}) YIELD value return value.n
但是可惜,会报这样的异常:
org.neo4j.driver.v1.exceptions.ClientException:
Failed to invoke procedure `apoc.cypher.doIt`:
Caused by: org.neo4j.graphdb.QueryExecutionException:
Variable `row` not defined (line 1, column 23 (offset: 22)) "MERGE (n:Person { id: row.id }) SET n += row.properties return n"
所以还是要分两步进行,不过可以合并在一起SET标签:传递标签名称作为参数:
UNWIND {batch} as row MERGE (n { id: row.id })
SET n += row.properties WITH n CALL apoc.create.addLabels(id(n), [n.label])
YIELD node RETURN node
这样就可以正确的保存数据并且动态设置标签了。笔
本来我们是可以直接使用APOC库的apoc.merge.node
状语从句:apoc.create.relationship
动态的更新节点标签,关系和节点的。但是正如前面分析的,apoc.merge.node
状语从句:apoc.create.relationship
现在的实现其实的英文一个防重复CREATE而已,不能达到更新的目的。否则我们的实现将非常简单明了:
更新节点:
UWNIND {batch} as row CALL apoc.merge.node(row.labels, {id: row.id} , row.properties)
yield node RETURN count(*)
更新关系:
UWNIND {batch} as row MATCH (from) WHERE id(from) = row.from
MATCH (to:Label) where to.key = row.to CALL apoc.merge.relationship(from, row.type, {id: row.id}, row.properties, to)
yield rel RETURN count(*)
一种做法就是叉一个分支出来,修改源码,部署自己的罐子包。
二、solr 的增量更新
1.首先要弄懂几个必要的属性,以及数据库建表事项,和dataimporter.properties 、data-config.xml里面的数据
<!– transformer 格式转化:HTMLStripTransformer 索引中忽略HTML标签 —>
<!– query:查询数据库表符合记录数据 —>
<!– deltaQuery:增量索引查询主键ID —> 注意这个只能返回ID字段
<!– deltaImportQuery:增量索引查询导入数据 —>
<!– deletedPkQuery:增量索引删除主键ID查询 —> 注意这个只能返回ID字段
2.数据库配置注意事项
1.如果只涉及添加,与修改业务,那么数据库里只需额外有一个timpstamp字段
就可以了,默认值为当前系统时间,CURRENT_TIMESTAMP
2.如果还涉及删除业务,那么数据里就需额外再多添加一个字段isdelete,int类型的
用0,1来标识,此条记录是否被删除
3.dataimporter.properties
这个配置文件很重要,它是用来记录当前时间与上一次修改时间的,通过它能够找出,那些,新添加的,修改的,或删除的记录标识,此条记录是否被删除的记录
4.增量更新就是在全量更新的基础上加上一些配置,配置如下:
<?xml version="1.0" encoding="UTF-8" ?>
<dataConfig>
<!--数据源-->
<dataSource type="JdbcDataSource"
driver="com.mysql.jdbc.Driver"
url="jdbc:mysql://192.168.2.10:3306/xtjkqyfw"
user="root"
password="Biaopu8888"/>
<document>
<entity name="solrTest"
query="SELECT fid,ftime,fcontent,ftitle,flastupdatetime FROM solrTest where flag = '0'"
deltaImportQuery = "SELECT fid,ftime,fcontent,ftitle,flastupdatetime FROM solrTest where fid = '${dataimporter.delta.fid}'"
deltaQuery = "SELECT fid FROM solrTest where flastupdatetime > '${dataimporter.last_index_time}' and flag = '0'"
deletedPkQuery = "SELECT fid FROM solrTest where flag = '1'"
>
<!--查询的数据和数据库索引意义对应column 是查询的字段name 是solr索引对应的字段-->
<field column="fid" name="fid"/>
<field column="ftitle" name="ftitle"/>
<field column="fcontent" name="fcontent"/>
<field column="flastupdatetime" name="flastupdatetime"/>
<field column="ftime" name="ftime"/>
</entity>
</document>
</dataConfig>
三、LOGSTASH-INPUT-JDBC 实现数据库同步ES
在数据方面碰到第一个问题是怎么将postgres中的数据库中同步到es中,在网上找了下相关文档,只有logstash-input-jdbc这个插件还在维护,而且在es中logstash高版本已经集成了这一插件,所以就省去了安装ruby和安装插件这一步了
1 安装elasticsearch logstash kibana 三件套
2 下载数据库驱动
图方便的话可以直接拷贝maven仓库里面的即可
3 添加 .conf文件

input {
jdbc {
# mysql 数据库链接,shop为数据库名
jdbc_connection_string => "jdbc:postgresql://ip:5432/chongqing_gis"
# 用户名和密码
jdbc_user => ""
jdbc_password => ""
# 驱动
jdbc_driver_library => "E:/ES/logstash-7.8.0/postgres/postgresql-42.2.9.jar"
# 驱动类名
jdbc_driver_class => "org.postgresql.Driver" jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 执行的sql 文件路径+名称
statement_filepath => "E:/ES/logstash-7.8.0/postgres/jdbc.sql" # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
# schedule => "* * * * *"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["localhost:9200"]
# 索引名称
index => "test_index"
# 需要关联的数据库中有有一个id字段,对应类型中的id document_id => "%{gid}"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
修改输入参数:数据库ip、端口、账号、密码等
修改输出参数:es的ip、端口、索引、document_id等
输出参数中索引如果还没有创建,在启动logstash时会自动根据默认模板创建索引,其中有些教程中出现了index_type的设置,这个类型在es的高版本中已经取消,不需要再设置,如果设置了,启动logstash会报错
statement_filepath 保存准备执行的sql文件
jdbc.sql文件:
select gid,name,address from qtpoi
在qtpoi表中有个字段是保存的空间地理信息geom字段,当我加上这个时,启动logstash一直报错,可能对空间字段需要做进一步的处理
Exception when executing JDBC query {:exception=>#<Sequel::DatabaseError: Java::OrgLogstash::MissingConverterException: Missing Converter handling for full class name=org.postgresql.util.PGobject, simple name=PGobject
4 启动logstash即可同步
bin/logstash -f config/postgres.conf
5 打开kibana即可查看到刚同步的数据
GET test_index/_doc/_search
6 如果设置了定时任务,logstash会定时去访问数据同步到es中,但是上面jdbc.sql文件中获取的是整张表的数据,也就是说每次同步都会对全表进行同步,但是我的需求是只需要第一次同步整张表后面只对更新的和修改的数据做同步,
在网上找了下,思路大概是给表增加一个新的字段,保存当前创建的时间或者是当前更新的时间,然后根据:sql_last_value这个函数获取大于这个函数的时间,就过滤出新增的数据和更新的数据,实现对增量数据同步到es,:sql_last_value这个函数官网也没说的太清楚,我大致认为是最后一个更新的值或者最后一次更新的时间