@Component
public class DatabaseBatchSaveUtil {
private final static Logger LOGGER = LoggerFactory.getLogger(DatabaseBatchSaveUtil.class);
private final static String NEO4j_GET_CURRENT_TIME_FUNCTION = "apoc.date.format(timestamp(),\\\"ms\\\",\\\"yyyy-MM-dd HH:mm:ss\\\",\\\"CTT\\\")";
public static void batchSaveNodeFromDatabase(String jdbcConnectString, String sql, BaseNode baseNode, Map<String, String> columnMap) {
try {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("call apoc.periodic.iterate('call apoc.load.jdbc(\\\"").append(jdbcConnectString).append("\\\",\\\"").append(sql).append("\\\")', '");
stringBuilder.append("merge (a:").append(baseNode.typeGetter()).append("{").append(baseNode.uniqueFieldNameGetter()).append(":").append("row.").append(columnMap.get(baseNode.uniqueFieldNameGetter()));
if (baseNode instanceof CarNode) {
stringBuilder.append(",vehicleCodeType:row.vehicleCodeTypeStr");
}
stringBuilder.append("}) ").append("set ");
columnMap.forEach((neo4jColumn, dbColumn) -> stringBuilder.append("a.").append(neo4jColumn).append(" = row.").append(dbColumn).append(","));
if (columnMap.containsKey("updateTime"))
stringBuilder.deleteCharAt(stringBuilder.length() - 1);
else stringBuilder.append("a.updateTime = ").append(NEO4j_GET_CURRENT_TIME_FUNCTION);
stringBuilder.append("' ").append(",{batchSize:1000,iterateList:true})");
LocalDateTime startTime = LocalDateTime.now();
CyberQueryExecuteUtil.executeBatchUpdateQuery(stringBuilder.toString());
LOGGER.info("Save node: {} from database {}, cost {} second", baseNode.typeGetter(), jdbcConnectString, Duration.between(startTime, LocalDateTime.now()).getSeconds());
Thread.sleep(20000);
} catch (Exception e) {
LOGGER.error(Throwables.getStackTraceAsString(e));
}
}
public static void batchSaveRelationFromDatabase(String jdbcConnectString, String sql, BaseRelation baseRelation, Map<String, String> columnMap) {
try {
BaseNode startNode = baseRelation.getStartNode();
BaseNode endNod = baseRelation.getEndNode();
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("call apoc.periodic.iterate('call apoc.load.jdbc(\\\"").append(jdbcConnectString).append("\\\",\\\"").append(sql).append("\\\")', '");
stringBuilder.append("merge (n:").append(startNode.typeGetter()).append("{").append(startNode.uniqueFieldNameGetter()).append(":").append("row.").append(columnMap.get("startNode"));
if (startNode instanceof CarNode) {
stringBuilder.append(",vehicleCodeType:row.vehicleCodeTypeStr");
}
stringBuilder.append("}) with * ");
stringBuilder.append("merge (m:").append(endNod.typeGetter()).append("{").append(endNod.uniqueFieldNameGetter()).append(":").append("row.").append(columnMap.get("endNode"));
if (endNod instanceof CarNode) {
stringBuilder.append(",vehicleCodeType:row.vehicleCodeTypeStr2");
}
stringBuilder.append("}) with * ");
stringBuilder.append("merge (n)-[r:").append(baseRelation.getType());
if (columnMap.containsKey("startTime") || columnMap.containsKey("endTime")) {
stringBuilder.append("{");
if (columnMap.containsKey("startTime"))
stringBuilder.append("startTime:row.").append(columnMap.get("startTime")).append(",");
if (columnMap.containsKey("endTime"))
stringBuilder.append("endTime:row.").append(columnMap.get("endTime"));
else stringBuilder.deleteCharAt(stringBuilder.length() - 1);
stringBuilder.append("}");
}
stringBuilder.append("]-(m)");
columnMap.remove("startNode");
columnMap.remove("endNode");
if (!columnMap.isEmpty()) {
stringBuilder.append(" set ");
columnMap.forEach((neo4jColumn, dbColumn) -> stringBuilder.append("r.").append(neo4jColumn).append(" = row.").append(dbColumn).append(","));
if (columnMap.containsKey("updateTime"))
stringBuilder.deleteCharAt(stringBuilder.length() - 1);
else stringBuilder.append("r.updateTime = ").append(NEO4j_GET_CURRENT_TIME_FUNCTION);
}
stringBuilder.append("' ").append(",{batchSize:1000,iterateList:true})");
LocalDateTime startTime = LocalDateTime.now();
CyberQueryExecuteUtil.executeBatchUpdateQuery(stringBuilder.toString());
LOGGER.info("Save relation: {} from database {}, cost {} second", baseRelation.getType(), jdbcConnectString, Duration.between(startTime, LocalDateTime.now()).getSeconds());
Thread.sleep(20000);
} catch (Exception e) {
LOGGER.error(Throwables.getStackTraceAsString(e));
}
}
}