gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档大数据开发

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 分类归档: "大数据开发"
  • ( 页面28 )
Java 3月 16,2021

空间地理算法工具类

空间地理,常常需要计算2个地址是否是同个地址,2个坐标之间的直线距离。下面把这些常用算法进行封装。 2个地址是否是同个地址 的相似度算法,采用 余弦相似度用向量空间中两个向量夹角的余弦值作为衡量两个个体间差异的大小。相比距离度量,余弦相似度更加注重两个向量在方向上的差异,而非距离或长度上。 

public final class ScoreUtil {
    private static Logger log = LoggerFactory.getLogger(ScoreUtil.class);

    /** 乡镇附属词 */
    private static final String WORD_TYPE_4 = "乡,镇,街道办事处,街道办,街道";

    /** 社区居委附属词 */
    private static final String WORD_TYPE_5 = "社区,社区居民委员会,村民委员会,社区居村委,社区居委,居村委,居委会,村委会,委员会,居委,村委,委会,村";

    /** 路附属词 */
    private static final String WORD_TYPE_6 = "道路,路,大街,街,巷";

    /** 自定义地址唯一组合 */
    private static final String[] UNION_SIMILAR = { "1&2&4&6", "1&3&6", "1&3" };

    /**
     * 
     */
    private ScoreUtil() {
    }

    /**
     * 计算两个地址的相似度
     * 
     * @param sourceAddr 源地址
     * @param standAddr 标准地址
     * @return
     * @throws Exception
     */
    public static long score(String sourceAddr, String standAddr) throws Exception {
        Map<Integer, String> sourceWords = WordSolrUtil.splitAddressWordNames(sourceAddr);
        Map<Integer, String> standWords = WordSolrUtil.splitAddressWordNames(standAddr);
        if (standWords.size() == 0 || sourceWords.size() == 0) {
            return 0L;
        }
        Integer sourceWordMaxType = Collections.max(sourceWords.keySet());
        Integer standWordMaxType = Collections.max(standWords.keySet());
        String sourceWordStr = null;
        String standWordStr = null;
        // 命中的词类型组合
        StringBuilder matchType = new StringBuilder();
        for (int i = (standWordMaxType > sourceWordMaxType ? standWordMaxType : sourceWordMaxType); i >= 1; i--) {
            sourceWordStr = sourceWords.get(i);
            standWordStr = standWords.get(i);
            if (i == 4) {
                if (StringUtils.isNotBlank(sourceWordStr)) {
                    sourceWords.put(i, sourceWordStr.replaceFirst("[" + WORD_TYPE_4 + "]$", "") + "乡镇");
                }
                if (StringUtils.isNotBlank(standWordStr)) {
                    standWords.put(i, standWordStr.replaceFirst("[" + WORD_TYPE_4 + "]$", "") + "乡镇");
                }
            } else if (i == 5) {
                if (StringUtils.isNotBlank(sourceWordStr)) {
                    sourceWords.put(i, sourceWordStr.replaceAll("[" + WORD_TYPE_5 + "]", "") + "居村委");
                }
                if (StringUtils.isNotBlank(standWordStr)) {
                    standWords.put(i, standWordStr.replaceAll("[" + WORD_TYPE_5 + "]", "") + "居村委");
                }
            } else if (i == 6) {
                if (StringUtils.isNotBlank(sourceWordStr)) {
                    sourceWords.put(i, sourceWordStr.replaceFirst("[" + WORD_TYPE_6 + "]$", "") + "大道");
                }
                if (StringUtils.isNotBlank(standWordStr)) {
                    standWords.put(i, standWordStr.replaceFirst("[" + WORD_TYPE_6 + "]$", "") + "大道");
                }
            }

            // 字符串相似度大于0.9,则标记为命中
            if (strSimilarMatch(sourceWords.get(i), standWords.get(i)) >= 0.9) {
                matchType.append(i).append("&");
            }
        }

        // 用于比较的地址字符串
        StringBuilder sourceAddrCompareStr = new StringBuilder();
        StringBuilder standAddrCompareStr = new StringBuilder();
        filterAddrToEquivalent(sourceWords, standWords, sourceWordMaxType, standWordMaxType, matchType,
                sourceAddrCompareStr, standAddrCompareStr);
        log.debug("源地址过滤后:" + sourceAddrCompareStr.toString());
        log.debug("标地址过滤后:" + standAddrCompareStr.toString());

        return Math.round(linearSpaceVectorMacth(sourceAddrCompareStr.toString(), standAddrCompareStr.toString()));
    }

    /**
     * 根据配置信息及别名过滤两个地址为等价地址
     * 
     * @param sourceWords
     * @param standWords
     * @param sourceWordMaxType
     * @param standWordMaxType
     * @param matchType
     * @param sourceAddrCompareStr
     * @param standAddrCompareStr
     */
    private static void filterAddrToEquivalent(Map<Integer, String> sourceWords, Map<Integer, String> standWords,
            Integer sourceWordMaxType, Integer standWordMaxType, StringBuilder matchType,
            StringBuilder sourceAddrCompareStr, StringBuilder standAddrCompareStr) {
        // 命中的词
        String[] matchedType = matchType.toString().contains("&") ? matchType.toString().split("&", 0) : new String[0];
        Arrays.sort(matchedType, new Comparator<String>() {

            @Override
            public int compare(String o1, String o2) {
                if (StringUtils.isBlank(o1)) {
                    return 1;
                }
                if (StringUtils.isBlank(o2)) {
                    return -1;
                }
                if (o1.equals(o2)) {
                    return 0;
                }
                return Integer.parseInt(o1) > Integer.parseInt(o2) ? 1 : -1;
            }
        });

        // 根据配置,移除可忽略的词
        for (int i = 0; i < UNION_SIMILAR.length; i++) {
            boolean isMatchedWithConfig = true;
            if (UNION_SIMILAR[i].split("&").length >= matchedType.length) {
                // 判断命中的词组是否与配置指定的一致
                for (String type : matchedType) {
                    if (StringUtils.isNotBlank(type) && Arrays.binarySearch(UNION_SIMILAR[i].split("&"), type) < 0) {
                        isMatchedWithConfig = false;
                        break;
                    }
                }

            } else {
                for (String type : UNION_SIMILAR[i].split("&")) {
                    if (StringUtils.isNotBlank(type) && Arrays.binarySearch(matchedType, type) < 0) {
                        isMatchedWithConfig = false;
                        break;
                    }
                }
                if (isMatchedWithConfig) {
                    matchedType = UNION_SIMILAR[i].split("&");
                }
            }

            if (isMatchedWithConfig && matchedType.length > 0) {
                // 补上缺省的词
                for (int j = Integer.parseInt(matchedType[0]); j <= Integer
                        .parseInt(matchedType[matchedType.length - 1]); j++) {
                    if (Arrays.binarySearch(matchedType, String.valueOf(j)) < 0) {
                        if (sourceWords.keySet().contains(j)) {
                            standWords.put(j, sourceWords.get(j));
                        } else if (standWords.keySet().contains(j)) {
                            sourceWords.put(j, standWords.get(j));
                        }
                    }
                }
            }
        }

        // 组装地址词组为字符串信息
        for (int i = 1; i <= (standWordMaxType > sourceWordMaxType ? standWordMaxType : sourceWordMaxType); i++) {
            sourceAddrCompareStr.append(StringUtils.trimToEmpty(sourceWords.get(i)));
            standAddrCompareStr.append(StringUtils.trimToEmpty(standWords.get(i)));
        }
    }

    /**
     * 线性空间几何
     * 
     * @param source
     * @param target
     * @return
     */
    private static double linearSpaceVectorMacth(String source, String target) {
        Set<Character> set = new HashSet<Character>();
        for (char c : source.toCharArray()) {
            set.add(c);
        }
        for (char c : target.toCharArray()) {
            set.add(c);
        }
        Character[] targetA = set.toArray(new Character[] {});
        int[] sourceArg = parseAddrToSpaceVector(targetA, source);
        int[] targetArg = parseAddrToSpaceVector(targetA, target);
        return cos(sourceArg, targetArg) * 100;
    }

    /**
     * 计算空间向量夹角cos值
     * 
     * @param point1
     * @param point2
     * @return
     */
    private static double cos(int[] point1, int[] point2) {
        int count = 0;
        for (int i = 0; i < point1.length; i++) {
            count += point1[i] * point2[i];
        }

        double a1 = 0.0;
        for (int i = 0; i < point1.length; i++) {
            a1 += point1[i] * point1[i];
        }
        a1 = Math.sqrt(a1);

        double a2 = 0.0;
        for (int i = 0; i < point2.length; i++) {
            a2 += point2[i] * point2[i];
        }
        a2 = Math.sqrt(a2);

        return count / (a1 * a2);
    }

    /**
     * 解析地址为空间向量坐标
     * 
     * @param tag
     * @param str
     * @return
     */
    private static int[] parseAddrToSpaceVector(Character[] tag, String str) {
        int[] rs = new int[tag.length];
        int count = 0;
        int i = 0;
        for (char t : tag) {
            count = 0;
            for (char c : str.toCharArray()) {
                if (t == c) {
                    count++;
                }
            }
            rs[i] = count;
            i++;
        }
        return rs;
    }

    /**
     * 字符串相似度匹配
     * 
     * @param compare
     * @param to
     * @return
     */
    public static double strSimilarMatch(String compare, String to) {
        if (StringUtils.isBlank(compare) || StringUtils.isBlank(to)) {
            return 0;
        }
        // 字符串相似度比较
        int len1 = compare.length();
        int len2 = to.length();

        int[][] dif = new int[len1 + 1][len2 + 1];
        for (int a = 0; a <= len1; a++) {
            dif[a][0] = a;
        }
        for (int a = 0; a <= len2; a++) {
            dif[0][a] = a;
        }

        int temp;
        for (int i = 1; i <= len1; i++) {
            for (int j = 1; j <= len2; j++) {
                if (compare.charAt(i - 1) == to.charAt(j - 1)) {
                    temp = 0;
                } else {
                    temp = 1;
                }
                dif[i][j] = min(dif[i - 1][j - 1] + temp, dif[i][j - 1] + 1, dif[i - 1][j] + 1);
            }
        }
        return 1 - (double) dif[len1][len2] / Math.max(compare.length(), to.length());
    }

    /**
     * 查找集合最小值
     * 
     * @param is
     * @return
     */
    private static int min(int... is) {
        int min = Integer.MAX_VALUE;
        for (int i : is) {
            if (min > i) {
                min = i;
            }
        }
        return min;
    }
}
作者 east
Java 3月 14,2021

Java开发最全学习资料(持续更新)

学习视频:

零基础小白真正轻松学Java|2020入门Java高薪

JAVA设计模式

Java零基础全栈就业班

java高级大互联网架构师进阶

Java分布式锁实战教程(基于Spring Boot)

Redis高并发高可用集群 整合SpringBoot百万级秒杀实战

WebSocket整合Spring、SockJS、Stomp、Rabbitmq分布式消息推送

SpringBoot2.0前后端分离开发之用户身份认证实战 (后端实现)

Spring Cloud Alibaba特训营

全新版本分布式架构教程 SpringCloud+Docker基础入门到高级实战

Spring特训营(手写篇)java高级开发 java架构师进阶课程

JVM深入浅出特训营

MySQL数据库深度讲解(设计+SQL语句)视频课程

MySQL从入门到入魔,Java高级,java进阶

深入Mybatis原理与实战

全新录制Elasticsearch7.X搜索引擎项目实战Java架构视频教程

企业级搜索引擎 ElasticSearch 7 实战

ES训练营/基于ElasticStack快速打造三位一体实时监控分析平台

700多分钟干货实战,Java多线程高并发高性能实战全集

分布式医疗云平台项目实战

Docker 网络详解

Nginx 从入门到百万并发实战

作者 east
大数据开发 3月 14,2021

大数据开发最全学习资料(持续更新)

Hadoop从0到精通详解教程,2021年最新全套

大数据开发-合适新手入门+深入

大数据开发就业教程

大数据全栈就业班 


大数据基石Hadoop由浅入深,从原理到源码全面征服

基于淘宝从架构到需求,由浅入深全面了解大数据的数据仓库

大数据金融离线数据仓库项目实战与运营分析

大数据Scala编程语言完整视频教程

大数据生态圈/Hadoop/Spark/Flink/数据仓库/实时分析/推荐系统

Flink大数据项目实战【大讲台】

大数据Spark实战训练营/3天掌握Spark京东电商实战案例

基于淘宝从理论到实战由浅入深全面了解最火的实时框架Flink

医药数据分析通识课:SAS编程与CDISC标准

作者 east
Java 3月 11,2021

使用Springboot @Value配置时遇到几个不生效的问题

在开发项目时,把一些可能变化的东西,尽量搞成配置文件。这样以后有变化时,改一下配置就可以,不用开发人员重新编译。

使用Springboot的@Value, 常规配置方法是这样:

@Compent
public class TestA{

@Value("${MY_URL}")
private String myUrl;
}

在开发当中,如果按上面方式,遇到下面情况会不生效:

1、静态变量

不能像常规那样使用,要使用set方法,例如:

@Compent
public class TestA{
private static String myUrl;

@Value("${MY_URL}")
public void setMyUrl(String url){
myUrl = url;
}
}

2、构造函数

@Compent
public class TestA{
 public TestA(@Value("${MY_URL}") String myUrl){
 }
}
作者 east
Java 3月 11,2021

Properties获取配置的数组

JDK自带的Properties类没有Springboot用Value配置参数那么方便,尤其是数组, Properties 没有相应的方法。但可以用变通的方式。就是把数组配置成字符串,用特殊符号分隔。

InputStream in = RunTest.class.getClassLoader().getResourceAsStream("application.properties");
        try {
            properties.load(in);

            String redisListString = properties.getProperty("redisList");
String[] arr = redisListString.split(",");
}catch(Exception ex){
}

application.properties的配置如下:

redisList=192.68.1.2:22409,192.68.1.3:22532

作者 east
Hbase 3月 10,2021

Hbase统计海量数据行数

业务需要统计每天hbase的数据量,而且每天增量有上百万条。

网上找了些代码,很多推荐使用Coprocessor的方式,执行效率高。但在我的大数据环境 运行出错,报“No registered coprocessor service found for name AggregateService in region xxx”。后来发现是第一次运行时需要下面这些代码来修改配置

 String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
        if (! descriptor.hasCoprocessor(coprocessorClass)) {
            descriptor.addCoprocessor(coprocessorClass);
        }
        admin.modifyTable(name, descriptor);
        admin.enableTable(name);
public void rowCount(String tablename){
    try {
        //提前创建connection和conf
        Admin admin = connection.getAdmin();
        TableName name=TableName.valueOf(tablename);
        //先disable表,添加协处理器后再enable表
        admin.disableTable(name);
        HTableDescriptor descriptor = admin.getTableDescriptor(name);
        String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
        if (! descriptor.hasCoprocessor(coprocessorClass)) {
            descriptor.addCoprocessor(coprocessorClass);
        }
        admin.modifyTable(name, descriptor);
        admin.enableTable(name);

        //计时
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        Scan scan = new Scan();
        AggregationClient aggregationClient = new AggregationClient(conf);

        System.out.println("RowCount: " + aggregationClient.rowCount(name, new LongColumnInterpreter(), scan));
        stopWatch.stop();
        System.out.println("统计耗时:" +stopWatch.getTotalTimeMillis());
    } catch (Throwable e) {
        e.printStackTrace();
    }
}
作者 east
Spark 3月 3,2021

Spark Streaming调优实践

当我们将应用部署在集群上时,可能会碰到运行慢、占用过多资源、不稳定等问题,这时需要做一些优化才能达到最好的性能。有时候一个简单的优化可以起到化腐朽为神奇的作用,使得程序能够更加有效率,也更加节省资源。本章我们就来介绍一些能够提高应用性能的参数和配置。另外需要指出的是,优化本身是一个具体性很强的事情,不同的应用及落地场景会有不同的优化方式,并没有一个统一的优化标准。本章我们将一些常用的和在项目中踩过的“坑”总结一下,列举以下常见的优化方式。

数据序列化在分布式应用中,序列化(serialization)对性能的影响是显著的。如果使用一种对象序列化慢、占用字节多的序列化格式,就会严重降低计算效率。通常在Spark中,主要有如下3个方面涉及序列化:

● 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。

● 将自定义的类型作为RDD的泛型类型时,所有自定义类型对象都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。

● 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER), Spark会将RDD中的每个partition都序列化成一个大的字节数组。而Spark综合考量易用性和性能,提供了下面两种序列化库。

● Java序列化:默认情况下,Spark使用Java的对象输出流框架(ObjectOutputStream framework)来进行对象的序列化,并且可用在任意实现Java.io.Serializable接口的自定义类上。我们可以通过扩展Java.io.Externalizable来更加精细地控制序列化行为。Java序列化方式非常灵活,但是通常序列化速度非常慢而且对于很多类会产生非常巨大的序列化结果。

● Kryo序列化:Spark在2.0.0以上的版本可以使用Kryo库来非常快速地进行对象序列化,Kryo要比Java序列化更快、更紧凑(10倍),但是其不支持所有的Serializable类型,并且在使用自定义类之前必须先注册。

我们可以在初始化SparkConf时,调用conf.set(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”)来使用Kryo。一旦进行了这个配置,Kryo序列化不仅仅会用在Shuffling操作时worker节点间的数据传递,也会用在RDDs序列化到硬盘的过程。Spark官方解释没有将Kryo作为默认序列化方式的唯一原因是,Kryo必须用户自己注册(注意如果我们不注册自定义类,Kryo也是可以正常运行的,但是它必须存储每个对象的完整类名,这是非常浪费的),但是其推荐在网络频繁传输的应用中使用Kryo。另外值得注意的是,在Spark 2.0.0之后,Spark已经默认将Kryo序列化作为简单类型(基本类型、基本类型的数组及string类型)RDD进行Shuffling操作时传输数据的对象序列化方式。Spark已经自动包含注册了绝大部分Scala的核心类,如果需要向Kryo注册自己的类别,可以使用registerKryoClasses方法。使用Kryo的代码框架如下:

如果我们的对象非常大,可能需要增加Spark.kryoserializer.buffer的配置。同样在Spark Streaming中,通过优化序列化格式可以缩减数据序列化的开销,而在Streaming中还会涉及以下两类数据的序列化。

● 输入数据:在4.4.1节中曾讲过,Spark Streaming中不同于RDD默认是以非序列化的形式存于内存当中,Streaming中由接收器(Receiver)接收而来的数据,默认是以序列化重复形式(StorageLevel.MEMORY_AND_DISK_SER_2)存放于Executor的内存当中。而采用这种方式的目的,一方面是由于将输入数据序列化为字节流可以减少垃圾回收(GC)的开销,另一方面对数据的重复可以对Executor节点的失败有更好的容错性。同时需要注意的是,输入数据流一开始是保存在内存当中,当内存不足以存放流式计算依赖的输入数据时,会自动存放于硬盘当中。而在Streaming中这部分序列化是一个很大的开销,接收器必须先反序列化(deserialize)接收到的数据,然后再序列化(serialize)为Spark本身的序列化格式。

● 由Streaming操作产生RDD的持久化:由流式计算产生的RDDs有可能持久化在内存当中,例如由于基于窗口操作的数据会被反复使用,所以会持久化在内存当中。值得注意的是,不同于Spark核心默认使用非序列化的持久化方式(StorageLevel. MEMORY_ONLY),流式计算为了减少垃圾回收(GC)的开销,默认使用了序列化的持久化方式(StorageLevel.MEMORY_ONLY_SER)。

不管在Spark还是在Spark Streaming中,使用Kryo序列化方式,都可以减少CPU和内存的开销。而对于流式计算,如果数据量不是很大,并且不会造成过大的垃圾回收(GC)开销,我们可以考虑利用非序列化对象进行持久化。

例如,我们使用很小的批处理时间间隔,并且没有基于窗口的操作,可以通过显示设置相应的存储级别来关闭持久化数据时的序列化,这样可以减少序列化引起的CPU开销,但是潜在的增加了GC的开销。

2.广播大变量

我们可以看出,不论Spark还是Spark Streaming的应用,在集群节点间进行数据传输时,都会有序列化和反序列化的开销,而如果我们的应用有非常大的对象时,这部分开销是巨大的。比如应用中的任何子任务需要使用Driver节点的一个大型配置查询表,这时就可以考虑将该表通过共享变量的方式,广播到每一个子节点,从而大大减少在传输和序列化上的开销。另外,Spark在Master节点会打印每个任务的序列化对象大小,我们可以通过观察任务的大小,考虑是否需要广播某些大变量。通常一个任务的大小超过20KB,是值得去优化的。当我们将大型的配置查询表广播出去时,每个节点可以读取配置项进行任务计算,那么假设配置发生了动态改变时,如何通知各个子节点配置表更改了呢?(尤其是对于流式计算的任务,重启服务代价还是蛮大的。)

广播变量是只读的,也就是说广播出去的变量没法再修改,那么应该怎么解决这个问题呢?我们可以利用Spark中的unpersist()函数,Spark通常会按照LRU(leastRecently Used)即最近最久未使用原则对老数据进行删除,我们并不需要操作具体的数据,但如果是手动删除,可以使用unpersist()函数。

3.数据处理和接收时的并行度

作为分布式系统,增加接收和处理数据的并行度是提高整个系统性能的关键,也能够充分发挥集群机器资源。关于partition和parallelism。partition指的就是数据分片的数量,每一次Task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多Executor的计算能力无法充分利用;但是如果partition太大了则会导致分片太多,执行效率降低。在执行Action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及Shuffle,因此这个parallelism的参数没有影响)。由上述可得,partition和parallelism这两个概念密切相关,都是涉及数据分片,作用方式其实是统一的。通过Spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量,如reduceByKey和reduceByKeyAndWindow。

Spark Streaming接收Kafka数据的方式,这个过程有一个数据反序列化并存储到Spark的开销,如果数据接收成为了整个系统的瓶颈,那么可以考虑增加数据接收的并行度。每个输入DStream会创建一个单一的接收器(receiver在worker节点运行)用来接收一个单一的数据流。而对于接收多重数据的情况,可以创建多个输入DStream用来接收源数据流的不同分支(partitions)。如果我们利用Receiver的形式接收Kafka,一个单一的Kafka输入DStream接收了两个不同topic的数据流,我们为了提高并行度可以创建两个输入流,分别接收其中一个topic上的数据。这样就可以创建两个接收器来并行地接收数据,从而提高整体的吞吐量。而之后对于多个DStreams,可以通过union操作并为一个DStream,之后便可以在这个统一的输入DStream上进行操作,代码示例如下:

如果采用Direct连接方式,前面讲过Spark中的partition和Kafka中的partition是一一对应的,但一般默认设置为Kafka中partition的数量,这样来达到足够并行度以接收Kafka数据。

4.设置合理的批处理间隔

对于一个Spark Streaming应用,只有系统处理数据的速度能够赶上数据接收的速度,整个系统才能保持稳定,否则就会造成数据积压。换句话说,即每个batch的数据一旦生成就需要被尽快处理完毕。这一点我们可以通过Spark监控界面进行查看,比较批处理时间必须小于批处理间隔。通过设置合理的批处理大小(batch size),使得每批数据能够在接收后被尽快地处理完成(即数据处理的速度赶上数据生成的速度)。如何选取合适的批处理时间呢?一个好的方法是:先保守地设置一个较大的批处理间隔(如5~10s),以及一个很低的数据速率,来观测系统是否能够赶上数据传输速率。我们可以通过查看每个处理好的batch的端到端延迟来观察,也可以看全局延迟来观察(可以在Spark log4j的日志里或者使用StreamingListener接口,也可以直接在UI界面查看)。如果延迟保持在一个相对稳定的状态,则整个系统是稳定的,否则延迟不断上升,那说明整个系统是不稳定的。在实际场景中,也可以直接观察系统正在运行的Spark监控界面来判断系统的稳定性。

5. 内存优化内存

优化是在所有应用落地中必须经历的话题,虽然Spark在内存方面已经为开发者做了很多优化和默认设置,但是我们还是需要针对具体的情况进行调试。在优化内存的过程中需要从3个方面考虑这个问题:对象本身需要的内存;访问这些对象的内存开销;垃圾回收(GC garbagecollection)导致的开销。通常来说,对于Java对象而言,有很快的访问速度,但是很容易消耗原始数据2~5倍以上的内存空间,可以归结为以下几点原因:

● 每个独立的Java对象,都会有一个“对象头”,大约16个字节用来保存一些基本信息,如指向类的指针,对于一个只包含很少数据量在内的对象(如一个Int类型数据),这个开销是相对巨大的。

● Java的String对象会在原始数据的基础上额外开销40个字节,因为除了字符数组(Chars array)本身之外,还需要保存如字符串长度等额外信息,而且由于String内部存储字符时是按照UTF-16格式编码的,所以一个10字符的字符串开销很容易超过60个字符。

● 对于集合类(collection classes),如HashMap、LinkedList,通常使用链表的形式将数据结构链在一起,那么对于每一个节点(entry,如Map.Entry)都会有一个包装器(wrapper),而这个包装器对象不仅包含对象头,还会保存指向下一个节点的指针(每个8字节)。

● 熟悉Java的开发者应该知道,Java数据类型分为基本类型和包装类型,对于int、long等基本类型是直接在栈中分配空间,如果我们想将这些类型用在集合类中(如Map),需要使用对基本数据类型打包(当然这是Java的一个自动过程),而打包后的基本数据类型就会产生额外的开销。针对以上内存优化的基本问题,接下来首先介绍Spark中如何管理内存,之后介绍一些能够在具体应用中更加有效地使用内存的具体策略,例如,如何确定合适的内存级别,如何改变数据结构或将数据存储为序列化格式来节省内存等,也会从Spark的缓存及Java的垃圾回收方面进行分析,另外,也会对SparkStreaming进行分析。

5.1 内存管理

Spark对于内存的使用主要有两类用途:执行(execution)和存储(storage)。执行类内存主要被用于Shuffle类操作、join操作及排序(sort)和聚合(aggregation)类操作,而存储类内存主要用于缓存数据(caching)和集群间内部数据的传送。在Spark内部执行和存储分享同一片内存空间(M),当没有执行类内存被使用时,存储类内存可以使用全部的内存空间,反之亦然。执行类内存可以剥夺存储类内存的空间,但是有一个前提是,存储类内存所占空间不得低于某一个阈值R,也就是说R指定了M中的一块子空间块是永远不会被剥夺的。而另一方面由于实现上的复杂性,存储类内存是不可以剥夺执行类内存的。Spark的这种设计方式确保了系统一些很好的特性:首先,如果应用不需要缓存数据,那么所有的空间都可以用作执行类内存,可以一定程度上避免不必要的内存不够用时溢出到硬盘的情况;其次,如果应用需要使用缓存数据,会有最小的内存空间R能够保证这部分数据块免于被剥夺;最后,这种方式对于使用者而言是完全黑盒的,使用者不需要了解内部如何根据不同的任务负载来进行内存划分。Spark提供了两个相关的配置,但是大多数情况下直接使用默认值就能满足大部分负载情况:

● Spark Memory.Fraction表示M的大小占整个JVM(Java Virtue Machine)堆空间的比例(默认是0.6),剩余的空间(40%)被用来保存用户的数据结构及Spark内部的元数据(metadata),另一方面预防某些异常数据记录造成的OOM(Out of Memory)错误。

● Spark.Memory.StorageFraction表示R的大小占整个M的比例(默认是0.5), R是存储类内存在M中占用的空间,其中缓存的数据块不会被执行类内存剥夺。

5.2 优化策略

当我们需要初步判断内存的占用情况时,可以创建一个RDD,然后将其缓存(cache)起来,然后观察网页监控页面的存储页部分,就可以看出RDD占用了多少内存。而对于特殊的对象,我们可以调用SizeEstimator的estimate()方法来评估内存消耗,这对于实验不同数据层的内存消耗,以及判断广播变量在每个Executor堆上所占用的内存是非常有效的。当我们了解了内存的消耗情况后,发现占用内存过大,可以着手做一些优化,一方面可以在数据结构方面进行优化。首先需要注意的是,我们要避免本章开头提到的Java本身数据结构的头部开销,比如基于指针的数据结构或者包装器类型,有以下方式可以进行优化:

● 在设计数据结构时,优先使用基本数据类型及对象数组等,避免使用Java或者Scala标准库当中的集合类(如HashMap),在fastutil库中,为基本数据类型提供了方便的集合类接口,这些接口也兼容Java标准库。

● 尽可能避免在数据结构中嵌套大量的小对象和指针。

● 考虑使用数值类ID或者枚举对象来代替字符串类型作为主键(Key)。

● 如果我们的运行时内存小于32GB,可以加上JVM配置-XX:+UseCompressedOops将指针的占用空间由8个字节压缩到4个字节,我们也可以在Spark-env.sh中进行配置。

假设我们通过以上策略还是发现对象占用了过大的内存,可以用一个非常简单的方式来降低内存使用,就是将对象以序列化的形式(serialized form)存储,在RDD的持久化接口中使用序列化的存储级别,如MEMORY_ONLY_SER, Spark便会将每个RDD分区存储为一个很大的字节数组。而这种方式会使得访问数据的速度有所下降,因为每个对象访问时都需要有一个反序列化的过程。在7.1节中我们已经介绍过,优先使用Kryo序列化方式,其占用大小远低于Java本身的序列化方式。

5.3 垃圾回收(GC)优化

如果我们在应用中进行了频繁的RDD变动,那么JVM的垃圾回收会成为一个问题(也就是说,假设在程序中只创建了一个RDD,后续所有操作都围绕这个RDD,那么垃圾回收就不存在问题)。当Java需要通过删除旧对象来为新对象开辟空间时,它便会扫描我们曾创建的所有对象并找到不再使用的对象。所以垃圾回收的开销是和Java对象的个数成比例的,我们要尽可能地使用包含较少对象的数据结构(如使用Int数组代替LinkedList)来降低这部分开销。另外前面提到的用序列化形式存储也是一个很好的方法,序列化后每个对象在每个RDD分区下仅有一个对象(一个字节数组)。注意当GC开销成为瓶颈时,首先要尝试的便是序列化缓存(serialized caching)。在做GC优化时,我们首先需要了解GC发生的频率以及其所消耗的时间。这可以通过在Java选项中加入-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps来实现;之后当Spark任务运行后,便可以在Worker日志中看到GC发生时打印的信息。注意这些日志是打印在集群中的Worker节点上的(在工作目录的stdout文件中),而非Driver程序。为了进一步优化GC,首先简单介绍下Java虚拟机内部是如何进行内存管理的。

(1)Java对象是存储在堆空间内的,堆空间被分为两部分,即年轻区域(Young region)和老年区域(Old region),其中年轻代(Young generation)会用来存储短生命周期的对象,而老年代(Old generation)会用来存储较长生命周期的对象。

(2)年轻代的区域又被分为3个部分[Eden, Survivor1,Survivor2]。

(3)一个简单的GC流程大致是:当Eden区域满了,一次小型GC过程会将Eden和Survivor1中还存活的对象复制到Survivor2区域上,Survivor区域是可交换的(即来回复制),当一个对象存活周期已足够长或者Survivor2区域已经满时,那么它们会被移动到老年代上,而当老年代的区域也满了时,就会触发一次完整的GC过程。Java的这种GC机制主要是基于程序中创建的大多数对象,都会在创建后被很快销毁,只有极少数对象会存活下来,所以其分为年轻代和老年代两部分,而这两部分GC的方式也是不同的,其时间复杂度也是不同的,年轻代会更加快一些,感兴趣的读者可以进一步查阅相关资料。基于以上原因,Spark在GC方面优化的主要目标是:只有长生命周期的RDD会被存储在老年代上,而年轻代上有足够的空间来存储短生命周期的对象,从而尽可能避免任务执行时创建的临时对象触发完整GC流程。我们可以通过以下步骤来一步步优化:

(1)通过GC统计信息观察是否存在过于频繁的GC操作,如果在任务完成前,完整的GC操作被调用了多次,那么说明可执行任务并没有获得足够的内存空间。

(2)如果触发了过多的小型GC,而完整的GC操作并没有调用很多次,那么给Eden区域多分配一些内存空间会有所帮助。我们可以根据每个任务所需内存大小来预估Eden的大小,如果Eden设置大小为E,可以利用配置项-Xmn=4/3*E来对年轻代的区域大小进行设置(其中4/3的比例是考虑到survivor区域所需空间)。(3)如果我们观察GC打印的统计信息,发现老年代接近存满,那么就需要改变spark.memory.fraction来减少存储类内存(用于caching)的占用,因为与其降低任务的执行速度,不如减少对象的缓存大小。另一个可选方案是减少年轻代的大小,即通过-Xmn来进行配置,也可以通过JVM的NewRatio参数进行调整,大多数JVM的该参数的默认值是2,意思是老年代占整个堆内存的2/3,这个比例需要大于Spark.Memory.Fraction。

(4)通过加入-XX:+UserG1GC来使用G1GC垃圾回收器,这可以一定程度提高GC的性能。另外注意对于executor堆内存非常大的情况,一定通过-XX:G1HeapRegionSize来增加G1区域的大小。

针对以上步骤我们举一个例子,如果我们的任务是从HDFS当中读取数据,任务需要的内存空间可以通过从HDFS当中读取的数据块大小来进行预估,一般解压后的数据块大小会是原数据块的2~3倍,所以如果我们希望3、4个任务同时运行在工作空间中,假设每个HDFS块大小是128MB,那么需要将Eden大小设置为4×3×128MB。改动之后,我们可以监控GC的频率和时间消耗,看看有没有达到优化的效果。对于优化GC,主要还是从降低全局GC的频率出发,executor中对于GC优化的配置可以通过spark.executor.extraJavaOptions来配置。

5.4 Spark Streaming内存优化

前面介绍了Spark中的优化策略和关于GC方面的调优,对于Spark Streaming的应用程序,这些策略也都是适用的,除此之外还会有一些其他方面的优化点。对于Spark Streaming应用所需要的集群内存,很大程度上取决于要使用哪种类型的transformation操作。比如,假设我们想使用10分钟数据的窗口操作,那么我们的集群必须有足够的空间能够保存10分钟的全部数据;亦或,我们在大量的键值上使用了updateStateByKey操作,那么所需要的内存空间会较大。而如果我们仅仅使用简单的Map、Filter、Store操作,那么所需空间会较小。默认情况下,接收器接收来的数据会以StorageLevel.MEMORY_AND_DISK_SER_2的格式存储,那么如果内存不足时,数据就会序列化到硬盘上,这样会损失SparkStreaming应用的性能。所以通常建议为Spark Streaming应用分配充足的内存,可以在小规模数据集上进行测试和判断。另一方面与Spark程序有显著区别的是,Spark Streaming程序对实时性要求会较高,所以我们需要尽可能降低JVM垃圾回收所导致的延迟。基于此,我们可以通过以下几个参数对内存使用和GC开销进行优化调整。

● DStream的持久化级别:输入数据默认是持久化为字节流的,因为相较于反序列化的开销,其更会降低内存的使用并且减少GC的开销。所以优先使用Kryo序列化方式,可以大大降低序列化后的尺寸和内存开销。另外,如果需要更进一步减少内存开销,可以通过配置spark.rdd.compress进行更进一步的压缩(当然对于目前的集群机器,大多数内存都足够了)。

● 及时清理老数据:默认情况下所有的输入数据和由DStream的Transormation操作产生的持久RDD会被自动清理,即Spark Streaming会决定何时对数据进行清理。例如,假设我们使用10分钟的窗口操作,Spark Streaming会保存之前10分钟的所有数据,并及时清理过时的老数据。数据保存的时间可以通过stremingContext.remember进行设置。

● CMS垃圾回收器:不同于之前我们在Spark中的建议,由于需要减少GC间的停顿,所以这里建议使用并发标记清除类的GC方式。即使并发GC会降低全局系统的生产吞吐量,但是使用这种GC可以使得每个Batch的处理时间更加一致(不会因为某个Batch处理时发生了GC,而导致处理时间剧增)。我们需要确保在Driver节点(在spark-submit中使用—driver-java-options)和Executor节点(在Spark配置中使用spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC)都设置了CMS GC方式。

● 其他减少GC开销的方式有:可以通过OFF_HEAP存储级别的RDD持久化方式,以及可以在Executor上使用更小的堆内存,从而降低每个JVM堆垃圾回收的压力。

作者 east
Kafka, Spark 3月 3,2021

SparkStreaming Direct方式读取kafka优缺点及示例(Redis保存offset)

在Spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有Receiver这一层,其会周期性地获取Kafka中每个topic(主题)的每个partition(分区)中的最新offsets(偏移量),之后根据设定的maxRatePerPartition来处理每个batch。其形式如下图所示。

这种方法相较于Receiver方式的优势在于:

● 简化的并行。Direct方式中,Kafka中的partition与Spark内部的partition是一一对应的,这点使得我们可以很容易地通过增加Kafka中的partition来提高数据整体传输的并行度,而不像Receiver方式中还需要创建多个Receiver然后利用union再合并成统一的Dstream。

● 高效。Direct方式中,我们可以自由地根据offset来从Kafka中拉取想要的数据(前提是Kafka保留了足够长时间的数据),这对错误恢复提供了很好的灵活性。然而在Receiver的方式中,还需要将数据存入Write Ahead Log中,存在数据冗余的问题。

● 一次性接收精确的数据记录Direct方式中我们直接使用了低阶Kafka的API接口,offset默认会利用Spark Steaming的checkpoints来存储,同样也可以将其存到数据库等其他地方。然而在Receiver的方式中,由于使用了Kafka的高阶API接口,其默认是从ZooKeeper中拉取offset记录(通常Kafka取数据都是这样的),但是Spark Streaming消费数据的情况和ZooKeeper记录的情况是不同步的,当程序发生中断或者错误时,可能会造成数据重复消费的情况。

不同于Receiver的方式,是从Zookeeper中读取offset值,那么自然Zookeeper就保存了当前消费的offset值,如果重新启动开始消费就会接着上一次offset值继续消费。而在Direct的方式中,是直接从Kafka来读数据,offset需要自己记录,可以利用checkpoint、数据库或文件记录或者回写到ZooKeeper中进行记录。这里我们给出利用Kafka底层API接口,将offset及时同步到ZooKeeper的通用类中。下面示范用redis保存offset

object Demo {


  val IP_RANG: Array[String] = "91,92,93,94,95".split(",")
  val PORT_RANG: Array[String] = "22420,22421,22422,22423,22424,22425,22426,22427".split(",")
  val hosts = new util.HashSet[HostAndPort]()

  val sdf:SimpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")

  def main(args: Array[String]) {

      val Array(checkPointDir, topic, brokers, groupId, cf, offset, dw_all_tn, dw_track_tn, dw_unique_tn, batchIntervel) = args

      login

      val client: JedisCluster = new JedisCluster(hosts, 5000)


      var topicPartitions: Map[TopicPartition, Long] = Map()

      if (client.exists(topic)) {
        val offsetMap: util.Map[String, String] = client.hgetAll(topic)
        val iterator: util.Iterator[String] = offsetMap.keySet().iterator()
        while (iterator.hasNext) {
          val key: String = iterator.next()
          val value: String = offsetMap.get(key)
          println(key + "------" + value)
          topicPartitions += (new TopicPartition(topic, key.toInt) -> value.toLong)
        }
      }
      client.close()

      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> brokers,
        "value.deserializer" -> classOf[StringDeserializer],
        "key.deserializer" -> classOf[StringDeserializer],
        "group.id" -> groupId,
        "security.protocol" -> "SASL_PLAINTEXT",
        "sasl.kerberos.service.name" -> "kafka",
        "auto.offset.reset" -> offset,
        "kerberos.domain.name" -> "hadoop.hadoop.com",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )


      def functionToCreateContext(): StreamingContext = {

        //      val topicArr = topic.split(",")
        //      val topicSet = topicArr.toSet


        val locationStrategy = LocationStrategies.PreferConsistent
        //      val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)

        val sparkConf: SparkConf = new SparkConf().setAppName("jingyi_xn_dw_all&track")

        val ssc = new StreamingContext(sparkConf, Seconds(batchIntervel.toInt))
        //      if (!"nocp".equals(checkPointDir)) {
        //        ssc.checkpoint(checkPointDir)
        //      }


        val config = HBaseConfiguration.create()
        val hbaseContext = new HBaseContext(ssc.sparkContext, config)

        val stream = KafkaUtils.createDirectStream[String, String](ssc,
          locationStrategy,
          //        consumerStrategy
          ConsumerStrategies.Assign[String, String](topicPartitions.keys.toList, kafkaParams, topicPartitions)
        )
    }
}

def setRedisHost: Unit ={
    for (host <- IP_RANG) {
      for (port <- PORT_RANG) {
        hosts.add(new HostAndPort("192.68.196." + host, port.toInt))
      }
    }
  }

       
作者 east
Hive 3月 2,2021

Hive开发建议

HQL编写之隐式类型转换

查询语句使用字段的值做过滤时,不建议通过Hive自身的隐式类型转换来编写HQL。因为隐式类型转换不利于代码的阅读和移植。

建议示例:

select * from default.tbl_src where id = 10001;
select * from default.tbl_src where name = 'TestName';

不建议示例:

select * from default.tbl_src where id = '10001';
select * from default.tbl_src where name = TestName;

说明:

表tbl_src的id字段为Int类型,name字段为String类型。

HQL编写之对象名称长度

HQL的对象名称,包括表名、字段名、视图名、索引名等,其长度建议不要超过30个字节。

Oracle中任何对象名称长度不允许超过30个字节,超过时会报错。PT为了兼容Oracle,对对象的名称进行了限制,不允许超过30个字节。

太长不利于阅读、维护、移植。

HQL编写之记录个数统计

统计某个表所有的记录个数,建议使用“select count(1) from table_name”。

统计某个表某个字段有效的记录个数,建议使用“select count(column_name) from table_name”。

JDBC超时限制

Hive提供的JDBC实现有超时限制,默认是5分钟,用户可以通过java.sql.DriverManager.setLoginTimeout(int seconds)设置,seconds的单位为秒。

UDF管理

建议由管理员创建永久UDF,避免每次使用时都去add jar,和重新定义UDF。

Hive的UDF会有一些默认属性,比如deterministic 默认为true(同一个输入会返回同一个结果),stateful(是否有状态,默认为true)。当用户实现的自定义UDF内部实现了汇总等,需要在类上加上相应的注解,比如如下类

@UDFType(deterministic = false)
Public class MyGenericUDAFEvaluator implements Closeable {

表分区优化建议

  1. 当数据量较大,且经常需要按天统计时,建议使用分区表,按天存放数据。
  2. 为了避免在插入动态分区数据的过程中,产生过多的小文件,在执行插入时,在分区字段上加上distribute by。

存储文件格式优化建议

Hive支持多种存储格式,比如TextFile,RCFile,ORC,Sequence,Parquet等。为了节省存储空间,或者大部分时间只查询其中的一部分字段时,可以在建表时使用列式存储(比如ORC文件)。

作者 east
Spark 3月 2,2021

Spark开发规范

规则

Spark应用中,需引入Spark的类

  • 对于Java开发语言,正确示例:
// 创建SparkContext时所需引入的类。
import org.apache.spark.api.java.JavaSparkContext
// RDD操作时引入的类。
import org.apache.spark.api.java.JavaRDD
// 创建SparkConf时引入的类。
import org.apache.spark.SparkConf
  • 对于Scala开发语言,正确示例:
// 创建SparkContext时所需引入的类。
import org.apache.spark.SparkContext
// RDD操作时引入的类。
import org.apache.spark.SparkContext._
// 创建SparkConf时引入的类。
import org.apache.spark.SparkConf

Java与Scala函数有区别,在编写应用时,需要弄清楚每个函数的功能

RDD是不可改变的,也就是说,RDD的元素对象是不能更改的,因此,在用Java和Scala编写需要弄清楚每个函数的功能。下面举个例子。

场景:现有用户位置数据,按照时间排序生成用户轨迹。在Scala中,按时间排序的代码如下:

/* 函数实现的功能是得到某个用户的位置轨迹。
 * 参数trajectory:由两部分组成-用户名和位置点(时间,经度,维度)
 */
private def getTimesOfOneUser(trajectory: (String, Seq[(String, Float, Float)]), zone: Zone, arrive: Boolean): Int =
{
// 先将用户位置点按时间排序
    val sorted: Seq[(String, Float, Float)] = trajectory._2.sortBy(x => x._1);
    …
}

若用java实现上述功能,则需要将trajectory._2重新生成对象,而不能直接对trajectory._2进行排序操作。原因是Collections.sort(trajectory._2)这个操作会改变了trajectory._2这个对象本身,这违背了RDD元素不可更改这条规则;而Scala代码之所以能够正常运行,是因为sortBy( )这个函数生成了一个新的对象,它并不对trajectory._2直接操作。下面分别列出java实现的正确示例和错误示例。

正确示例:

// 将用户的位置点从新生成一个对象。
List<Tuple3< String, Float, Float >> list = new ArrayList<Tuple3< String, Float, Float >>( trajectory._2);
// 对新对象进行排序。
Collections.sort(list);

错误示例:

// 直接对用户位置点按照时间排序。
Collections.sort(trajectory._2);

分布式模式下,应注意Driver和Executor之间的参数传递

在Spark编程时,总是有一些代码逻辑中需要根据输入参数来判断,这种时候往往会使用这种方式,将参数设置为全局变量,先给定一个空值(null),在main函数中,实例化SparkContext对象之前对这个变量赋值。然而,在分布式模式下,执行程序的jar包会被发送到每个Executor上执行。而该变量只在main函数的节点改变了,并未传给执行任务的函数中,因此Executor将会报空指针异常。

正确示例:

object Test
{
  private var testArg: String = null;
  def main(args: Array[String])
  {
    testArg = …;
    val sc: SparkContext = new SparkContext(…);

    sc.textFile(…)
    .map(x => testFun(x, testArg));
  }

  private def testFun(line: String, testArg: String): String =
  {
    testArg.split(…);
    return …; 
  }
}

错误示例:

//定义对象。
object Test
{
  // 定义全局变量,赋为空值(null);在main函数中,实例化SparkContext对象之前对这个变量赋值。
  private var testArg: String = null;
  // main函数
  def main(args: Array[String])
  {
    
    testArg = …;
    val sc: SparkContext = new SparkContext(…);

    sc.textFile(…)
      .map(x => testFun(x));
  }

  private def testFun(line: String): String =
  {
    testArg.split(...);
    return …; 
  }
}

运行错误示例,在Spark的local模式下能正常运行,而在分布式模式情况下,会在蓝色代码处报错,提示空指针异常,这是由于在分布式模式下,执行程序的jar包会被发送到每个Executor上执行,当执行到testFun函数时,需要从内存中取出testArg的值,但是testArg的值只在启动main函数的节点改变了,其他节点无法获取这些变化,因此它们从内存中取出的就是初始化这个变量时的值null,这就是空指针异常的原因。

应用程序结束之前必须调用SparkContext.stop

利用spark做二次开发时,当应用程序结束之前必须调用SparkContext.stop()。

说明:

利用Java语言开发时,应用程序结束之前必须调用JavaSparkContext.stop()。

利用Scala语言开发时,应用程序结束之前必须调用SparkContext.stop()。

以Scala语言开发应用程序为例,分别介绍下正确示例与错误示例。

正确示例:

//提交spark作业
val sc = new SparkContext(conf)

//具体的任务
...

//应用程序结束
sc.stop()

错误示例:

//提交spark作业
val sc = new SparkContext(conf)

//具体的任务
...

如果不添加SparkContext.stop,YARN界面会显示失败。同样的任务,前一个程序是没有添加SparkContext.stop,后一个程序添加了SparkContext.stop()。

作者 east
Flink 3月 1,2021

Flink Stream SQL Join程序

场景说明

假定某个Flink业务1每秒就会收到1条消息记录,消息记录某个用户的基本信息,包括名字、性别、年龄。另有一个Flink业务2会不定时收到1条消息记录,消息记录该用户的名字、职业信息。

基于某些业务要求,开发的Flink应用程序实现功能:实时的以根据业务2中消息记录的用户名字作为关键字,对两个业务数据进行联合查询。

数据规划

  • 业务1的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户),并从Kafka组件接收数据。Kafka配置参见样例数据规划章节。
  • 业务2的数据通过socket接收消息记录,可使用netcat命令用户输入模拟数据源。
    • 使用Linux命令netcat -l -p <port>,启动一个简易的文本服务器。
    • 启动应用程序连接netcat监听的port成功后,向netcat终端输入数据信息。

开发思路

  1. 启动Flink Kafka Producer应用向Kafka发送数据。
  2. 启动Flink Kafka Consumer应用从Kafka接收数据,构造Table1,保证topic与producer一致。
  3. 从soket中读取数据,构造Table2。
  4. 使用Flink SQL对Table1和Table2进行联合查询,并进行打印。

Java样例代码

功能介绍

在Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。

代码样例

用户在开发前需要使用对接安全模式的FusionInsight Kafka,则需要引入FusionInsight的kafka-client-0.11.x.x.jar,该jar包可在FusionInsight client目录下获取。

下面列出producer和consumer,以及Flink Stream SQL Join使用主要逻辑代码作为演示。

完整代码参见com.huawei.bigdata.flink.examples.WriteIntoKafka和com.huawei.bigdata.flink.examples.SqlJoinWithSocket

Java样例代码

  1. 每秒钟往Kafka中生产一条用户信息,用户信息有姓名、年龄、性别组成。
//producer代码
public class WriteIntoKafka {

      public static void main(String[] args) throws Exception {

      // 打印出执行flink run的参考命令
        System.out.println("use command as: ");

        System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005");

        System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");

        System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");

        System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");

        System.out.println("******************************************************************************************");

        System.out.println("<topic> is the kafka topic name");

        System.out.println("<bootstrap.servers> is the ip:port list of brokers");

        System.out.println("******************************************************************************************");
       
        // 构造执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并发度
        env.setParallelism(1);
        // 解析运行参数
        ParameterTool paraTool = ParameterTool.fromArgs(args);
        // 构造流图,将自定义Source生成的数据写入Kafka
        DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());

        FlinkKafkaProducer010 producer = new FlinkKafkaProducer010<>(new FlinkKafkaProducer010<>(paraTool.get("topic"),

           new SimpleStringSchema(),

           paraTool.getProperties()));

        messageStream.addSink(producer);

        // 调用execute触发执行
        env.execute();
     }

// 自定义Source,每隔1s持续产生消息
public static class SimpleStringGenerator implements SourceFunction<String> {
        static final String[] NAME = {"Carry", "Alen", "Mike", "Ian", "John", "Kobe", "James"};

        static final String[] SEX = {"MALE", "FEMALE"};

        static final int COUNT = NAME.length;   

        boolean running = true;

        Random rand = new Random(47);

       @Override
        //rand随机产生名字,性别,年龄的组合信息
         public void run(SourceContext<String> ctx) throws Exception {

            while (running) {

                int i = rand.nextInt(COUNT);

                int age = rand.nextInt(70);

                String sexy = SEX[rand.nextInt(2)];

                ctx.collect(NAME[i] + "," + age + "," + sexy);

                thread.sleep(1000);

            }

    }

       @Override

       public void cancel() {

         running = false;

       }

     }

   }

2.生成Table1和Table2,并使用Join对Table1和Table2进行联合查询,打印输出结果。

public class SqlJoinWithSocket {
    public static void main(String[] args) throws Exception{

        final String hostname;

        final int port;

        System.out.println("use command as: ");

        System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" +
                " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21005 --hostname xxx.xxx.xxx.xxx --port xxx");

        System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" +
                " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka"
                + "--hostname xxx.xxx.xxx.xxx --port xxx");

        System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" +
                " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks "
                + "--ssl.truststore.password huawei --hostname xxx.xxx.xxx.xxx --port xxx");

        System.out.println("******************************************************************************************");
        System.out.println("<topic> is the kafka topic name");
        System.out.println("<bootstrap.servers> is the ip:port list of brokers");
        System.out.println("******************************************************************************************");

        try {
            final ParameterTool params = ParameterTool.fromArgs(args);

            hostname = params.has("hostname") ? params.get("hostname") : "localhost";

            port = params.getInt("port");

        } catch (Exception e) {
            System.err.println("No port specified. Please run 'FlinkStreamSqlJoinExample " +
                    "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                    "and port is the address of the text server");

            System.err.println("To start a simple text server, run 'netcat -l -p <port>' and " +
                    "type the input text into the command line");

            return;
        }
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

        //基于EventTime进行处理
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        env.setParallelism(1);

        ParameterTool paraTool = ParameterTool.fromArgs(args);

        //Stream1,从Kafka中读取数据
        DataStream<Tuple3<String, String, String>> kafkaStream = env.addSource(new FlinkKafkaConsumer010<>(paraTool.get("topic"),
                new SimpleStringSchema(),
                paraTool.getProperties())).map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String s) throws Exception {
                String[] word = s.split(",");

                return new Tuple3<>(word[0], word[1], word[2]);
            }
        });

        //将Stream1注册为Table1
        tableEnv.registerDataStream("Table1", kafkaStream, "name, age, sexy, proctime.proctime");

        //Stream2,从Socket中读取数据
        DataStream<Tuple2<String, String>> socketStream = env.socketTextStream(hostname, port, "\n").
                map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        String[] words = s.split("\\s");
                        if (words.length < 2) {
                            return new Tuple2<>();
                        }

                        return new Tuple2<>(words[0], words[1]);
                    }
                });

        //将Stream2注册为Table2
        tableEnv.registerDataStream("Table2", socketStream, "name, job, proctime.proctime");

        //执行SQL Join进行联合查询
        Table result = tableEnv.sqlQuery("SELECT t1.name, t1.age, t1.sexy, t2.job, t2.proctime as shiptime\n" +
                "FROM Table1 AS t1\n" +
                "JOIN Table2 AS t2\n" +
                "ON t1.name = t2.name\n" +
                "AND t1.proctime BETWEEN t2.proctime - INTERVAL '1' SECOND AND t2.proctime + INTERVAL '1' SECOND");

        //将查询结果转换为Stream,并打印输出
        tableEnv.toAppendStream(result, Row.class).print();

        env.execute();
    }
}
作者 east
Flink 3月 1,2021

Flink 配置表与流JOIN程序

场景说明

场景说明

假定用户有某个网站周末网民网购停留时间的日志文本,另有一张网民个人信息的csv格式表,基于某些业务要求,要求开发Flink的应用程序实现如下功能:

  • 实时统计总计网购时间超过2个小时的女性网民信息,包含对应的个人详细信息; 其中日志文本和csv格式表中的姓名字段可作为关键字,通过该值将两张表联合起来。
  • 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 data.txt:周末两天网民停留日志



LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 NotExist,female,200

  • configtable.csv:网民个人信息,第一列为姓名,第二列为年龄,第三列为公司,第四列为工作地点,第五列为学历,第六列为工作年数,第七列为手机号码,第八列为户籍所在地,第九列为毕业学校,csv标准格式,即分隔符为“,”


username,age,company,workLocation,educational,workYear,phone,nativeLocation,school LiuYang,25,Microsoft,hangzhou,college,5,13512345678,hangzhou zhejiang,wuhan university YuanJing,26,Oracle,shanghai,master,6,13512345679,shijiazhuang hebei,zhejiang university GuoYijun,27,Alibaba,beijing,college,7,13512345680,suzhou jiangsu,qinghua university CaiXuyu,28,Coca Cola,shenzheng,master,8,13512345681,hefei anhui,beijing university Liyuan,29,Tencent,chengdou,doctor,9,13512345682,nanchang jiangxi,nanjing university FangBo,30,Huawei,qingdao,doctor,10,13512345683,xiamen fujian,fudan university

开发思路

统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息,包含对应的个人详细信息。

主要分为七个部分:

  • 修改“import.properties”和“read.properties”配置,配置csv文件字段、Redis读取字段以及Redis的节点信息配置
  • 导入“configtable.csv”配置表进入Redis中存储起来。
  • 读取文本数据,生成相应DataStream,解析数据生成OriginalRecord信息。
  • 调用异步IO的函数,以OriginalRecord用户姓名字段为关键字在Redis中查询对应的个人信息,并转化为UserRecord。
  • 筛选出女性网民上网时间数据信息。
  • 按照姓名进行keyby操作,并汇总在一个时间窗口内每个女性上网时间。
  • 筛选连续上网时间超过阈值的用户,并获取结果。
package com.huawei.bigdata.flink.examples;

import org.apache.flink.api.java.utils.ParameterTool;

import org.supercsv.cellprocessor.constraint.NotNull;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.io.CsvBeanReader;
import org.supercsv.io.ICsvBeanReader;
import org.supercsv.prefs.CsvPreference;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

import java.io.File;
import java.io.FileReader;
import java.util.*;

/**
 * Read data from csv file and import to redis.
 */
public class RedisDataImport {
    public static void main(String[] args) throws Exception {
        // print comment for command to use run flink
        System.out.println("use command as: \n" +
                "java -cp /opt/FI-Client/Flink/flink/lib/*:/opt/FlinkConfigtableJavaExample.jar" +
                " com.huawei.bigdata.flink.examples.RedisDataImport --configPath <config filePath>" +
                "******************************************************************************************\n" +
                "<config filePath> is for configure file to load\n" +
                "you may write following content into config filePath: \n" +
                "CsvPath=config/configtable.csv\n" +
                "CsvHeaderExist=true\n" +
                "ColumnNames=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school\n" +
                "Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400\n" +
                "******************************************************************************************");

        // read all configures
        final String configureFilePath = ParameterTool.fromArgs(args).get("configPath", "config/import.properties");
        final String csvFilePath = ParameterTool.fromPropertiesFile(configureFilePath).get("CsvPath", "config/configtable.csv");
        final boolean isHasHeaders = ParameterTool.fromPropertiesFile(configureFilePath).getBoolean("CsvHeaderExist", true);
        final String csvScheme = ParameterTool.fromPropertiesFile(configureFilePath).get("ColumnNames");
        final String redisIPPort = ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_IP_Port");

        // init redis client
        Set<HostAndPort> hosts = new HashSet<HostAndPort>();
        for (String hostAndPort : redisIPPort.split(",")) {
            hosts.add(new HostAndPort(hostAndPort.split(":")[0], Integer.parseInt(hostAndPort.split(":")[1])));
        }
        final JedisCluster client = new JedisCluster(hosts, 15000);

        // get all files under csv file path
        ArrayList<File> files = getListFiles(csvFilePath);
        System.out.println("Read file or directory under  " + csvFilePath
                + ", total file num: " + files.size() + ", columns: " + csvScheme);

        // run read csv file and analyze it
        for (int index = 0; index < files.size(); index++) {
            readWithCsvBeanReader(files.get(index).getAbsolutePath(), csvScheme, isHasHeaders, client);
        }
        client.close();
        System.out.println("Data import finish!!!");
    }

    public static ArrayList<File> getListFiles(Object obj) {
        File directory = null;
        if (obj instanceof File) {
            directory = (File) obj;
        } else {
            directory = new File(obj.toString());
        }
        ArrayList<File> files = new ArrayList<File>();
        if (directory.isFile()) {
            files.add(directory);
            return files;
        } else if (directory.isDirectory()) {
            File[] fileArr = directory.listFiles();
            for (int i = 0; i < fileArr.length; i++) {
                File fileOne = fileArr[i];
                files.addAll(getListFiles(fileOne));
            }
        }
        return files;
    }

    /**
     * Sets up the processors used for read csv. There are 9 CSV columns. Empty
     * columns are read as null (hence the NotNull() for mandatory columns).
     *
     * @return the cell processors
     */
    private static CellProcessor[] getProcessors() {
        final CellProcessor[] processors = new CellProcessor[] {
                new NotNull(), // username
                new NotNull(), // age
                new NotNull(), // company
                new NotNull(), // workLocation
                new NotNull(), // educational
                new NotNull(), // workYear
                new NotNull(), // phone
                new NotNull(), // nativeLocation
                new NotNull(), // school
        };

        return processors;
    }

    private static void readWithCsvBeanReader(String path, String csvScheme, boolean isSkipHeader, JedisCluster client) throws Exception {
        ICsvBeanReader beanReader = null;
        try {
            beanReader = new CsvBeanReader(new FileReader(path), CsvPreference.STANDARD_PREFERENCE);

            // the header elements are used to map the values to the bean (names must match)
            final String[] header = isSkipHeader ? beanReader.getHeader(true) : csvScheme.split(",");
            final CellProcessor[] processors = getProcessors();

            UserInfo userinfo;
            while( (userinfo = beanReader.read(UserInfo.class, header, processors)) != null ) {
                System.out.println(String.format("lineNo=%s, rowNo=%s, userinfo=%s", beanReader.getLineNumber(),
                        beanReader.getRowNumber(), userinfo));

                // set redis key and value
                client.hmset(userinfo.getKeyValue(), userinfo.getMapInfo());
            }
        }
        finally {
            if( beanReader != null ) {
                beanReader.close();
            }
        }
    }



    // define the UserInfo structure
    public static class UserInfo {
        private String username;
        private String age;
        private String company;
        private String workLocation;
        private String educational;
        private String workYear;
        private String phone;
        private String nativeLocation;
        private String school;


        public UserInfo() {

        }

        public UserInfo(String nm, String a, String c, String w, String e, String wy, String p, String nl, String sc) {
            username = nm;
            age = a;
            company = c;
            workLocation = w;
            educational = e;
            workYear = wy;
            phone = p;
            nativeLocation = nl;
            school = sc;
        }

        public String toString() {
            return "UserInfo-----[username: " + username + "  age: " + age + "  company: " + company
                    + "  workLocation: " + workLocation + "  educational: " + educational
                    + "  workYear: " + workYear + "  phone: " + phone + "  nativeLocation: " + nativeLocation + "  school: " + school + "]";
        }

        // get key
        public String getKeyValue() {
            return username;
        }

        public Map<String, String> getMapInfo() {
            Map<String, String> info = new HashMap<String, String>();
            info.put("username", username);
            info.put("age", age);
            info.put("company", company);
            info.put("workLocation", workLocation);
            info.put("educational", educational);
            info.put("workYear", workYear);
            info.put("phone", phone);
            info.put("nativeLocation", nativeLocation);
            info.put("school", school);
            return info;
        }

        /**
         * @return the username
         */
        public String getUsername() {
            return username;
        }

        /**
         * @param username
         *            the username to set
         */
        public void setUsername(String username) {
            this.username = username;
        }

        /**
         * @return the age
         */
        public String getAge() {
            return age;
        }

        /**
         * @param age
         *            the age to set
         */
        public void setAge(String age) {
            this.age = age;
        }

        /**
         * @return the company
         */
        public String getCompany() {
            return company;
        }

        /**
         * @param company
         *            the company to set
         */
        public void setCompany(String company) {
            this.company = company;
        }

        /**
         * @return the workLocation
         */
        public String getWorkLocation() {
            return workLocation;
        }

        /**
         * @param workLocation
         *            the workLocation to set
         */
        public void setWorkLocation(String workLocation) {
            this.workLocation = workLocation;
        }

        /**
         * @return the educational
         */
        public String getEducational() {
            return educational;
        }

        /**
         * @param educational
         *            the educational to set
         */
        public void setEducational(String educational) {
            this.educational = educational;
        }

        /**
         * @return the workYear
         */
        public String getWorkYear() {
            return workYear;
        }

        /**
         * @param workYear
         *            the workYear to set
         */
        public void setWorkYear(String workYear) {
            this.workYear = workYear;
        }

        /**
         * @return the phone
         */
        public String getPhone() {
            return phone;
        }

        /**
         * @param phone
         *            the phone to set
         */
        public void setPhone(String phone) {
            this.phone = phone;
        }

        /**
         * @return the nativeLocation
         */
        public String getNativeLocation() {
            return nativeLocation;
        }

        /**
         * @param nativeLocation
         *            the nativeLocation to set
         */
        public void setNativeLocation(String nativeLocation) {
            this.nativeLocation = nativeLocation;
        }

        /**
         * @return the school
         */
        public String getSchool() {
            return school;
        }

        /**
         * @param school
         *            the school to set
         */
        public void setSchool(String school) {
            this.school = school;
        }
    }
}
package com.huawei.bigdata.flink.examples;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.com.google.common.cache.LoadingCache;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

import java.util.*;
import java.util.concurrent.TimeUnit;
/**
 * Read stream data and join from configure table from redis.
 */
public class FlinkConfigtableJavaExample {

    public static void main(String[] args) throws Exception {
        // print comment for command to use run flink
        System.out.println("use command as: \n" +
                "./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkConfigtableJavaExample" +
                " -m yarn-cluster -yt /opt/config -yn 3 -yjm 1024 -ytm 1024 " +
                "/opt/FlinkConfigtableJavaExample.jar --dataPath config/data.txt" +
                "******************************************************************************************\n" +
                "Especially you may write following content into config filePath, as in config/read.properties: \n" +
                "ReadFields=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school\n" +
                "Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400\n" +
                "******************************************************************************************");

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // get configure and read data and transform to OriginalRecord
        final String dataPath = ParameterTool.fromArgs(args).get("dataPath", "config/data.txt");
        DataStream<OriginalRecord> originalStream = env.readTextFile(
                dataPath
        ).map(new MapFunction<String, OriginalRecord>() {
            @Override
            public OriginalRecord map(String value) throws Exception {
                return getRecord(value);
            }
        }).assignTimestampsAndWatermarks(
                new Record2TimestampExtractor()
        ).disableChaining();

        // read from redis and join to the whole user information
        AsyncFunction<OriginalRecord, UserRecord> function = new AsyncRedisRequest();
        // timeout set to 2 minutes, max parallel request num set to 5, you can modify this to optimize
        DataStream<UserRecord> result = AsyncDataStream.unorderedWait(
                originalStream,
                function,
                2,
                TimeUnit.MINUTES,
                5);

        // data transform
        result.filter(new FilterFunction<UserRecord>() {
            @Override
            public boolean filter(UserRecord value) throws Exception {
                return value.sexy.equals("female");
            }
        }).keyBy(
                new UserRecordSelector()
        ).window(
                TumblingEventTimeWindows.of(Time.seconds(30))
        ).reduce(new ReduceFunction<UserRecord>() {
            @Override
            public UserRecord reduce(UserRecord value1, UserRecord value2)
                    throws Exception {
                value1.shoppingTime += value2.shoppingTime;
                return value1;
            }
        }).filter(new FilterFunction<UserRecord>() {
            @Override
            public boolean filter(UserRecord value) throws Exception {
                return value.shoppingTime > 120;
            }
        }).print();

        // execute program
        env.execute("FlinkConfigtable java");
    }

    private static class UserRecordSelector implements KeySelector<UserRecord, String> {
        @Override
        public String getKey(UserRecord value) throws Exception {
            return value.name;
        }
    }

    // class to set watermark and timestamp
    private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<OriginalRecord> {

        // add tag in the data of datastream elements
        @Override
        public long extractTimestamp(OriginalRecord element, long previousTimestamp) {
            return System.currentTimeMillis();
        }

        // give the watermark to trigger the window to execute, and use the value to check if the window elements is ready
        @Override
        public Watermark checkAndGetNextWatermark(OriginalRecord element, long extractedTimestamp) {
            return new Watermark(extractedTimestamp - 1);
        }
    }

    private static OriginalRecord getRecord(String line) {
        String[] elems = line.split(",");
        assert elems.length == 3;
        return new OriginalRecord(elems[0], elems[1], Integer.parseInt(elems[2]));
    }

    public static class OriginalRecord {
        private String name;
        private String sexy;
        private int shoppingTime;

        public OriginalRecord(String n, String s, int t) {
            name = n;
            sexy = s;
            shoppingTime = t;
        }
    }

    public static class UserRecord {
        private String name;
        private int age;
        private String company;
        private String workLocation;
        private String educational;
        private int workYear;
        private String phone;
        private String nativeLocation;
        private String school;
        private String sexy;
        private int shoppingTime;

        public UserRecord(String nm, int a, String c, String w, String e, int wy, String p, String nl, String sc, String sx, int st) {
            name = nm;
            age = a;
            company = c;
            workLocation = w;
            educational = e;
            workYear = wy;
            phone = p;
            nativeLocation = nl;
            school = sc;
            sexy = sx;
            shoppingTime = st;
        }

        public void setInput(String input_nm, String input_sx, int input_st) {
            name = input_nm;
            sexy = input_sx;
            shoppingTime = input_st;
        }

        public String toString() {
            return "UserRecord-----name: " + name + "  age: " + age + "  company: " + company
                    + "  workLocation: " + workLocation + "  educational: " + educational
                    + "  workYear: " + workYear + "  phone: " + phone + "  nativeLocation: " + nativeLocation + "  school: " + school
                    + "  sexy: " + sexy + "  shoppingTime: " + shoppingTime;
        }
    }

    public static class AsyncRedisRequest extends RichAsyncFunction<OriginalRecord, UserRecord>{
        private String fields = "";
        private transient JedisCluster client;
        private LoadingCache<String, UserRecord> cacheRecords;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            // init cache builder
            cacheRecords = CacheBuilder.newBuilder()
                    .maximumSize(10000)
                    .expireAfterAccess(7, TimeUnit.DAYS)
                    .build(new CacheLoader<String, UserRecord>() {
                        public UserRecord load(String key) throws Exception {
                            //load from redis
                            return loadFromRedis(key);
                        }
                    });

            // get configure from config/read.properties, you must put this with commands:
            // ./bin/yarn-session.sh -t config -n 3 -jm 1024 -tm 1024 or
            // ./bin/flink run -m yarn-cluster -yt config -yn 3 -yjm 1024 -ytm 1024 /opt/test.jar
            String configPath = "config/read.properties";
            fields = ParameterTool.fromPropertiesFile(configPath).get("ReadFields");
            final String hostPort = ParameterTool.fromPropertiesFile(configPath).get("Redis_IP_Port");
            // create jedisCluster client
            Set<HostAndPort> hosts = new HashSet<HostAndPort>();
            for (String node : hostPort.split(",")) {
                hosts.add(new HostAndPort(node.split(":")[0], Integer.parseInt(node.split(":")[1])));
            }
            client = new JedisCluster(hosts, 60000);
            System.out.println("JedisCluster init, getClusterNodes: " + client.getClusterNodes().size());
        }

        @Override
        public void close() throws Exception {
            super.close();

            if (client != null) {
                System.out.println("JedisCluster close!!!");
                client.close();
            }
        }

        public UserRecord loadFromRedis(final String key) throws Exception {
            if (client.getClusterNodes().size() <= 0) {
                System.out.println("JedisCluster init failed, getClusterNodes: " + client.getClusterNodes().size());
            }
            if (!client.exists(key)) {
                System.out.println("test-------cannot find data to key:  " + key);
                return new UserRecord(
                        "null",
                        0,
                        "null",
                        "null",
                        "null",
                        0,
                        "null",
                        "null",
                        "null",
                        "null",
                        0);
            } else {
                // get some fields
                List<String> values = client.hmget(key, fields.split(","));
                System.out.println("test-------key: " + key + "  get some fields:  " + values.toString());
                return new UserRecord(
                        values.get(0),
                        Integer.parseInt(values.get(1)),
                        values.get(2),
                        values.get(3),
                        values.get(4),
                        Integer.parseInt(values.get(5)),
                        values.get(6),
                        values.get(7),
                        values.get(8),
                        "null",
                        0);
            }
        }

        public void asyncInvoke(final OriginalRecord input, final AsyncCollector<UserRecord> collector) throws Exception {
            // set key string, if you key is more than one column, build your key string with columns
            String key = input.name;
            UserRecord info = cacheRecords.get(key);
            info.setInput(input.name, input.sexy, input.shoppingTime);
            collector.collect(Collections.singletonList(info));
        }
    }
}
作者 east

上一 1 … 27 28 29 … 41 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?
  • 如何进行AUTOSAR模块的持续集成(CI)部署与版本控制?

文章归档

  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (42)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (484)
    • CDH (6)
    • datax (4)
    • doris (28)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (9)
    • 运维 (33)
      • Docker (2)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.