gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

月度归档4月 2022

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  2022   /  
  • 4月
Android, spring 4月 29,2022

生产环境app和后台都看不到日志的异常调试方法

最近调试一个项目,调试到让人亏崩溃。

由于是特殊行业,手机不开usb调试模式,这样没法看到android app的日志。由于疫情等原因,又无法到现场看后台日志。在调试app登录时,app一直报“ 网络异常,请稍后重试” 。

由于网络是经过第三方转发的,刚开始以为是第三方转发那方面没配置好。经过费劲周折,先后发现app的key、secret、url配错了,又测了后台直接调用的接口是通的,反复找第三方转发公司确认,才确认第三方转发是正确的,整个网络是通的。

全面的测试用例查找问题:

app端对网络异常处理不够完善,当后台500出错时,app没准确提示,还是提示统一的“网络异常,请稍后重试”。怀疑app访问超时导致这样问题,修改超时时间还是没返回。后来别的同事加入讨论,他测试的账号是不存在的,终于有了后台提示。我之前一直是用正确存在的账号测试,反而一直没有收到后台返回结果。看来测试还是要考虑正常和非正常等多元化测试,考虑全面的测试用例,这样可以覆盖代码多个分支。

替代排除法:

通过上面的步骤,已经确认了app能正确访问到生产环境的后台,但不确定是app和后台都有问题,还只是后台有问题。想到生产环境还有一套接口一模一样生产环境的后台,是能正常使用的。把app的url修改成那个后台的地址,发现app登录等一切正常。

通过日志分析:

没办法调试app和后台,但日志还是可以想办法拿到的。android的日志可以上传到后台,也可以保存到本地,通过文件浏览器来找到日志文件。例如在Android把调试日志保存到本地,然后在手机找到安装包包名对应的目录,就可以查找到日志。

通过上面的方法,看到app日志报的是500错误,那么是后台接口报错,后来通过查看后台日志,果然看到空指针错误了。处理好问题了,果然app能正常登录后台和正常使用了。

作者 east
Flink, Spark 4月 19,2022

大数据实时流式处理:Apache Flink vs Apache

对更快数据处理的需求一直在增加,实时流数据处理是目前的解决方案。虽然 Apache Spark 仍在许多组织中用于大数据处理,但 Apache Flink 已经迅速成为替代方案。事实上,许多人认为它有可能取代 Apache Spark,因为它能够实时处理流数据。当然,Flink 能否取代 Spark 尚无定论,因为 Flink 还没有经过广泛的测试。但实时处理和低数据延迟是其两个决定性特征。同时,这需要考虑到 Apache Spark 可能不会失宠,因为它的批处理能力仍然很重要。

流式数据处理案例

对于基于批处理的所有优点,实时流数据处理似乎是一个强有力的案例。流式数据处理使快速设置和加载数据仓库成为可能。具有低数据延迟的流处理器可以快速提供对数据的更多见解。所以,你有更多的时间来了解发生了什么。除了更快的处理之外,还有另一个显着的好处:您有更多的时间来设计对事件的适当响应。例如,在异常检测的情况下,更低的延迟和更快的检测使您能够确定最佳响应,这是防止安全网站受到欺诈攻击或工业设备损坏等情况的关键。因此,您可以防止重大损失。

什么是 Apache Flink?

Apache Flink 是一种大数据处理工具,以在大规模分布式系统上以低数据延迟和高容错性快速处理大数据而著称。它的定义特征是它能够实时处理流数据。

Apache Flink 最初是一个学术开源项目,当时它被称为 Stratosphere。后来,它成为了 Apache 软件基金会孵化器的一部分。为避免与其他项目名称冲突,将名称更改为 Flink。 Flink 这个名字很合适,因为它意味着敏捷。即使选择的标志,松鼠也是合适的,因为松鼠代表了敏捷、敏捷和速度的美德。

自从加入 Apache 软件基金会后,它作为大数据处理工具迅速崛起,并在 8 个月内开始受到更广泛受众的关注。人们对 Flink 的兴趣日益浓厚,这反映在 2015 年的多次会议的参会人数上。2015 年 5 月在伦敦举行的 Strata 会议和 2015 年 6 月在圣何塞举行的 Hadoop 峰会上,有很多人参加了关于 Flink 的会议。 2015 年 8 月,超过 60 人参加了在圣何塞 MapR 总部举办的湾区 Apache Flink 聚会。

下图给出了 Flink 的 Lambda 架构。

Spark 和 Flink 的比较

虽然 Spark 和 Flink 之间有一些相似之处,例如它们的 API 和组件,但在数据处理方面,相似之处并不重要。 下面给出了 Flink 和 Spark 之间的比较。

数据处理

Spark 以批处理模式处理数据,而 Flink 实时处理流数据。 Spark 处理数据块,称为 RDD,而 Flink 可以实时处理一行一行的数据。 因此,虽然 Spark 始终存在最小数据延迟,但 Flink 并非如此。

迭代

Spark 支持批量数据迭代,但 Flink 可以使用其流式架构原生迭代其数据。 下图显示了迭代处理是如何发生的。

内存管理

Flink 可以自动适应不同的数据集,但 Spark 需要手动优化和调整其作业以适应单个数据集。 Spark 也进行手动分区和缓存。因此,预计处理会有所延迟。

数据流

Flink 能够在需要时为其数据处理提供中间结果。 Spark 遵循过程式编程系统,而 Flink 遵循分布式数据流方法。因此,当需要中间结果时,广播变量用于将预先计算的结果分发到所有工作节点。

数据可视化

Flink 提供了一个 Web 界面来提交和执行所有作业。 Spark 和 Flink 都与 Apache Zeppelin 集成,并提供数据摄取、数据分析、发现、协作和可视化。 Apache Zeppelin 还提供了多语言后端,允许您提交和执行 Flink 程序。

处理时间

以下段落提供了 Flink 和 Spark 在不同作业中所用时间的比较。

为了公平比较,Flink 和 Spark 都以机器规格和节点配置的形式获得了相同的资源。

Flink 处理速度更快,因为它的流水线执行。 处理数据,Spark 用了 2171 秒,而 Flink 用了 1490 秒。

当执行不同数据大小的 TeraSort 时,结果如下:

对于 10 GB 的数据,Flink 需要 157 秒,而 Spark 需要 387 秒。
对于 160 GB 的数据,Flink 需要 3127 秒,而 Spark 需要 4927 秒。
基于批处理或流式数据——哪个过程更好?

这两种工艺各有优势,适用于不同的情况。 尽管许多人声称基于批处理的工具正在失宠,但它不会很快发生。 要了解它们的相对优势,请参见以下比较:

在个别情况下,Flink 和 Spark批 处理都是有用的。以每天计算滚动月销售额的用例为例。在此活动中,需要计算每日销售总额,然后进行累计。在这样的用例中,可能不需要对数据进行流式处理。数据的批处理可以根据日期处理各个批次的销售数据,然后将它们添加。在这种情况下,即使存在一些数据延迟,也可以在稍后将该潜在数据添加到以后的批次中时弥补这些延迟。

有类似的用例需要流处理。以计算每个访问者在网站上花费的每月滚动时间的用例为例。在网站的情况下,访问次数可以每小时、每分钟甚至每天更新一次。但是这种情况下的问题是定义会话。定义会话的开始和结束可能很困难。此外,难以计算或识别不活动的时间段。因此,在这种情况下,定义会话甚至不活动时间段都没有合理的界限。在这种情况下,需要实时处理流数据。

概括

虽然 Spark 在批处理数据处理方面有很多优势,而且它仍然有很多使用场景,但 Flink 似乎正在迅速获得商业方面应用的青睐。 Flink 也可以进行批处理这一事实似乎对其有利。当然,这需要考虑到 Flink 的批处理能力可能与 Spark 不在一个级别。

作者 east
Java 4月 16,2022

网格编号工具类


import java.math.BigDecimal;

/**
 * 网格编号工具类<br>
 * 一、业务数据坐标网格编号计算 <br>
 * 1)基于覆盖全国的要求,地理坐标以左下角为原点,采用经度72°及纬度 0°为网格坐标原点进行网格划分<br>
 * 2)根据不同的比例尺要求,将网格切分成不同的级别,并按照4x4进行逐级进行切割,精度保留到小数点后10位<br>
 * 3)从原点开始计算(原点处坐标为72°,0°),根据网格范围进行行列号换算,其中一次网格行号和列号采用1位或2位数据表示( 0,1,2,3,4…),其他均采用1位数表达(0,1,2,3)<br>
 * 二、计算最小网格的中心点坐标<br>
 * 1)通过入参确定需要计算的最小网格中心点坐标 <br>
 * 2)先计算出一次网格的跨度,如果有更小的网格则累加起来,并取最后一个网格的半个跨度
 * 
 */
public final class GridCodeUtil {

    /** 网格化原点经度72° */
    public static final double ORIGIN_X = 72d;

    /** 网格化原点纬度 0° */
    public static final double ORIGIN_Y = 0d;

    /** 网格化终点经度 136° */
    public static final double END_X = 136d;

    /** 网格化终点纬度 54° */
    private static final double END_Y = 54d;

    /** 网格化无效值 */
    public static final int INVALID_VALUE = -1;

    /** 一次网格大小,即经度 1度,纬度1度 */
    public static final double FIRST_GRID_SIZE = 1d;

    /** 二次网格大小,即经度0.25度,纬度0.25度 */
    public static final double SECOND_GRID_SIZE = 0.25d;

    /** 三次网格大小,即经度0.0625度,纬度0.0625度 */
    public static final double THIRD_GRID_SIZE = 0.0625d;

    /** 四次网格大小,即经度0.015625度,纬度0.015625度 */
    public static final double FOURTH_GRID_SIZE = 0.015625d;

    /** 五次网格大小,即经度0.00390625度,纬度0.00390625度 */
    public static final double FIFTH_GRID_SIZE = 0.00390625d;

    /** 六次网格大小,即经度0.0009765625度,纬度0.0009765625度 */
    public static final double SIXTH_GRID_SIZE = 0.0009765625d;

    /** 一次网格折半大小 */
    public static final double FIRST_GRID_HALF_SIZE = FIRST_GRID_SIZE / 2;

    /** 二次网格折半大小 */
    public static final double SECOND_GRID_HALF_SIZE = SECOND_GRID_SIZE / 2;

    /** 三次网格折半大小 */
    public static final double THIRD_GRID_HALF_SIZE = THIRD_GRID_SIZE / 2;

    /** 四次网格折半大小 */
    public static final double FOURTH_GRID_HALF_SIZE = FOURTH_GRID_SIZE / 2;

    /** 五次网格折半大小 */
    public static final double FIFTH_GRID_HALF_SIZE = FIFTH_GRID_SIZE / 2;

    /** 六次网格折半大小 */
    public static final double SIXTH_GRID_HALF_SIZE = SIXTH_GRID_SIZE / 2;

    /** 私有构造函数 */
    private GridCodeUtil() {
    }

    /**
     * 坐标网格编号计算
     * 
     * @param xcoord 点的x坐标值,即经度
     * @param ycoord 点的y坐标值,即纬度
     * @return
     */
    public static GridCode computeGridCode(Double xcoord, Double ycoord) {
        GridCode result = new GridCode();
        boolean isNullOrOutOfRange4x = xcoord == null || xcoord < ORIGIN_X || xcoord > END_X;
        boolean isNullOrOutOfRange4y = ycoord == null || ycoord < ORIGIN_Y || ycoord > END_Y;

        // 校验是否合法的坐标
        if (isNullOrOutOfRange4x || isNullOrOutOfRange4y) {
            result.setInvalidValue(INVALID_VALUE);
        } else {
            result.setFirstGridRowCode(computeRowCodeOfFirstGrid(ycoord));
            result.setFirstGridColCode(computeColumnCodeOfFirstGrid(xcoord));
            result.setSecondGridRowCode(computeRowCodeOfSecondGrid(ycoord));
            result.setSecondGridColCode(computeColumnCodeOfSecondGrid(xcoord));
            result.setThirdGridRowCode(computeRowCodeOfThirdGrid(ycoord));
            result.setThirdGridColCode(computeColumnCodeOfThirdGrid(xcoord));
            result.setFourthGridRowCode(computeRowCodeOfFourthGrid(ycoord));
            result.setFourthGridColCode(computeColumnCodeOfFourthGrid(xcoord));
            result.setFifthGridRowCode(computeRowCodeOfFifthGrid(ycoord));
            result.setFifthGridColCode(computeColumnCodeOfFifthGrid(xcoord));
            result.setSixthGridRowCode(computeRowCodeOfSixthGrid(ycoord));
            result.setSixthGridColCode(computeColumnCodeOfSixthGrid(xcoord));
        }
        return result;
    }

    /**
     * 计算最小网格的中心点坐标
     * 
     * @param gridCode 网格编号对象
     * @return 中心点坐标,数组的第一个值是x坐标,第二个值是y坐标
     */
    public static Double[] computeCentrePointOfMinGrid(GridCode gridCode) {
        Double xcoord = null;
        Double ycoord = null;
        int minGrid = 0;

        // 先计算出一次网格的跨度,如果有更小的网格则累加起来。因网格计算时,编号是从0开始的,因此不需要减去一
        if (gridCode.getFirstGridColCode() != null && gridCode.getFirstGridRowCode() != null) {
            xcoord = gridCode.getFirstGridColCode() * FIRST_GRID_SIZE;
            ycoord = gridCode.getFirstGridRowCode() * FIRST_GRID_SIZE;
            minGrid = 1;
        } else {
            // 如果连最基本的一次网格编号都为空,直接抛出参数异常
            throw new IllegalArgumentException("计算最小网格的中心点坐标时,传入的一次网格编号不能为空!");
        }

        if (gridCode.getSecondGridColCode() != null && gridCode.getSecondGridRowCode() != null) {
            xcoord += gridCode.getSecondGridColCode() * SECOND_GRID_SIZE;
            ycoord += gridCode.getSecondGridRowCode() * SECOND_GRID_SIZE;
            minGrid = 2;
        }

        if (gridCode.getThirdGridColCode() != null && gridCode.getThirdGridRowCode() != null) {
            // 如果二次网格编号为空,直接抛出参数异常
            if (minGrid != 2) {
                throw new IllegalArgumentException("计算最小网格的中心点坐标时,传入的二次网格编号不能为空!");
            }
            xcoord += gridCode.getThirdGridColCode() * THIRD_GRID_SIZE;
            ycoord += gridCode.getThirdGridRowCode() * THIRD_GRID_SIZE;
            minGrid = 3;
        }

        if (gridCode.getFourthGridColCode() != null && gridCode.getFourthGridRowCode() != null) {
            // 如果三次网格编号为空,直接抛出参数异常
            if (minGrid != 3) {
                throw new IllegalArgumentException("计算最小网格的中心点坐标时,传入的三次网格编号不能为空!");
            }
            xcoord += gridCode.getFourthGridColCode() * FOURTH_GRID_SIZE;
            ycoord += gridCode.getFourthGridRowCode() * FOURTH_GRID_SIZE;
            minGrid = 4;
        }

        if (gridCode.getFifthGridColCode() != null && gridCode.getFifthGridRowCode() != null) {
            // 如果四次网格编号为空,直接抛出参数异常
            if (minGrid != 4) {
                throw new IllegalArgumentException("计算最小网格的中心点坐标时,传入的四次网格编号不能为空!");
            }
            xcoord += gridCode.getFifthGridColCode() * FIFTH_GRID_SIZE;
            ycoord += gridCode.getFifthGridRowCode() * FIFTH_GRID_SIZE;
            minGrid = 5;
        }

        if (gridCode.getSixthGridColCode() != null && gridCode.getSixthGridRowCode() != null) {
            // 如果五次网格编号为空,直接抛出参数异常
            if (minGrid != 5) {
                throw new IllegalArgumentException("计算最小网格的中心点坐标时,传入的五次网格编号不能为空!");
            }
            xcoord += gridCode.getSixthGridColCode() * SIXTH_GRID_SIZE;
            ycoord += gridCode.getSixthGridRowCode() * SIXTH_GRID_SIZE;
            minGrid = 6;
        }
        return buildResult(xcoord, ycoord, minGrid);
    }

    /**
     * 计算指定的网格的中心点坐标
     *
     * @param gridCode 网格编号对象
     * @param minGrid 要计算的网格层级
     * @return 中心点坐标,数组的第一个值是x坐标,第二个值是y坐标
     */
    public static Double[] computeCentrePointOfMinGrid2(GridCode gridCode,int minGrid) {
        Double xcoord = null;
        Double ycoord = null;

        if (minGrid == 1) {
            xcoord = gridCode.getFirstGridColCode() * FIRST_GRID_SIZE;
            ycoord = gridCode.getFirstGridRowCode() * FIRST_GRID_SIZE;
        }

        if (minGrid == 2) {
            xcoord += gridCode.getSecondGridColCode() * SECOND_GRID_SIZE;
            ycoord += gridCode.getSecondGridRowCode() * SECOND_GRID_SIZE;
        }

        if (minGrid == 3) {
            xcoord += gridCode.getThirdGridColCode() * THIRD_GRID_SIZE;
            ycoord += gridCode.getThirdGridRowCode() * THIRD_GRID_SIZE;
        }

        if (minGrid == 4) {
            xcoord += gridCode.getFourthGridColCode() * FOURTH_GRID_SIZE;
            ycoord += gridCode.getFourthGridRowCode() * FOURTH_GRID_SIZE;
        }

        if (minGrid == 5) {
            xcoord += gridCode.getFifthGridColCode() * FIFTH_GRID_SIZE;
            ycoord += gridCode.getFifthGridRowCode() * FIFTH_GRID_SIZE;
            minGrid = 5;
        }

        if (minGrid == 6) {
            xcoord += gridCode.getSixthGridColCode() * SIXTH_GRID_SIZE;
            ycoord += gridCode.getSixthGridRowCode() * SIXTH_GRID_SIZE;
        }
        try {
            return buildResult(xcoord, ycoord, minGrid);
        } catch (Exception e) {
            return null;
        }
    }

    /**
     * 构建最后的结果值<br>
     * 添加网格化原点经纬度和最小网格的中点跨度,如果小数点大于八位,则四舍五入保留八位小数点
     * 
     * @param xcoord 网格跨度x坐标值
     * @param ycoord 网格跨度y坐标值
     * @param minGrid 需要计算的最小网格
     * @return
     */
    private static Double[] buildResult(Double xcoord, Double ycoord, int minGrid) {
        Double[] result = new Double[2];

        result[0] = ORIGIN_X + xcoord;
        result[1] = ORIGIN_Y + ycoord;

        switch (minGrid) {
        case 1:
            result[0] += FIRST_GRID_HALF_SIZE;
            result[1] += FIRST_GRID_HALF_SIZE;
            break;
        case 2:
            result[0] += SECOND_GRID_HALF_SIZE;
            result[1] += SECOND_GRID_HALF_SIZE;
            break;
        case 3:
            result[0] += THIRD_GRID_HALF_SIZE;
            result[1] += THIRD_GRID_HALF_SIZE;
            break;
        case 4:
            result[0] += FOURTH_GRID_HALF_SIZE;
            result[1] += FOURTH_GRID_HALF_SIZE;
            break;
        case 5:
            result[0] += FIFTH_GRID_HALF_SIZE;
            result[1] += FIFTH_GRID_HALF_SIZE;
            break;
        case 6:
            result[0] += SIXTH_GRID_HALF_SIZE;
            result[1] += SIXTH_GRID_HALF_SIZE;
            break;
        default:
            throw new IllegalArgumentException("计算最小网格的中心点坐标异常,当前只支持1-6次网格!");
        }
        BigDecimal bd0 = new BigDecimal(result[0]);
        BigDecimal bd1 = new BigDecimal(result[1]);
        result[0] = bd0.setScale(8, BigDecimal.ROUND_HALF_UP).doubleValue();
        result[1] = bd1.setScale(8, BigDecimal.ROUND_HALF_UP).doubleValue();
        return result;
    }

    /**
     * 计算一次网格的行编号
     * 
     * @param ycoord 点的y坐标值,即纬度
     * @return
     */
    private static int computeRowCodeOfFirstGrid(double ycoord) {
        return (int) (difOriginY(ycoord) / FIRST_GRID_SIZE);
    }

    /**
     * 计算一次网格的列编号
     * 
     * @param xcoord 点的x坐标值,即经度
     * @return
     */
    private static int computeColumnCodeOfFirstGrid(double xcoord) {
        return (int) (difOriginX(xcoord) / FIRST_GRID_SIZE);
    }

    /**
     * 获取当前一次网格最小y坐标值
     * 
     * @param ycoord 点的y坐标值,即纬度
     * @return
     */
    private static double getMiniYcoodOfFirstGrid(double ycoord) {
        return computeRowCodeOfFirstGrid(ycoord) * FIRST_GRID_SIZE;
    }

    /**
     * 获取当前一次网格最小x坐标值
     * 
     * @param xcoord 点的x坐标值,即经度
     * @return
     */
    private static double getMiniXcoodOfFirstGrid(double xcoord) {
        return computeColumnCodeOfFirstGrid(xcoord) * FIRST_GRID_SIZE;
    }

    /**
     * 计算二次网格的行编号
     * 
     * @param ycoord 点的y坐标值,即纬度
     * @return
     */
    private static int computeRowCodeOfSecondGrid(double ycoord) {
        return (int) ((difOriginY(ycoord) - getMiniYcoodOfFirstGrid(ycoord)) / SECOND_GRID_SIZE);
    }

    /**
     * 计算二次网格的列编号
     * 
     * @param xcoord 点的x坐标值,即经度
     * @return
     */
    private static int computeColumnCodeOfSecondGrid(double xcoord) {
        return (int) ((difOriginX(xcoord) - getMiniXcoodOfFirstGrid(xcoord)) / SECOND_GRID_SIZE);
    }

    /**
     * 获取当前二次网格最小y坐标值
     * 
     * @param ycoord 点的y坐标值,即纬度
     * @return
     */
    private static double getMiniYcoodOfSecondGrid(double ycoord) {
        return computeRowCodeOfSecondGrid(ycoord) * SECOND_GRID_SIZE;
    }

    /**
     * 获取当前二次网格最小x坐标值
     * 
     * @param xcoord 点的x坐标值,即经度
     * @return
     */
    private static double getMiniXcoodOfSecondGrid(double xcoord) {
        return computeColumnCodeOfSecondGrid(xcoord) * SECOND_GRID_SIZE;
    }

    /**
     * 计算三次网格的行编号
     * 
     * @param ycoord 点的y坐标值,即纬度
     * @return
     */
    private static int computeRowCodeOfThirdGrid(double ycoord) {
        return (int) ((difOriginY(ycoord) - getMiniYcoodOfFirstGrid(ycoord) - getMiniYcoodOfSecondGrid(ycoord))
                / THIRD_GRID_SIZE);
    }

    /**
     * 计算三次网格的列编号
     * 
     * @param xcoord 点的x坐标值,即经度
     * @return
     */
    private static int computeColumnCodeOfThirdGrid(double xcoord) {
        return (int) ((difOriginX(xcoord) - getMiniXcoodOfFirstGrid(xcoord) - getMiniXcoodOfSecondGrid(xcoord))
                / THIRD_GRID_SIZE);
    }

    /**
     * 获取当前三次网格最小y坐标值
     * 
     * @param ycoord 点的y坐标值,即纬度
     * @return
     */
    private static double getMiniYcoodOfThirdGrid(double ycoord) {
        return computeRowCodeOfThirdGrid(ycoord) * THIRD_GRID_SIZE;
    }

    /**
     * 获取当前三次网格最小x坐标值
     * 
     * @param xcoord 点的x坐标值,即经度
     * @return
     */
    private static double getMiniXcoodOfThirdGrid(double xcoord) {
        return computeColumnCodeOfThirdGrid(xcoord) * THIRD_GRID_SIZE;
    }

    /**
     * 计算四次网格的行编号
     * 
     * @param ycoord 点的y坐标值,即纬度
     * @return
     */
    private static int computeRowCodeOfFourthGrid(double ycoord) {
        return (int) ((difOriginY(ycoord) - getMiniYcoodOfFirstGrid(ycoord) - getMiniYcoodOfSecondGrid(ycoord)
                - getMiniYcoodOfThirdGrid(ycoord)) / FOURTH_GRID_SIZE);
    }

    /**
     * 计算四次网格的列编号
     * 
     * @param xcoord 点的x坐标值,即经度
     * @return
     */
    private static int computeColumnCodeOfFourthGrid(double xcoord) {
        return (int) ((difOriginX(xcoord) - getMiniXcoodOfFirstGrid(xcoord) - getMiniXcoodOfSecondGrid(xcoord)
                - getMiniXcoodOfThirdGrid(xcoord)) / FOURTH_GRID_SIZE);
    }

    /**
     * 获取当前四次网格最小y坐标值
     * 
     * @param ycoord 点的y坐标值,即纬度
     * @return
     */
    private static double getMiniYcoodOfFourthGrid(double ycoord) {
        return computeRowCodeOfFourthGrid(ycoord) * FOURTH_GRID_SIZE;
    }

    /**
     * 获取当前四次网格最小x坐标值
     * 
     * @param xcoord 点的x坐标值,即经度
     * @return
     */
    private static double getMiniXcoodOfFourthGrid(double xcoord) {
        return computeColumnCodeOfFourthGrid(xcoord) * FOURTH_GRID_SIZE;
    }

    /**
     * 计算五次网格的行编号
     * 
     * @param ycoord 点的y坐标值,即纬度
     * @return
     */
    private static int computeRowCodeOfFifthGrid(double ycoord) {
        return (int) ((difOriginY(ycoord) - getMiniYcoodOfFirstGrid(ycoord) - getMiniYcoodOfSecondGrid(ycoord)
                - getMiniYcoodOfThirdGrid(ycoord) - getMiniYcoodOfFourthGrid(ycoord)) / FIFTH_GRID_SIZE);
    }

    /**
     * 计算五次网格的列编号
     * 
     * @param xcoord 点的x坐标值,即经度
     * @return
     */
    private static int computeColumnCodeOfFifthGrid(double xcoord) {
        return (int) ((difOriginX(xcoord) - getMiniXcoodOfFirstGrid(xcoord) - getMiniXcoodOfSecondGrid(xcoord)
                - getMiniXcoodOfThirdGrid(xcoord) - getMiniXcoodOfFourthGrid(xcoord)) / FIFTH_GRID_SIZE);
    }

    /**
     * 获取当前五次网格最小y坐标值
     * 
     * @param ycoord 点的y坐标值,即纬度
     * @return
     */
    private static double getMiniYcoodOfFifthGrid(double ycoord) {
        return computeRowCodeOfFifthGrid(ycoord) * FIFTH_GRID_SIZE;
    }

    /**
     * 获取当前五次网格最小x坐标值
     * 
     * @param xcoord 点的x坐标值,即经度
     * @return
     */
    private static double getMiniXcoodOfFifthGrid(double xcoord) {
        return computeColumnCodeOfFifthGrid(xcoord) * FIFTH_GRID_SIZE;
    }

    /**
     * 计算六次网格的行编号
     * 
     * @param ycoord 点的y坐标值,即纬度
     * @return
     */
    private static int computeRowCodeOfSixthGrid(double ycoord) {
        return (int) ((difOriginY(ycoord) - getMiniYcoodOfFirstGrid(ycoord) - getMiniYcoodOfSecondGrid(ycoord)
                - getMiniYcoodOfThirdGrid(ycoord) - getMiniYcoodOfFourthGrid(ycoord) - getMiniYcoodOfFifthGrid(ycoord))
                / SIXTH_GRID_SIZE);
    }

    /**
     * 计算六次网格的列编号
     * 
     * @param xcoord 点的x坐标值,即经度
     * @return
     */
    private static int computeColumnCodeOfSixthGrid(double xcoord) {
        return (int) ((difOriginX(xcoord) - getMiniXcoodOfFirstGrid(xcoord) - getMiniXcoodOfSecondGrid(xcoord)
                - getMiniXcoodOfThirdGrid(xcoord) - getMiniXcoodOfFourthGrid(xcoord) - getMiniXcoodOfFifthGrid(xcoord))
                / SIXTH_GRID_SIZE);
    }

    /**
     * 目标点和原点的纬度差值
     * 
     * @param ycoord 点的y坐标值,即纬度
     * @return
     */
    private static double difOriginY(double ycoord) {
        return ycoord - ORIGIN_Y;
    }

    /**
     * 目标点和原点的经度差值
     * 
     * @param xcoord 点的x坐标值,即经度
     * @return
     */
    private static double difOriginX(double xcoord) {
        return xcoord - ORIGIN_X;
    }

}
public class GridCode {

    /** 一次网格行号 */
    private Integer firstGridRowCode;

    /** 一次网格列号 */
    private Integer firstGridColCode;

    /** 二次网格行号 */
    private Integer secondGridRowCode;

    /** 二次网格列号 */
    private Integer secondGridColCode;

    /** 三次网格行号 */
    private Integer thirdGridRowCode;

    /** 三次网格列号 */
    private Integer thirdGridColCode;

    /** 四次网格行号 */
    private Integer fourthGridRowCode;

    /** 四次网格列号 */
    private Integer fourthGridColCode;

    /** 五次网格行号 */
    private Integer fifthGridRowCode;

    /** 五次网格列号 */
    private Integer fifthGridColCode;

    /** 六次网格行号 */
    private Integer sixthGridRowCode;

    /** 六次网格列号 */
    private Integer sixthGridColCode;

    /**
     * 设置无效值
     */
    public void setInvalidValue(int invalidValue) {
        this.firstGridRowCode = invalidValue;
        this.firstGridColCode = invalidValue;
        this.secondGridRowCode = invalidValue;
        this.secondGridColCode = invalidValue;
        this.thirdGridRowCode = invalidValue;
        this.thirdGridColCode = invalidValue;
        this.fourthGridRowCode = invalidValue;
        this.fourthGridColCode = invalidValue;
        this.fifthGridRowCode = invalidValue;
        this.fifthGridColCode = invalidValue;
        this.sixthGridRowCode = invalidValue;
        this.sixthGridColCode = invalidValue;
    }

    public Integer getFirstGridRowCode() {
        return firstGridRowCode;
    }

    public void setFirstGridRowCode(Integer firstGridRowCode) {
        this.firstGridRowCode = firstGridRowCode;
    }

    public Integer getFirstGridColCode() {
        return firstGridColCode;
    }

    public void setFirstGridColCode(Integer firstGridColCode) {
        this.firstGridColCode = firstGridColCode;
    }

    public Integer getSecondGridRowCode() {
        return secondGridRowCode;
    }

    public void setSecondGridRowCode(Integer secondGridRowCode) {
        this.secondGridRowCode = secondGridRowCode;
    }

    public Integer getSecondGridColCode() {
        return secondGridColCode;
    }

    public void setSecondGridColCode(Integer secondGridColCode) {
        this.secondGridColCode = secondGridColCode;
    }

    public Integer getThirdGridRowCode() {
        return thirdGridRowCode;
    }

    public void setThirdGridRowCode(Integer thirdGridRowCode) {
        this.thirdGridRowCode = thirdGridRowCode;
    }

    public Integer getThirdGridColCode() {
        return thirdGridColCode;
    }

    public void setThirdGridColCode(Integer thirdGridColCode) {
        this.thirdGridColCode = thirdGridColCode;
    }

    public Integer getFourthGridRowCode() {
        return fourthGridRowCode;
    }

    public void setFourthGridRowCode(Integer fourthGridRowCode) {
        this.fourthGridRowCode = fourthGridRowCode;
    }

    public Integer getFourthGridColCode() {
        return fourthGridColCode;
    }

    public void setFourthGridColCode(Integer fourthGridColCode) {
        this.fourthGridColCode = fourthGridColCode;
    }

    public Integer getFifthGridRowCode() {
        return fifthGridRowCode;
    }

    public void setFifthGridRowCode(Integer fifthGridRowCode) {
        this.fifthGridRowCode = fifthGridRowCode;
    }

    public Integer getFifthGridColCode() {
        return fifthGridColCode;
    }

    public void setFifthGridColCode(Integer fifthGridColCode) {
        this.fifthGridColCode = fifthGridColCode;
    }

    public Integer getSixthGridRowCode() {
        return sixthGridRowCode;
    }

    public void setSixthGridRowCode(Integer sixthGridRowCode) {
        this.sixthGridRowCode = sixthGridRowCode;
    }

    public Integer getSixthGridColCode() {
        return sixthGridColCode;
    }

    public void setSixthGridColCode(Integer sixthGridColCode) {
        this.sixthGridColCode = sixthGridColCode;
    }

}
作者 east
Keras 4月 15,2022

如何在 Keras 中使用词嵌入层进行深度学习

词嵌入提供了词及其相对含义的密集表示。

它们是对更简单的词袋模型表示中使用的稀疏表示的改进。

词嵌入可以从文本数据中学习并在项目之间重用。它们也可以作为在文本数据上拟合神经网络的一部分来学习。

在本教程中,您将了解如何使用 Keras 在 Python 中使用词嵌入进行深度学习。

完成本教程后,您将了解:

关于词嵌入以及 Keras 通过嵌入层支持词嵌入。
如何在拟合神经网络的同时学习词嵌入。
如何在神经网络中使用预训练的词嵌入。
从我的新书《自然语言处理的深度学习》开始您的项目,包括分步教程和所有示例的 Python 源代码文件。

让我们开始吧。

2018 年 2 月更新:修复了由于底层 API 更改而导致的错误。
2019 年 10 月更新:针对 Keras 2.3 和 TensorFlow 2.0 进行了更新。

教程概述
本教程分为 3 个部分; 他们是:

词嵌入
Keras 嵌入层
学习嵌入的例子
使用预训练手套嵌入的示例

1.词嵌入
词嵌入是一类使用密集向量表示来表示词和文档的方法。

这是对传统的词袋模型编码方案的改进,在传统的词袋模型编码方案中,使用大的稀疏向量来表示每个单词或对向量中的每个单词进行评分以表示整个词汇表。这些表示是稀疏的,因为词汇量很大,并且给定的单词或文档将由一个主要由零值组成的大向量表示。

相反,在嵌入中,单词由密集向量表示,其中向量表示单词在连续向量空间中的投影。

向量空间中单词的位置是从文本中学习的,并且基于使用该单词时围绕该单词的单词。

单词在学习向量空间中的位置称为它的嵌入。

从文本中学习词嵌入的两个流行示例包括:

Word2Vec。
GloVe。
除了这些精心设计的方法之外,还可以将词嵌入作为深度学习模型的一部分进行学习。这可能是一种较慢的方法,但会根据特定的训练数据集定制模型。

2、Keras嵌入层
Keras 提供了一个嵌入层,可用于文本数据上的神经网络。

它要求对输入数据进行整数编码,以便每个单词都由一个唯一的整数表示。这个数据准备步骤可以使用 Keras 提供的 Tokenizer API 来执行。

Embedding 层使用随机权重初始化,并将学习训练数据集中所有单词的嵌入。

它是一个灵活的层,可以以多种方式使用,例如:

它可以单独用来学习一个词嵌入,以后可以保存并在另一个模型中使用。
它可以用作深度学习模型的一部分,其中嵌入与模型本身一起学习。
它可用于加载预训练的词嵌入模型,一种迁移学习。
Embedding 层被定义为网络的第一个隐藏层。它必须指定 3 个参数:

它必须指定 3 个参数:

input_dim:这是文本数据中词汇的大小。例如,如果您的数据被整数编码为 0-10 之间的值,那么词汇表的大小将为 11 个单词。
output_dim:这是嵌入单词的向量空间的大小。它为每个单词定义了该层的输出向量的大小。例如,它可以是 32 或 100 甚至更大。为您的问题测试不同的值。
input_length:这是输入序列的长度,就像您为 Keras 模型的任何输入层定义的一样。例如,如果您的所有输入文档都包含 1000 个单词,那么这将是 1000。
例如,下面我们定义了一个包含 200 个词汇表的 Embedding 层(例如,从 0 到 199 的整数编码词,包括 0 到 199),一个 32 维的向量空间,其中将嵌入词,输入文档每个有 50 个词。


e = Embedding(200, 32, input_length=50)

Embedding 层具有学习的权重。 如果您将模型保存到文件中,这将包括嵌入层的权重。

Embedding 层的输出是一个 2D 向量,对于输入的单词序列(输入文档)中的每个单词都有一个嵌入。

如果您希望将 Dense 层直接连接到 Embedding 层,则必须首先使用 Flatten 层将 2D 输出矩阵展平为 1D 矢量。

现在,让我们看看如何在实践中使用嵌入层。

3、学习嵌入的例子
在本节中,我们将研究如何在将神经网络拟合到文本分类问题时学习词嵌入。

我们将定义一个小问题,我们有 10 个文本文档,每个文档都有一个关于学生提交的作品的评论。 每个文本文档被分类为正“1”或负“0”。 这是一个简单的情感分析问题。

首先,我们将定义文档及其类标签。

# define documents
docs = ['Well done!',
		'Good work',
		'Great effort',
		'nice work',
		'Excellent!',
		'Weak',
		'Poor effort!',
		'not good',
		'poor work',
		'Could have done better.']
# define class labels
labels = array([1,1,1,1,1,0,0,0,0,0])

接下来,我们可以对每个文档进行整数编码。 这意味着作为输入,嵌入层将具有整数序列。 我们可以尝试其他更复杂的词模型编码包,如计数或 TF-IDF。

Keras 提供了 one_hot() 函数,该函数将每个单词的散列创建为有效的整数编码。 我们将估计词汇量为 50,这比减少散列函数冲突概率所需的要大得多。

# integer encode the documents
vocab_size = 50
encoded_docs = [one_hot(d, vocab_size) for d in docs]
print(encoded_docs)

序列具有不同的长度,Keras 更喜欢对输入进行矢量化,并且所有输入都具有相同的长度。 我们将填充所有输入序列的长度为 4。同样,我们可以使用内置的 Keras 函数来完成此操作,在本例中为 pad_sequences() 函数。

# pad documents to a max length of 4 words
max_length = 4
padded_docs = pad_sequences(encoded_docs, maxlen=max_length, padding='post')
print(padded_docs)

我们现在准备将嵌入层定义为神经网络模型的一部分。

Embedding 的词汇量为 50,输入长度为 4。我们将选择一个 8 维的小嵌入空间。

该模型是一个简单的二元分类模型。 重要的是,嵌入层的输出将是 4 个 8 维向量,每个词一个。 我们将其展平为一个 32 元素的向量,以传递给 Dense 输出层。

# define the model
model = Sequential()
model.add(Embedding(vocab_size, 8, input_length=max_length))
model.add(Flatten())
model.add(Dense(1, activation='sigmoid'))
# compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
# summarize the model
print(model.summary())

最后,我们可以拟合和评估分类模型。

# fit the model
model.fit(padded_docs, labels, epochs=50, verbose=0)
# evaluate the model
loss, accuracy = model.evaluate(padded_docs, labels, verbose=0)
print('Accuracy: %f' % (accuracy*100))

下面提供了完整的代码清单。

from numpy import array
from keras.preprocessing.text import one_hot
from keras.preprocessing.sequence import pad_sequences
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers.embeddings import Embedding
# define documents
docs = ['Well done!',
		'Good work',
		'Great effort',
		'nice work',
		'Excellent!',
		'Weak',
		'Poor effort!',
		'not good',
		'poor work',
		'Could have done better.']
# define class labels
labels = array([1,1,1,1,1,0,0,0,0,0])
# integer encode the documents
vocab_size = 50
encoded_docs = [one_hot(d, vocab_size) for d in docs]
print(encoded_docs)
# pad documents to a max length of 4 words
max_length = 4
padded_docs = pad_sequences(encoded_docs, maxlen=max_length, padding='post')
print(padded_docs)
# define the model
model = Sequential()
model.add(Embedding(vocab_size, 8, input_length=max_length))
model.add(Flatten())
model.add(Dense(1, activation='sigmoid'))
# compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
# summarize the model
print(model.summary())
# fit the model
model.fit(padded_docs, labels, epochs=50, verbose=0)
# evaluate the model
loss, accuracy = model.evaluate(padded_docs, labels, verbose=0)
print('Accuracy: %f' % (accuracy*100))

运行示例首先打印整数编码的文档。

 
[[6, 16], [42, 24], [2, 17], [42, 24], [18], [17], [22, 17], [27, 42], [22, 24], [49, 46, 16, 34]]

然后打印每个文档的填充版本,使它们都具有统一的长度。

 
[[ 6 16  0  0]
[42 24  0  0]
[ 2 17  0  0]
[42 24  0  0]
[18  0  0  0]
[17  0  0  0]
[22 17  0  0]
[27 42  0  0]
[22 24  0  0]
[49 46 16 34]]

定义网络后,将打印层的摘要。 我们可以看到,正如预期的那样,Embedding 层的输出是一个 4×8 矩阵,它被 Flatten 层压缩为一个 32 元素的向量。

 
_________________________________________________________________
Layer (type)                 Output Shape              Param #
=================================================================
embedding_1 (Embedding)      (None, 4, 8)              400
_________________________________________________________________
flatten_1 (Flatten)          (None, 32)                0
_________________________________________________________________
dense_1 (Dense)              (None, 1)                 33
=================================================================
Total params: 433
Trainable params: 433
Non-trainable params: 0
_________________________________________________________________

注意:您的结果可能会因算法或评估过程的随机性或数值精度的差异而有所不同。考虑运行该示例几次并比较平均结果。

最后,打印出训练模型的准确率,表明它完美地学习了训练数据集(这并不奇怪)。

精度:100.000000


您可以将学习到的权重从嵌入层保存到文件中,以供以后在其他模型中使用。

您通常也可以使用此模型对测试数据集中具有相同类型词汇的其他文档进行分类。

接下来,让我们看看在 Keras 中加载预训练的词嵌入。

4、使用预训练手套嵌入的示例
Keras Embedding 层也可以使用在其他地方学习的词嵌入。

在自然语言处理领域,学习、保存和免费提供词嵌入是很常见的。

例如,GloVe 方法背后的研究人员在他们的网站上提供了一套预训练的词嵌入,并在公共领域许可下发布。看:

GloVe:单词表示的全局向量
嵌入的最小包为 822Mb,称为“glove.6B.zip”。它在 10 亿个标记(单词)的数据集上进行了训练,词汇量为 40 万个单词。有几种不同的嵌入向量大小,包括 50、100、200 和 300 维。

您可以下载此嵌入集合,我们可以使用来自训练数据集中单词的预训练嵌入的权重为 Keras 嵌入层播种。

这个例子的灵感来自 Keras 项目中的一个例子:pretrained_word_embeddings.py。

下载解压后会看到几个文件,其中一个是“glove.6B.100d.txt”,里面包含了一个100维版本的embedding。

如果您查看文件内部,您将在每行看到一个标记(单词),然后是权重(100 个数字)。例如,下面是嵌入 ASCII 文本文件的第一行,显示了“the”的嵌入。

 
the -0.038194 -0.24487 0.72812 -0.39961 0.083172 0.043953 -0.39141 0.3344 -0.57545 0.087459 0.28787 -0.06731 0.30906 -0.26384 -0.13231 -0.20757 0.33395 -0.33848 -0.31743 -0.48336 0.1464 -0.37304 0.34577 0.052041 0.44946 -0.46971 0.02628 -0.54155 -0.15518 -0.14107 -0.039722 0.28277 0.14393 0.23464 -0.31021 0.086173 0.20397 0.52624 0.17164 -0.082378 -0.71787 -0.41531 0.20335 -0.12763 0.41367 0.55187 0.57908 -0.33477 -0.36559 -0.54857 -0.062892 0.26584 0.30205 0.99775 -0.80481 -3.0243 0.01254 -0.36942 2.2167 0.72201 -0.24978 0.92136 0.034514 0.46745 1.1079 -0.19358 -0.074575 0.23353 -0.052062 -0.22044 0.057162 -0.15806 -0.30798 -0.41625 0.37972 0.15006 -0.53212 -0.2055 -1.2526 0.071624 0.70565 0.49744 -0.42063 0.26148 -1.538 -0.30223 -0.073438 -0.28312 0.37104 -0.25217 0.016215 -0.017099 -0.38984 0.87424 -0.72569 -0.51058 -0.52028 -0.1459 0.8278 0.27062

与上一节一样,第一步是定义示例,将它们编码为整数,然后将序列填充为相同的长度。

在这种情况下,我们需要能够将单词映射到整数以及将整数映射到单词。

Keras 提供了一个适合训练数据的 Tokenizer 类,可以通过调用 Tokenizer 类的 texts_to_sequences() 方法将文本一致地转换为序列,并提供对 word_index 属性中单词到整数的字典映射的访问。

# define documents
docs = ['Well done!',
		'Good work',
		'Great effort',
		'nice work',
		'Excellent!',
		'Weak',
		'Poor effort!',
		'not good',
		'poor work',
		'Could have done better.']
# define class labels
labels = array([1,1,1,1,1,0,0,0,0,0])
# prepare tokenizer
t = Tokenizer()
t.fit_on_texts(docs)
vocab_size = len(t.word_index) + 1
# integer encode the documents
encoded_docs = t.texts_to_sequences(docs)
print(encoded_docs)
# pad documents to a max length of 4 words
max_length = 4
padded_docs = pad_sequences(encoded_docs, maxlen=max_length, padding='post')
print(padded_docs)

接下来,我们需要将整个 GloVe 词嵌入文件加载到内存中,作为词到嵌入数组的字典。

# load the whole embedding into memory
embeddings_index = dict()
f = open('glove.6B.100d.txt')
for line in f:
	values = line.split()
	word = values[0]
	coefs = asarray(values[1:], dtype='float32')
	embeddings_index[word] = coefs
f.close()
print('Loaded %s word vectors.' % len(embeddings_index))

这很慢。 过滤训练数据中唯一单词的嵌入可能会更好。

接下来,我们需要为训练数据集中的每个单词创建一个嵌入矩阵。 我们可以通过枚举 Tokenizer.word_index 中的所有唯一词并从加载的 GloVe 嵌入中定位嵌入权重向量来做到这一点。

结果是一个权重矩阵,仅适用于我们将在训练期间看到的单词。

# create a weight matrix for words in training docs
embedding_matrix = zeros((vocab_size, 100))
for word, i in t.word_index.items():
	embedding_vector = embeddings_index.get(word)
	if embedding_vector is not None:
		embedding_matrix[i] = embedding_vector

现在我们可以像以前一样定义、拟合和评估模型了。

关键区别在于嵌入层可以使用 GloVe 词嵌入权重作为种子。 我们选择了 100 维的版本,因此 Embedding 层必须定义 output_dim 设置为 100。最后,我们不想更新该模型中的学习词权重,因此我们将模型的 trainable 属性设置为 False .

e = Embedding(vocab_size, 100, weights=[embedding_matrix], input_length=4, trainable=False)

下面列出了完整的代码示例。

from numpy import array
from numpy import asarray
from numpy import zeros
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Embedding
# define documents
docs = ['Well done!',
		'Good work',
		'Great effort',
		'nice work',
		'Excellent!',
		'Weak',
		'Poor effort!',
		'not good',
		'poor work',
		'Could have done better.']
# define class labels
labels = array([1,1,1,1,1,0,0,0,0,0])
# prepare tokenizer
t = Tokenizer()
t.fit_on_texts(docs)
vocab_size = len(t.word_index) + 1
# integer encode the documents
encoded_docs = t.texts_to_sequences(docs)
print(encoded_docs)
# pad documents to a max length of 4 words
max_length = 4
padded_docs = pad_sequences(encoded_docs, maxlen=max_length, padding='post')
print(padded_docs)
# load the whole embedding into memory
embeddings_index = dict()
f = open('../glove_data/glove.6B/glove.6B.100d.txt')
for line in f:
	values = line.split()
	word = values[0]
	coefs = asarray(values[1:], dtype='float32')
	embeddings_index[word] = coefs
f.close()
print('Loaded %s word vectors.' % len(embeddings_index))
# create a weight matrix for words in training docs
embedding_matrix = zeros((vocab_size, 100))
for word, i in t.word_index.items():
	embedding_vector = embeddings_index.get(word)
	if embedding_vector is not None:
		embedding_matrix[i] = embedding_vector
# define model
model = Sequential()
e = Embedding(vocab_size, 100, weights=[embedding_matrix], input_length=4, trainable=False)
model.add(e)
model.add(Flatten())
model.add(Dense(1, activation='sigmoid'))
# compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
# summarize the model
print(model.summary())
# fit the model
model.fit(padded_docs, labels, epochs=50, verbose=0)
# evaluate the model
loss, accuracy = model.evaluate(padded_docs, labels, verbose=0)
print('Accuracy: %f' % (accuracy*100))

注意:您的结果可能会因算法或评估过程的随机性或数值精度的差异而有所不同。 考虑运行该示例几次并比较平均结果。

运行该示例可能需要更长的时间,但随后证明它同样能够解决这个简单的问题。

[[6, 2], [3, 1], [7, 4], [8, 1], [9], [10], [5, 4], [11, 3], [5, 1], [12, 13, 2, 14]]
 
[[ 6  2  0  0]
 [ 3  1  0  0]
 [ 7  4  0  0]
 [ 8  1  0  0]
 [ 9  0  0  0]
 [10  0  0  0]
 [ 5  4  0  0]
 [11  3  0  0]
 [ 5  1  0  0]
 [12 13  2 14]]
 
Loaded 400000 word vectors.
 
_________________________________________________________________
Layer (type)                 Output Shape              Param #
=================================================================
embedding_1 (Embedding)      (None, 4, 100)            1500
_________________________________________________________________
flatten_1 (Flatten)          (None, 400)               0
_________________________________________________________________
dense_1 (Dense)              (None, 1)                 401
=================================================================
Total params: 1,901
Trainable params: 401
Non-trainable params: 1,500
_________________________________________________________________
 
 
Accuracy: 100.000000

在实践中,我鼓励您尝试使用固定的预训练嵌入来学习词嵌入,并尝试在预训练嵌入之上执行学习。

看看什么最适合您的特定问题。

总结
在本教程中,您了解了如何通过 Keras 在 Python 中使用词嵌入进行深度学习。

具体来说,您了解到:

关于词嵌入以及 Keras 通过嵌入层支持词嵌入。
如何在拟合神经网络的同时学习词嵌入。
如何在神经网络中使用预训练的词嵌入。

作者 east
Flink, Spark 4月 13,2022

Flink和Spark的Transformation不同地方对比

1、合并输入流:

在spark有Union
返回一个包含源DStream与其他 DStream的元素合并后的新DSTREAM。 具体例子可以参考Spark Streaming多个输入流

在Flink中更高级,除了有union合并多个输入流(
union()所连接的两个或多个数据流的数据类型必须一致 ),还有connect()。

①connect()只能连接两个数据流,union()可以连接多个数据流。

②connect()所连接的两个数据流的数据类型可以不一致,union()所连接的两个或多个数据流的数据类型必须一致。

③两个DataStream经过connect()之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且两个流之间可以共享状态。

2、求最大最小的操作

Flink有 max()、 maxBy() 对该字段求最大值 。
min()、minBy对某字段求最小值

作者 east
深度学习 4月 12,2022

Jaccard 相似度 – NLP 中的文本相似度度量

Jaccard 相似度也称为 Jaccard 指数和联合交集。用于确定两个文本文档之间的相似性的 Jaccard 相似度矩阵是指两个文本文档在上下文方面彼此接近的程度,即在总词中存在多少常用词。

在自然语言处理中,我们经常需要估计文本文档之间的文本相似度。存在许多文本相似度矩阵,例如余弦相似度、Jaccard 相似度和欧几里得距离测量。所有这些文本相似度指标都有不同的行为。

在本教程中,您将通过示例详细了解 Jaccard 相似度矩阵。您还可以参考本教程来探索余弦相似度指标。

Jaccard 相似度定义为两个文档的交集除以这两个文档的并集,这两个文档指的是总单词数中的常用单词数。在这里,我们将使用单词集来查找文档的交集和并集。

Jaccard 相似度的数学表示为:

Jaccard Similarity mathematical equation

Jaccard Similarity 得分在 0 到 1 的范围内。如果两个文档相同,则 Jaccard Similarity 为 1。如果两个文档之间没有共同词,则 Jaccard 相似度得分为 0。

让我们看一下 Jaccard Similarity 如何工作的示例?

doc_1 = "Data is the new oil of the digital economy"
doc_2 = "Data is a new oil"

让我们为每个文档获取一组唯一的单词。

words_doc1 = {'data', 'is', 'the', 'new', 'oil', 'of', 'digital', 'economy'}
words_doc2 = {'data', 'is', 'a', 'new', 'oil'}

现在,我们将计算这两组词的交集和并集,并测量 doc_1 和 doc_2 之间的 Jaccard 相似度。

Calculate the Jaccard similarity example
Jaccard similarity visual example

查找 Jaccard 相似性的 Python 代码

让我们为 Jaccard Similarity 编写 Python 代码。

def Jaccard_Similarity(doc1, doc2): 
    
    # List the unique words in a document
    words_doc1 = set(doc1.lower().split()) 
    words_doc2 = set(doc2.lower().split())
    
    # Find the intersection of words list of doc1 & doc2
    intersection = words_doc1.intersection(words_doc2)

    # Find the union of words list of doc1 & doc2
    union = words_doc1.union(words_doc2)
        
    # Calculate Jaccard similarity score 
    # using length of intersection set divided by length of union set
    return float(len(intersection)) / len(union)
doc_1 = "Data is the new oil of the digital economy" doc_2 = "Data is a new oil" Jaccard_Similarity(doc_1,doc_2)
0.44444

doc_1 和 doc_2 之间的 Jaccard 相似度为 0.444

作者 east
python, Tensorflow 4月 11,2022

tensorflow的tf.matmul()用法详解

matmul的用法是2个矩阵相乘, 将前面矩阵的每一行分别与后面矩阵的列相乘,作为结果矩阵的行列

import tensorflow as tf
import math
import matplotlib.pyplot as plt
A = [[1,2,3],[0,1,1],[2,0,0]]
B = [[0,1,2,0],[4,3,0,0],[0,1,0,0]]
tf.compat.v1.disable_eager_execution()
sess = tf.compat.v1.InteractiveSession()
print(sess.run(tf.matmul(A,B)))

运行结果:

[[ 8 10  2  0]  
[ 4 4 0 0]
[ 0 2 4 0]]
作者 east
Hive 4月 10,2022

生产环境选型:Hive对比Pig

Apache Hadoop 包括一个不断增长的软件库,可帮助用户管理数据。对于需要管理大量信息的组织而言,Hive 和 Pig 是两个最关键的 Hadoop 项目。以下 Hive 与 Pig 比较将帮助您确定哪个 Hadoop 组件更符合您的需求。您还将有机会了解替代 ETL 解决方案的优势,这些解决方案使数据管理和丰富变得更加容易。

Hive vs Pig:最关键的区别
显然,Hive 和 Pig 为用户提供了很多优势。您使用的工具可能取决于您的数据需求。你是数据分析师还是程序员?您使用结构化数据还是半结构化数据?

了解这些问题的答案将帮助您确定更适合您的选项。通过了解 Hive 与 Pig 最关键的区别,您可以专注于适合您和您的组织的工具。

Hive 具有将数据转换为报告的可靠功能,而 Pig 为您提供了一种编程语言,可帮助您从一个或多个数据库中提取所需的信息。
Hive 在服务器端工作,而 Pig 在集群的客户端工作。
Hive 可以访问原始数据,而 Pig Latin 脚本不能。
HiveQL 遵循数据分析师可以轻松掌握的声明性 SQL 语言,而 Pig 依赖于具有更大学习曲线的 SQL 变体。
Hive 处理结构化数据,而 Pig 可以处理结构化和半结构化数据。

什么是 Hive Hadoop?
Apache 的 Hadoop Hive 组件执行多种功能,帮助数据分析专业人员通过类似于 SQL 的操作界面定位和丰富数据。 如果您的团队成员已经了解 SQL,那么他们很容易开始使用 Hive。

数据分析师经常使用 Hive 来:

分析数据。
查询大量非结构化数据。
生成数据摘要。
Hive 为您提供了一种可靠的方式来定位和分析非结构化数据。 显然,Hive 并不是每个组织的完美工具,但它具有出色的功能,使其成为需要有效方式处理非结构化数据的团体的有用工具。

什么是Pig Hadoop?
Apache Pig 使用脚本语言 Pig Latin 从 Hadoop 中查找、提取和丰富数据结构化和半结构化数据。许多人发现 Pig Latin 有点难学。但是,克服学习曲线可以让用户更好地控制他们的 Hadoop 数据。

选择Pig的人经常指出它:

快速加载数据。
隐式定义表模式。
支持同组。
像所有数据工具一样,Pig 也有其优点和缺点。您可以深入了解以下优点和缺点,以帮助您确定是否要将 Pig 作为 Hadoop 策略的一部分。

Apache Hadoop 在 ETL 中的作用
有些人错误地认为 Apache Hadoop 是一种 ETL 工具,它为他们提供了提取、转换和加载数据所需的所有工具。 Hadoop 提供了一些出色的优势,但它不属于 ETL 类别。但是,如果使用得当,它可以改进 ETL 策略和项目。

许多人使用 Apache Hadoop 之类的数据,因为它可以:

提高性能并防止硬件出现故障。
在将流行类型的数据移动到 ETL 管道之前集成它们。
提高处理和传输大数据的速度。
在用户将受损数据转移到其他工具之前识别安全漏洞并警告用户。
注意可能擦除或损坏数据的风险,让您有机会在丢失项目的关键信息之前解决问题。
虽然将 Hadoop 称为 ETL 解决方案是不正确的,但您可以将其称为 ETL 助手。该解决方案有几个很棒的功能,可以提高 ETL 项目的速度和准确性。

Hive:优点和缺点
要了解有关 Hive 优缺点的更多信息,直接从经常使用 Hadoop 组件的人那里获取信息是有意义的。 TrustRadius 评论家给 Apache Hive 打了 7.8 分(满分 10 分)。

用户从 Hive 获得的一些优势包括:

对已经熟悉 SQL 的任何人都适用的简单查询。
可在需要时从多个服务器寻求增援的可扩展性。
为数据分析生成临时查询的选项。
它处理长时间运行的查询的能力如何。
它能够连接各种关系数据库,包括 Postgres 和 MySQL。
使用 Java 和 Python 编写自定义函数的选项。
简化 Hadoop 体验,尤其是当没有技术背景的人参与数据项目时。
这就是潜在用户在选择 Hive 时应该考虑的一系列积极特征。不过,用户也有很多批评。例如,Hive 的许多批评包括:

缺乏对在线处理数据的支持。
无法支持子查询,
更新数据的复杂方法。
即席查询速度慢。
缺乏让管理员为用户分配特定角色的安全控制。
将易用性置于处理速度之上,尤其是在批处理方面。
尽管许多用户赞赏 Hive 的查询语言是基于 SQL 构建的,但他们指出 Hive 遗漏了一些非常有用的 SQL 命令。这种缺陷迫使用户浪费时间重写应该自动附带 Hadoop 组件的命令。

Pig:优点和缺点
Apache Pig 的数值审查略胜 Apache Hive。 TrustRadius 用户给 Pig 打了 7.9 分(满分 10 分)。

Apache Pig 用户提到的一些优点包括:

与 MapReduce、Spark 和 Tez 一起使用的快速执行。
它能够处理几乎任何数量的数据,无论大小。
使其与其他工具(如 Hive 和 DBMS)结合以改进其功能的功能。
一个强大的文档流程,可帮助新用户学习 Pig Latin。
本地和远程互操作性,让专业人员可以在任何地方通过可靠的连接工作。
尽管很多人喜欢 Apache Pig,但它确实存在给用户带来问题的问题。针对 Pig 的一些投诉集中在:

无法解决复杂的数学问题。
难以实施顺序检查。
很少有用于循环数据的选项,这会增加用户的工作量。
有些人难以掌握的特定领域语言(猪拉丁语)。
显然,Apache 可以对 Pig 进行一些改进。然而,它确实填补了一个吸引某些用户的利基空间。

作者 east
Flink 4月 8,2022

Flink面试题汇总

1、Flink如何保证精确一次性消费

Flink 保证精确一次性消费主要依赖于两种Flink机制

1、Checkpoint机制

2、二阶段提交机制

Checkpoint机制

主要是当Flink开启Checkpoint的时候,会往Source端插入一条barrir,然后这个barrir随着数据流向一直流动,当流入到一个算子的时候,这个算子就开始制作checkpoint,制作的是从barrir来到之前的时候当前算子的状态,将状态写入状态后端当中。然后将barrir往下流动,当流动到keyby 或者shuffle算子的时候,例如当一个算子的数据,依赖于多个流的时候,这个时候会有barrir对齐,也就是当所有的barrir都来到这个算子的时候进行制作checkpoint,依次进行流动,当流动到sink算子的时候,并且sink算子也制作完成checkpoint会向jobmanager 报告 checkpoint n 制作完成。

二阶段提交机制

Flink 提供了CheckpointedFunction与CheckpointListener这样两个接口,CheckpointedFunction中有snapshotState方法,每次checkpoint触发执行方法,通常会将缓存数据放入状态中,可以理解为一个hook,这个方法里面可以实现预提交,CheckpointListyener中有notifyCheckpointComplete方法,checkpoint完成之后的通知方法,这里可以做一些额外的操作。例如FLinkKafkaConumerBase使用这个来完成Kafka offset的提交,在这个方法里面可以实现提交操作。在2PC中提到如果对应流程例如某个checkpoint失败的话,那么checkpoint就会回滚,不会影响数据一致性,那么如果在通知checkpoint成功的之后失败了,那么就会在initalizeSate方法中完成事务的提交,这样可以保证数据的一致性。最主要是根据checkpoint的状态文件来判断的。

2、flink和spark区别

flink是一个类似spark的“开源技术栈”,因为它也提供了批处理,流式计算,图计算,交互式查询,机器学习等。flink也是内存计算,比较类似spark,但是不一样的是,spark的计算模型基于RDD,将流式计算看成是特殊的批处理,他的DStream其实还是RDD。而flink吧批处理当成是特殊的流式计算,但是批处理和流式计算的层的引擎是两个,抽象了DataSet和DataStream。flink在性能上也表现的很好,流式计算延迟比spark少,能做到真正的流式计算,而spark只能是准流式计算。而且在批处理上,当迭代次数变多,flink的速度比spark还要快,所以如果flink早一点出来,或许比现在的Spark更火。

3、Flink的状态可以用来做什么?

Flink状态主要有两种使用方式:

  1. checkpoint的数据恢复
  2. 逻辑计算

4、Flink的waterMark机制,Flink watermark传递机制

Flink 中的watermark机制是用来处理乱序的,flink的时间必须是event time ,有一个简单的例子就是,假如窗口是5秒,watermark是2秒,那么 总共就是7秒,这个时候什么时候会触发计算呢,假设数据初始时间是1000,那么等到6999的时候会触发5999窗口的计算,那么下一个就是13999的时候触发10999的窗口

其实这个就是watermark的机制,在多并行度中,例如在kafka中会所有的分区都达到才会触发窗口

5、Flink的时间语义

Event Time 事件产生的时间

Ingestion time 事件进入Flink的时间

processing time 事件进入算子的时间

6、Flink window join

1、window join,即按照指定的字段和滚动滑动窗口和会话窗口进行 inner join

2、是coGoup 其实就是left join 和 right join,

3、interval join 也就是 在窗口中进行join 有一些问题,因为有些数据是真的会后到的,时间还很长,那么这个时候就有了interval join但是必须要是事件时间,并且还要指定watermark和水位以及获取事件时间戳。并且要设置 偏移区间,因为join 也不能一直等的。

7、flink窗口函数有哪些

Tumbing window

Silding window

Session window

Count winodw

8、keyedProcessFunction 是如何工作的。假如是event time的话

keyedProcessFunction 是有一个ontime 操作的,假如是 event时间的时候 那么 调用的时间就是查看,event的watermark 是否大于 trigger time 的时间,如果大于则进行计算,不大于就等着,如果是kafka的话,那么默认是分区键最小的时间来进行触发。

9、flink是怎么处理离线数据的例如和离线数据的关联?

1、async io

2、broadcast

3、async io + cache

4、open方法中读取,然后定时线程刷新,缓存更新是先删除,之后再来一条之后再负责写入缓存

10、flink支持的数据类型

DataSet Api 和 DataStream Api、Table Api

11、Flink出现数据倾斜怎么办

Flink数据倾斜如何查看:

在flink的web ui中可以看到数据倾斜的情况,就是每个subtask处理的数据量差距很大,例如有的只有一M 有的100M 这就是严重的数据倾斜了。

KafkaSource端发生的数据倾斜

例如上游kafka发送的时候指定的key出现了数据热点问题,那么就在接入之后,做一个负载均衡(前提下游不是keyby)。

聚合类算子数据倾斜

预聚合加全局聚合

12、flink 维表关联怎么做的

1、async io

2、broadcast

3、async io + cache

4、open方法中读取,然后定时线程刷新,缓存更新是先删除,之后再来一条之后再负责写入缓存

13、Flink checkpoint的超时问题 如何解决。

1、是否网络问题

2、是否是barrir问题

3、查看webui,是否有数据倾斜

4、有数据倾斜的话,那么解决数据倾斜后,会有改善,

14、flinkTopN与离线的TopN的区别

topn 无论是在离线还是在实时计算中都是比较常见的功能,不同于离线计算中的topn,实时数据是持续不断的,这样就给topn的计算带来很大的困难,因为要持续在内存中维持一个topn的数据结构,当有新数据来的时候,更新这个数据结构

15、sparkstreaming 和flink 里checkpoint的区别

sparkstreaming 的checkpoint会导致数据重复消费

但是flink的 checkpoint可以 保证精确一次性,同时可以进行增量,快速的checkpoint的,有三个状态后端,memery、rocksdb、hdfs

16、简单介绍一下cep状态编程

Complex Event Processing(CEP):

FLink Cep 是在FLink中实现的复杂时间处理库,CEP允许在无休止的时间流中检测事件模式,让我们有机会掌握数据中重要的部分,一个或多个由简单事件构成的时间流通过一定的规则匹配,然后输出用户想得到的数据,也就是满足规则的复杂事件。

17、 Flink cep连续事件的可选项有什么

18、如何通过flink的CEP来实现支付延迟提醒

19、Flink cep 你用过哪些业务场景

20、cep底层如何工作

21、cep怎么老化

22、cep性能调优

23、Flink的背压,介绍一下Flink的反压,你们是如何监控和发现的呢。

Flink 没有使用任何复杂的机制来解决反压问题,Flink 在数据传输过程中使用了分布式阻塞队列。我们知道在一个阻塞队列中,当队列满了以后发送者会被天然阻塞住,这种阻塞功能相当于给这个阻塞队列提供了反压的能力。

当你的任务出现反压时,如果你的上游是类似 Kafka 的消息系统,很明显的表现就是消费速度变慢,Kafka 消息出现堆积。

如果你的业务对数据延迟要求并不高,那么反压其实并没有很大的影响。但是对于规模很大的集群中的大作业,反压会造成严重的“并发症”。首先任务状态会变得很大,因为数据大规模堆积在系统中,这些暂时不被处理的数据同样会被放到“状态”中。另外,Flink 会因为数据堆积和处理速度变慢导致 checkpoint 超时,而 checkpoint 是 Flink 保证数据一致性的关键所在,最终会导致数据的不一致发生。

Flink Web UI

Flink 的后台页面是我们发现反压问题的第一选择。Flink 的后台页面可以直观、清晰地看到当前作业的运行状态。

Web UI,需要注意的是,只有用户在访问点击某一个作业时,才会触发反压状态的计算。在默认的设置下,Flink的TaskManager会每隔50ms触发一次反压状态监测,共监测100次,并将计算结果反馈给JobManager,最后由JobManager进行反压比例的计算,然后进行展示。

在生产环境中Flink任务有反压有三种OK、LOW、HIGH

OK正常

LOW一般

HIGH高负载

24、Flink的CBO,逻辑执行计划和物理执行计划

Flink的优化执行其实是借鉴的数据库的优化器来生成的执行计划。

CBO,成本优化器,代价最小的执行计划就是最好的执行计划。传统的数据库,成本优化器做出最优化的执行计划是依据统计信息来计算的。Flink 的成本优化器也一样。Flink 在提供最终执行前,优化每个查询的执行逻辑和物理执行计划。这些优化工作是交给底层来完成的。根据查询成本执行进一步的优化,从而产生潜在的不同决策:如何排序连接,执行哪种类型的连接,并行度等等。

// TODO

25、Flink中数据聚合,不使用窗口怎么实现聚合

  • valueState 用于保存单个值
  • ListState 用于保存list元素
  • MapState 用于保存一组键值对
  • ReducingState 提供了和ListState相同的方法,返回一个ReducingFunction聚合后的值。
  • AggregatingState和 ReducingState类似,返回一个AggregatingState内部聚合后的值

26、Flink中state有哪几种存储方式

Memery、RocksDB、HDFS

27、Flink 异常数据怎么处理

异常数据在我们的场景中,一般分为缺失字段和异常值数据。

异常值: 例如宝宝的年龄的数据,例如对于母婴行业来讲,一个宝宝的年龄是一个至关重要的数据,可以说是最重要的,因为宝宝大于3岁几乎就不会在母婴上面购买物品。像我们的有当日、未知、以及很久的时间。这样都属于异常字段,这些数据我们会展示出来给店长和区域经理看,让他们知道多少个年龄是不准的。如果要处理的话,可以根据他购买的时间来进行实时矫正,例如孕妇服装、奶粉的段位、纸尿裤的大小,以及奶嘴啊一些能够区分年龄段的来进行处理。我们并没有实时处理这些数据,我们会有一个底层的策略任务夜维去跑,一个星期跑一次。

缺失字段: 例如有的字段真的缺失的很厉害,能修补就修补。不能修补就放弃,就像上家公司中的新闻推荐过滤器。

28、Flink 监控你们怎么做的

1、我们监控了Flink的任务是否停止

2、我们监控了Flink的Kafka的LAG

3、我们会进行实时数据对账,例如销售额。

29、Flink 有数据丢失的可能吗

Flink有三种数据消费语义:

  1. At Most Once 最多消费一次 发生故障有可能丢失
  2. At Least Once 最少一次 发生故障有可能重复
  3. Exactly-Once 精确一次 如果产生故障,也能保证数据不丢失不重复。

flink 新版本已经不提供 At-Most-Once 语义。

30、Flink interval join 你能简单的写一写吗

DataStream<T> keyed1 = ds1.keyBy(o -> o.getString("key"))
DataStream<T> keyed2 = ds2.keyBy(o -> o.getString("key"))
//右边时间戳-5s<=左边流时间戳<=右边时间戳-1s
keyed1.intervalJoin(keyed2).between(Time.milliseconds(-5), Time.milliseconds(5))

31、Flink 提交的时候 并行度如何制定,以及资源如何配置

并行度根据kafka topic的并行度,一个并行度3个G

32、Flink的boardcast join 的原理是什么

利用 broadcast State 将维度数据流广播到下游所有 task 中。这个 broadcast 的流可以与我们的事件流进行 connect,然后在后续的 process 算子中进行关联操作即可。

33、flink的source端断了,比如kafka出故障,没有数据发过来,怎么处理?

会有报警,监控的kafka偏移量也就是LAG。

34、flink有什么常用的流的API?

window join 啊 cogroup 啊 map flatmap,async io 等

35、flink的水位线,你了解吗,能简单介绍一下吗

Flink 的watermark是一种延迟触发的机制。

一般watermark是和window结合来进行处理乱序数据的,Watermark最根本就是一个时间机制,例如我设置最大乱序时间为2s,窗口时间为5秒,那么就是当事件时间大于7s的时候会触发窗口。当然假如有数据分区的情况下,例如kafka中接入watermake的话,那么watermake是会流动的,取的是所有分区中最小的watermake进行流动,因为只有最小的能够保证,之前的数据都已经来到了,可以触发计算了。

36、Flink怎么维护Checkpoint?在HDFS上存储的话会有小文件吗

默认情况下,如果设置了Checkpoint选项,Flink只保留最近成功生成的1个Checkpoint。当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。但是,如果我们希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活。Flink支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置指定最多需要保存Checkpoint的个数。

关于小文件问题可以参考代达罗斯之殇-大数据领域小文件问题解决攻略。

37、Spark和Flink的序列化,有什么区别吗?

Spark 默认使用的是 Java序列化机制,同时还有优化的机制,也就是kryo

Flink是自己实现的序列化机制,也就是TypeInformation

38、Flink是怎么处理迟到数据的?但是实际开发中不能有数据迟到,怎么做?

Flink 的watermark是一种延迟触发的机制。

一般watermark是和window结合来进行处理乱序数据的,Watermark最根本就是一个时间机制,例如我设置最大乱序时间为2s,窗口时间为5秒,那么就是当事件时间大于7s的时候会触发窗口。当然假如有数据分区的情况下,例如kafka中接入watermake的话,那么watermake是会流动的,取的是所有分区中最小的watermake进行流动,因为只有最小的能够保证,之前的数据都已经来到了,可以触发计算了。

39、画出flink执行时的流程图。

40、Flink分区分配策略

41、Flink关闭后状态端数据恢复得慢怎么办?

42、了解flink的savepoint吗?讲一下savepoint和checkpoint的不同和各有什么优势

43、flink的状态后端机制

Flink的状态后端是Flink在做checkpoint的时候将状态快照持久化,有三种状态后端 Memery、HDFS、RocksDB

44、flink中滑动窗口和滚动窗口的区别,实际应用的窗口是哪种?用的是窗口长度和滑动步长是多少?

45、用flink能替代spark的批处理功能吗

Flink 未来的目标是批处理和流处理一体化,因为批处理的数据集你可以理解为是一个有限的数据流。Flink 在批出理方面,尤其是在今年 Flink 1.9 Release 之后,合入大量在 Hive 方面的功能,你可以使用 Flink SQL 来读取 Hive 中的元数据和数据集,并且使用 Flink SQL 对其进行逻辑加工,不过目前 Flink 在批处理方面的性能,还是干不过 Spark的。

目前看来,Flink 在批处理方面还有很多内容要做,当然,如果是实时计算引擎的引入,Flink 当然是首选。

46、flink计算的UV你们是如何设置状态后端保存数据

可以使用布隆过滤器。

47、sparkstreaming和flink在执行任务上有啥区别,不是简单的流处理和微批,sparkstreaming提交任务是分解成stage,flink是转换graph,有啥区别?

48、flink把streamgraph转化成jobGraph是在哪个阶段?

49、Flink中的watermark除了处理乱序数据还有其他作用吗?

还有kafka数据顺序消费的处理。

50、flink你一般设置水位线设置多少

我们之前设置的水位线是6s

52、Flink任务提交流程

Flink任务提交后,Client向HDFS上传Flink的jar包和配置,之后向Yarn ResourceManager提交任务,ResourceManager分配Container资源并通知对应的NodeManager启动
ApplicationMaster,ApplicationMaster启动后加载Flink的jar包和配置构建环境,然后启动JobManager;之后Application Master向ResourceManager申请资源启动TaskManager
,ResourceManager分配Container资源后,由ApplicationMaster通知资源所在的节点的NodeManager启动TaskManager,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager,TaskManager启动向JobManager发送心跳,并等待JobManager向其分配任务。

53、Flink技术架构图

54、flink如何实现在指定时间进行计算。

55、手写Flink topN

57、Flink的Join算子有哪些

一般join是发生在window上面的:

1、window join,即按照指定的字段和滚动滑动窗口和会话窗口进行 inner join

2、是coGoup 其实就是left join 和 right join,

3、interval join 也就是 在窗口中进行join 有一些问题,因为有些数据是真的会后到的,时间还很长,那么这个时候就有了interval join但是必须要是事件时间,并且还要指定watermark和水位以及获取事件时间戳。并且要设置 偏移区间,因为join 也不能一直等的。

58、Flink1.10 有什么新特性吗?

内存管理及配置优化

Flink 目前的 TaskExecutor 内存模型存在着一些缺陷,导致优化资源利用率比较困难,例如:

  • 流和批处理内存占用的配置模型不同
  • 流处理中的 RocksDB state backend 需要依赖用户进行复杂的配置

为了让内存配置变的对于用户更加清晰、直观,Flink 1.10 对 TaskExecutor 的内存模型和配置逻辑进行了较大的改动 (FLIP-49 [7])。这些改动使得 Flink 能够更好地适配所有部署环境(例如 Kubernetes, Yarn, Mesos),让用户能够更加严格的控制其内存开销。

Managed 内存扩展

Managed 内存的范围有所扩展,还涵盖了 RocksDB state backend 使用的内存。尽管批处理作业既可以使用堆内内存也可以使用堆外内存,使用 RocksDB state backend 的流处理作业却只能利用堆外内存。因此为了让用户执行流和批处理作业时无需更改集群的配置,我们规定从现在起 managed 内存只能在堆外。

简化 RocksDB 配置

此前,配置像 RocksDB 这样的堆外 state backend 需要进行大量的手动调试,例如减小 JVM 堆空间、设置 Flink 使用堆外内存等。现在,Flink 的开箱配置即可支持这一切,且只需要简单地改变 managed 内存的大小即可调整 RocksDB state backend 的内存预算。

另一个重要的优化是,Flink 现在可以限制 RocksDB 的 native 内存占用,以避免超过总的内存预算—这对于 Kubernetes 等容器化部署环境尤为重要。

统一的作业提交逻辑
在此之前,提交作业是由执行环境负责的,且与不同的部署目标(例如 Yarn, Kubernetes, Mesos)紧密相关。这导致用户需要针对不同环境保留多套配置,增加了管理的成本。

在 Flink 1.10 中,作业提交逻辑被抽象到了通用的 Executor 接口。新增加的 ExecutorCLI (引入了为任意执行目标指定配置参数的统一方法。此外,随着引入 JobClient负责获取 JobExecutionResult,获取作业执行结果的逻辑也得以与作业提交解耦。

原生 Kubernetes 集成(Beta)

对于想要在容器化环境中尝试 Flink 的用户来说,想要在 Kubernetes 上部署和管理一个 Flink standalone 集群,首先需要对容器、算子及像 kubectl 这样的环境工具有所了解。

在 Flink 1.10 中,我们推出了初步的支持 session 模式的主动 Kubernetes 集成(FLINK-9953)。其中,“主动”指 Flink ResourceManager (K8sResMngr) 原生地与 Kubernetes 通信,像 Flink 在 Yarn 和 Mesos 上一样按需申请 pod。用户可以利用 namespace,在多租户环境中以较少的资源开销启动 Flink。这需要用户提前配置好 RBAC 角色和有足够权限的服务账号。

Table API/SQL: 生产可用的 Hive 集成

Flink 1.9 推出了预览版的 Hive 集成。该版本允许用户使用 SQL DDL 将 Flink 特有的元数据持久化到 Hive Metastore、调用 Hive 中定义的 UDF 以及读、写 Hive 中的表。Flink 1.10 进一步开发和完善了这一特性,带来了全面兼容 Hive 主要版本的生产可用的 Hive 集成。

Batch SQL 原生分区支持

此前,Flink 只支持写入未分区的 Hive 表。在 Flink 1.10 中,Flink SQL 扩展支持了 INSERT OVERWRITE 和 PARTITION 的语法(FLIP-63 ),允许用户写入 Hive 中的静态和动态分区。

  • 写入静态分区
INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement;
  • 写入动态分区
INSERT { INTO | OVERWRITE } TABLE tablename1 select_statement1 FROM from_statement;

对分区表的全面支持,使得用户在读取数据时能够受益于分区剪枝,减少了需要扫描的数据量,从而大幅提升了这些操作的性能。

另外,除了分区剪枝,Flink 1.10 的 Hive 集成还引入了许多数据读取方面的优化,例如:

  • 投影下推:Flink 采用了投影下推技术,通过在扫描表时忽略不必要的域,最小化 Flink 和 Hive 表之间的数据传输量。这一优化在表的列数较多时尤为有效。
  • LIMIT 下推:对于包含 LIMIT 语句的查询,Flink 在所有可能的地方限制返回的数据条数,以降低通过网络传输的数据量。
  • 读取数据时的 ORC 向量化: 为了提高读取 ORC 文件的性能,对于 Hive 2.0.0 及以上版本以及非复合数据类型的列,Flink 现在默认使用原生的 ORC 向量化读取器。

59、Flink的重启策略

固定延迟重启策略

固定延迟重启策略是尝试给定次数重新启动作业。如果超过最大尝试次数,则作业失败。在两次连续重启尝试之间,会有一个固定的延迟等待时间。

故障率重启策略

故障率重启策略在故障后重新作业,当设置的故障率(failure rate)超过每个时间间隔的故障时,作业最终失败。在两次连续重启尝试之间,重启策略延迟等待一段时间。

无重启策略

作业直接失败,不尝试重启。

后备重启策略

使用群集定义的重新启动策略。这对于启用检查点的流式传输程序很有帮助。默认情况下,如果没有定义其他重启策略,则选择固定延迟重启策略。

60、Flink什么时候用aggregate()或者process()

aggregate: 增量聚合

process: 全量聚合

当计算累加操作时候可以使用aggregate操作。

当计算窗口内全量数据的时候使用process,例如排序等操作。

61、Flink优化 你了解多少

62、Flink内存溢出怎么办

63、说说Flink中的keyState包含哪些数据结构

64、Flink shardGroup的概念

作者 east
mongodb, redis 4月 8,2022

MongoDB vs. Redis——你应该使用哪个数据库?

比较 MongoDB 和 Redis 时的 5 个重要区别:

1、速度:Redis 比 MongoDB 快,因为它是一个内存数据库。
2、 RAM:Redis 使用比 MongoDB 更多的 RAM 来存储重要数据集。
3、 可扩展性:MongoDB 的可扩展性优于 Redis。
4、 存储:企业(主要)使用 Redis 进行键值存储。 MongoDB 将所有内容存储在文档中。
5、 评论:MongoDB 和 Redis 的评论普遍是正面的。 (两者在 G2 上都有 4.5/5 星。)

在当今的数字世界中,可靠的数据库具有多种功能。 所有类型的企业都需要在数据库中存储、管理和访问信息,以用于销售、库存、客户服务等。 一些世界上最大的品牌将 MongoDB 和 Redis 用于各种抽象数据结构。 但是哪一个更好呢? 有什么区别?

下面我们在几个因素上比较 MongoDB 和 Redis,以便您选择正确的一个。

MongoDB 与 Redis:特性和优势
MongoDB 与 Redis:技术细节
在比较特性之前,我们先来看看 MongoDB 和 Redis 的主要区别:

MongoDB 是一个 NoSQL 数据库。 MongoDB 于 2009 年发布,使用类似 JSON 的文档,允许用户存储无模式数据集。它将自己描述为“现代应用程序最流行的数据库”。有关 Integrate.io 的本机 MongoDB 连接器的更多信息,请访问我们的集成页面。
Redis 是一种内存数据结构存储,可兼作数据库。 Redis 同样于 2009 年发布,支持列表、地图、位图、流、字符串和空间索引等各种数据结构。
虽然 MongoDB 和 Redis 有很多不同,但它们有一些共同点:

两者都有开源许可证。
两者都启用二级索引。
两个数据库最终都成为计算密集型的。
MongoDB vs. Redis:数据库结构
MongoDB 和 Redis 具有不同的数据库结构设置,MongoDB 的性能类似于关系数据库。 (它使用表达性查询语言。)不过,关键的区别在于 MongoDB 是无模式的,因此用户不必创建文档结构。这意味着该平台是两者中更容易使用的。

另一方面,Redis 使用键值存储,用最简单的术语来说,将数据分配给键和关联值。这意味着设置与 MongoDB 完全不同,MongoDB 使用传统上与关系数据库相关的行和列。键值存储使一些用户受益,但 Redis 一开始对一些人来说可能看起来很陌生。正如我们之前提到的,Redis 使用其他数据结构——位图、集合、字符串,等等——尽管它优先考虑键值存储。

MongoDB 与 Redis:可扩展性
MongoDB 和 Redis 都在可扩展性方面得分,这使得它们对于任何成长中的业务都值得添加。不过还是有区别的:

MongoDB 是用 C++ 编写的,可用于 Windows、OS X、Linux 和 Solaris。
Redis 是用 C 编写的,可用于 Windows、OS X、Linux 和 BSD。
MongoDB 和 Redis 都支持广泛的编程语言,包括 C、C~、C++、Java、Python 和 Scala。

在考虑扩展数据管理时,用户应该考虑到所有这些。

MongoDB 与 Redis:性能
Redis 比 MongoDB 快,因为它是一个内存数据库。这使它成为快速构建复杂数据结构的绝佳选择。然而,MongoDB 适合大多数需要可靠数据库的中型企业。它相对简单易用,而且正如我们前面提到的,它非常可扩展。

速度带来了一些缺点。 Redis,也许并不奇怪,比 MongoDB 使用更多的 RAM,这对于非平凡的数据集是显而易见的。

MongoDB vs. Redis:流行度
这两个开源数据库都有一个忠实的用户社区:

MongoDB 定期在其社区页面上举办网络研讨会和其他活动。
Redis 拥有一个蓬勃发展的社区,用户在这里讨论新功能,甚至亲自见面。
许多著名公司使用这些工具:

Uber、Lyft 和 Stack 使用 MongoDB。
Twitter、GitHub 和微博都使用 Redis。

集成您的数据仓库

MongoDB 与 Redis:定价
MongoDB 和 Redis 都有开源许可证,可以免费使用。但是,因此这两个平台都存在局限性。尽管 MongoDB 和 Redis 允许用户存储和管理各种数据,但用户需要了解编程语言。

在数据管理方面,一些企业将受益于支付不需要任何代码的 ETL 平台。这使得无需学习查询语言即可轻松简化数据处理和优化工作流程。

MongoDB 与 Redis:评论
人们如何看待 MongoDB 和 Redis?

MongoDB 评论
根据 356 条客户评论(截至 2020 年 9 月),MongoDB 在流行的技术评论网站 G2 上的平均评分为 4.5/5 星。

一位评论员是一家大公司的工程师,他说:

“MongoDB 可以非常正确、轻松、实时地读取数据。在那里编写查询也非常容易。通过 API 链接到各种资源也非常容易。该应用程序还提供了出色的结果。”

大多数 MongoDB 评论都是正面的。然而,对数据库的批评包括:

用户需要了解一门编程语言。
很难识别不同的特征。
文档的大小限制。
一位担任数据分析师的审稿人指出:

“我很伤心,因为它需要大量内存,因为它存储每个值对的键名,因此意味着存在高数据冗余。它还限制了嵌套,也不支持连接。”

Redis 评论
根据 82 条用户评论(截至 2020 年 9 月),Redis 在 G2 上的平均评分也为 4.55。

一位评论员是一家大公司的数据架构师,他说:

“Redis 数据库非常易于安装和使用。配置也适合单个文件。性能非常好,此工具可让您轻松扩展应用程序。”

这位审稿人将 Redis 用于各种目的,例如“缓存文件、存储临时数据、排队、发布/子系统”等等。

对 Redis 的批评包括:

缺乏用户支持文档。
脚本语言。
内存限制。


作者 east
Java 4月 7,2022

地图不同坐标系的转换工具类

提供以下的坐标系互相转换:

GCJ-02(火星坐标系)转换为WGS84坐标系

BD-09(百度坐标系)转换为WGS84坐标系

BD-09(百度坐标系)转换为GCJ-02(火星坐标)

sogou转换为WGS84


/**
 * @author jy 地图转换测试/
 */
public final class MapUtil {

    /**
     * 辅助类禁止共有构造函数
     */
    private MapUtil() {

    }

    /**
     * 常量:pi
     */
    public static final double CONSTANT_PI = 3.1415926535897932384626;

    /**
     * 常量:a
     */
    public static final double CONSTANT_A = 6378245.0;

    /**
     * 常量:e
     */
    public static final double CONSTANT_E = 0.00669342162296594323;

    /**
     * 常量:ccc
     */
    private static final double[] CCC = new double[]{12890594.86, 8362377.87, 5591021, 3481989.83, 1678043.12, 0};

    /**
     * 常量:ddd
     */
    private static final double[][] DDD = {
            new double[]{1.410526172116255e-8, 0.00000898305509648872, -1.9939833816331, 200.9824383106796,
                    -187.2403703815547, 91.6087516669843, -23.38765649603339, 2.57121317296198, -0.03801003308653,
                    17337981.2},
            new double[]{-7.435856389565537e-9, 0.000008983055097726239, -0.78625201886289, 96.32687599759846,
                    -1.85204757529826, -59.36935905485877, 47.40033549296737, -16.50741931063887, 2.28786674699375,
                    10260144.86},
            new double[]{-3.030883460898826e-8, 0.00000898305509983578, 0.30071316287616, 59.74293618442277,
                    7.357984074871, -25.38371002664745, 13.45380521110908, -3.29883767235584, 0.32710905363475,
                    6856817.37},
            new double[]{-1.981981304930552e-8, 0.000008983055099779535, 0.03278182852591, 40.31678527705744,
                    0.65659298677277, -4.44255534477492, 0.85341911805263, 0.12923347998204, -0.04625736007561,
                    4482777.06},
            new double[]{3.09191371068437e-9, 0.000008983055096812155, 0.00006995724062, 23.10934304144901,
                    -0.00023663490511, -0.6321817810242, -0.00663494467273, 0.03430082397953, -0.00466043876332,
                    2555164.4},
            new double[]{2.890871144776878e-9, 0.000008983055095805407, -3.068298e-8, 7.47137025468032,
                    -0.00000353937994, -0.02145144861037, -0.00001234426596, 0.00010322952773, -0.00000323890364,
                    826088.5}};

    /**
     * GCJ-02(火星坐标系)转换为WGS84坐标系<br>
     * GCJ-02:谷歌中国地图、搜搜中国地图、高德地图<br>
     *
     * @param lat 纬度
     * @param lon 经度
     * @return
     */
    public static Coord gcj02ToWgs84(double lat, double lon) {

        Coord coord = transform(lat, lon);

        double finalLon = lon * 2 - coord.getLon();
        double finalLat = lat * 2 - coord.getLat();

        return new Coord(finalLat, finalLon);

    }

    /**
     * BD-09(百度坐标系)转换为WGS84坐标系<br>
     *
     * @param lat 纬度
     * @param lon 经度
     * @return
     */
    public static Coord bd09ToWgs84(double lat, double lon) {

        Coord gcj02 = MapUtil.bd09ToGcj02(lat, lon);
        Coord wgs84 = MapUtil.gcj02ToWgs84(gcj02.getLat(), gcj02.getLon());

        return wgs84;
    }

    /**
     * BD-09(百度坐标系)转换为GCJ-02(火星坐标)<br>
     *
     * @param lat 纬度
     * @param lon 经度
     * @return
     */
    public static Coord bd09ToGcj02(double lat, double lon) {

        double x = lon - 0.0065;
        double y = lat - 0.006;
        double z = Math.sqrt(x * x + y * y) - 0.00002 * Math.sin(y * CONSTANT_PI);

        double theta = Math.atan2(y, x) - 0.000003 * Math.cos(x * CONSTANT_PI);

        double finalLon = z * Math.cos(theta);
        double finalLat = z * Math.sin(theta);

        return new Coord(finalLat, finalLon);

    }

    /**
     * sogou转换为WGS84
     *
     * @param lat 纬度
     * @param lon 经度
     * @return
     */
    public static Coord sogouToWgs84(double lat, double lon) {

        Coord coord = new Coord(lat, lon);

        Coord tmp = new Coord();
        tmp.setLon(Math.abs(coord.getLon()));
        tmp.setLat(Math.abs(coord.getLat()));

        double[] obj = new double[9];
        for (int i = 0; i < CCC.length; i++) {
            if (tmp.getLat() > CCC[i]) {
                obj = DDD[i];
                break;
            }
        }

        return MapUtil.convert(coord, obj);
    }

    /**
     * sogou内部转换函数
     *
     * @param coord
     * @param varAttr
     * @return
     */
    private static Coord convert(Coord coord, double[] varAttr) {
        Coord tmp = new Coord();
        tmp.setLon(varAttr[0] + varAttr[1] * Math.abs(coord.getLon()));
        double f = Math.abs(coord.getLat()) / varAttr[9];
        double tmpLat = varAttr[2] + varAttr[3] * f + varAttr[4] * f * f + varAttr[5] * f * f * f
                + varAttr[6] * f * f * f * f + varAttr[7] * f * f * f * f * f + varAttr[8] * f * f * f * f * f * f;
        tmp.setLat(tmpLat);
        tmp.setLon(tmp.getLon() * (coord.getLon() < 0 ? -1 : 1));
        tmp.setLat(tmp.getLat() * (coord.getLat() < 0 ? -1 : 1));

        return tmp;
    }

    /**
     * 坐标点转换函数
     *
     * @param lat 纬度
     * @param lon 经度
     * @return
     */
    private static Coord transform(double lat, double lon) {

        if (outOfChina(lat, lon)) {
            return new Coord(lat, lon);
        }

        double dLat = transformLat(lat - 35.0, lon - 105.0);
        double dLon = transformLon(lat - 35.0, lon - 105.0);

        double radLat = lat / 180.0 * CONSTANT_PI;

        double magic = Math.sin(radLat);
        magic = 1 - CONSTANT_E * magic * magic;
        double sqrtMagic = Math.sqrt(magic);
        dLat = (dLat * 180.0) / ((CONSTANT_A * (1 - CONSTANT_E)) / (magic * sqrtMagic) * CONSTANT_PI);
        dLon = (dLon * 180.0) / (CONSTANT_A / sqrtMagic * Math.cos(radLat) * CONSTANT_PI);
        double mgLat = lat + dLat;
        double mgLon = lon + dLon;

        return new Coord(mgLat, mgLon);

    }

    /**
     * 纬度转换函数
     *
     * @param lat 维度
     * @param lon 经度
     * @return
     */
    private static double transformLat(double lat, double lon) {

        double ret = -100.0 + 2.0 * lon + 3.0 * lat + 0.2 * lat * lat + 0.1 * lon * lat
                + 0.2 * Math.sqrt(Math.abs(lon));
        ret += (20.0 * Math.sin(6.0 * lon * CONSTANT_PI) + 20.0 * Math.sin(2.0 * lon * CONSTANT_PI)) * 2.0 / 3.0;
        ret += (20.0 * Math.sin(lat * CONSTANT_PI) + 40.0 * Math.sin(lat / 3.0 * CONSTANT_PI)) * 2.0 / 3.0;
        ret += (160.0 * Math.sin(lat / 12.0 * CONSTANT_PI) + 320 * Math.sin(lat * CONSTANT_PI / 30.0)) * 2.0 / 3.0;

        return ret;
    }

    /**
     * 经度转换函数
     *
     * @param lat 纬度
     * @param lon 经度
     * @return
     */
    private static double transformLon(double lat, double lon) {

        double ret = 300.0 + lon + 2.0 * lat + 0.1 * lon * lon + 0.1 * lon * lat + 0.1 * Math.sqrt(Math.abs(lon));

        ret += (20.0 * Math.sin(6.0 * lon * CONSTANT_PI) + 20.0 * Math.sin(2.0 * lon * CONSTANT_PI)) * 2.0 / 3.0;
        ret += (20.0 * Math.sin(lon * CONSTANT_PI) + 40.0 * Math.sin(lon / 3.0 * CONSTANT_PI)) * 2.0 / 3.0;
        ret += (150.0 * Math.sin(lon / 12.0 * CONSTANT_PI) + 300.0 * Math.sin(lon / 30.0 * CONSTANT_PI)) * 2.0 / 3.0;

        return ret;
    }

    /**
     * 判断坐标点是否在中国范围内
     *
     * @param lat
     * @param lon
     * @return
     */
    private static boolean outOfChina(double lat, double lon) {

        if (lon < 72.004 || lon > 137.8347) {
            return true;
        }

        if (lat < 0.8293 || lat > 55.8271) {
            return true;
        }

        return false;
    }

}
public class Coord implements Serializable {

    /**
     *
     */
    private static final long serialVersionUID = -229841606158709053L;
    /**
     * 纬度
     */
    private double lat;
    /**
     * 经度
     */
    private double lon;

    /**
     * 默认构造参数
     */
    public Coord() {

    }

    /**
     * 重载构造方法
     *
     * @param lat 纬度
     * @param lon 经度
     */
    public Coord(double lat, double lon) {
        this.lat = lat;
        this.lon = lon;
    }

    public double getLat() {
        return lat;
    }

    public void setLat(double lat) {
        this.lat = lat;
    }

    public double getLon() {
        return lon;
    }

    public void setLon(double lon) {
        this.lon = lon;
    }

}
作者 east
数据库 4月 6,2022

MongoDB vs PostgreSQL:数据库结构的详细比较

任何公司功能中最重要的部分之一是安全数据库。 随着网络钓鱼攻击、恶意软件和其他威胁的增加,您必须做出正确的选择,以确保您的数据安全并有效处理数据。 然而,在当今市场上种类繁多的数据库解决方案中进行选择可能非常困难。 两个常用的选项是 Mongodb 和 Postgresql。

关于 MongoDB 与 PostgreSQL,您需要了解什么? 本文将带您比较每个的主要特性、功能和性能。

什么是 MongoDB?
MongoDB 是一个无模式文档数据库,提供免费和付费计划。作为文档数据库,MongoDB 的结构和语法与传统的 RDMS(关系数据库管理系统)不同。它是一个 NoSQL(不仅仅是 SQL)数据库。

NoSQL 数据库通常本质上更简单,因此对于具有任何编程经验的人来说,MongoDB 相对容易学习。文档数据库将数据处理为 JSON 类型的文档。这些数据是半结构化的,而不是完全结构化的。这意味着它可以比许多其他解决方案更快地处理大量数据。这也意味着用户可以在处理的所有不同阶段以及各种格式和结构中搜索和处理数据,从而使数据库比传统的关系数据库具有更高的灵活性。用户可以根据需要访问数据并对模式进行更改或更新,这与 SQL 数据库模型不同,在 SQL 数据库模型中,用户只能在数据经过处理和正确格式化后才能访问数据。

MongoDB 使用 BSON(二进制 JSON)。 BSON 允许某些不与常规 JSON 一起使用的数据类型,例如 long、floating-point 和 date。 MongoDB 还提供了一种替代 SQL 的查询语言,称为 MQL。 MQL 带来了许多与 SQL 相同的功能以及对各种编程语言的额外支持。该数据库具有分布式架构,这意味着组件在多个平台上相互协作。这也意味着 MongoDB 具有几乎无限的可扩展性,因为它可以根据需要跨多个平台进行扩展。这是将 MongoDB 与关系数据库区分开来的众多因素之一,因为关系数据库只能垂直扩展并且价格昂贵,因为在某些时候它们需要多个服务器才能扩展数据库的多个副本。 MongoDB 可以水平扩展。

MongoDB 是用 C、C++ 和 JS 编写的。

MongoDB 通过 TLS 和 SSL(传输层安全和安全套接字层)提供客户端、字段级加密。 TLS 和 SSL 都是使 HTTP(超文本传输​​协议)变成 HTTPS(安全超文本传输​​协议)的互联网加密协议。事实上,TLS 只是一种升级的 SSL,旨在减少安全漏洞。用户因此可以加密所有 MongoDB 网络流量。此外,MongoDB 有各种保护措施来确保正确验证用户身份。

MongoDB 提供社区支持,并在支持工程师的监督下提供有偿的全面培训和升级。

MongoDB 的任务完成速度非常快,特别是由于数据只是半结构化的。根据各种评论,即使在定期处理大量数据时,它也是市场上速度更快的解决方案之一。这使其非常适合需要实时或接近实时数据的情况,从而使公司能够立即全面了解其业务。

MongoDB 可以托管在 Google Cloud Platform、Amazon Web Services (AWS) 和 Microsoft Azure 等云平台上。

MongoDB 的常见用例包括客户分析、内容管理、业务交易和产品数据。该数据库还非常适合需要扩展到数百万用户的移动解决方案,这要归功于它的扩展能力。另一个主要用例是提供数据即服务的平台。 MongoDB 可以实时更新数据,允许用户在新信息进入时查看它。最后,MongoDB 提供的 IDP(智能数据平台)将数据库与其他互补技术相结合,成为一个完整的 IoT(物联网)支持物联网应用的平台。

什么是 PostgreSQL?
PostgreSQL 是一个 100% 免费和开源的 ORD(对象关系数据库)。数据库不是像文档那样存储数据,而是将其存储为结构化对象。它遵循 SQL 数据库的传统语法和模式。 Schema 实际上是一个模板或结构,您可以使用一组词汇将其应用于数据库。模式包含各种模式对象,包括任何表、列、键等。您必须在将数据加载到此类数据库之前对其进行结构化。虽然这往​​往需要更多时间,但它也可以将数据转换为更易于管理和可读的格式。

PostgreSQL 具有单体架构,这意味着组件是完全统一的。这也意味着数据库只能与运行它的机器一样扩展。它是用 C 语言编写的。代码是开源的,可供开发人员访问。 PostgreSQL 提供社区支持,并且仅通过某些其他公司提供额外的付费支持选项。

Postgres 包括基本文件保护、通过 IP 地址限制客户端连接的能力,以及与旨在使其更加安全的各种其他外部包的兼容性。

PostgreSQL 的用例包括银行系统、风险评估、多应用程序数据存储库、BI(商业智能)、制造和支持各种业务应用程序。它是事务性工作流程的理想选择。此外,PostgreSQL 具有故障保险和冗余,使其存储特别可靠。这意味着它非常适合医疗保健和制造业等重要行业。

两个数据库都使用不同的语法和术语来执行许多相同的任务。 PostgreSQL 使用表的地方,MongoDB 使用集合。 PostgreSQL 使用行来记录数据,MongoDB 使用文档等。它们还具有许多将它们彼此区分开来的特性。

MongoDB 与 PostgreSQL 的主要特性
MongoDB 具有 ACID 合规性的潜力,而 Postgres 具有内置的 ACID 合规性。 ACID(原子性、一致性、隔离性、持久性)是致力于数据有效性的原则或组件,尤其是在用于事务工作流的数据库中。


MongoDB 使用集合的目的与 Postgres 使用表的目的相同。这些集合包括用于设置验证规则和设置最大大小的选项。 Postgres 用一种非常特定的语言描述表,并以数据库或 ETL 工具可以处理它的方式构造数据。


两者之间术语和语法差异的另一个示例是 MongoDB 使用文档来获取数据,而 Postgres 使用行来实现相同目的。


虽然 MongoDB 不支持 FOREIGN KEY 约束,但 PostgreSQL 支持。外键可以是一个列或一组列,您可以使用它们同时在来自多个表的数据中创建链接。由于这些约束不允许破坏从一个表到另一个表的链接的任何操作,并且可以阻止将无效数据插入外键列,因此这可能是某些用户的必要功能。


MongoDB 聚合管道由多个阶段组成,用于转换数据。 Postgres 使用 GROUP_BY 运行查询,而 MongoDB 使用聚合管道。


MongoDB 使用冗余副本集,Postgres 执行同步或 2-safe 复制来维护数据集。您可以根据需要使用副本集来记录和“重播”过程。同步复制涉及同时更新多个存储库或系统。由于 PostgreSQL 数据库同时更新两条记录,以同样的方式,您可以减少错误,从而使用户拥有完整准确的备份。


像 Postgres 这样的 SQL 数据库使用连接将来自多个表的数据组合到一个表中。您可以使用四种不同类型的连接:全连接、左连接、右连接和内连接。假设您有两个表要连接一些数据,但不是全部,您将使用 left、right 或 inner 将两个表合并到您的第一个表或第二个表中,或者将两个表的一部分合并到第三张桌子。如果要将所有数据同时连接到一个表中,则可以使用完全连接。 MongoDB 使用索引,它只是连接的一个组成部分。该数据库不是为执行常规连接而设计的。索引是一种数据结构,可以以易于阅读的形式存储非常少量的数据。它们通过使数据更简单从而更易于扫描来帮助您更高效地解决查询。

MongoDB PostgreSQL
Schema-free  SQL-based but supports various NoSQL features
Document database Relational database
Uses BSON Uses SQL
Distributed architecture Monolithic architecture
Potential for ACID compliance ACID-compliant
Uses collections Uses tables
Uses documents to obtain data Uses rows to obtain data
Does not support foreign key constraints Supports foreign key constraints
Uses the aggregation pipeline for running queries Uses GROUP_BY
Redundant replica sets 2-safe replication
Uses indexes Uses joins

综上所述,MongoDB 和 PostgreSQL 的主要区别在于它们的系统、架构和语法:MongoDB 是文档数据库,而 Postgres 是关系数据库管理系统; MongoDB 是分布式架构,而 PostgreSQL 是单体架构; Postgres 使用 SQL,而 MongoDB 使用 BSON。

对于已经初步了解 JavaScript 的人来说,MongoDB 的学习曲线更短,而那些在 SQL 数据库方面有长期经验的人可能会发现更容易适应 Postgres。两者都作为各种行业的综合数据库解决方案越来越受欢迎。然而,公司在处理来自任一数据库的数据时遇到的最大问题之一是所涉及的时间和复杂性。

ETL(提取、传输和加载)数据到 MongoDB 或 PostgreSQL 数据库中通常涉及大量编码和复杂、耗时的过程。此外,由于 MongoDB 具有不寻常的语法和 NoSQL 支持,许多 ETL 提供商可能没有优化他们的解决方案来应对其特定挑战。

作者 east

1 2 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (488)
    • CDH (6)
    • datax (4)
    • doris (30)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.