gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具
    • SVG转PDF/Word
    • SVG转Draw.io可二次编辑格式
    • js代码混淆
    • json格式化及任意折叠展开
    • PDF常用工具

分类归档大数据开发

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 分类归档: "大数据开发"
  • ( 页面30 )
Flink 3月 1,2021

Flink 配置表与流JOIN程序

场景说明

场景说明

假定用户有某个网站周末网民网购停留时间的日志文本,另有一张网民个人信息的csv格式表,基于某些业务要求,要求开发Flink的应用程序实现如下功能:

  • 实时统计总计网购时间超过2个小时的女性网民信息,包含对应的个人详细信息; 其中日志文本和csv格式表中的姓名字段可作为关键字,通过该值将两张表联合起来。
  • 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 data.txt:周末两天网民停留日志



LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 NotExist,female,200

  • configtable.csv:网民个人信息,第一列为姓名,第二列为年龄,第三列为公司,第四列为工作地点,第五列为学历,第六列为工作年数,第七列为手机号码,第八列为户籍所在地,第九列为毕业学校,csv标准格式,即分隔符为“,”


username,age,company,workLocation,educational,workYear,phone,nativeLocation,school LiuYang,25,Microsoft,hangzhou,college,5,13512345678,hangzhou zhejiang,wuhan university YuanJing,26,Oracle,shanghai,master,6,13512345679,shijiazhuang hebei,zhejiang university GuoYijun,27,Alibaba,beijing,college,7,13512345680,suzhou jiangsu,qinghua university CaiXuyu,28,Coca Cola,shenzheng,master,8,13512345681,hefei anhui,beijing university Liyuan,29,Tencent,chengdou,doctor,9,13512345682,nanchang jiangxi,nanjing university FangBo,30,Huawei,qingdao,doctor,10,13512345683,xiamen fujian,fudan university

开发思路

统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息,包含对应的个人详细信息。

主要分为七个部分:

  • 修改“import.properties”和“read.properties”配置,配置csv文件字段、Redis读取字段以及Redis的节点信息配置
  • 导入“configtable.csv”配置表进入Redis中存储起来。
  • 读取文本数据,生成相应DataStream,解析数据生成OriginalRecord信息。
  • 调用异步IO的函数,以OriginalRecord用户姓名字段为关键字在Redis中查询对应的个人信息,并转化为UserRecord。
  • 筛选出女性网民上网时间数据信息。
  • 按照姓名进行keyby操作,并汇总在一个时间窗口内每个女性上网时间。
  • 筛选连续上网时间超过阈值的用户,并获取结果。
package com.huawei.bigdata.flink.examples;

import org.apache.flink.api.java.utils.ParameterTool;

import org.supercsv.cellprocessor.constraint.NotNull;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.io.CsvBeanReader;
import org.supercsv.io.ICsvBeanReader;
import org.supercsv.prefs.CsvPreference;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

import java.io.File;
import java.io.FileReader;
import java.util.*;

/**
 * Read data from csv file and import to redis.
 */
public class RedisDataImport {
    public static void main(String[] args) throws Exception {
        // print comment for command to use run flink
        System.out.println("use command as: \n" +
                "java -cp /opt/FI-Client/Flink/flink/lib/*:/opt/FlinkConfigtableJavaExample.jar" +
                " com.huawei.bigdata.flink.examples.RedisDataImport --configPath <config filePath>" +
                "******************************************************************************************\n" +
                "<config filePath> is for configure file to load\n" +
                "you may write following content into config filePath: \n" +
                "CsvPath=config/configtable.csv\n" +
                "CsvHeaderExist=true\n" +
                "ColumnNames=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school\n" +
                "Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400\n" +
                "******************************************************************************************");

        // read all configures
        final String configureFilePath = ParameterTool.fromArgs(args).get("configPath", "config/import.properties");
        final String csvFilePath = ParameterTool.fromPropertiesFile(configureFilePath).get("CsvPath", "config/configtable.csv");
        final boolean isHasHeaders = ParameterTool.fromPropertiesFile(configureFilePath).getBoolean("CsvHeaderExist", true);
        final String csvScheme = ParameterTool.fromPropertiesFile(configureFilePath).get("ColumnNames");
        final String redisIPPort = ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_IP_Port");

        // init redis client
        Set<HostAndPort> hosts = new HashSet<HostAndPort>();
        for (String hostAndPort : redisIPPort.split(",")) {
            hosts.add(new HostAndPort(hostAndPort.split(":")[0], Integer.parseInt(hostAndPort.split(":")[1])));
        }
        final JedisCluster client = new JedisCluster(hosts, 15000);

        // get all files under csv file path
        ArrayList<File> files = getListFiles(csvFilePath);
        System.out.println("Read file or directory under  " + csvFilePath
                + ", total file num: " + files.size() + ", columns: " + csvScheme);

        // run read csv file and analyze it
        for (int index = 0; index < files.size(); index++) {
            readWithCsvBeanReader(files.get(index).getAbsolutePath(), csvScheme, isHasHeaders, client);
        }
        client.close();
        System.out.println("Data import finish!!!");
    }

    public static ArrayList<File> getListFiles(Object obj) {
        File directory = null;
        if (obj instanceof File) {
            directory = (File) obj;
        } else {
            directory = new File(obj.toString());
        }
        ArrayList<File> files = new ArrayList<File>();
        if (directory.isFile()) {
            files.add(directory);
            return files;
        } else if (directory.isDirectory()) {
            File[] fileArr = directory.listFiles();
            for (int i = 0; i < fileArr.length; i++) {
                File fileOne = fileArr[i];
                files.addAll(getListFiles(fileOne));
            }
        }
        return files;
    }

    /**
     * Sets up the processors used for read csv. There are 9 CSV columns. Empty
     * columns are read as null (hence the NotNull() for mandatory columns).
     *
     * @return the cell processors
     */
    private static CellProcessor[] getProcessors() {
        final CellProcessor[] processors = new CellProcessor[] {
                new NotNull(), // username
                new NotNull(), // age
                new NotNull(), // company
                new NotNull(), // workLocation
                new NotNull(), // educational
                new NotNull(), // workYear
                new NotNull(), // phone
                new NotNull(), // nativeLocation
                new NotNull(), // school
        };

        return processors;
    }

    private static void readWithCsvBeanReader(String path, String csvScheme, boolean isSkipHeader, JedisCluster client) throws Exception {
        ICsvBeanReader beanReader = null;
        try {
            beanReader = new CsvBeanReader(new FileReader(path), CsvPreference.STANDARD_PREFERENCE);

            // the header elements are used to map the values to the bean (names must match)
            final String[] header = isSkipHeader ? beanReader.getHeader(true) : csvScheme.split(",");
            final CellProcessor[] processors = getProcessors();

            UserInfo userinfo;
            while( (userinfo = beanReader.read(UserInfo.class, header, processors)) != null ) {
                System.out.println(String.format("lineNo=%s, rowNo=%s, userinfo=%s", beanReader.getLineNumber(),
                        beanReader.getRowNumber(), userinfo));

                // set redis key and value
                client.hmset(userinfo.getKeyValue(), userinfo.getMapInfo());
            }
        }
        finally {
            if( beanReader != null ) {
                beanReader.close();
            }
        }
    }



    // define the UserInfo structure
    public static class UserInfo {
        private String username;
        private String age;
        private String company;
        private String workLocation;
        private String educational;
        private String workYear;
        private String phone;
        private String nativeLocation;
        private String school;


        public UserInfo() {

        }

        public UserInfo(String nm, String a, String c, String w, String e, String wy, String p, String nl, String sc) {
            username = nm;
            age = a;
            company = c;
            workLocation = w;
            educational = e;
            workYear = wy;
            phone = p;
            nativeLocation = nl;
            school = sc;
        }

        public String toString() {
            return "UserInfo-----[username: " + username + "  age: " + age + "  company: " + company
                    + "  workLocation: " + workLocation + "  educational: " + educational
                    + "  workYear: " + workYear + "  phone: " + phone + "  nativeLocation: " + nativeLocation + "  school: " + school + "]";
        }

        // get key
        public String getKeyValue() {
            return username;
        }

        public Map<String, String> getMapInfo() {
            Map<String, String> info = new HashMap<String, String>();
            info.put("username", username);
            info.put("age", age);
            info.put("company", company);
            info.put("workLocation", workLocation);
            info.put("educational", educational);
            info.put("workYear", workYear);
            info.put("phone", phone);
            info.put("nativeLocation", nativeLocation);
            info.put("school", school);
            return info;
        }

        /**
         * @return the username
         */
        public String getUsername() {
            return username;
        }

        /**
         * @param username
         *            the username to set
         */
        public void setUsername(String username) {
            this.username = username;
        }

        /**
         * @return the age
         */
        public String getAge() {
            return age;
        }

        /**
         * @param age
         *            the age to set
         */
        public void setAge(String age) {
            this.age = age;
        }

        /**
         * @return the company
         */
        public String getCompany() {
            return company;
        }

        /**
         * @param company
         *            the company to set
         */
        public void setCompany(String company) {
            this.company = company;
        }

        /**
         * @return the workLocation
         */
        public String getWorkLocation() {
            return workLocation;
        }

        /**
         * @param workLocation
         *            the workLocation to set
         */
        public void setWorkLocation(String workLocation) {
            this.workLocation = workLocation;
        }

        /**
         * @return the educational
         */
        public String getEducational() {
            return educational;
        }

        /**
         * @param educational
         *            the educational to set
         */
        public void setEducational(String educational) {
            this.educational = educational;
        }

        /**
         * @return the workYear
         */
        public String getWorkYear() {
            return workYear;
        }

        /**
         * @param workYear
         *            the workYear to set
         */
        public void setWorkYear(String workYear) {
            this.workYear = workYear;
        }

        /**
         * @return the phone
         */
        public String getPhone() {
            return phone;
        }

        /**
         * @param phone
         *            the phone to set
         */
        public void setPhone(String phone) {
            this.phone = phone;
        }

        /**
         * @return the nativeLocation
         */
        public String getNativeLocation() {
            return nativeLocation;
        }

        /**
         * @param nativeLocation
         *            the nativeLocation to set
         */
        public void setNativeLocation(String nativeLocation) {
            this.nativeLocation = nativeLocation;
        }

        /**
         * @return the school
         */
        public String getSchool() {
            return school;
        }

        /**
         * @param school
         *            the school to set
         */
        public void setSchool(String school) {
            this.school = school;
        }
    }
}
package com.huawei.bigdata.flink.examples;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.com.google.common.cache.LoadingCache;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

import java.util.*;
import java.util.concurrent.TimeUnit;
/**
 * Read stream data and join from configure table from redis.
 */
public class FlinkConfigtableJavaExample {

    public static void main(String[] args) throws Exception {
        // print comment for command to use run flink
        System.out.println("use command as: \n" +
                "./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkConfigtableJavaExample" +
                " -m yarn-cluster -yt /opt/config -yn 3 -yjm 1024 -ytm 1024 " +
                "/opt/FlinkConfigtableJavaExample.jar --dataPath config/data.txt" +
                "******************************************************************************************\n" +
                "Especially you may write following content into config filePath, as in config/read.properties: \n" +
                "ReadFields=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school\n" +
                "Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400\n" +
                "******************************************************************************************");

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // get configure and read data and transform to OriginalRecord
        final String dataPath = ParameterTool.fromArgs(args).get("dataPath", "config/data.txt");
        DataStream<OriginalRecord> originalStream = env.readTextFile(
                dataPath
        ).map(new MapFunction<String, OriginalRecord>() {
            @Override
            public OriginalRecord map(String value) throws Exception {
                return getRecord(value);
            }
        }).assignTimestampsAndWatermarks(
                new Record2TimestampExtractor()
        ).disableChaining();

        // read from redis and join to the whole user information
        AsyncFunction<OriginalRecord, UserRecord> function = new AsyncRedisRequest();
        // timeout set to 2 minutes, max parallel request num set to 5, you can modify this to optimize
        DataStream<UserRecord> result = AsyncDataStream.unorderedWait(
                originalStream,
                function,
                2,
                TimeUnit.MINUTES,
                5);

        // data transform
        result.filter(new FilterFunction<UserRecord>() {
            @Override
            public boolean filter(UserRecord value) throws Exception {
                return value.sexy.equals("female");
            }
        }).keyBy(
                new UserRecordSelector()
        ).window(
                TumblingEventTimeWindows.of(Time.seconds(30))
        ).reduce(new ReduceFunction<UserRecord>() {
            @Override
            public UserRecord reduce(UserRecord value1, UserRecord value2)
                    throws Exception {
                value1.shoppingTime += value2.shoppingTime;
                return value1;
            }
        }).filter(new FilterFunction<UserRecord>() {
            @Override
            public boolean filter(UserRecord value) throws Exception {
                return value.shoppingTime > 120;
            }
        }).print();

        // execute program
        env.execute("FlinkConfigtable java");
    }

    private static class UserRecordSelector implements KeySelector<UserRecord, String> {
        @Override
        public String getKey(UserRecord value) throws Exception {
            return value.name;
        }
    }

    // class to set watermark and timestamp
    private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<OriginalRecord> {

        // add tag in the data of datastream elements
        @Override
        public long extractTimestamp(OriginalRecord element, long previousTimestamp) {
            return System.currentTimeMillis();
        }

        // give the watermark to trigger the window to execute, and use the value to check if the window elements is ready
        @Override
        public Watermark checkAndGetNextWatermark(OriginalRecord element, long extractedTimestamp) {
            return new Watermark(extractedTimestamp - 1);
        }
    }

    private static OriginalRecord getRecord(String line) {
        String[] elems = line.split(",");
        assert elems.length == 3;
        return new OriginalRecord(elems[0], elems[1], Integer.parseInt(elems[2]));
    }

    public static class OriginalRecord {
        private String name;
        private String sexy;
        private int shoppingTime;

        public OriginalRecord(String n, String s, int t) {
            name = n;
            sexy = s;
            shoppingTime = t;
        }
    }

    public static class UserRecord {
        private String name;
        private int age;
        private String company;
        private String workLocation;
        private String educational;
        private int workYear;
        private String phone;
        private String nativeLocation;
        private String school;
        private String sexy;
        private int shoppingTime;

        public UserRecord(String nm, int a, String c, String w, String e, int wy, String p, String nl, String sc, String sx, int st) {
            name = nm;
            age = a;
            company = c;
            workLocation = w;
            educational = e;
            workYear = wy;
            phone = p;
            nativeLocation = nl;
            school = sc;
            sexy = sx;
            shoppingTime = st;
        }

        public void setInput(String input_nm, String input_sx, int input_st) {
            name = input_nm;
            sexy = input_sx;
            shoppingTime = input_st;
        }

        public String toString() {
            return "UserRecord-----name: " + name + "  age: " + age + "  company: " + company
                    + "  workLocation: " + workLocation + "  educational: " + educational
                    + "  workYear: " + workYear + "  phone: " + phone + "  nativeLocation: " + nativeLocation + "  school: " + school
                    + "  sexy: " + sexy + "  shoppingTime: " + shoppingTime;
        }
    }

    public static class AsyncRedisRequest extends RichAsyncFunction<OriginalRecord, UserRecord>{
        private String fields = "";
        private transient JedisCluster client;
        private LoadingCache<String, UserRecord> cacheRecords;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            // init cache builder
            cacheRecords = CacheBuilder.newBuilder()
                    .maximumSize(10000)
                    .expireAfterAccess(7, TimeUnit.DAYS)
                    .build(new CacheLoader<String, UserRecord>() {
                        public UserRecord load(String key) throws Exception {
                            //load from redis
                            return loadFromRedis(key);
                        }
                    });

            // get configure from config/read.properties, you must put this with commands:
            // ./bin/yarn-session.sh -t config -n 3 -jm 1024 -tm 1024 or
            // ./bin/flink run -m yarn-cluster -yt config -yn 3 -yjm 1024 -ytm 1024 /opt/test.jar
            String configPath = "config/read.properties";
            fields = ParameterTool.fromPropertiesFile(configPath).get("ReadFields");
            final String hostPort = ParameterTool.fromPropertiesFile(configPath).get("Redis_IP_Port");
            // create jedisCluster client
            Set<HostAndPort> hosts = new HashSet<HostAndPort>();
            for (String node : hostPort.split(",")) {
                hosts.add(new HostAndPort(node.split(":")[0], Integer.parseInt(node.split(":")[1])));
            }
            client = new JedisCluster(hosts, 60000);
            System.out.println("JedisCluster init, getClusterNodes: " + client.getClusterNodes().size());
        }

        @Override
        public void close() throws Exception {
            super.close();

            if (client != null) {
                System.out.println("JedisCluster close!!!");
                client.close();
            }
        }

        public UserRecord loadFromRedis(final String key) throws Exception {
            if (client.getClusterNodes().size() <= 0) {
                System.out.println("JedisCluster init failed, getClusterNodes: " + client.getClusterNodes().size());
            }
            if (!client.exists(key)) {
                System.out.println("test-------cannot find data to key:  " + key);
                return new UserRecord(
                        "null",
                        0,
                        "null",
                        "null",
                        "null",
                        0,
                        "null",
                        "null",
                        "null",
                        "null",
                        0);
            } else {
                // get some fields
                List<String> values = client.hmget(key, fields.split(","));
                System.out.println("test-------key: " + key + "  get some fields:  " + values.toString());
                return new UserRecord(
                        values.get(0),
                        Integer.parseInt(values.get(1)),
                        values.get(2),
                        values.get(3),
                        values.get(4),
                        Integer.parseInt(values.get(5)),
                        values.get(6),
                        values.get(7),
                        values.get(8),
                        "null",
                        0);
            }
        }

        public void asyncInvoke(final OriginalRecord input, final AsyncCollector<UserRecord> collector) throws Exception {
            // set key string, if you key is more than one column, build your key string with columns
            String key = input.name;
            UserRecord info = cacheRecords.get(key);
            info.setInput(input.name, input.sexy, input.shoppingTime);
            collector.collect(Collections.singletonList(info));
        }
    }
}
作者 east
Flink 3月 1,2021

Flink Job Pipeline程序

场景说明

场景说明

本样例中发布者Job自己每秒钟产生10000条数据,然后经由该job的NettySink算子向下游发送。另外两个Job作为订阅者,分别订阅一份数据。

数据规划

  1. 发布者Job使用自定义算子每秒钟产生10000条数据
  2. 数据包含两个属性:分别是Int和String类型
  3. 配置文件
    • nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如:nettyconnector.registerserver.topic.storage: /flink/nettyconnector
    • nettyconnector.sinkserver.port.range:设置NettySink的端口范围(必填),例如:nettyconnector.sinkserver.port.range: 28444-28943
    • nettyconnector.sinkserver.subnet:设置网络所属域,例如:nettyconnector.sinkserver.subnet: 10.162.0.0/16
  4. 接口说明
    • 注册服务器接口 注册服务器用来保存NettySink的IP、端口以及并发度信息,以便NettySource连接使用。为用户提供以下接口:public interface RegisterServerHandler { /** * 启动注册服务器 * @param configuration Flink的Configuration类型 */ void start(Configuration configuration) throws Exception; /** *注册服务器上创建Topic节点(目录) * @param topic topic节点名称 */ void createTopicNode(String topic) throw Exception; /** *将信息注册到某个topic节点(目录)下 * @param topic 需要注册到的目录 * @param registerRecord 需要注册的信息 */ void register(String topic, RegisterRecord registerRecord) throws Exception; /** *删除topic节点 * @param topic 待删除topic */ void deleteTopicNode(String topic) throws Exception; /** *注销注册信息 *@param topic 注册信息所在的topic *@param recordId 待注销注册信息ID */ void unregister(String topic, int recordId) throws Exception; /** * 查寻信息 * @param 查询信息所在的topic *@recordId 查询信息的ID */ RegisterRecord query(String topic, int recordId) throws Exception; /** * 查询某个Topic是否存在 * @param topic */ Boolean isExist(String topic) throws Exception; /** *关闭注册服务器句柄 */ void shutdown() throws Exception; 工程基于以上接口提供了ZookeeperRegisterHandler供用户使用。
    • NettySink算子Class NettySink(String name, String topic, RegisterServerHandler registerServerHandler, int numberOfSubscribedJobs)
      • name:为本NettySink的名称。
      • topic:为本NettySink产生数据的Topic,每个不同的NettySink(并发度除外)必须使用不同的TOPIC,否则会引起订阅混乱,数据无法正常分发。
      • registerServerHandler: 为注册服务器的句柄。
      • numberOfSubscribedJobs:为订阅本NettySink的作业数量,该数量必须是明确的,只有当所有订阅者都连接上NettySink,NettySink才发送数据。
    • NettySource算子Class NettySource(String name, String topic, RegisterServerHandler registerServerHandler)
      • name:为本NettySource的名称,该NettySource必须是唯一的(并发度除外),否则,连接NettySink时会出现冲突,导致无法连接。
      • topic:订阅的NettySink的topic。
      • registerServerHandler:为注册服务器的句柄。

说明:

NettySource的并发度必须与NettySource的并发度相同,否则无法正常创建连接。

开发思路

1. 一个Job作为发布者Job,其余两个作为订阅者Job

2. 发布者Job自己产生数据将其转化成byte[],分别向订阅者发送

3. 订阅者收到byte[]之后将其转化成String类型,并抽样打印输出

Java版代码:

  1. 发布Job自定义Source算子产生数据
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 
import java.io.Serializable;
 
public class UserSource extends RichParallelSourceFunction<Tuple2<Integer, String>> implements Serializable {
 
    private boolean isRunning = true;
 
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
 
    }
/**
    * 数据产生函数,每秒钟产生10000条数据
   */
    public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
 
        while(isRunning) {
            for (int i = 0; i < 10000; i++) {
                ctx.collect(Tuple2.of(i, "hello-" + i));
            }
            Thread.sleep(1000);
        }
    }
 
    public void close() {
        isRunning = false;
    }
 
    public void cancel() {
        isRunning = false;
    }
}
  1. 发布者代码
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.sink.NettySink;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
 
public class TestPipeline_NettySink {
 
    public static void main(String[] args) throws Exception{
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置job的并发度为2
        env.setBufferTimeout(2);
 
// 创建Zookeeper的注册服务器handler
        ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
// 添加自定义Source算子
        env.addSource(new UserSource())
                .keyBy(0)
                .map(new MapFunction<Tuple2<Integer,String>, byte[]>() {
                    //将发送信息转化成字节数组
@Override
                    public byte[] map(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                        return integerStringTuple2.f1.getBytes();
                    }
                }).addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2));//通过NettySink发送出去。
 
        env.execute();
 
    }
}
  1. 第一个订阅者
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.source.NettySource;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
 
public class TestPipeline_NettySource1 {
 
    public static void main(String[] args) throws Exception{
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置job的并发度为2        
env.setParallelism(2);
 
// 创建Zookeeper的注册服务器句柄
        ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//添加NettySource算子,接收来自发布者的消息
        env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
                .map(new MapFunction<byte[], String>() {
                  // 将接收到的字节流转化成字符串  
    @Override
                    public String map(byte[] b) {
                        return new String(b);
                    }
                }).print();
 
        env.execute();
    }
}
  1. 第二个订阅者
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.source.NettySource;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
 
public class TestPipeline_NettySource2 {
 
    public static void main(String[] args) throws Exception {
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置作业的并发度为2       
 env.setParallelism(2);
 
//创建Zookeeper的注册服务器句柄
        ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//添加NettySource算子,接收来自发布者的数据
        env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
                .map(new MapFunction<byte[], String>() {
          //将接收到的字节数组转化成字符串
                    @Override
                    public String map(byte[] b) {
                        return new String(b);
                    }
                }).print();
 
        env.execute();
    }
}

Scala样例代码

  1. 发送消息
package com.huawei.bigdata.flink.examples
 
case class Inforamtion(index: Int, content: String) {
 
  def this() = this(0, "")
}
  1. 发布者job自定义source算子产生数据
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 
class UserSource extends RichParallelSourceFunction[Inforamtion] with Serializable{
 
  var isRunning = true
 
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
   
  }
 
// 每秒钟产生10000条数据
  override def run(sourceContext: SourceContext[Inforamtion]) = {
 
    while (isRunning) {
      for (i <- 0 until 10000) {
        sourceContext.collect(Inforamtion(i, "hello-" + i));
 
      }
      Thread.sleep(1000)
    }
  }
 
  override def close(): Unit = super.close()
 
  override def cancel() = {
    isRunning = false
  }
}
  1. 发布者代码
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.sink.NettySink
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
 
object TestPipeline_NettySink {
 
  def main(args: Array[String]): Unit = {
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置job的并发度为2    
env.setParallelism(2)
//设置Zookeeper为注册服务器
    val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//添加用户自定义算子产生数据    
env.addSource(new UserSource)
      .keyBy(0).map(x=>x.content.getBytes)//将发送数据转化成字节数组
      .addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2))//添加NettySink算子发送数据
 
    env.execute()
  }
}
  1. 第一个订阅者
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.source.NettySource
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
 
import scala.util.Random
 
 
object TestPipeline_NettySource1 {
 
  def main(args: Array[String]): Unit = {
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置Job的并发度为2  
  env.setParallelism(2)
//设置Zookeeper作为注册服务器
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//添加NettySource算子,接收来自发布者的数据
    env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
      .map(x => (1, new String(x)))//将接收到的字节流转化成字符串
      .filter(x => {
        Random.nextInt(50000) == 10
      })
      .print
 
    env.execute()
  }
}
  1. 第二个订阅者
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.source.NettySource
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
 
import scala.util.Random
 
 
object TestPipeline_NettySource2 {
 
  def main(args: Array[String]): Unit = {
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置job的并发度为2   
 env.setParallelism(2)
//创建Zookeeper作为注册服务器
    val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//添加NettySource算子,接收数据    
env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
      .map(x=>(2, new String(x)))//将接收到的字节数组转化成字符串
      .filter(x=>{
        Random.nextInt(50000) == 10
      })
      .print()
 
    env.execute()
  }
}
作者 east
Flink 3月 1,2021

Flink异步Checkpoint机制程序

场景说明

假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性,即:当应用出现异常并恢复后,各个算子的状态能够处于统一的状态。

数据规划

  1. 使用自定义算子每秒钟产生大约10000条数据。
  2. 产生的数据为一个四元组(Long,String,String,Integer)。
  3. 数据经统计后,统计结果打印到终端输出。
  4. 打印输出的结果为Long类型的数据。

开发思路

  1. source算子每隔1秒钟发送10000条数据,并注入到Window算子中。
  2. window算子每隔1秒钟统计一次最近4秒钟内数据数量。
  3. 每隔1秒钟将统计结果打印到终端。具体查看方式请参考查看调测结果。
  4. 每隔6秒钟触发一次checkpoint,然后将checkpoint的结果保存到HDFS中。

Java样例代码

功能介绍

假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性。

代码样例

  1. 快照数据 该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。import java.io.Seriablizale; // 该类作为快照的一部分,保存用户自定义状态 public class UDFState implements Serializable { private long count; // 初始化用户自定义状态 public UDFState() { count = 0L; } // 设置用户自定义状态 public void setState(long count) { this.count = count; } // 获取用户自定义状态 public long geState() { return this.count; } }
  2. 带checkpoint的数据源 source算子的代码,该段代码每发送10000条数据休息1秒钟,制作快照时将到目前为止已经发送的数据的条数保存在UDFState中;从快照中状态恢复时,读取UDFState中的数据条数并重新赋值给count变量。import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.util.ArrayList; import java.util.List; import java.util.Random; // 该类是带checkpoint的source算子 public class SEventSourceWithChk extends RichSourceFunction<Tuple4<Long, String, String, Integer>> implements ListCheckpointed<UDFState> { private Long count = 0L; private boolean isRunning = true; private String alphabet = “abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321”; // 算子的主要逻辑,每秒钟向流图中注入10000个元组 public void run(SourceContext<Tuple4<Long, String, String, Integer>> ctx) throws Exception { Random random = new Random(); while(isRunning) { for (int i = 0; i < 10000; i++) { ctx.collect(Tuple4.of(random.nextLong(), “hello-” + count, alphabet, 1)) count++; } Thread.sleep(1000); } } // 任务取消时调用 public void cancel() { isRunning = false; } // 制作自定义快照 public List<UDFState> snapshotState(long l, long ll) throws Exception { UDFState udfState = new UDFState(); List<UDFState> listState = new ArrayList<UDFState>(); udfState.setState(count); listState.add(udfState); return listState; } // 从自定义快照中恢复数据 public void restoreState(List<UDFState> list) throws Exception { UDFState udfState = list.get(0); count = udfState.getState(); } }
  3. 带checkpoint的窗口定义 该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; // 该类是带checkpoint的window算子 public class WindowStatisticWithChk implements WindowFunction<Tuple4<Long, String, String, Integer>, Long, Tuple, TimeWindow>, ListCheckpointed<UDFState> { private Long total = 0L; // window算子实现逻辑,统计window中元组的个数 void apply(Tuple key, TimeWindow window, Iterable<Tuple4<Long, String, String, Integer>> input, Collector<Long> out) throws Exception { long count = 0L; for (Tuple4<Long, String, String, Integer> event : input) { count++; } total += count; out.collect(count); } // 制作自定义快照 public List<UDFState> snapshotState(Long l, Long ll) { List<UDFState> listState = new ArrayList<UDFState>(); UDFState udfState = new UDFState(); udfState.setState(total); listState.add(udfState); return listState; } // 从自定义快照中恢复状态 public void restoreState(List<UDFState> list) throws Exception { UDFState udfState = list.get(0); total = udfState.getState(); } }
  4. 应用代码 该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使用了processing time。import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; public class FlinkProcessingTimeAPIChkMain { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置相关配置,并开启checkpoint功能 env.setStateBackend(new FsStateBackend(“hdfs://hacluster/flink-checkpoint/checkpoint/”)); env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig.setCheckpointInterval(6000); // 应用逻辑 env.addSource(new SEventSourceWithChk()) .keyBy(0) .window(SlidingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(1))) .apply(new WindowStatisticWithChk()) .print() env.execute(); } }

Scala样例代码

功能介绍

假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性。

代码样例

  1. 发送数据形式case class SEvent(id: Long, name: String, info: String, count: Int)
  2. 快照数据 该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。// 用户自定义状态 class UDFState extends Serializable{ private var count = 0L // 设置用户自定义状态 def setState(s: Long) = count = s // 获取用户自定状态 def getState = count }
  3. 带checkpoint的数据源 source算子的代码,该段代码每发送10000条数据休息1秒钟,制作快照时将到目前为止已经发送的数据的条数保存在UDFState中;从快照中状态恢复时,读取UDFState中的数据条数并重新赋值给count变量。import java.util import org.apache.flink.streaming.api.checkpoint.ListCheckpointed import org.apache.flink.streaming.api.functions.source.RichSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext // 该类是带有checkpoint的source算子 class SEventSourceWithChk extends RichSourceFunction[SEvent] with ListCheckpointed[UDFState]{ private var count = 0L private var isRunning = true private val alphabet = “abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321” // source算子的逻辑,即:每秒钟向流图中注入10000个元组 override def run(sourceContext: SourceContext[SEvent]): Unit = { while(isRunning) { for (i <- 0 until 10000) { sourceContext.collect(SEvent(1, “hello-“+count, alphabet,1)) count += 1L } Thread.sleep(1000) } } // 任务取消时调用 override def cancel(): Unit = { isRunning = false; } override def close(): Unit = super.close() // 制作快照 override def snapshotState(l: Long, l1: Long): util.List[UDFState] = { val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState] val udfState = new UDFState udfState.setState(count) udfList.add(udfState) udfList } // 从快照中获取状态 override def restoreState(list: util.List[UDFState]): Unit = { val udfState = list.get(0) count = udfState.getState } }
  4. 带checkpoint的窗口定义 该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。import java.util import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.checkpoint.ListCheckpointed import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector // 该类是带checkpoint的window算子 class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{ private var total = 0L // window算子的实现逻辑,即:统计window中元组的数量 override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = { var count = 0L for (event <- input) { count += 1L } total += count out.collect(count) } // 制作自定义状态快照 override def snapshotState(l: Long, l1: Long): util.List[UDFState] = { val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState] val udfState = new UDFState udfState.setState(total) udfList.add(udfState) udfList } // 从自定义快照中恢复状态 override def restoreState(list: util.List[UDFState]): Unit = { val udfState = list.get(0) total = udfState.getState } }
  5. 应用代码 该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使用了event time。import com.hauwei.rt.flink.core.{SEvent, SEventSourceWithChk, WindowStatisticWithChk} import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.api.scala._ import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup object FlinkEventTimeAPIChkMain { def main(args: Array[String]): Unit ={ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new FsStateBackend(“hdfs://hacluster/flink-checkpoint/checkpoint/”)) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getConfig.setAutoWatermarkInterval(2000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setCheckpointInterval(6000) // 应用逻辑 env.addSource(new SEventSourceWithChk) .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] { // 设置watermark override def getCurrentWatermark: Watermark = { new Watermark(System.currentTimeMillis()) } // 给每个元组打上时间戳 override def extractTimestamp(t: SEvent, l: Long): Long = { System.currentTimeMillis() } }) .keyBy(0) .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1))) .apply(new WindowStatisticWithChk) .print() env.execute() } }
作者 east
Flink 3月 1,2021

Flink向Kafka生产并消费数据程序

场景说明

假定某个Flink业务每秒就会收到1个消息记录。

基于某些业务要求,开发的Flink应用程序实现功能:实时输出带有前缀的消息内容。

数据规划

Flink样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户),并从Kafka组件接收数据。

  1. 确保集群安装完成,包括HDFS、Yarn、Flink和Kafka。
  2. 创建Topic。 创建topic的命令格式: bin/kafka-topics.sh –create –zookeeper {zkQuorum}/kafka –partitions {partitionNum} –replication-factor {replicationNum} –topic {Topic}

开发思路

  1. 启动Flink Kafka Producer应用向Kafka发送数据。
  2. 启动Flink Kafka Consumer应用从Kafka接收数据,保证topic与producer一致。
  3. 在数据内容中增加前缀并进行打印。

java版代码:

Java样例代码





//producer代码 public class WriteIntoKafka { public static void main(String[] args) throws Exception { // 打印出执行flink run的参考命令 System.out.println("use command as: "); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test --bootstrap.servers 10.91.8.218:21005"); System.out.println("******************************************************************************************"); System.out.println("<topic> is the kafka topic name"); System.out.println("<bootstrap.servers> is the ip:port list of brokers"); System.out.println("******************************************************************************************"); // 构造执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并发度 env.setParallelism(1); // 解析运行参数 ParameterTool paraTool = ParameterTool.fromArgs(args); // 构造流图,将自定义Source生成的数据写入Kafka DataStream<String> messageStream = env.addSource(new SimpleStringGenerator()); messageStream.addSink(new FlinkKafkaProducer010<>(paraTool.get("topic"), new SimpleStringSchema(), paraTool.getProperties())); // 调用execute触发执行 env.execute(); } // 自定义Source,每隔1s持续产生消息 public static class SimpleStringGenerator implements SourceFunction<String> { private static final long serialVersionUID = 2174904787118597072L; boolean running = true; long i = 0; @Override public void run(SourceContext<String> ctx) throws Exception { while (running) { ctx.collect("element-" + (i++)); Thread.sleep(1000); } } @Override public void cancel() { running = false; } } } //consumer代码 public class ReadFromKafka { public static void main(String[] args) throws Exception { // 打印出执行flink run的参考命令 System.out.println("use command as: "); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005"); System.out.println("******************************************************************************************"); System.out.println("<topic> is the kafka topic name"); System.out.println("<bootstrap.servers> is the ip:port list of brokers"); System.out.println("******************************************************************************************"); // 构造执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并发度 env.setParallelism(1); // 解析运行参数 ParameterTool paraTool = ParameterTool.fromArgs(args); // 构造流图,从Kafka读取数据并换行打印 DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<>(paraTool.get("topic"), new SimpleStringSchema(), paraTool.getProperties())); messageStream.rebalance().map(new MapFunction<String, String>() { @Override public String map(String s) throws Exception { return "Flink says " + s + System.getProperty("line.separator"); } }).print(); // 调用execute触发执行 env.execute(); } }

scala版本代码:

Scala样例代码





//producer代码 object WriteIntoKafka { def main(args: Array[String]) { // 打印出执行flink run的参考命令 System.out.println("use command as: ") System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test --bootstrap.servers 10.91.8.218:21005") System.out.println("******************************************************************************************") System.out.println("<topic> is the kafka topic name") System.out.println("<bootstrap.servers> is the ip:port list of brokers") System.out.println("******************************************************************************************") // 构造执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置并发度 env.setParallelism(1) // 解析运行参数 val paraTool = ParameterTool.fromArgs(args) // 构造流图,将自定义Source生成的数据写入Kafka val messageStream: DataStream[String] = env.addSource(new SimpleStringGenerator) messageStream.addSink(new FlinkKafkaProducer010( paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties)) // 调用execute触发执行 env.execute } } // 自定义Source,每隔1s持续产生消息 class SimpleStringGenerator extends SourceFunction[String] { var running = true var i = 0 override def run(ctx: SourceContext[String]) { while (running) { ctx.collect("element-" + i) i += 1 Thread.sleep(1000) } } override def cancel() { running = false } } //consumer代码 object ReadFromKafka { def main(args: Array[String]) { // 打印出执行flink run的参考命令 System.out.println("use command as: ") System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005") System.out.println("******************************************************************************************") System.out.println("<topic> is the kafka topic name") System.out.println("<bootstrap.servers> is the ip:port list of brokers") System.out.println("******************************************************************************************") // 构造执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置并发度 env.setParallelism(1) // 解析运行参数 val paraTool = ParameterTool.fromArgs(args) // 构造流图,从Kafka读取数据并换行打印 val messageStream = env.addSource(new FlinkKafkaConsumer010( paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties)) messageStream .map(s => "Flink says " + s + System.getProperty("line.separator")).print() // 调用execute触发执行 env.execute() } }
作者 east
Flink 3月 1,2021

Flink统计连续网购时间超过2个小时的女性网民信息例子

Java样例代码

场景说明

场景说明

假定用户有某个网站周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Flink的DataStream应用程序实现如下功能:

说明:

DataStream应用程序可以在Windows环境和Linux环境中运行。

  • 实时统计总计网购时间超过2个小时的女性网民信息。
  • 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志。LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
  • log2.txt:周日网民停留日志。LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60

数据规划

DataStream样例工程的数据存储在文本中。

将log1.txt和log2.txt放置在某路径下,例如”/opt/log1.txt”和”/opt/log2.txt”。

开发思路

统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。

主要分为四个部分:

  1. 读取文本数据,生成相应DataStream,解析数据生成UserRecord信息。
  2. 筛选女性网民上网时间数据信息。
  3. 按照姓名、性别进行keyby操作,并汇总在一个时间窗口内每个女性上网时间。
  4. 筛选连续上网时间超过阈值的用户,并获取结果。

功能介绍

统计连续网购时间超过2个小时的女性网民信息,将统计结果直接打印。

java版代码:

Java样例代码











// 参数解析: // <filePath>为文本读取路径,用逗号分隔。 // <windowTime>为统计数据的窗口跨度,时间单位都是分。 public class FlinkStreamJavaExample { public static void main(String[] args) throws Exception { // 打印出执行flink run的参考命令 System.out.println("use command as: "); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2"); System.out.println("******************************************************************************************"); System.out.println("<filePath> is for text file to read data, use comma to separate"); System.out.println("<windowTime> is the width of the window, time as minutes"); System.out.println("******************************************************************************************"); // 读取文本路径信息,并使用逗号分隔 final String[] filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(","); assert filePaths.length > 0; // windowTime设置窗口时间大小,默认2分钟一个窗口足够读取文本内的所有数据了 final int windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2); // 构造执行环境,使用eventTime处理窗口数据 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); // 读取文本数据流 DataStream<String> unionStream = env.readTextFile(filePaths[0]); if (filePaths.length > 1) { for (int i = 1; i < filePaths.length; i++) { unionStream = unionStream.union(env.readTextFile(filePaths[i])); } } // 数据转换,构造整个数据处理的逻辑,计算并得出结果打印出来 unionStream.map(new MapFunction<String, UserRecord>() { @Override public UserRecord map(String value) throws Exception { return getRecord(value); } }).assignTimestampsAndWatermarks( new Record2TimestampExtractor() ).filter(new FilterFunction<UserRecord>() { @Override public boolean filter(UserRecord value) throws Exception { return value.sexy.equals("female"); } }).keyBy( new UserRecordSelector() ).window( TumblingEventTimeWindows.of(Time.minutes(windowTime)) ).reduce(new ReduceFunction<UserRecord>() { @Override public UserRecord reduce(UserRecord value1, UserRecord value2) throws Exception { value1.shoppingTime += value2.shoppingTime; return value1; } }).filter(new FilterFunction<UserRecord>() { @Override public boolean filter(UserRecord value) throws Exception { return value.shoppingTime > 120; } }).print(); // 调用execute触发执行 env.execute("FemaleInfoCollectionPrint java"); } // 构造keyBy的关键字作为分组依据 private static class UserRecordSelector implements KeySelector<UserRecord, Tuple2<String, String>> { @Override public Tuple2<String, String> getKey(UserRecord value) throws Exception { return Tuple2.of(value.name, value.sexy); } } // 解析文本行数据,构造UserRecord数据结构 private static UserRecord getRecord(String line) { String[] elems = line.split(","); assert elems.length == 3; return new UserRecord(elems[0], elems[1], Integer.parseInt(elems[2])); } // UserRecord数据结构的定义,并重写了toString打印方法 public static class UserRecord { private String name; private String sexy; private int shoppingTime; public UserRecord(String n, String s, int t) { name = n; sexy = s; shoppingTime = t; } public String toString() { return "name: " + name + " sexy: " + sexy + " shoppingTime: " + shoppingTime; } } // 构造继承AssignerWithPunctuatedWatermarks的类,用于设置eventTime以及waterMark private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<UserRecord> { // add tag in the data of datastream elements @Override public long extractTimestamp(UserRecord element, long previousTimestamp) { return System.currentTimeMillis(); } // give the watermark to trigger the window to execute, and use the value to check if the window elements is ready @Override public Watermark checkAndGetNextWatermark(UserRecord element, long extractedTimestamp) { return new Watermark(extractedTimestamp - 1); } } }

scala版本:

Scala样例代码











// 参数解析: // filePath为文本读取路径,用逗号分隔。 // windowTime;为统计数据的窗口跨度,时间单位都是分。 object FlinkStreamScalaExample { def main(args: Array[String]) { // 打印出执行flink run的参考命令 System.out.println("use command as: ") System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2") System.out.println("******************************************************************************************") System.out.println("<filePath> is for text file to read data, use comma to separate") System.out.println("<windowTime> is the width of the window, time as minutes") System.out.println("******************************************************************************************") // 读取文本路径信息,并使用逗号分隔 val filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(",").map(_.trim) assert(filePaths.length > 0) // windowTime设置窗口时间大小,默认2分钟一个窗口足够读取文本内的所有数据了 val windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2) // 构造执行环境,使用eventTime处理窗口数据 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) // 读取文本数据流 val unionStream = if (filePaths.length > 1) { val firstStream = env.readTextFile(filePaths.apply(0)) firstStream.union(filePaths.drop(1).map(it => env.readTextFile(it)): _*) } else { env.readTextFile(filePaths.apply(0)) } // 数据转换,构造整个数据处理的逻辑,计算并得出结果打印出来 unionStream.map(getRecord(_)) .assignTimestampsAndWatermarks(new Record2TimestampExtractor) .filter(_.sexy == "female") .keyBy("name", "sexy") .window(TumblingEventTimeWindows.of(Time.minutes(windowTime))) .reduce((e1, e2) => UserRecord(e1.name, e1.sexy, e1.shoppingTime + e2.shoppingTime)) .filter(_.shoppingTime > 120).print() // 调用execute触发执行 env.execute("FemaleInfoCollectionPrint scala") } // 解析文本行数据,构造UserRecord数据结构 def getRecord(line: String): UserRecord = { val elems = line.split(",") assert(elems.length == 3) val name = elems(0) val sexy = elems(1) val time = elems(2).toInt UserRecord(name, sexy, time) } // UserRecord数据结构的定义 case class UserRecord(name: String, sexy: String, shoppingTime: Int) // 构造继承AssignerWithPunctuatedWatermarks的类,用于设置eventTime以及waterMark private class Record2TimestampExtractor extends AssignerWithPunctuatedWatermarks[UserRecord] { // add tag in the data of datastream elements override def extractTimestamp(element: UserRecord, previousTimestamp: Long): Long = { System.currentTimeMillis() } // give the watermark to trigger the window to execute, and use the value to check if the window elements is ready def checkAndGetNextWatermark(lastElement: UserRecord, extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp - 1) } } }
作者 east
solr 3月 1,2021

Solr增删改查例子

Solr初始化

Solr初始化

功能简介

Solr初始化是指在使用Solr提供的API之前,需要做的必要工作。目的是取得与SolrCoud的连接。

说明:

在进行完Solr操作后,需要调用cloudSolrClient.close()关闭所申请的资源。

Solr初始化





/** *初始化CloudSolrClient实例,连接SolrCloud private CloudSolrClient getCloudSolrClient(String zkHost) throws SolrException { Builder builder = new CloudSolrClient.Builder(); builder.withZkHost(zkHost); CloudSolrClient cloudSolrClient = builder.build(); cloudSolrClient.setZkClientTimeout(zkClientTimeout); cloudSolrClient.setZkConnectTimeout(zkConnectTimeout); cloudSolrClient.connect(); LOG.info("The cloud Server has been connected !!!!"); ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader(); ClusterState cloudState = zkStateReader.getClusterState(); LOG.info("The zookeeper state is : {}", cloudState); return cloudSolrClient; }

查询collection

查询collection

功能简介

通过调用CollectionAdminRequest.List的process(cloudSolrClient)并调用返回的response来获取所有collection的名字。

代码样例

private List<String> queryAllCollections(CloudSolrClient 
                             cloudSolrClient) throws SolrException {
        CollectionAdminRequest.List list = 
        new CollectionAdminRequest.List();
        CollectionAdminResponse listRes = null;
        try {
            listRes = list.process(cloudSolrClient);
        } catch (SolrServerException | IOException e) {
            LOG.error("Failed to list collection", e);
            throw new SolrException("Failed to list collection");
        } catch (Exception e) {
            LOG.error("Failed to list collection", e);
            throw new SolrException("unknown exception");
        }

        List<String> collectionNames = (List<String>) 
        listRes.getResponse().get("collections");
        LOG.info("All existed collections : {}", collectionNames);
        return collectionNames;
    }

删除collection

删除collection

功能简介

通过调用CollectionAdminRequest.Delete的process(cloudSolrClient)并调用返回的response来判断是否执行删除collection操作成功。

代码样例

private void deleteCollection(CloudSolrClient cloudSolrClient) 
                                               throws SolrException {
        CollectionAdminRequest.Delete delete = 
        new CollectionAdminRequest.Delete();
        delete.setCollectionName(COLLECTION_NAME);
        CollectionAdminResponse response = null;
        try {
            response = delete.process(cloudSolrClient);
        } catch (SolrServerException | IOException e) {
            LOG.error("Failed to delete collection", e);
            throw new SolrException("Failed to create collection");
        } catch (Exception e) {
            LOG.error("Failed to delete collection", e);
            throw new SolrException("unknown exception");
        }
        if (response.isSuccess()) {
            LOG.info("Success to delete collection[{}]", 
            COLLECTION_NAME);
        } else {
            LOG.error("Failed to delete collection[{}], cause : {}",             COLLECTION_NAME, response.getErrorMessages());
            throw new SolrException("Failed to delete collection");
        }
    }

创建collection

创建collection

功能简介

通过调用CollectionAdminRequest.Create的process(cloudSolrClient)并调用返回的response来判断是否执行创建collection操作成功。

代码样例

  private void createCollection(CloudSolrClient cloudSolrClient) throws SolrException {
        CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLLECTION_NAME, DEFAULT_CONFIG_NAME, shardNum, replicaNum);
        CollectionAdminResponse response = null;
        try {
            response = create.process(cloudSolrClient);
        } catch (SolrServerException e) {
            LOG.error("Failed to create collection", e);
            throw new SolrException("Failed to create collection");
        } catch (IOException e) {
            LOG.error("Failed to create collection", e);
            throw new SolrException("Failed to create collection");
        } catch (Exception e) {
            LOG.error("Failed to create collection", e);
            throw new SolrException("unknown exception");
        }
        if (response.isSuccess()) {
            LOG.info("Success to create collection[{}]", COLLECTION_NAME);
        } else {
            LOG.error("Failed to create collection[{}], cause : {}", COLLECTION_NAME, response.getErrorMessages());
            throw new SolrException("Failed to create collection");
        }
    }

添加Doc

添加Doc

功能简介

通过调用cloudSolrClient的add方法或者构造UpdateRequest调用cloudSolrClient的request方法来添加索引数据。

代码样例1

private void addDocs(CloudSolrClient cloudSolrClient) throws SolrException {
        Collection<SolrInputDocument> documents = new ArrayList<SolrInputDocument>();
        for (Integer i = 0; i < 5; i++) {
            SolrInputDocument doc = new SolrInputDocument();
            doc.addField("id", i.toString());
            doc.addField("name", "Luna_" + i);
            doc.addField("features", "test" + i);
            doc.addField("price", (float) i * 1.01);
            documents.add(doc);
        }
        try {
            cloudSolrClient.add(documents);
            LOG.info("success to add index");
        } catch (SolrServerException e) {
            LOG.error("Failed to add document to collection", e);
            throw new SolrException("Failed to add document to collection");
        } catch (IOException e) {
            LOG.error("Failed to add document to collection", e);
            throw new SolrException("Failed to add document to collection");
        } catch (Exception e) {
            LOG.error("Failed to add document to collection", e);
            throw new SolrException("unknown exception");
        }
    }

代码样例2

private void addDocs2(CloudSolrClient cloudSolrClient) throws 
SolrException{
    UpdateRequest request = new UpdateRequest();
    Collection<SolrInputDocument> documents = new ArrayList<>();
    for (Integer i = 5; i < 10; i++) {
        SolrInputDocument doc = new SolrInputDocument();
        doc.addField("id", i.toString());
        doc.addField("name", "张三" + i);
        doc.addField("features", "test" + i);
        doc.addField("price", (float) i * 1.01);
        documents.add(doc);
     }
     request.add(documents);
    try {
        cloudSolrClient.request(request);
        cloudSolrClient.commit();
    } catch (SolrServerException | IOException e) {
        LOG.error("Failed to add document to collection", e);
        throw new SolrException("Failed to add document to 
        collection");
    }
 }

查询Doc

查询Doc

功能简介

通过构造SolrQuery实例,并调用cloudSolrClient.query接口来查询索引数据。

样例代码

    private void queryIndex(CloudSolrClient cloudSolrClient) throws SolrException {
        SolrQuery query = new SolrQuery();
        query.setQuery("name:Luna*");

        try {
            QueryResponse response = cloudSolrClient.query(query);
            SolrDocumentList docs = response.getResults();
            LOG.info("Query wasted time : {}ms", response.getQTime());

            LOG.info("Total doc num : {}", docs.getNumFound());
            for (SolrDocument doc : docs) {
                LOG.info("doc detail : " + doc.getFieldValueMap());
            }
        } catch (SolrServerException e) {
            LOG.error("Failed to query document", e);
            throw new SolrException("Failed to query document");
        } catch (IOException e) {
            LOG.error("Failed to query document", e);
            throw new SolrException("Failed to query document");
        } catch (Exception e) {
            LOG.error("Failed to query document", e);
            throw new SolrException("unknown exception");
        }
    }

删除Doc

删除Doc

功能简介

通过调用cloudSolrClient.deleteByQuery方法删除指定匹配的索引数据。

代码样例

private void removeIndex(CloudSolrClient cloudSolrClient) throws 
SolrException {
        try {
            cloudSolrClient.deleteByQuery("*:*");
            cloudSolrClient.commit();
            LOG.info("Success to delete index");
        } catch (SolrServerException | IOException e){
            LOG.error("Failed to remove document", e);
            throw new SolrException("Failed to remove document");
        }
    }
作者 east
Hive 3月 1,2021

Hive数据查询

数据查询

功能介绍

本小节介绍了如何使用HQL对数据进行查询分析。从本节中可以掌握如下查询分析方法:

  • SELECT查询的常用特性,如JOIN等。
  • 加载数据进指定分区。
  • 如何使用Hive自带函数。
  • 如何使用自定义函数进行查询分析,如何创建、定义自定义函数请见用户自定义函数。

样例代码

-- 查看薪水支付币种为美元的雇员联系方式. 
SELECT  
a.name,  
b.tel_phone,  
b.email  
FROM employees_info a JOIN employees_contact b  ON(a.id = b.id) WHERE usd_flag='D'; 
 
-- 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中. 
INSERT OVERWRITE TABLE employees_info_extended PARTITION (entrytime = '2014')  
SELECT  
a.id,  
a.name,  
a.usd_flag,  
a.salary,  
a.deductions,  
a.address, 
b.tel_phone, 
b.email  
FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE a.entrytime = '2014'; 
 
-- 使用Hive中已有的函数COUNT(),统计表employees_info中有多少条记录. 
SELECT COUNT(*) FROM employees_info; 
 
-- 查询使用以“cn”结尾的邮箱的员工信息. 
SELECT a.name, b.tel_phone FROM  employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE b.email like '%cn'; 

扩展使用

  • 配置Hive中间过程的数据加密 指定表的格式为RCFile(推荐使用)或SequenceFile,加密算法为ARC4Codec。SequenceFile是Hadoop特有的文件格式,RCFile是Hive优化的文件格式。RCFile优化了列存储,在对大表进行查询时,综合性能表现比SequenceFile更优。 set hive.exec.compress.output=true; set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.encryption.arc4.ARC4Codec;
作者 east
Hive 3月 1,2021

Hive创建表示例

创建表

功能介绍

本小节介绍了如何使用HQL创建内部表、外部表的基本操作。创建表主要有以下三种方式:

  • 自定义表结构,以关键字EXTERNAL区分创建内部表和外部表。
    • 内部表,如果对数据的处理都由Hive完成,则应该使用内部表。在删除内部表时,元数据和数据一起被删除。
    • 外部表,如果数据要被多种工具(如Pig等)共同处理,则应该使用外部表,可避免对该数据的误操作。删除外部表时,只删除掉元数据。
  • 根据已有表创建新表,使用CREATE LIKE句式,完全复制原有的表结构,包括表的存储格式。
  • 根据查询结果创建新表,使用CREATE AS SELECT句式。 这种方式比较灵活,可以在复制原表表结构的同时指定要复制哪些字段,不包括表的存储格式。

样例代码

-- 创建外部表employees_info. 
CREATE EXTERNAL TABLE IF NOT EXISTS employees_info 
( 
id INT, 
name STRING, 
usd_flag STRING, 
salary DOUBLE, 
deductions MAP<STRING, DOUBLE>, 
address STRING, 
entrytime STRING 
) 
-- 指定行中各字段分隔符. 
-- "delimited fields terminated by"指定列与列之间的分隔符为',',"MAP KEYS TERMINATED BY"指定MAP中键值的分隔符为'&'. 
ROW FORMAT delimited fields terminated by ',' MAP KEYS TERMINATED BY '&'  
-- 指定表的存储格式为TEXTFILE. 
STORED AS TEXTFILE;  
 
-- 使用CREATE Like创建表. 
CREATE TABLE employees_like LIKE employees_info; 
 
-- 使用DESCRIBE查看employees_info、employees_like、 employees_as_select表结构. 
DESCRIBE employees_info; 
DESCRIBE employees_like; 

扩展应用

  • 创建分区表 一个表可以拥有一个或者多个分区,每个分区以文件夹的形式单独存在表文件夹的目录下。对分区内数据进行查询,可缩小查询范围,加快数据的检索速度和可对数据按照一定的条件进行管理。 分区是在创建表的时候用PARTITIONED BY子句定义的。 CREATE EXTERNAL TABLE IF NOT EXISTS employees_info_extended ( id INT, name STRING, usd_flag STRING, salary DOUBLE, deductions MAP<STRING, DOUBLE>, address STRING ) — 使用关键字PARTITIONED BY指定分区列名及数据类型 . PARTITIONED BY (entrytime STRING) STORED AS TEXTFILE;
  • 更新表的结构 一个表在创建完成后,还可以使用ALTER TABLE执行增、删字段,修改表属性,添加分区等操作 — 为表employees_info_extended增加tel_phone、email字段. ALTER TABLE employees_info_extended ADD COLUMNS (tel_phone STRING, email STRING);
  • 建表时配置Hive数据加密 指定表的格式为RCFile(推荐使用)或SequenceFile,加密算法为ARC4Codec。SequenceFile是Hadoop特有的文件格式,RCFile是Hive优化的文件格式。RCFile优化了列存储,在对大表进行查询时,综合性能表现比SequenceFile更优。 set hive.exec.compress.output=true; set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.encryption.arc4.ARC4Codec; create table seq_Codec (key string, value string) stored as RCFile;
作者 east
Hbase 3月 1,2021

Hbase 基于二级索引的查询

基于二级索引的查询

功能介绍

针对添加了二级索引的用户表,您可以通过Filter来查询数据。其数据查询性能高于针对无二级索引用户表的数据查询。

二级索引的使用规则如下:

  • 针对某一列或者多列创建了单索引的场景下:
    • 当查询时使用此列进行过滤时,不管是AND还是OR操作,该索引都会被利用来提升查询性能。 例如:Filter_Condition(IndexCol1) AND/OR Filter_Condition(IndexCol2)
    • 当查询时使用“索引列AND非索引列”过滤时,此索引会被利用来提升查询性能。 例如:Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(NonIndexCol1)
    • 当查询时使用“索引列OR非索引列”过滤时,此索引将不会被使用,查询性能不会因为索引得到提升。 例如:Filter_Condition(IndexCol1) AND/OR Filter_Condition(IndexCol2) OR Filter_Condition(NonIndexCol1)
  • 针对多个列创建的联合索引场景下:
    • 当查询时使用的列(多个),是联合索引所有对应列的一部分或者全部,且列的顺序与联合索引一致时,此索引会被利用来提升查询性能。 例如,针对C1、C2、C3列创建了联合索引,生效的场景包括: Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) Filter_Condition(IndexCol1) 不生效的场景包括: Filter_Condition(IndexCol2) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol2) Filter_Condition(IndexCol3)
    • 当查询时使用“索引列AND非索引列”过滤时,此索引会被利用来提升查询性能。 例如: Filter_Condition(IndexCol1) AND Filter_Condition(NonIndexCol1) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(NonIndexCol1)
    • 当查询时使用“索引列OR非索引列”过滤时,此索引不会被使用,查询性能不会因为索引得到提升。 例如: Filter_Condition(IndexCol1) OR Filter_Condition(NonIndexCol1) (Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2))OR ( Filter_Condition(NonIndexCol1))
    • 当查询时使用多个列进行范围查询时,只有联合索引中最后一个列可指定取值范围,前面的列只能设置为“=”。 例如:针对C1、C2、C3列创建了联合索引,需要进行范围查询时,只能针对C3设置取值范围,过滤条件为“C1=XXX,C2=XXX,C3=取值范围”。
  • 针对添加了二级索引的用户表,可以通过Filter来查询数据,在单列索引和复合列索引上进行过滤查询,查询结果都与无索引结果相同,且其数据查询性能高于无二级索引用户表的数据查询性能。

代码样例

下面代码片段在com.huawei.hadoop.hbase.example包的“HBaseSample”类的testScanDataByIndex方法中:

样例:使用二级索引查找数据

  public void testScanDataByIndex() {
    LOG.info("Entering testScanDataByIndex.");
    Table table = null;
    ResultScanner scanner = null;
    try {
      table = conn.getTable(tableName);
      
      // Create a filter for indexed column.
      Filter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("name"),
          CompareOp.EQUAL, "Li Gang".getBytes());
      Scan scan = new Scan();
      scan.setFilter(filter);
      scanner = table.getScanner(scan);
      LOG.info("Scan indexed data.");
      
      for (Result result : scanner) {
        for (Cell cell : result.rawCells()) {
          LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":"
              + Bytes.toString(CellUtil.cloneFamily(cell)) + ","
              + Bytes.toString(CellUtil.cloneQualifier(cell)) + ","
              + Bytes.toString(CellUtil.cloneValue(cell)));
        }
      }
      LOG.info("Scan data by index successfully.");
    } catch (IOException e) {
      LOG.error("Scan data by index failed.");
    } finally {
      if (scanner != null) {
        // Close the scanner object.
        scanner.close();
      }
      try {
        if (table != null) {
          table.close();
        }
      } catch (IOException e) {
        LOG.error("Close table failed.");
      }
    }
    
    LOG.info("Exiting testScanDataByIndex.");
  }

注意事项

需要预先对字段name创建二级索引。

相关操作

基于二级索引表查询。

查询样例如下:

用户在hbase_sample_table的info列族的name列添加一个索引,在客户端执行,

hbase org.apache.hadoop.hbase.hindex.mapreduce.TableIndexer -Dtablename.to.index=hbase_sample_table -Dindexspecs.to.add='IDX1=>info:[name->String]'

然后用户需要查询“info:name”,在hbase shell执行如下命令:

>scan ‘hbase_sample_table’,{FILTER=>”SingleColumnValueFilter(family,qualifier,compareOp,comparator,filterIfMissing,latestVersionOnly)”}

说明:

hbase shell下面做复杂的查询请使用API进行处理。

参数说明:

  • family:需要查询的列所在的列族,例如info;
  • qualifier:需要查询的列,例如name;
  • compareOp:比较符,例如=、>等;
  • comparator:需要查找的目标值,例如binary:Zhang San;
  • filterIfMissing:如果某一行不存在该列,是否过滤,默认值为false;
  • latestVersionOnly:是否仅查询最新版本的值,默认值为false。

例如:

>scan 'hbase_sample_table',{FILTER=>"SingleColumnValueFilter('info','name',=,'binary:Zhang San',true,true)"}
作者 east
Hbase 3月 1,2021

Hbase创建二级索引

创建二级索引

功能简介

一般都通过调用org.apache.hadoop.hbase.hindex.client.HIndexAdmin中方法进行HBase二级索引的管理,该类中提供了创建索引的方法。

说明:

二级索引不支持修改,如果需要修改,请先删除旧的然后重新创建。

代码样例

以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的createIndex方法中。

public void createIndex() {     
LOG.info("Entering createIndex.");
String indexName = "index_name";
// Create index instance
TableIndices tableIndices = new TableIndices();
IndexSpecification iSpec = new IndexSpecification(indexName); iSpec.addIndexColumn(new HColumnDescriptor("info"), "name", ValueType.String);//注[1]
tableIndices.addIndex(iSpec);
HIndexAdmin iAdmin = null;
Admin admin = null;
try {
admin = conn.getAdmin();
iAdmin = new IndexAdmin(conf);
// add index to the table
iAdmin.addIndices(tableName, tableIndices);
LOG.info("Create index successfully.");
} catch (IOException e) {
LOG.error("Create index failed " ,e);
} finally {
if (admin != null) {
try {
admin.close();
} catch (IOException e) {
LOG.error("Close admin failed " ,e);
}
}
if (iAdmin != null) {
try {
// Close IndexAdmin Object
iAdmin.close();
} catch (IOException e) {
LOG.error("Close admin failed " ,e);
}
}
}
LOG.info("Exiting createIndex.");
}

新创建的二级索引默认是不启用的,如果需要启用指定的二级索引,可以参考如下代码片段。该代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的enableIndex方法中。

  public void enableIndex() {
    LOG.info("Entering createIndex.");

    // Name of the index to be enabled
    String indexName = "index_name";

    List<String> indexNameList = new ArrayList<String>();
    indexNameList.add(indexName);
    HIndexAdmin iAdmin = null;
    try {
      iAdmin = HIndexClient.newHIndexAdmin(conn.getAdmin());
      // Alternately, enable the specified indices
      iAdmin.enableIndices(tableName, indexNameList);
      System.out.println("Successfully enable indices " + indexNameList + " of the table " + tableName);
    } catch (IOException e) {
      System.out.println("Failed to enable indices " + indexNameList + " of the table " + tableName + "." + e);
    } finally {
      if (iAdmin != null) {
        try {
          iAdmin.close();
        } catch (IOException e) {
          LOG.error("Close admin failed ", e);
        }
      }
    }
  }

注意事项

注[1]:创建联合索引

HBase支持在多个字段上创建二级索引,例如在列name和age上。

HIndexSpecification iSpecUnite = new HIndexSpecification(indexName); 
 iSpecUnite.addIndexColumn(new HColumnDescriptor("info"), "name", ValueType.String); 
 iSpecUnite.addIndexColumn(new HColumnDescriptor("info"), "age", ValueType.String);

相关操作

使用命令创建索引表。

您还可以通过TableIndexer工具在已有用户表中创建索引。

说明:

<table_name>用户表必须存在。

hbase org.apache.hadoop.hbase.index.mapreduce.TableIndexer -Dindexspecs.to.add=<table_name> -Dtable.columns.index='IDX1=>cf1:[q1->datatype&length];cf2:[q1->datatype],[q2->datatype],[q3->datatype]#IDX2=>cf1:[q5->datatype&length]

“#”用于区分不同的索引,“;”用于区分不同的列族,“,”用于区分不同的列。

tablename.to.index:创建索引的用户表表名。

indexspecs.to.add:创建索引对应的用户表列。

其中命令中各参数的含义如下:

  • IDX1:索引名称
  • cf1:列族名称。
  • q1:列名。
  • datatype:数据类型。数据类型仅支持Integer、String、Double、Float、Long、Short、Byte、Char类型。
作者 east
Hbase 3月 1,2021

HBase支持全文索引

HBase支持全文索引

功能简介

通过org.apache.luna.client.LunaAdmin对象的createTable方法来创建表和索引,并指定表名、列族名、索引创建请求,mapping文件所在目录路径。也可通过addCollection往已有表中添加索引。查询时通过org.apache.luna.client.LunaAdmin对象的getTable方法来获取Table对象进行scan操作。

说明:

表的列名以及列族名不能包含特殊字符,可以由字母、数字以及下划线组成。

带有全文索引的HBase表限制:

1、不支持多实例;

2、不支持容灾备份恢复;

3、不支持删除行/列族操作;

4、Solr侧查询不支持强一致性;

代码样例片段

以下代码片段在com.huawei.bigdata.hbase.examples包的“LunaSample”类的testFullTextScan方法中。

  public static void testFullTextScan() throws Exception {
    /**
     * Create create request of Solr. Specify collection name, confset name,
     * number of shards, and number of replication factor.
     */
    Create create = new Create();
    create.setCollectionName(COLLECTION_NAME);
    create.setConfigName(CONFSET_NAME);
    create.setNumShards(NUM_OF_SHARDS);
    create.setReplicationFactor(NUM_OF_REPLICATIONFACTOR);
    /**
     * Create mapping. Specify index fields(mandatory) and non-index
     * fields(optional).
     */
    List<ColumnField> indexedFields = new ArrayList<ColumnField>();
    indexedFields.add(new ColumnField("name", "f:n"));
    indexedFields.add(new ColumnField("cat", "f:t"));
    indexedFields.add(new ColumnField("features", "f:d"));
    Mapping mapping = new Mapping(indexedFields);
    /**
     * Create table descriptor of HBase.
     */
    HTableDescriptor desc = new HTableDescriptor(HBASE_TABLE);
    desc.addFamily(new HColumnDescriptor(TABLE_FAMILY));
    /**
     * Create table and collection at the same time.
     */
    LunaAdmin admin = null;
    try {
      admin = new AdminSingleton().getAdmin();
      admin.deleteTable(HBASE_TABLE);
      if (!admin.tableExists(HBASE_TABLE)) {
        admin.createTable(desc, Bytes.toByteArrays(new String[] { "0", "1", "2", "3", "4" }),
            create, mapping);
      }
      /**
       * Put data.
       */
      Table table = admin.getTable(HBASE_TABLE);
      int i = 0;
      while (i < 5) {
        byte[] row = Bytes.toBytes(i + "+sohrowkey");
        Put put = new Put(row);
        put.addColumn(TABLE_FAMILY, Bytes.toBytes("n"), Bytes.toBytes("ZhangSan" + i));
        put.addColumn(TABLE_FAMILY, Bytes.toBytes("t"), Bytes.toBytes("CO" + i));
        put.addColumn(TABLE_FAMILY, Bytes.toBytes("d"), Bytes.toBytes("Male, Leader of M.O" + i));
        table.put(put);
        i++;
      }

      /**
       * Scan table.
       */
      Scan scan = new Scan();
      SolrQuery query = new SolrQuery();
      query.setQuery("name:ZhangSan1 AND cat:CO1");
      Filter filter = new FullTextFilter(query, COLLECTION_NAME);
      scan.setFilter(filter);
      ResultScanner scanner = table.getScanner(scan);
      LOG.info("-----------------records----------------");
      for (Result r = scanner.next(); r != null; r = scanner.next()) {
        for (Cell cell : r.rawCells()) {
          LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":"
              + Bytes.toString(CellUtil.cloneFamily(cell)) + ","
              + Bytes.toString(CellUtil.cloneQualifier(cell)) + ","
              + Bytes.toString(CellUtil.cloneValue(cell)));
        }
      }
      LOG.info("-------------------end------------------");
      /**
       * Delete collection.
       */
      admin.deleteCollection(HBASE_TABLE, COLLECTION_NAME);

      /**
       * Delete table.
       */
      admin.deleteTable(HBASE_TABLE);
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      /**
       * When everything done, close LunaAdmin.
       */
      admin.close();
    }
  }

解释

(1)创建索引请求

(2)创建表描述符

(3)获取LunaAdmin对象,LunaAdmin提供了建表和索引、添加索引、检查表是否存在、检查索引是否存在、删除索引和删除表等功能。

(4)调用LunaAdmin的建表方法。

(5)往表中插入数据。

(6)构造全文索引条件,设置FullTextFilter,进行查询。

(7)删除索引。

(8)删除表。

(9)关闭admin资源。

注意事项

  • 创建表和索引都必须不存在。
  • 必须使用LunaAdmin获取Table对象进行scan操作。
作者 east
大数据开发 2月 21,2021

国外大公司Pig常见面试题

1)区分Hadoop MapReduce和Pig

Hadoop MapReduce是 编译语言 , 抽象级别低 , 代码需要更多行代码 ,
代码效率代码效率很高。

Pig是脚本语言,抽象级别高。pig与Hadoop MapReduce相比,代码行更少。
代码效率相对较低。

2)比较Apache Pig和SQL。

Apache Pig与SQL的区别在于ETL的用法,惰性评估,在管道中任何给定时间点存储数据,支持管道拆分和显式声明执行计划。 SQL围绕查询产生单个结果。 SQL没有用于拆分数据处理流并将不同的运算符应用于每个子流的内置机制。
Apache Pig允许将用户代码包括在管道的任何位置,而如果要在SQL中使用的数据首先需要导入到数据库中,然后开始清理和转换过程。

3)说明在Apache Pig中进行编程时对MapReduce的需求。

Apache Pig程序使用称为Pig Latin的查询语言编写,与SQL查询语言相似。为了执行查询,需要执行引擎。 Pig引擎将查询转换为MapReduce作业,因此MapReduce充当执行引擎,并且是运行程序所必需的。

4)说明BloomMapFile。

BloomMapFile是一个类,它扩展了MapFile类。它以HBase表格式使用,以使用动态Bloom筛选器为密钥提供快速的成员资格测试。

5) bag in Pig 是什么意思?

元组的集合在Apache Pig中称为包

6)Pig脚本中的foreach操作的用途是什么?

Apache Pig中的FOREACH操作用于将转换应用于数据包中的每个元素,以便执行相应的操作以生成新的数据项。

语法-FOREACH data_bagname GENERATE exp1,exp2

7)解释Pig中不同的复杂数据类型。

Apache Pig支持3种复杂的数据类型-

映射-这些是使用#连接在一起的键值存储。
元组-类似于表格中的行,其中不同的项目之间用逗号分隔。元组可以具有多个属性。
袋-无序的元组集合。包允许多个重复的元组。
8)Flatten在Pig中做什么?

有时,在元组或包中有数据,如果我们想从该数据中删除嵌套级别,则可以使用Pig中的Flatten修饰符。展平未套袋和元组。对于元组,Flatten运算符将用元组的字段代替元组,而取消嵌套的包有点复杂,因为它需要创建新的元组。

通过研究有趣的Pig实时示例来掌握Hadoop

9)用户如何与Apache Pig中的shell交互?

使用Grunt即Apache Pig的交互式外壳,用户可以与HDFS或本地文件系统进行交互。要启动Grunt,用户应该不使用任何命令来调用Apache Pig –

执行“ pig –x local”命令将出现提示-


grunt >

通过在PIG_CLASSPATH中设置配置,可以在本地模式或集群模式下运行PigLatin脚本。

要退出grunt shell,请按CTRL + D或直接键入exit。

10)Apache Pig脚本使用哪些调试工具?

描述和解释是Apache Pig中重要的调试实用程序。

当尝试调试错误或优化PigLatin脚本时,explain实用程序对Hadoop开发人员很有帮助。 describe可以应用于脚本中的特定别名,也可以应用于grunt交互式shell中的整个脚本。说明实用程序会生成几个文本格式的图形,可以将其打印到文件中。
describe调试实用程序在编写Pig脚本时对开发人员很有帮助,因为它显示了脚本中的关系模式。对于尝试学习Apache Pig的初学者,可以使用describe实用程序来了解每个操作员如何更改数据。


11)在Apache Pig中用于说明什么?

在大型数据集上执行猪脚本通常需要很长时间。为解决此问题,开发人员在示例数据上运行了Pig脚本,但是选择的示例数据有可能无法正确执行您的Pig脚本。例如,如果脚本具有联接运算符,则示例数据中至少应有一些记录具有相同的键,否则联接操作将不返回任何结果。为了解决这类问题,使用了说明。说明从数据中获取样本,并且每当遇到诸如删除数据的联接或过滤器之类的运算符时,它都会通过对记录进行修改以使它们满足

他条件。说明仅显示每个阶段的输出,但不运行任何MapReduce任务。

12)解释Pig脚本的执行计划

或者

区分Apache Pig脚本的逻辑和物理计划

在执行pig脚本期间创建逻辑和物理计划。 Pig脚本基于解释器检查。逻辑计划是在语义检查和基本解析之后生成的,在逻辑计划的创建过程中不会进行任何数据处理。对于Pig脚本中的每一行,都会对运算符执行语法检查,并创建一个逻辑计划。每当脚本中遇到错误时,都会引发异常并结束程序执行,否则脚本中的每个语句都有自己的逻辑计划。

逻辑计划在脚本中包含运算符的集合,但不包含运算符之间的边缘。

生成逻辑计划后,脚本执行将移至物理计划,其中有关于Apache Pig将用来执行Pig脚本的物理运算符的描述。物理计划或多或少类似于一系列MapReduce作业,但是该计划没有任何关于如何在MapReduce中执行的参考。在创建物理计划时,将协同逻辑运算符转换为3个物理运算符,即–本地重排,全局重排和打包。加载和存储功能通常在物理计划中得到解决。

13)您对Apache Pig的区分大小写了解多少?

很难说Apache Pig是区分大小写还是不区分大小写。例如,pig中用户定义的函数,关系和字段名称区分大小写,即函数COUNT与函数计数不相同,或者X = load’foo’与x = load’foo’不相同。另一方面,Apache Pig中的关键字不区分大小写,即LOAD与load相同。

14)您能想到哪些Apache Pig用例?

Apache Pig大数据工具特别用于迭代处理,原始数据研究和传统ETL数据管道。由于Pig可以在模式未知,不一致或不完整的情况下运行,因此它被研究人员广泛使用,他们希望在清理数据并将其加载到数据仓库之前利用这些数据。

例如,要建立行为预测模型,网站可以使用它来跟踪访客对各种类型的广告,图像,文章等的响应。

15)区分PigLatin和HiveQL

必须在HiveQL中指定架构,而在PigLatin中是可选的。
HiveQL是一种声明性语言,而PigLatin是程序性语言。
HiveQL遵循平坦的关系数据模型,而PigLatin具有嵌套的关系数据模型。
阅读有关Pig vs.Hive的更多信息

16)PigLatin是一种强类型语言吗?如果是,那么您是如何得出结论的?

在强类型语言中,用户必须预先声明所有变量的类型。在Apache Pig中,当您描述数据的模式时,它期望数据以您提到的相同格式出现。但是,当模式未知时,脚本将在运行时适应实际的数据类型。因此,可以说PigLatin在大多数情况下是强类型的,但在极少数情况下是轻度键入的,即它继续处理不符合其期望的数据。

17)您对Pig的内包和外包有什么了解?

包内部的关系称为内包,而外包只是Pig中的关系

18)区分GROUP和COGROUP运算符。

GROUP和COGROUP运算符是相同的,并且可以使用一个或多个关系。 GROUP运算符通常用于按单个关系对数据进行分组以提高可读性,而COGROUP可以用于按2个或更多关系对数据进行分组。 COGROUP更像是GROUP和JOIN的组合,即它基于列对表进行分组,然后将它们联接到分组的列上。一次最多可以组合127个关系。

19)解释一下Apache Pig中COUNT_STAR和COUNT函数之间的区别吗?

在计算袋中元素数时,COUNT函数不包括NULL值,而COUNT_STAR(0函数在计数时包括NULL值。

20)Apache Pig提供了哪些各种诊断运算符?

转储运算符-用于在屏幕上显示Pig Latin语句的输出,以便开发人员可以调试代码。
描述操作员-在Apache Pig面试问题10中解释
解释操作员-在apache Pig面试中解释问题-10号
说明操作员-在apache pig面试问题-11中解释
21)您将如何合并两个或多个关系的内容,并将单个关系分为两个或多个关系?

这可以使用UNION和SPLIT运算符来完成。

22)我有一个关系R。如何从关系R中获得前10个元组?

20)Apache Pig提供了哪些各种诊断运算符?

转储运算符-用于在屏幕上显示Pig Latin语句的输出,以便开发人员可以调试代码。
描述操作员-在Apache Pig面试问题10中解释
解释操作员-在apache Pig面试中解释问题-10号
说明操作员-在apache pig面试问题-11中解释
21)您将如何合并两个或多个关系的内容,并将单个关系分为两个或多个关系?

这可以使用UNION和SPLIT运算符来完成。

22)我有一个关系R。如何从关系R中获得前10个元组?

TOP()函数从一包元组或一个关系中返回前N个元组。 N与要比较其值的列以及关系R一起作为参数传递给函数top()。

23)Pig和Hive之间有什么共同点?

HiveQL和PigLatin都将命令转换为MapReduce作业。
它们不能用于OLAP事务,因为很难执行低延迟查询。
24)Apache Pig支持哪些Java UDF类型?

代数,评估和过滤器功能是Pig中支持的各种UDF类型。

25)您在HDFS目录中有一个名为employee.txt的文件,其中包含100条记录。您只想查看employee.txt文件中的前10条记录。您将如何做?

第一步是将文件Employee.txt加载到关系名称为Employee的文件中。

员工数据的前10条记录可以使用limit运算符获取-

结果=限制员工10。

26)解释Apache Pig中的标量数据类型。

integer,float,double,long,bytearray和char数组是Apache Pig中可用的标量数据类型。

27)用户如何与Apache Pig中的HDFS交互?

使用grunt外壳。

28)在Apache Pig中使用过滤器有什么用?

就像SQL中的where子句一样,Apache Pig具有用于根据给定条件或谓词提取记录的过滤器。如果谓词或条件变为true,则记录将通过管道传递。谓词包含各种运算符,例如==,<=,!=,> =。

例子 –

X =将“输入”加载为(名称,地址)

Y =通过符号匹配“ Mr. *”的X;

29)什么是pig的UDF?

如果内置运算符不提供某些功能,则程序员可以通过使用其他编程语言(例如Java,Python,Ruby等)编写用户定义的函数来实现这些功能。然后可以将这些用户定义的函数(UDF)嵌入到Pig Latin中脚本。

30)您可以在Apache Pig脚本中加入多个字段吗?

是的,可以在PIG脚本中联接多个字段,因为联接操作从一个输入获取记录,然后将它们与另一输入联接。这可以通过为每个输入指定键来实现,当键相等时,两行将连接在一起。

31)Pig是否支持多行命令?

是的

作者 east

上一 1 … 29 30 31 … 42 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 解决gitlab配置Webhooks,提示 Invalid url given的问题
  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?

文章归档

  • 2025年12月
  • 2025年10月
  • 2025年8月
  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (497)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (79)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (39)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (8)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.