gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档neo4j

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "neo4j"
Elasticsearch, neo4j, solr 3月 22,2022

neo4j、solr、es数据同步和增量更新

一、Neo4j的数据增量更新:

图数据除了节点的增量更新还牵扯到边的增量更新,节点其实还好说,无论是新增节点还是新增节点属性或者是节点属性更新,实际上都是在节点表可以完成,不是很消耗资源;比较复杂且影响更新效率的其实边的增量更新。

1、节点更新

这个需求其实很普遍,比如我有一个节点:

(n:Person {id: 'argan', name: 'argan', age: 32})

然后用户又传递了一个人数据过来:

{id: 'argan', age: 30, sex: 'male', email: 'arganzheng@gmail.com'} 

可以看到更新了一个属性:年龄,新增了两个属性:性别和电子邮件。我们希望最后的结果是:

(n:Person {id: 'argan', name: 'argan', age: 30, sex: 'male', email: 'arganzheng@gmail.com'})。

需要注意的是name是没有传递的,但还是保留着的。如果要删除一个属性,需要把它的值显式的设置为空。

在Neo4j的要怎么做到呢?

Neo4j的提供了合并语句来实现这个功能。

与ON CREATE和ON MATCH合并

如果需要创建节点,请合并节点并设置属性。

MERGE (n:Person { id: 'argan' })
ON CREATE SET n.created = timestamp()
ON MATCH SET n.lastAccessed = timestamp()
RETURN n.name, n.created, n.lastAccessed

上面的例子可以这么写:

MERGE (n:Node {id: 'argan'}) SET n += {id: 'argan', age: 30, sex: 'male', email: 'arganzheng@gmail.com'} 
RETURN n 

因为这里采用了+=本身就是合并属性,所以区分不需要的英文ON CREATE还是ON MATCH。

同样关系也可以用合并保证只创建一次:

MATCH (n), (m) WHERE n.id = "argan" AND m.id = "magi" CREATE (n)-[:KNOWS]->(m)

写成这样子就可以保证唯一了:

MATCH (n:User {name: "argan"}), (m:User {name: "magi"}) MERGE (n)-[:KNOWS]->(m)

2、neo4j如何支持动态节点标签和关系类型?

上面的合并语句能够实现“存在更新,否则创建”的逻辑,但是还有一个问题没有解决,就是没有设置节点的标签。我们希望构建的节点数据完全是运行时根据用户提供的数据构造的,包括。标签比如用户提供如下数据:

:param batch: [{properties: {name: "argan", label: "Person", id: "1", age: 31}}, {properties: {name: "magi", label: "Person", id: "2", age: 28}}]

下面的暗号语句并没有设置节点的标签,虽然节点有一个叫做标签的属性:

UNWIND {batch} as row 
MERGE (n {id: row.id})
SET n += row.properties

那么我们能不能简单的指定标签呢?

UNWIND {batch} as row 
MERGE (n:row.properties.label {id: row.id})
SET n += row.properties

但是遗憾的是这个语句会报错,因为neo4j不支持动态的节点标签。把row.properties.label去掉或者改成一个固定的字符串就没有问题。

改成这样子也不行:

UNWIND {batch} as row   MERGE (n {id: row.id} )   SET n:row.properties.label,  n += row.properties

绑定变量也不行:

UNWIND {batch} as row   MERGE (n {id: row.id} )   SET n:{label},  n += row.properties

直接指定标签就可以了:

UNWIND {batch} as row   MERGE (n {id: row.id} )   SET n:Test,  n += row.properties

也就是说3.3.13.9。在节点上设置标签也并不支持动态标签..

笔记

neo4j的设置标签还有一个问题,就是它其实是新增标签,不是修改标签。要到更新的效果,你需要先删除掉,再新增..

MATCH (n) WHERE ID(n) = 14  REMOVE n:oldLabel SET n:newLabel

如果是单条数据更新,那其实很简单,我们只需要做字符串拼接就可以了:

String label = vertex.getLabel(); "MERGE (n:" + label + " {id: {id}} " + "SET n += {properties}"

但是关键是我们这里是在Neo4j的内部用开卷展开的服务端变量,如果它不允许动态变量,根本搞不定。难道真的要一条条的插入,那会非常慢的!Neo4j的的插入性能是众所周知的差。一种做法就是先批量插入数据,设置一个临时的标签,然后再批量的更新标签。不过需要两次操作,性能肯定至少慢两倍。

有没有什么方式呢?谷歌了很久,发现了也有人遇到这样的问题:功能请求:apoc支持MERGE节点和rels#271和是否可以使用数据驱动的节点或关系标签进行合并?。

原理跟单条数据插入一样,只是由于退绕是在服务端(Neo4j的)进行的,所以拼接也只能在服务端进行,怎么拼接的就是用?apoc.cypher.doIt拼接后让它在服务端执行:

UNWIND {batch} as row  WITH 'MERGE (n:' + row.properties.label + ' { id: row.id }) SET n += row.properties return n' AS cypher CALL apoc.cypher.doIt(cypher, {}) YIELD value return value.n

但是可惜,会报这样的异常:

org.neo4j.driver.v1.exceptions.ClientException: 
Failed to invoke procedure `apoc.cypher.doIt`: 
Caused by: org.neo4j.graphdb.QueryExecutionException: 
Variable `row` not defined (line 1, column 23 (offset: 22)) "MERGE (n:Person { id: row.id }) SET n += row.properties return n"

所以还是要分两步进行,不过可以合并在一起SET标签:传递标签名称作为参数:

UNWIND {batch} as row  MERGE (n { id: row.id }) 
SET n += row.properties  WITH n  CALL apoc.create.addLabels(id(n), [n.label]) 
YIELD node RETURN node

这样就可以正确的保存数据并且动态设置标签了。笔

本来我们是可以直接使用APOC库的apoc.merge.node状语从句:apoc.create.relationship动态的更新节点标签,关系和节点的。但是正如前面分析的,apoc.merge.node状语从句:apoc.create.relationship现在的实现其实的英文一个防重复CREATE而已,不能达到更新的目的。否则我们的实现将非常简单明了:

更新节点:

UWNIND {batch} as row CALL apoc.merge.node(row.labels, {id: row.id} , row.properties) 
yield node RETURN count(*)

更新关系:

UWNIND {batch} as row MATCH (from) WHERE id(from) = row.from 
MATCH (to:Label) where to.key = row.to CALL apoc.merge.relationship(from, row.type, {id: row.id}, row.properties, to) 
yield rel RETURN count(*)

一种做法就是叉一个分支出来,修改源码,部署自己的罐子包。

二、solr 的增量更新

1.首先要弄懂几个必要的属性,以及数据库建表事项,和dataimporter.properties 、data-config.xml里面的数据

<!–  transformer 格式转化:HTMLStripTransformer 索引中忽略HTML标签   —> 
  <!–  query:查询数据库表符合记录数据   —> 
  <!–  deltaQuery:增量索引查询主键ID    —>    注意这个只能返回ID字段 
  <!–  deltaImportQuery:增量索引查询导入数据  —> 
  <!–  deletedPkQuery:增量索引删除主键ID查询  —> 注意这个只能返回ID字段

2.数据库配置注意事项

1.如果只涉及添加,与修改业务,那么数据库里只需额外有一个timpstamp字段 
就可以了,默认值为当前系统时间,CURRENT_TIMESTAMP
2.如果还涉及删除业务,那么数据里就需额外再多添加一个字段isdelete,int类型的 
用0,1来标识,此条记录是否被删除

3.dataimporter.properties 

这个配置文件很重要,它是用来记录当前时间与上一次修改时间的,通过它能够找出,那些,新添加的,修改的,或删除的记录标识,此条记录是否被删除的记录

4.增量更新就是在全量更新的基础上加上一些配置,配置如下:

<?xml version="1.0" encoding="UTF-8" ?>
<dataConfig> 
    <!--数据源-->
    <dataSource type="JdbcDataSource"
                driver="com.mysql.jdbc.Driver"
                url="jdbc:mysql://192.168.2.10:3306/xtjkqyfw"
                user="root"
                password="Biaopu8888"/>
    <document> 

        <entity name="solrTest" 
        query="SELECT fid,ftime,fcontent,ftitle,flastupdatetime FROM solrTest where flag = '0'"
        deltaImportQuery = "SELECT fid,ftime,fcontent,ftitle,flastupdatetime FROM solrTest where fid = '${dataimporter.delta.fid}'"
        deltaQuery = "SELECT fid FROM solrTest where flastupdatetime > '${dataimporter.last_index_time}' and flag = '0'"
        deletedPkQuery = "SELECT fid FROM solrTest where flag = '1'"
        >
            <!--查询的数据和数据库索引意义对应column 是查询的字段name 是solr索引对应的字段-->
            <field column="fid" name="fid"/>
            <field column="ftitle" name="ftitle"/>
            <field column="fcontent" name="fcontent"/>
            <field column="flastupdatetime" name="flastupdatetime"/>
            <field column="ftime" name="ftime"/>
        </entity>
        
    </document> 
</dataConfig>

三、LOGSTASH-INPUT-JDBC 实现数据库同步ES

在数据方面碰到第一个问题是怎么将postgres中的数据库中同步到es中,在网上找了下相关文档,只有logstash-input-jdbc这个插件还在维护,而且在es中logstash高版本已经集成了这一插件,所以就省去了安装ruby和安装插件这一步了

1 安装elasticsearch logstash kibana 三件套

2 下载数据库驱动

图方便的话可以直接拷贝maven仓库里面的即可

3 添加 .conf文件

input {  
jdbc {
# mysql 数据库链接,shop为数据库名
jdbc_connection_string => "jdbc:postgresql://ip:5432/chongqing_gis"
# 用户名和密码
jdbc_user => ""
jdbc_password => ""
# 驱动
jdbc_driver_library => "E:/ES/logstash-7.8.0/postgres/postgresql-42.2.9.jar"
# 驱动类名
jdbc_driver_class => "org.postgresql.Driver" jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 执行的sql 文件路径+名称
statement_filepath => "E:/ES/logstash-7.8.0/postgres/jdbc.sql" # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
# schedule => "* * * * *"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["localhost:9200"]
# 索引名称
index => "test_index"
# 需要关联的数据库中有有一个id字段,对应类型中的id document_id => "%{gid}"
}
stdout {
# JSON格式输出
codec => json_lines
}
}

修改输入参数:数据库ip、端口、账号、密码等

修改输出参数:es的ip、端口、索引、document_id等

输出参数中索引如果还没有创建,在启动logstash时会自动根据默认模板创建索引,其中有些教程中出现了index_type的设置,这个类型在es的高版本中已经取消,不需要再设置,如果设置了,启动logstash会报错

statement_filepath 保存准备执行的sql文件

jdbc.sql文件:

select gid,name,address from qtpoi

在qtpoi表中有个字段是保存的空间地理信息geom字段,当我加上这个时,启动logstash一直报错,可能对空间字段需要做进一步的处理

Exception when executing JDBC query {:exception=>#<Sequel::DatabaseError: Java::OrgLogstash::MissingConverterException: Missing Converter handling for full class name=org.postgresql.util.PGobject, simple name=PGobject

4 启动logstash即可同步

bin/logstash -f config/postgres.conf

5 打开kibana即可查看到刚同步的数据

GET test_index/_doc/_search

6 如果设置了定时任务,logstash会定时去访问数据同步到es中,但是上面jdbc.sql文件中获取的是整张表的数据,也就是说每次同步都会对全表进行同步,但是我的需求是只需要第一次同步整张表后面只对更新的和修改的数据做同步,

在网上找了下,思路大概是给表增加一个新的字段,保存当前创建的时间或者是当前更新的时间,然后根据:sql_last_value这个函数获取大于这个函数的时间,就过滤出新增的数据和更新的数据,实现对增量数据同步到es,:sql_last_value这个函数官网也没说的太清楚,我大致认为是最后一个更新的值或者最后一次更新的时间

作者 east
neo4j 5月 27,2021

Neo4j的地理空间特性

很多数据库都有地理空间特性,例如mysql在5.0以上就有,es也有。

从Neo4j 3.0版本开始就内置了有限的空间支持功能。默认支持点和距离。假定点设置了经纬度属性值,就可计算出两点之间的距离。先让我们创建塔,并养成好习惯定义相应的约束:

CREATE CONSTRAINT ON (t:Tower) ASSERT t.name IS UNIQUE
CREATE (paris:Tower {name:"Eiffel Tower",country:"FRA",latitude:48.82322,longitude:2.29323})

CREATE (guangzhou:Tower {name:"Guangzhou Tower",country:"CN",latitude:68.82322,longitude:128.29323})

可通过下面查询语句获得埃菲尔铁塔和其他塔之间的距离:

MATCH (et:Tower {name:'Eiffle Tower'}), (other:Tower)
RETURN et.name, other.name, round(distance(point(et),
point(other))/10)/100 as dist_km

这里的关键是点(point),一个地理位置点是用图数据库中的一个带有纬度和经度属性的节点进行表示,distance函数是通过两点位置计算出来的。Round函数的作用是舍入到最近的整数,使用小技巧(/10/100),可将数值保留到小数点后两位

可通过以下方式查询APOC的空间过程列表:

CALL apoc.help("spatial")

APOC提供了地理编码的可能性,它将地址转换成空间坐标并按距离对路径进行排序。虽然它只是将地址映射到坐标,但不只是两个浮点数,地理编码并不容易,大量工程师在通过代码进行地理编码的变换。存储汽车行走时拍摄的照片需要消耗大量存储资源,识别图片中的数字需要消耗大量的CPU计算资源。因此,首选是不要自己来处理地理编码,而是依赖一个可被APOC透明调用的提供方。

作者 east
neo4j 5月 11,2021

Neo4j批量插入Java API实例

@Component
public class DatabaseBatchSaveUtil {

    private final static Logger LOGGER = LoggerFactory.getLogger(DatabaseBatchSaveUtil.class);

    private final static String NEO4j_GET_CURRENT_TIME_FUNCTION = "apoc.date.format(timestamp(),\\\"ms\\\",\\\"yyyy-MM-dd HH:mm:ss\\\",\\\"CTT\\\")";

    public static void batchSaveNodeFromDatabase(String jdbcConnectString, String sql, BaseNode baseNode, Map<String, String> columnMap) {
        try {
            StringBuilder stringBuilder = new StringBuilder();
            stringBuilder.append("call apoc.periodic.iterate('call apoc.load.jdbc(\\\"").append(jdbcConnectString).append("\\\",\\\"").append(sql).append("\\\")', '");
            stringBuilder.append("merge (a:").append(baseNode.typeGetter()).append("{").append(baseNode.uniqueFieldNameGetter()).append(":").append("row.").append(columnMap.get(baseNode.uniqueFieldNameGetter()));
            if (baseNode instanceof CarNode) {
                stringBuilder.append(",vehicleCodeType:row.vehicleCodeTypeStr");
            }
            stringBuilder.append("}) ").append("set ");
            columnMap.forEach((neo4jColumn, dbColumn) -> stringBuilder.append("a.").append(neo4jColumn).append(" = row.").append(dbColumn).append(","));
            if (columnMap.containsKey("updateTime"))
                stringBuilder.deleteCharAt(stringBuilder.length() - 1);
            else stringBuilder.append("a.updateTime = ").append(NEO4j_GET_CURRENT_TIME_FUNCTION);
            stringBuilder.append("' ").append(",{batchSize:1000,iterateList:true})");

            LocalDateTime startTime = LocalDateTime.now();
            CyberQueryExecuteUtil.executeBatchUpdateQuery(stringBuilder.toString());

            LOGGER.info("Save node: {} from database {}, cost {} second", baseNode.typeGetter(), jdbcConnectString, Duration.between(startTime, LocalDateTime.now()).getSeconds());

            Thread.sleep(20000);
        } catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
        }
    }

    public static void batchSaveRelationFromDatabase(String jdbcConnectString, String sql, BaseRelation baseRelation, Map<String, String> columnMap) {
        try {
            BaseNode startNode = baseRelation.getStartNode();
            BaseNode endNod = baseRelation.getEndNode();

            StringBuilder stringBuilder = new StringBuilder();
            stringBuilder.append("call apoc.periodic.iterate('call apoc.load.jdbc(\\\"").append(jdbcConnectString).append("\\\",\\\"").append(sql).append("\\\")', '");

            stringBuilder.append("merge (n:").append(startNode.typeGetter()).append("{").append(startNode.uniqueFieldNameGetter()).append(":").append("row.").append(columnMap.get("startNode"));
            if (startNode instanceof CarNode) {
                stringBuilder.append(",vehicleCodeType:row.vehicleCodeTypeStr");
            }
            stringBuilder.append("}) with * ");

            stringBuilder.append("merge (m:").append(endNod.typeGetter()).append("{").append(endNod.uniqueFieldNameGetter()).append(":").append("row.").append(columnMap.get("endNode"));
            if (endNod instanceof CarNode) {
                stringBuilder.append(",vehicleCodeType:row.vehicleCodeTypeStr2");
            }
            stringBuilder.append("}) with * ");

            stringBuilder.append("merge (n)-[r:").append(baseRelation.getType());
            if (columnMap.containsKey("startTime") || columnMap.containsKey("endTime")) {
                stringBuilder.append("{");
                if (columnMap.containsKey("startTime"))
                    stringBuilder.append("startTime:row.").append(columnMap.get("startTime")).append(",");

                if (columnMap.containsKey("endTime"))
                    stringBuilder.append("endTime:row.").append(columnMap.get("endTime"));
                else stringBuilder.deleteCharAt(stringBuilder.length() - 1);

                stringBuilder.append("}");
            }
            stringBuilder.append("]-(m)");

            columnMap.remove("startNode");
            columnMap.remove("endNode");

            if (!columnMap.isEmpty()) {
                stringBuilder.append(" set  ");
                columnMap.forEach((neo4jColumn, dbColumn) -> stringBuilder.append("r.").append(neo4jColumn).append(" = row.").append(dbColumn).append(","));
                if (columnMap.containsKey("updateTime"))
                    stringBuilder.deleteCharAt(stringBuilder.length() - 1);
                else stringBuilder.append("r.updateTime = ").append(NEO4j_GET_CURRENT_TIME_FUNCTION);
            }

            stringBuilder.append("' ").append(",{batchSize:1000,iterateList:true})");

            LocalDateTime startTime = LocalDateTime.now();
            CyberQueryExecuteUtil.executeBatchUpdateQuery(stringBuilder.toString());

            LOGGER.info("Save relation: {} from database {}, cost {} second", baseRelation.getType(), jdbcConnectString, Duration.between(startTime, LocalDateTime.now()).getSeconds());

            Thread.sleep(20000);
        } catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
        }
    }
}
作者 east
neo4j 5月 9,2021

Neo4j增删改查Java API实例

@Component
public class QueryDao {
    private static final Logger LOGGER = LoggerFactory.getLogger(StatisticDao.class);

    public List<BaseNode> queryNodeByPage(BaseNode baseNode, LocalDateTime lastTime, long pageNum, int pageSize) {
        long skip = pageNum * pageSize;
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("MATCH (n:").append(baseNode.typeGetter());
        if (StringUtils.hasText(baseNode.uniqueValueGetter()))
            stringBuilder.append("{").append(baseNode.uniqueFieldNameGetter()).append(": '").append(baseNode.uniqueValueGetter()).append("'}");
        stringBuilder.append(") ");
        if (null != lastTime)
            stringBuilder.append("WHERE n.updateTime is null OR n.updateTime <= '").append(lastTime.format(Constant.NORMAL_TIME_FORMATTER)).append("' ");
        stringBuilder.append("RETURN n SKIP ").append(skip).append(" LIMIT ").append(pageSize);
        return Lists.newArrayList(CyberQueryExecuteUtil.executeEntityQuery(stringBuilder.toString(), BaseNode.class));
    }

    public List<BaseNode> queryNodeByUniqueValue(BaseNode entity) {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("MATCH (n:").append(entity.typeGetter()).append("{").append(entity.uniqueFieldNameGetter()).append(": '");
        if (entity instanceof CarNode) {
            String[] carArray = entity.uniqueValueGetter().split(Constant.carNoTypeSplit);
            stringBuilder.append(carArray[0]).append("'");
            if (1 < carArray.length) {
                stringBuilder.append(",vehicleCodeType:'");
                if (2 > carArray[1].length()) stringBuilder.append("0");
                stringBuilder.append(carArray[1]).append("'");
            }
        } else stringBuilder.append(entity.uniqueValueGetter()).append("'");
        stringBuilder.append("}) RETURN n");
        return Lists.newArrayList(CyberQueryExecuteUtil.executeEntityQuery(stringBuilder.toString(), BaseNode.class));
    }

    public boolean isNodeExistAndFreshLastWeek(BaseNode entity) {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("MATCH (n:").append(entity.typeGetter());
        stringBuilder.append("{").append(entity.uniqueFieldNameGetter()).append(": '").append(entity.uniqueValueGetter()).append("'");
        if (entity instanceof CarNode) {
            CarNode carNode = (CarNode) entity;
            if (StringUtils.hasText(carNode.getVehicleCodeType()))
                stringBuilder.append(",vehicleCodeType:'").append(carNode.getVehicleCodeType()).append("'");
        }
        stringBuilder.append("}) WHERE n.updateTime >= '")
                .append(LocalDateTime.now().minusDays(7L).format(Constant.NORMAL_TIME_FORMATTER)).append("' RETURN count(n)");
        Long count = CyberQueryExecuteUtil.executeCountQuery(stringBuilder.toString());
        return 1L <= count;
    }

    public List<BaseRelation> queryEndNodeByStartNodeAndRelationType(BaseNode entity, BaseRelation relationship, BaseNode baseNode) {
        StringBuilder stringBuilder = createBaseCQL(entity, relationship, baseNode);
        stringBuilder.append("RETURN n,m,r");
        return Lists.newArrayList(CyberQueryExecuteUtil.executeEntityQuery(stringBuilder.toString(), relationship.getClass()));
    }

    public List<BaseRelation> queryEndNodeByStartNodeAndRelationType(BaseNode entity, BaseRelation relationship, BaseNode baseNode, LocalDateTime startTime, LocalDateTime endTime) {
        StringBuilder stringBuilder = createBaseCQL(entity, relationship, baseNode);
        stringBuilder.append("WHERE 1=1 ");
        if (null != startTime)
            stringBuilder.append("AND r.startTime > '").append(startTime.format(Constant.NORMAL_TIME_FORMATTER)).append("' ");
        if (null != endTime)
            stringBuilder.append("AND r.endTime < '").append(endTime.format(Constant.NORMAL_TIME_FORMATTER)).append("' ");
        stringBuilder.append("RETURN n,m,r");
        return Lists.newArrayList(CyberQueryExecuteUtil.executeEntityQuery(stringBuilder.toString(), relationship.getClass()));
    }

    public List<BaseRelation> queryRelationByStartNode(BaseNode entity, BaseRelation relationship, BaseNode baseNode) {
        StringBuilder stringBuilder = createBaseCQL(entity, relationship, baseNode);
        stringBuilder.append("WHERE r.joinCount IS NOT NULL ");
        if (null != relationship.getStartTime())
            stringBuilder.append("AND r.startTime > '").append(relationship.getStartTime().format(Constant.NORMAL_TIME_FORMATTER)).append("' ");
        if (null != relationship.getEndTime())
            stringBuilder.append("AND r.endTime < '").append(relationship.getEndTime().format(Constant.NORMAL_TIME_FORMATTER)).append("' ");
        stringBuilder.append("RETURN n,m,r ");
        stringBuilder.append("ORDER BY r.joinCount DESC, r.confidence DESC LIMIT 10");
        return Lists.newArrayList(CyberQueryExecuteUtil.executeEntityQuery(stringBuilder.toString(), relationship.getClass()));
    }

   
    public List<Map<String, Object>> statisticFollowHistory(BaseNode entity, BaseNode baseNode, boolean last3Day) {
        StringBuilder stringBuilder = createBaseCQL(entity, FollowRelation.builder().build(), baseNode);
        if (last3Day)
            stringBuilder.append("WHERE r.endTime >= '")
                    .append(LocalDate.now().minusDays(3L).atStartOfDay().format(Constant.NORMAL_TIME_FORMATTER))
                    .append("' ");
        stringBuilder.append("WITH n,m,");
        stringBuilder.append("SUM(r.joinCount)");
        //stringBuilder.append(last3Day ? "SUM(r.joinCount)" : "COUNT(r)");
        stringBuilder.append(" AS r_count RETURN n,m,r_count ORDER BY r_count DESC LIMIT 5");
        return CyberQueryExecuteUtil.executeOriginResultQuery(stringBuilder.toString());
    }

    public List<FollowRelation> queryLatestByStartNode(BaseNode entity, BaseNode baseNode) {
        StringBuilder stringBuilder = createBaseCQL(entity, FollowRelation.builder().build(), baseNode);
        stringBuilder.append("RETURN n,m,r ORDER BY r.endTime DESC LIMIT 5");
        return Lists.newArrayList(CyberQueryExecuteUtil.executeEntityQuery(stringBuilder.toString(), FollowRelation.class));
    }

    private StringBuilder createBaseCQL(BaseNode entity, BaseRelation relationship, BaseNode baseNode) {
        StringBuilder stringBuilder = new StringBuilder("MATCH ");
        stringBuilder.append("(n:").append(entity.typeGetter()).append("{").append(entity.uniqueFieldNameGetter()).append(": '").append(entity.uniqueValueGetter()).append("'");
        if (entity instanceof CarNode) {
            CarNode carNode = (CarNode) entity;
            if (StringUtils.hasText(carNode.getVehicleCodeType()))
                stringBuilder.append(",vehicleCodeType:'").append(carNode.getVehicleCodeType()).append("'");
        }
        stringBuilder.append("})");
        stringBuilder.append("-");
        stringBuilder.append("[r:").append(relationship.getType()).append("]");
        stringBuilder.append("-");
        stringBuilder.append("(m:").append(baseNode.typeGetter()).append(") ");
        return stringBuilder;
    }
}
/**
 * @author Ricardo.H.Wu
 * @time 2020/8/3 11:04
 */
@Data
public abstract class BaseNode {

    @Id
    @GeneratedValue
    private Long id;

    @Convert(Neo4jLocalDateTimeConvert.class)
    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss")
    private LocalDateTime updateTime = LocalDateTime.now();

    public abstract String typeGetter();

    public abstract String uniqueFieldNameGetter();

    public abstract String uniqueValueGetter();

    public abstract void uniqueValueSetter(String uniqueValue);

    public DtoNeo4jNode convertToResponseEntity() {
        return DtoNeo4jNode.builder()
                .label(typeGetter())
                .id(uniqueValueGetter())
                .data(this)
                .build();
    }
}

@Data
public abstract class BaseRelation {

@Id
@GeneratedValue
private Long id;

@StartNode
private BaseNode startNode;

@EndNode
private BaseNode endNode;

@Convert(Neo4jLocalDateTimeConvert.class)
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime startTime;

@Convert(Neo4jLocalDateTimeConvert.class)
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime endTime;

@Convert(Neo4jLocalDateTimeConvert.class)
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime updateTime = LocalDateTime.now();

public abstract String getType();

public abstract DtoNeo4jEdge convertToResponseEntity();

}

作者 east
neo4j 5月 8,2021

Neo4j封装的基础查询Java API

import com.alibaba.fastjson.JSON;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import org.neo4j.ogm.model.QueryStatistics;
import org.neo4j.ogm.model.Result;
import org.neo4j.ogm.session.Session;
import org.neo4j.ogm.session.SessionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


@Component
public class CyberQueryExecuteUtil {
    private static final Logger LOGGER = LoggerFactory.getLogger(CyberQueryExecuteUtil.class);

    private static SessionFactory SESSION_FACTORY;

    @Autowired
    public void init(SessionFactory sessionFactory) {
        SESSION_FACTORY = sessionFactory;
    }

    /**
     * 执行统计方法
     *
     * @param cql cypher查询语句
     * @return 统计结果
     */
    public static Long executeCountQuery(String cql) {
        try {
            Session session = SESSION_FACTORY.openSession();
            Long count = session.queryForObject(Long.class, cql, new HashMap<>());
            session.clear();
            LOGGER.debug("Neo4j cypher query: [{}], result: [{}]", cql, count);
            return count;
        } catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
            return 0L;
        }
    }

    /**
     * 执行实体查询方法
     *
     * @param cql       cypher查询语句
     * @param className 实体类型
     * @return 查询结果
     */
    public static <T> Iterable<T> executeEntityQuery(String cql, Class<T> className) {
        try {
            Session session = SESSION_FACTORY.openSession();
            Iterable<T> response = session.query(className, cql, new HashMap<>());
            session.clear();
            LOGGER.debug("Neo4j cypher query: [{}]", cql);
            return response;
        } catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
            return new ArrayList<>();
        }
    }

    /**
     * 执行实体查询方法
     *
     * @param cql cypher查询语句
     * @return 查询原始结果
     */
    public static List<Map<String, Object>> executeOriginResultQuery(String cql) {
        try {
            Session session = SESSION_FACTORY.openSession();
            Result response = session.query(cql, new HashMap<>());
            session.clear();
            LOGGER.debug("Neo4j cypher query: [{}], status: [{}]", cql, JSON.toJSONString(response.queryStatistics()));
            return Lists.newArrayList(response.queryResults());
        } catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
            return new ArrayList<>();
        }
    }

    /**
     * 执行neo4j更新语句
     *
     * @param cql cypher更新语句
     */
    public static void executeUpdateQuery(String cql) {
        try {
            Session session = SESSION_FACTORY.openSession();
            Result result = session.query(cql, new HashMap<>());
            QueryStatistics queryStatistics = result.queryStatistics();
            session.clear();
            LOGGER.debug("Neo4j execute cql: [{}] contain update:[{}], status: [{}]", cql, queryStatistics.containsUpdates(), JSON.toJSON(queryStatistics));
        } catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
        }
    }

    /**
     * 执行neo4j批量更新语句
     *
     * @param cql cypher批量更新语句
     */
    public static void executeBatchUpdateQuery(String cql) {
        try {
            Session session = SESSION_FACTORY.openSession();
            Result result = session.query(cql, new HashMap<>());
            QueryStatistics queryStatistics = result.queryStatistics();
            session.clear();
            LOGGER.debug("Neo4j batch save cql: [{}] contain update:[{}], status: [{}]", cql, queryStatistics.containsUpdates(), JSON.toJSON(queryStatistics));
        } catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
        }
    }

    /**
     * 执行neo4j批量更新语句
     *
     * @param cql cypher批量更新语句
     */
    public static void executeBatchUpdateQuery(String cql, Map<String, Object> param) {
        try {
            Session session = SESSION_FACTORY.openSession();
            Result result = session.query(cql, param);
            QueryStatistics queryStatistics = result.queryStatistics();
            session.clear();
            LOGGER.debug("Neo4j batch save cql: [{}] contain update:[{}], status: [{}]", cql, queryStatistics.containsUpdates(), JSON.toJSON(queryStatistics));
        } catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
        }
    }
}
作者 east

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (488)
    • CDH (6)
    • datax (4)
    • doris (30)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.