gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档大数据开发

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 分类归档: "大数据开发"
  • ( 页面27 )
bug清单, flume 6月 13,2021

Flume pollDelay设置不正确停止采集

使用FusionInsight HD Flume从本地采集静态日志( Spooling Source )保存到Kafka,由于采集堆积太多了,flume配置参数做了一些修改。后来发现一个诡异问题:每次重启flume采集,只采集1、2个文件就停止采集了,也没报什么错误。

采用对比法排查问题,对比正常运行的flume配置,看到pollDelay跟之前的不同。才想起之前一顿三百五的操作:想加快速度。pollDelay的设置值从5000改成500。

采集方案采用的Spooling Source + Memory Channel + kfaka

Spooling Source常用配置 :

Memory Channel使用内存作为缓存区,Events存放在内存队列中。常用配置如下表所示:

Kafka Sink将数据写入到Kafka中。常用配置如下表所示:

参考配置如下:

a1.channels = c1
a1.sources = s1
a1.sinks = sink1

a1.sources.s1.type = spooldir
a1.sources.s1.channels = c1
a1.sources.s1.spoolDir = /home/ftp (填写实际的路径)
a1.sources.s1.bufferMaxLineLength = 1073741824
a1.sources.s1.pollDelay = 5000
a1.sources.s1.consumeOrder = random

a1.channels.c1.type = memory
a1.channels.c1.capacity = 30000
a1.channels.c1.tansactionCapacity = 30000

a1.sinks.sink1.channel = c1
a1.sinks.sink1.type = org.apache.kafka.kafkaSink
a1.sinks.sink1.bootstrap.servers=192.168.1.1:210007  (根据实际填写)
a1.sinks.sink1.topic = mytopic (根据实际填写)
a1.sinks.sink1.batchSize = 200
a1.sinks.sink1.producer.requiredAcks = 1



作者 east
bug清单, Kafka, Spark 6月 10,2021

运行 Spark Streaming出现”Could not find KafkaClient entry in the JAAS configuration”

在使用FusionInsight HD大数据平台,用Spark Streaming来处理数据接入,kafka作为消费者,运行程序时出现”Could not find KafkaClient entry in the JAAS configuration”,当时怀疑是FusionInsight HD 的客户端相关配置有问题。

采用替换法思维,在另一台已经验证 FusionInsight HD 的客户端没问题的服务上运行,果然这个问题没做出现,仔细对比了这2台服务FusionInsight HD 的客户端 的配置,发现在spark配置文件(hd安装目录/Spark2x/spark/conf/Jaas.conf)要修改为下面的配置:

KafkaClient{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
principal="大数据平台账号"
useTicketCache=false
keyTab="user.keytab的路径"
storeKey=true;
};

作者 east
Hbase 6月 4,2021

使用Hbase出现KeeperErrorCode = Session expired for /hbase/hbaseid

在使用FusionInsight HD大数据平台安全认证模式时,用hbase Shell或代码调用hbase时,有时出现”KeeperErrorCode = Session expired for /hbase/hbaseid”等错误。这是因为安全认证session过时了。

可以切换到hd,使用HBase客户端安装目录 ,输入下面命令

source bigdata_env
kinit 组件业务用户
按提示输入密码

再使用hbase shell就不会上面的问题。
如果是在Spark上调用的话,可以在调用的shell脚本上kinit 指定机机认证的安全认证文件。或者干脆写个定时执行的脚本配置在crontab上。

作者 east
neo4j 5月 27,2021

Neo4j的地理空间特性

很多数据库都有地理空间特性,例如mysql在5.0以上就有,es也有。

从Neo4j 3.0版本开始就内置了有限的空间支持功能。默认支持点和距离。假定点设置了经纬度属性值,就可计算出两点之间的距离。先让我们创建塔,并养成好习惯定义相应的约束:

CREATE CONSTRAINT ON (t:Tower) ASSERT t.name IS UNIQUE
CREATE (paris:Tower {name:"Eiffel Tower",country:"FRA",latitude:48.82322,longitude:2.29323})

CREATE (guangzhou:Tower {name:"Guangzhou Tower",country:"CN",latitude:68.82322,longitude:128.29323})

可通过下面查询语句获得埃菲尔铁塔和其他塔之间的距离:

MATCH (et:Tower {name:'Eiffle Tower'}), (other:Tower)
RETURN et.name, other.name, round(distance(point(et),
point(other))/10)/100 as dist_km

这里的关键是点(point),一个地理位置点是用图数据库中的一个带有纬度和经度属性的节点进行表示,distance函数是通过两点位置计算出来的。Round函数的作用是舍入到最近的整数,使用小技巧(/10/100),可将数值保留到小数点后两位

可通过以下方式查询APOC的空间过程列表:

CALL apoc.help("spatial")

APOC提供了地理编码的可能性,它将地址转换成空间坐标并按距离对路径进行排序。虽然它只是将地址映射到坐标,但不只是两个浮点数,地理编码并不容易,大量工程师在通过代码进行地理编码的变换。存储汽车行走时拍摄的照片需要消耗大量存储资源,识别图片中的数字需要消耗大量的CPU计算资源。因此,首选是不要自己来处理地理编码,而是依赖一个可被APOC透明调用的提供方。

作者 east
Java 5月 13,2021

Java远程访问工具类

我们都知道,xshell、xftp等是远程访问的利器,但在实际工作中,有时要用java代码实现上面这2个工具的基础功能,远程访问和下载等等:

public class RemoteShellUtil {

    private final static Logger LOGGER = LoggerFactory.getLogger(RemoteShellUtil.class);

    private JSch jsch;

    private Session session = null;

    private String user = null;

    private String password = null;

    private String host = null;

    public void setUser(String user) {
        this.user = user;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public void exec(String command) {
        if (this.session == null || !this.session.isConnected()) {
            if (!this.Connect(this.user, this.password, this.host)) {
                LOGGER.error("connect failed");
                return;
            }
        }

        BufferedReader reader = null;
        Channel channel = null;

        try {
            channel = this.session.openChannel("exec");
            ((ChannelExec) channel).setCommand(command);
            channel.setInputStream(null);
            ((ChannelExec) channel).setErrStream(System.err);
            channel.setInputStream(null);
            int exitStatus = channel.getExitStatus();
            channel.connect();
            InputStream in = channel.getInputStream();
            reader = new BufferedReader(new InputStreamReader(in));
            String buf;

            while ((buf = reader.readLine()) != null) {
                LOGGER.info(buf);
            }
        } catch (JSchException | IOException e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
        } finally {
            try {
                if (null != reader) reader.close();
                if (null != channel) channel.disconnect();
                this.session.disconnect();
            } catch (IOException e) {
                LOGGER.error(Throwables.getStackTraceAsString(e));
            }

        }

    }

    public Boolean download(String serverPath, String localpath) {
        if (!checkConnect()) return false;

        ChannelSftp channel = null;

        boolean status = false;
        try {
            channel = (ChannelSftp) this.session.openChannel("sftp");
            channel.connect();
            FileOutputStream out = new FileOutputStream(new File(localpath));
            BufferedOutputStream writer = new BufferedOutputStream(out);
            SftpProgressMonitor var10000 = new SftpProgressMonitor() {
                private long current = 0L;

                @Override
                public void init(int i, String s, String s1, long l) {
                }

                public boolean count(long arg0) {
                    this.current += arg0;
                    return true;
                }

                @Override
                public void end() {
                }
            };
            channel.get(serverPath, writer);
            writer.close();
            status = true;
        } catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
        } finally {
            if (null != channel) channel.disconnect();
            this.session.disconnect();
        }

        return status;
    }

    public Boolean upload(String serverPath, String localpath) {
        if (!checkConnect()) return false;

        ChannelSftp channel = null;

        boolean status = false;
        try {
            channel = (ChannelSftp) this.session.openChannel("sftp");
            channel.connect();
            FileInputStream in = new FileInputStream(new File(localpath));
            BufferedInputStream reader = new BufferedInputStream(in);
            SftpProgressMonitor var10000 = new SftpProgressMonitor() {
                private long current = 0L;

                public boolean count(long arg0) {
                    this.current += arg0;
                    return true;
                }

                public void end() {
                }

                public void init(int arg0, String arg1, String arg2, long arg3) {
                }
            };
            channel.put(reader, serverPath);
            reader.close();
            status = true;
        } catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
        } finally {
            if (null != channel) channel.disconnect();
            this.session.disconnect();
        }

        return status;
    }

    private Boolean Connect(String user, String password, String host) {
        try {
            if (null == this.jsch) this.jsch = new JSch();
            this.session = this.jsch.getSession(user, host, 22);
            this.session.setPassword(password);
            Properties config = new Properties();
            config.put("StrictHostKeyChecking", "no");
            this.session.setConfig(config);
            this.session.connect();
            return true;
        } catch (Exception e) {
            LOGGER.error("connect failed, reason{}", Throwables.getStackTraceAsString(e));
            return false;
        }
    }

    private Boolean checkConnect() {
        if (this.session == null || !this.session.isConnected()) {
            return this.Connect(this.user, this.password, this.host);
        }
        return true;
    }
}
作者 east
neo4j 5月 11,2021

Neo4j批量插入Java API实例

@Component
public class DatabaseBatchSaveUtil {

    private final static Logger LOGGER = LoggerFactory.getLogger(DatabaseBatchSaveUtil.class);

    private final static String NEO4j_GET_CURRENT_TIME_FUNCTION = "apoc.date.format(timestamp(),\\\"ms\\\",\\\"yyyy-MM-dd HH:mm:ss\\\",\\\"CTT\\\")";

    public static void batchSaveNodeFromDatabase(String jdbcConnectString, String sql, BaseNode baseNode, Map<String, String> columnMap) {
        try {
            StringBuilder stringBuilder = new StringBuilder();
            stringBuilder.append("call apoc.periodic.iterate('call apoc.load.jdbc(\\\"").append(jdbcConnectString).append("\\\",\\\"").append(sql).append("\\\")', '");
            stringBuilder.append("merge (a:").append(baseNode.typeGetter()).append("{").append(baseNode.uniqueFieldNameGetter()).append(":").append("row.").append(columnMap.get(baseNode.uniqueFieldNameGetter()));
            if (baseNode instanceof CarNode) {
                stringBuilder.append(",vehicleCodeType:row.vehicleCodeTypeStr");
            }
            stringBuilder.append("}) ").append("set ");
            columnMap.forEach((neo4jColumn, dbColumn) -> stringBuilder.append("a.").append(neo4jColumn).append(" = row.").append(dbColumn).append(","));
            if (columnMap.containsKey("updateTime"))
                stringBuilder.deleteCharAt(stringBuilder.length() - 1);
            else stringBuilder.append("a.updateTime = ").append(NEO4j_GET_CURRENT_TIME_FUNCTION);
            stringBuilder.append("' ").append(",{batchSize:1000,iterateList:true})");

            LocalDateTime startTime = LocalDateTime.now();
            CyberQueryExecuteUtil.executeBatchUpdateQuery(stringBuilder.toString());

            LOGGER.info("Save node: {} from database {}, cost {} second", baseNode.typeGetter(), jdbcConnectString, Duration.between(startTime, LocalDateTime.now()).getSeconds());

            Thread.sleep(20000);
        } catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
        }
    }

    public static void batchSaveRelationFromDatabase(String jdbcConnectString, String sql, BaseRelation baseRelation, Map<String, String> columnMap) {
        try {
            BaseNode startNode = baseRelation.getStartNode();
            BaseNode endNod = baseRelation.getEndNode();

            StringBuilder stringBuilder = new StringBuilder();
            stringBuilder.append("call apoc.periodic.iterate('call apoc.load.jdbc(\\\"").append(jdbcConnectString).append("\\\",\\\"").append(sql).append("\\\")', '");

            stringBuilder.append("merge (n:").append(startNode.typeGetter()).append("{").append(startNode.uniqueFieldNameGetter()).append(":").append("row.").append(columnMap.get("startNode"));
            if (startNode instanceof CarNode) {
                stringBuilder.append(",vehicleCodeType:row.vehicleCodeTypeStr");
            }
            stringBuilder.append("}) with * ");

            stringBuilder.append("merge (m:").append(endNod.typeGetter()).append("{").append(endNod.uniqueFieldNameGetter()).append(":").append("row.").append(columnMap.get("endNode"));
            if (endNod instanceof CarNode) {
                stringBuilder.append(",vehicleCodeType:row.vehicleCodeTypeStr2");
            }
            stringBuilder.append("}) with * ");

            stringBuilder.append("merge (n)-[r:").append(baseRelation.getType());
            if (columnMap.containsKey("startTime") || columnMap.containsKey("endTime")) {
                stringBuilder.append("{");
                if (columnMap.containsKey("startTime"))
                    stringBuilder.append("startTime:row.").append(columnMap.get("startTime")).append(",");

                if (columnMap.containsKey("endTime"))
                    stringBuilder.append("endTime:row.").append(columnMap.get("endTime"));
                else stringBuilder.deleteCharAt(stringBuilder.length() - 1);

                stringBuilder.append("}");
            }
            stringBuilder.append("]-(m)");

            columnMap.remove("startNode");
            columnMap.remove("endNode");

            if (!columnMap.isEmpty()) {
                stringBuilder.append(" set  ");
                columnMap.forEach((neo4jColumn, dbColumn) -> stringBuilder.append("r.").append(neo4jColumn).append(" = row.").append(dbColumn).append(","));
                if (columnMap.containsKey("updateTime"))
                    stringBuilder.deleteCharAt(stringBuilder.length() - 1);
                else stringBuilder.append("r.updateTime = ").append(NEO4j_GET_CURRENT_TIME_FUNCTION);
            }

            stringBuilder.append("' ").append(",{batchSize:1000,iterateList:true})");

            LocalDateTime startTime = LocalDateTime.now();
            CyberQueryExecuteUtil.executeBatchUpdateQuery(stringBuilder.toString());

            LOGGER.info("Save relation: {} from database {}, cost {} second", baseRelation.getType(), jdbcConnectString, Duration.between(startTime, LocalDateTime.now()).getSeconds());

            Thread.sleep(20000);
        } catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
        }
    }
}
作者 east
neo4j 5月 9,2021

Neo4j增删改查Java API实例

@Component
public class QueryDao {
    private static final Logger LOGGER = LoggerFactory.getLogger(StatisticDao.class);

    public List<BaseNode> queryNodeByPage(BaseNode baseNode, LocalDateTime lastTime, long pageNum, int pageSize) {
        long skip = pageNum * pageSize;
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("MATCH (n:").append(baseNode.typeGetter());
        if (StringUtils.hasText(baseNode.uniqueValueGetter()))
            stringBuilder.append("{").append(baseNode.uniqueFieldNameGetter()).append(": '").append(baseNode.uniqueValueGetter()).append("'}");
        stringBuilder.append(") ");
        if (null != lastTime)
            stringBuilder.append("WHERE n.updateTime is null OR n.updateTime <= '").append(lastTime.format(Constant.NORMAL_TIME_FORMATTER)).append("' ");
        stringBuilder.append("RETURN n SKIP ").append(skip).append(" LIMIT ").append(pageSize);
        return Lists.newArrayList(CyberQueryExecuteUtil.executeEntityQuery(stringBuilder.toString(), BaseNode.class));
    }

    public List<BaseNode> queryNodeByUniqueValue(BaseNode entity) {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("MATCH (n:").append(entity.typeGetter()).append("{").append(entity.uniqueFieldNameGetter()).append(": '");
        if (entity instanceof CarNode) {
            String[] carArray = entity.uniqueValueGetter().split(Constant.carNoTypeSplit);
            stringBuilder.append(carArray[0]).append("'");
            if (1 < carArray.length) {
                stringBuilder.append(",vehicleCodeType:'");
                if (2 > carArray[1].length()) stringBuilder.append("0");
                stringBuilder.append(carArray[1]).append("'");
            }
        } else stringBuilder.append(entity.uniqueValueGetter()).append("'");
        stringBuilder.append("}) RETURN n");
        return Lists.newArrayList(CyberQueryExecuteUtil.executeEntityQuery(stringBuilder.toString(), BaseNode.class));
    }

    public boolean isNodeExistAndFreshLastWeek(BaseNode entity) {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("MATCH (n:").append(entity.typeGetter());
        stringBuilder.append("{").append(entity.uniqueFieldNameGetter()).append(": '").append(entity.uniqueValueGetter()).append("'");
        if (entity instanceof CarNode) {
            CarNode carNode = (CarNode) entity;
            if (StringUtils.hasText(carNode.getVehicleCodeType()))
                stringBuilder.append(",vehicleCodeType:'").append(carNode.getVehicleCodeType()).append("'");
        }
        stringBuilder.append("}) WHERE n.updateTime >= '")
                .append(LocalDateTime.now().minusDays(7L).format(Constant.NORMAL_TIME_FORMATTER)).append("' RETURN count(n)");
        Long count = CyberQueryExecuteUtil.executeCountQuery(stringBuilder.toString());
        return 1L <= count;
    }

    public List<BaseRelation> queryEndNodeByStartNodeAndRelationType(BaseNode entity, BaseRelation relationship, BaseNode baseNode) {
        StringBuilder stringBuilder = createBaseCQL(entity, relationship, baseNode);
        stringBuilder.append("RETURN n,m,r");
        return Lists.newArrayList(CyberQueryExecuteUtil.executeEntityQuery(stringBuilder.toString(), relationship.getClass()));
    }

    public List<BaseRelation> queryEndNodeByStartNodeAndRelationType(BaseNode entity, BaseRelation relationship, BaseNode baseNode, LocalDateTime startTime, LocalDateTime endTime) {
        StringBuilder stringBuilder = createBaseCQL(entity, relationship, baseNode);
        stringBuilder.append("WHERE 1=1 ");
        if (null != startTime)
            stringBuilder.append("AND r.startTime > '").append(startTime.format(Constant.NORMAL_TIME_FORMATTER)).append("' ");
        if (null != endTime)
            stringBuilder.append("AND r.endTime < '").append(endTime.format(Constant.NORMAL_TIME_FORMATTER)).append("' ");
        stringBuilder.append("RETURN n,m,r");
        return Lists.newArrayList(CyberQueryExecuteUtil.executeEntityQuery(stringBuilder.toString(), relationship.getClass()));
    }

    public List<BaseRelation> queryRelationByStartNode(BaseNode entity, BaseRelation relationship, BaseNode baseNode) {
        StringBuilder stringBuilder = createBaseCQL(entity, relationship, baseNode);
        stringBuilder.append("WHERE r.joinCount IS NOT NULL ");
        if (null != relationship.getStartTime())
            stringBuilder.append("AND r.startTime > '").append(relationship.getStartTime().format(Constant.NORMAL_TIME_FORMATTER)).append("' ");
        if (null != relationship.getEndTime())
            stringBuilder.append("AND r.endTime < '").append(relationship.getEndTime().format(Constant.NORMAL_TIME_FORMATTER)).append("' ");
        stringBuilder.append("RETURN n,m,r ");
        stringBuilder.append("ORDER BY r.joinCount DESC, r.confidence DESC LIMIT 10");
        return Lists.newArrayList(CyberQueryExecuteUtil.executeEntityQuery(stringBuilder.toString(), relationship.getClass()));
    }

   
    public List<Map<String, Object>> statisticFollowHistory(BaseNode entity, BaseNode baseNode, boolean last3Day) {
        StringBuilder stringBuilder = createBaseCQL(entity, FollowRelation.builder().build(), baseNode);
        if (last3Day)
            stringBuilder.append("WHERE r.endTime >= '")
                    .append(LocalDate.now().minusDays(3L).atStartOfDay().format(Constant.NORMAL_TIME_FORMATTER))
                    .append("' ");
        stringBuilder.append("WITH n,m,");
        stringBuilder.append("SUM(r.joinCount)");
        //stringBuilder.append(last3Day ? "SUM(r.joinCount)" : "COUNT(r)");
        stringBuilder.append(" AS r_count RETURN n,m,r_count ORDER BY r_count DESC LIMIT 5");
        return CyberQueryExecuteUtil.executeOriginResultQuery(stringBuilder.toString());
    }

    public List<FollowRelation> queryLatestByStartNode(BaseNode entity, BaseNode baseNode) {
        StringBuilder stringBuilder = createBaseCQL(entity, FollowRelation.builder().build(), baseNode);
        stringBuilder.append("RETURN n,m,r ORDER BY r.endTime DESC LIMIT 5");
        return Lists.newArrayList(CyberQueryExecuteUtil.executeEntityQuery(stringBuilder.toString(), FollowRelation.class));
    }

    private StringBuilder createBaseCQL(BaseNode entity, BaseRelation relationship, BaseNode baseNode) {
        StringBuilder stringBuilder = new StringBuilder("MATCH ");
        stringBuilder.append("(n:").append(entity.typeGetter()).append("{").append(entity.uniqueFieldNameGetter()).append(": '").append(entity.uniqueValueGetter()).append("'");
        if (entity instanceof CarNode) {
            CarNode carNode = (CarNode) entity;
            if (StringUtils.hasText(carNode.getVehicleCodeType()))
                stringBuilder.append(",vehicleCodeType:'").append(carNode.getVehicleCodeType()).append("'");
        }
        stringBuilder.append("})");
        stringBuilder.append("-");
        stringBuilder.append("[r:").append(relationship.getType()).append("]");
        stringBuilder.append("-");
        stringBuilder.append("(m:").append(baseNode.typeGetter()).append(") ");
        return stringBuilder;
    }
}
/**
 * @author Ricardo.H.Wu
 * @time 2020/8/3 11:04
 */
@Data
public abstract class BaseNode {

    @Id
    @GeneratedValue
    private Long id;

    @Convert(Neo4jLocalDateTimeConvert.class)
    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss")
    private LocalDateTime updateTime = LocalDateTime.now();

    public abstract String typeGetter();

    public abstract String uniqueFieldNameGetter();

    public abstract String uniqueValueGetter();

    public abstract void uniqueValueSetter(String uniqueValue);

    public DtoNeo4jNode convertToResponseEntity() {
        return DtoNeo4jNode.builder()
                .label(typeGetter())
                .id(uniqueValueGetter())
                .data(this)
                .build();
    }
}

@Data
public abstract class BaseRelation {

@Id
@GeneratedValue
private Long id;

@StartNode
private BaseNode startNode;

@EndNode
private BaseNode endNode;

@Convert(Neo4jLocalDateTimeConvert.class)
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime startTime;

@Convert(Neo4jLocalDateTimeConvert.class)
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime endTime;

@Convert(Neo4jLocalDateTimeConvert.class)
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime updateTime = LocalDateTime.now();

public abstract String getType();

public abstract DtoNeo4jEdge convertToResponseEntity();

}

作者 east
neo4j 5月 8,2021

Neo4j封装的基础查询Java API

import com.alibaba.fastjson.JSON;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import org.neo4j.ogm.model.QueryStatistics;
import org.neo4j.ogm.model.Result;
import org.neo4j.ogm.session.Session;
import org.neo4j.ogm.session.SessionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


@Component
public class CyberQueryExecuteUtil {
    private static final Logger LOGGER = LoggerFactory.getLogger(CyberQueryExecuteUtil.class);

    private static SessionFactory SESSION_FACTORY;

    @Autowired
    public void init(SessionFactory sessionFactory) {
        SESSION_FACTORY = sessionFactory;
    }

    /**
     * 执行统计方法
     *
     * @param cql cypher查询语句
     * @return 统计结果
     */
    public static Long executeCountQuery(String cql) {
        try {
            Session session = SESSION_FACTORY.openSession();
            Long count = session.queryForObject(Long.class, cql, new HashMap<>());
            session.clear();
            LOGGER.debug("Neo4j cypher query: [{}], result: [{}]", cql, count);
            return count;
        } catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
            return 0L;
        }
    }

    /**
     * 执行实体查询方法
     *
     * @param cql       cypher查询语句
     * @param className 实体类型
     * @return 查询结果
     */
    public static <T> Iterable<T> executeEntityQuery(String cql, Class<T> className) {
        try {
            Session session = SESSION_FACTORY.openSession();
            Iterable<T> response = session.query(className, cql, new HashMap<>());
            session.clear();
            LOGGER.debug("Neo4j cypher query: [{}]", cql);
            return response;
        } catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
            return new ArrayList<>();
        }
    }

    /**
     * 执行实体查询方法
     *
     * @param cql cypher查询语句
     * @return 查询原始结果
     */
    public static List<Map<String, Object>> executeOriginResultQuery(String cql) {
        try {
            Session session = SESSION_FACTORY.openSession();
            Result response = session.query(cql, new HashMap<>());
            session.clear();
            LOGGER.debug("Neo4j cypher query: [{}], status: [{}]", cql, JSON.toJSONString(response.queryStatistics()));
            return Lists.newArrayList(response.queryResults());
        } catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
            return new ArrayList<>();
        }
    }

    /**
     * 执行neo4j更新语句
     *
     * @param cql cypher更新语句
     */
    public static void executeUpdateQuery(String cql) {
        try {
            Session session = SESSION_FACTORY.openSession();
            Result result = session.query(cql, new HashMap<>());
            QueryStatistics queryStatistics = result.queryStatistics();
            session.clear();
            LOGGER.debug("Neo4j execute cql: [{}] contain update:[{}], status: [{}]", cql, queryStatistics.containsUpdates(), JSON.toJSON(queryStatistics));
        } catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
        }
    }

    /**
     * 执行neo4j批量更新语句
     *
     * @param cql cypher批量更新语句
     */
    public static void executeBatchUpdateQuery(String cql) {
        try {
            Session session = SESSION_FACTORY.openSession();
            Result result = session.query(cql, new HashMap<>());
            QueryStatistics queryStatistics = result.queryStatistics();
            session.clear();
            LOGGER.debug("Neo4j batch save cql: [{}] contain update:[{}], status: [{}]", cql, queryStatistics.containsUpdates(), JSON.toJSON(queryStatistics));
        } catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
        }
    }

    /**
     * 执行neo4j批量更新语句
     *
     * @param cql cypher批量更新语句
     */
    public static void executeBatchUpdateQuery(String cql, Map<String, Object> param) {
        try {
            Session session = SESSION_FACTORY.openSession();
            Result result = session.query(cql, param);
            QueryStatistics queryStatistics = result.queryStatistics();
            session.clear();
            LOGGER.debug("Neo4j batch save cql: [{}] contain update:[{}], status: [{}]", cql, queryStatistics.containsUpdates(), JSON.toJSON(queryStatistics));
        } catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
        }
    }
}
作者 east
大数据开发 5月 4,2021

大数据即席查询框架技术选型

目前应用比较广泛的几种即席查询框架有Druid、Kylin、Presto、Impala、Spark SQL和Elasticsearch,针对响应时间、数据支持、技术特点等方面的对比如表

针对上表的对比情况,分析汇总如下。

● Druid:实时处理时序数据的OLAP数据库,因为它的索引首先按照时间进行分片,查询的时候也是按照时间线去路由索引的。

● Kylin:核心是Cube,Cube是一种预计算技术,基本思路是预先对数据进行多维索引,查询时只扫描索引而不访问原始数据,从而提高查询速度。

● Presto:它没有使用MapReduce,大部分场景下比Hive快一个数量级,其中的关键是所有的处理都在内存中完成。

● Impala:基于内存运算,速度快,支持的数据源没有Presto多。

● Spark SQL:基于Spark平台上的一个OLAP框架,基本思路是增加机器以实现并行计算,从而提高查询速度。

● Elasticsearch:最大的特点是使用倒排索引解决了索引存在的问题。根据研究,Elasticsearch在数据获取和聚集时用的资源比在Druid中高。框架选型如下。

● 从超大数据的查询效率来看:Druid > Kylin > Presto >Spark SQL。

● 从支持的数据源种类来看:Presto > Spark SQL > Kylin >Druid。

作者 east
数据仓库 5月 2,2021

数据仓库分层及命名规则

数据仓库中的数据要想真正发挥最大的作用,必须对数据仓库进行分层,数据仓库分层的优点如下。

● 把复杂问题简单化。可以将一个复杂的任务分解成多个步骤来完成,每层只处理单一的步骤。

● 减少重复开发。规范数据分层,通过使用中间层数据,可以大大减少重复计算量,增加计算结果的复用性。

● 隔离原始数据。使真实数据与最终统计数据解耦。数据仓库具体如何分层取决于设计者对数据仓库的整体规划,不过大部分的思路是相似的。

本数据仓库分为五层,如下所述。

● ODS层:原始数据层,存放原始数据,直接加载原始日志、数据,数据保持原貌不做处理。

● DWD层:明细数据层,对ODS层数据进行清洗(去除空值、脏数据、超过极限范围的数据)、维度退化、脱敏等。

● DWS层:服务数据层,以DWD层的数据为基础,按天进行轻度汇总。

● DWT层:主题数据层,以DWS层的数据为基础,按主题进行汇总,获得每个主题的全量数据表。

● ADS层:数据应用层,面向实际的数据需求,为各种统计报表提供数据。数据仓库分层后要遵守一定的数据仓库命名规范,本项目中的规范如下。

1.表命名ODS层命名为ods_表名。

DWD层命名为dwd_dim/fact_表名。DWS层命名为dws_表名。

DWT层命名为dwt_购物车。ADS层命名为ads_表名。临时表命名为tmp_×××。用户行为表以.log为后缀。

2.脚本命名脚本命名格式为数据源to目标_db/log.sh。用户行为需求相关脚本以.log为后缀;业务数据需求相关脚本以.db为后缀。

作者 east
数据仓库 4月 28,2021

关系型数据库(关系模型)转变为数据仓库(维度模型)示例

关系模型示意如图1所示,严格遵循第三范式(3NF)。从图1中可以看出,模型较为松散、零碎,物理表数量多,但数据冗余程度低。由于数据分布于众多的表中,因此这些数据可以更为灵活地被应用,功能性较强。关系模型主要应用于OLTP中,为了保证数据的一致性及避免冗余,大部分业务系统的表都是遵循第三范式的。

维度模型示意如图2所示,其主要应用于OLAP中,通常以某一张事实表为中心进行表的组织,主要面向业务,其特征是可能存在数据的冗余,但用户能方便地得到数据。关系模型虽然数据冗余程度低,但在大规模数据中进行跨表分析、统计、查询时,会造成多表关联,这会大大降低执行效率。所以通常我们采用维度模型建模,把各种相关表整理成事实表和维度表两种。所有的维度表围绕事实表进行解释。

图1 关系模型示意
图2 维度模型示意
作者 east
大数据开发 3月 29,2021

FusionInsight HD客户端使用kerberos安全认证遇到坑

之前成功安装过一个华为FusionInsight HD的客户端,Hbase、Spark等都使用正常。由于项目需要,又在新的服务器安装新的 FusionInsight HD 客户端,没想到这次遇到很多坑。

重新找了一个新版本的 FusionInsight HD的客户端 。安装成功后,进行安装认证

kinit myAccount

提示:Client myAccount@hadoop.com not found in Kerberos database while getting inital credentials。

但如果 kinit admin,就不会出现上面的问题。

经过各种折腾,和服务同步时间,关闭防火墙,查找hosts是否修改正确,jdk版本是否兼容,以及配置是否正确。卸载HD客户端和重新安装。上面的错误消失了,但又出现各种奇奇怪怪问题,例如下面的:

Not attempting to re-login since the last re-login was attempted less than 600 seconds before

GSS inititate failed

运行Spark程序又遇到种种问题

Couldn’t bind no a random free port

Connection refused

Conn’t setup connection for hdfs/myAccount@hadoop.com

刚开始没有怀疑这个HD客户端的问题,总是以为自己没配置好,或者是HD集群的服务器需要把这个ip或账号加入名单,反复折腾了好久。跟之前安装正常的HD集群各种对比,除了版本不一样,别的都找不到有什么区别。甚至在这台安装正常的服务器重新装新的HD客户端,也是安全认证不了。

后来觉得,有可能是客户端问题,要重新找回之前版本或新版本HD客户端,果然装了一个最新版本HD客户端安全认证成功了,怀疑之前安全认证不成功的,可能是装了测试集群或测试版本的。没有myAccount这个账户,但是测试集群也有admin这个账户。

由于对原理等理解不透彻,网上遇到同类问题的人也少,出现问题先入为主,以为自己没配置好,好几次看到HD客户端和之前版本不一样,也没有怀疑到这方面上去。

作者 east

上一 1 … 26 27 28 … 41 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?
  • 如何进行AUTOSAR模块的持续集成(CI)部署与版本控制?

文章归档

  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (42)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (484)
    • CDH (6)
    • datax (4)
    • doris (28)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (9)
    • 运维 (33)
      • Docker (2)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.