Flink 配置表与流JOIN程序

场景说明

场景说明

假定用户有某个网站周末网民网购停留时间的日志文本,另有一张网民个人信息的csv格式表,基于某些业务要求,要求开发Flink的应用程序实现如下功能:

  • 实时统计总计网购时间超过2个小时的女性网民信息,包含对应的个人详细信息; 其中日志文本和csv格式表中的姓名字段可作为关键字,通过该值将两张表联合起来。
  • 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 data.txt:周末两天网民停留日志



LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 NotExist,female,200

  • configtable.csv:网民个人信息,第一列为姓名,第二列为年龄,第三列为公司,第四列为工作地点,第五列为学历,第六列为工作年数,第七列为手机号码,第八列为户籍所在地,第九列为毕业学校,csv标准格式,即分隔符为“,”


username,age,company,workLocation,educational,workYear,phone,nativeLocation,school LiuYang,25,Microsoft,hangzhou,college,5,13512345678,hangzhou zhejiang,wuhan university YuanJing,26,Oracle,shanghai,master,6,13512345679,shijiazhuang hebei,zhejiang university GuoYijun,27,Alibaba,beijing,college,7,13512345680,suzhou jiangsu,qinghua university CaiXuyu,28,Coca Cola,shenzheng,master,8,13512345681,hefei anhui,beijing university Liyuan,29,Tencent,chengdou,doctor,9,13512345682,nanchang jiangxi,nanjing university FangBo,30,Huawei,qingdao,doctor,10,13512345683,xiamen fujian,fudan university

开发思路

统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息,包含对应的个人详细信息。

主要分为七个部分:

  • 修改“import.properties”和“read.properties”配置,配置csv文件字段、Redis读取字段以及Redis的节点信息配置
  • 导入“configtable.csv”配置表进入Redis中存储起来。
  • 读取文本数据,生成相应DataStream,解析数据生成OriginalRecord信息。
  • 调用异步IO的函数,以OriginalRecord用户姓名字段为关键字在Redis中查询对应的个人信息,并转化为UserRecord。
  • 筛选出女性网民上网时间数据信息。
  • 按照姓名进行keyby操作,并汇总在一个时间窗口内每个女性上网时间。
  • 筛选连续上网时间超过阈值的用户,并获取结果。
package com.huawei.bigdata.flink.examples;

import org.apache.flink.api.java.utils.ParameterTool;

import org.supercsv.cellprocessor.constraint.NotNull;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.io.CsvBeanReader;
import org.supercsv.io.ICsvBeanReader;
import org.supercsv.prefs.CsvPreference;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

import java.io.File;
import java.io.FileReader;
import java.util.*;

/**
 * Read data from csv file and import to redis.
 */
public class RedisDataImport {
    public static void main(String[] args) throws Exception {
        // print comment for command to use run flink
        System.out.println("use command as: \n" +
                "java -cp /opt/FI-Client/Flink/flink/lib/*:/opt/FlinkConfigtableJavaExample.jar" +
                " com.huawei.bigdata.flink.examples.RedisDataImport --configPath <config filePath>" +
                "******************************************************************************************\n" +
                "<config filePath> is for configure file to load\n" +
                "you may write following content into config filePath: \n" +
                "CsvPath=config/configtable.csv\n" +
                "CsvHeaderExist=true\n" +
                "ColumnNames=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school\n" +
                "Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400\n" +
                "******************************************************************************************");

        // read all configures
        final String configureFilePath = ParameterTool.fromArgs(args).get("configPath", "config/import.properties");
        final String csvFilePath = ParameterTool.fromPropertiesFile(configureFilePath).get("CsvPath", "config/configtable.csv");
        final boolean isHasHeaders = ParameterTool.fromPropertiesFile(configureFilePath).getBoolean("CsvHeaderExist", true);
        final String csvScheme = ParameterTool.fromPropertiesFile(configureFilePath).get("ColumnNames");
        final String redisIPPort = ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_IP_Port");

        // init redis client
        Set<HostAndPort> hosts = new HashSet<HostAndPort>();
        for (String hostAndPort : redisIPPort.split(",")) {
            hosts.add(new HostAndPort(hostAndPort.split(":")[0], Integer.parseInt(hostAndPort.split(":")[1])));
        }
        final JedisCluster client = new JedisCluster(hosts, 15000);

        // get all files under csv file path
        ArrayList<File> files = getListFiles(csvFilePath);
        System.out.println("Read file or directory under  " + csvFilePath
                + ", total file num: " + files.size() + ", columns: " + csvScheme);

        // run read csv file and analyze it
        for (int index = 0; index < files.size(); index++) {
            readWithCsvBeanReader(files.get(index).getAbsolutePath(), csvScheme, isHasHeaders, client);
        }
        client.close();
        System.out.println("Data import finish!!!");
    }

    public static ArrayList<File> getListFiles(Object obj) {
        File directory = null;
        if (obj instanceof File) {
            directory = (File) obj;
        } else {
            directory = new File(obj.toString());
        }
        ArrayList<File> files = new ArrayList<File>();
        if (directory.isFile()) {
            files.add(directory);
            return files;
        } else if (directory.isDirectory()) {
            File[] fileArr = directory.listFiles();
            for (int i = 0; i < fileArr.length; i++) {
                File fileOne = fileArr[i];
                files.addAll(getListFiles(fileOne));
            }
        }
        return files;
    }

    /**
     * Sets up the processors used for read csv. There are 9 CSV columns. Empty
     * columns are read as null (hence the NotNull() for mandatory columns).
     *
     * @return the cell processors
     */
    private static CellProcessor[] getProcessors() {
        final CellProcessor[] processors = new CellProcessor[] {
                new NotNull(), // username
                new NotNull(), // age
                new NotNull(), // company
                new NotNull(), // workLocation
                new NotNull(), // educational
                new NotNull(), // workYear
                new NotNull(), // phone
                new NotNull(), // nativeLocation
                new NotNull(), // school
        };

        return processors;
    }

    private static void readWithCsvBeanReader(String path, String csvScheme, boolean isSkipHeader, JedisCluster client) throws Exception {
        ICsvBeanReader beanReader = null;
        try {
            beanReader = new CsvBeanReader(new FileReader(path), CsvPreference.STANDARD_PREFERENCE);

            // the header elements are used to map the values to the bean (names must match)
            final String[] header = isSkipHeader ? beanReader.getHeader(true) : csvScheme.split(",");
            final CellProcessor[] processors = getProcessors();

            UserInfo userinfo;
            while( (userinfo = beanReader.read(UserInfo.class, header, processors)) != null ) {
                System.out.println(String.format("lineNo=%s, rowNo=%s, userinfo=%s", beanReader.getLineNumber(),
                        beanReader.getRowNumber(), userinfo));

                // set redis key and value
                client.hmset(userinfo.getKeyValue(), userinfo.getMapInfo());
            }
        }
        finally {
            if( beanReader != null ) {
                beanReader.close();
            }
        }
    }



    // define the UserInfo structure
    public static class UserInfo {
        private String username;
        private String age;
        private String company;
        private String workLocation;
        private String educational;
        private String workYear;
        private String phone;
        private String nativeLocation;
        private String school;


        public UserInfo() {

        }

        public UserInfo(String nm, String a, String c, String w, String e, String wy, String p, String nl, String sc) {
            username = nm;
            age = a;
            company = c;
            workLocation = w;
            educational = e;
            workYear = wy;
            phone = p;
            nativeLocation = nl;
            school = sc;
        }

        public String toString() {
            return "UserInfo-----[username: " + username + "  age: " + age + "  company: " + company
                    + "  workLocation: " + workLocation + "  educational: " + educational
                    + "  workYear: " + workYear + "  phone: " + phone + "  nativeLocation: " + nativeLocation + "  school: " + school + "]";
        }

        // get key
        public String getKeyValue() {
            return username;
        }

        public Map<String, String> getMapInfo() {
            Map<String, String> info = new HashMap<String, String>();
            info.put("username", username);
            info.put("age", age);
            info.put("company", company);
            info.put("workLocation", workLocation);
            info.put("educational", educational);
            info.put("workYear", workYear);
            info.put("phone", phone);
            info.put("nativeLocation", nativeLocation);
            info.put("school", school);
            return info;
        }

        /**
         * @return the username
         */
        public String getUsername() {
            return username;
        }

        /**
         * @param username
         *            the username to set
         */
        public void setUsername(String username) {
            this.username = username;
        }

        /**
         * @return the age
         */
        public String getAge() {
            return age;
        }

        /**
         * @param age
         *            the age to set
         */
        public void setAge(String age) {
            this.age = age;
        }

        /**
         * @return the company
         */
        public String getCompany() {
            return company;
        }

        /**
         * @param company
         *            the company to set
         */
        public void setCompany(String company) {
            this.company = company;
        }

        /**
         * @return the workLocation
         */
        public String getWorkLocation() {
            return workLocation;
        }

        /**
         * @param workLocation
         *            the workLocation to set
         */
        public void setWorkLocation(String workLocation) {
            this.workLocation = workLocation;
        }

        /**
         * @return the educational
         */
        public String getEducational() {
            return educational;
        }

        /**
         * @param educational
         *            the educational to set
         */
        public void setEducational(String educational) {
            this.educational = educational;
        }

        /**
         * @return the workYear
         */
        public String getWorkYear() {
            return workYear;
        }

        /**
         * @param workYear
         *            the workYear to set
         */
        public void setWorkYear(String workYear) {
            this.workYear = workYear;
        }

        /**
         * @return the phone
         */
        public String getPhone() {
            return phone;
        }

        /**
         * @param phone
         *            the phone to set
         */
        public void setPhone(String phone) {
            this.phone = phone;
        }

        /**
         * @return the nativeLocation
         */
        public String getNativeLocation() {
            return nativeLocation;
        }

        /**
         * @param nativeLocation
         *            the nativeLocation to set
         */
        public void setNativeLocation(String nativeLocation) {
            this.nativeLocation = nativeLocation;
        }

        /**
         * @return the school
         */
        public String getSchool() {
            return school;
        }

        /**
         * @param school
         *            the school to set
         */
        public void setSchool(String school) {
            this.school = school;
        }
    }
}
package com.huawei.bigdata.flink.examples;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.com.google.common.cache.LoadingCache;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

import java.util.*;
import java.util.concurrent.TimeUnit;
/**
 * Read stream data and join from configure table from redis.
 */
public class FlinkConfigtableJavaExample {

    public static void main(String[] args) throws Exception {
        // print comment for command to use run flink
        System.out.println("use command as: \n" +
                "./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkConfigtableJavaExample" +
                " -m yarn-cluster -yt /opt/config -yn 3 -yjm 1024 -ytm 1024 " +
                "/opt/FlinkConfigtableJavaExample.jar --dataPath config/data.txt" +
                "******************************************************************************************\n" +
                "Especially you may write following content into config filePath, as in config/read.properties: \n" +
                "ReadFields=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school\n" +
                "Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400\n" +
                "******************************************************************************************");

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // get configure and read data and transform to OriginalRecord
        final String dataPath = ParameterTool.fromArgs(args).get("dataPath", "config/data.txt");
        DataStream<OriginalRecord> originalStream = env.readTextFile(
                dataPath
        ).map(new MapFunction<String, OriginalRecord>() {
            @Override
            public OriginalRecord map(String value) throws Exception {
                return getRecord(value);
            }
        }).assignTimestampsAndWatermarks(
                new Record2TimestampExtractor()
        ).disableChaining();

        // read from redis and join to the whole user information
        AsyncFunction<OriginalRecord, UserRecord> function = new AsyncRedisRequest();
        // timeout set to 2 minutes, max parallel request num set to 5, you can modify this to optimize
        DataStream<UserRecord> result = AsyncDataStream.unorderedWait(
                originalStream,
                function,
                2,
                TimeUnit.MINUTES,
                5);

        // data transform
        result.filter(new FilterFunction<UserRecord>() {
            @Override
            public boolean filter(UserRecord value) throws Exception {
                return value.sexy.equals("female");
            }
        }).keyBy(
                new UserRecordSelector()
        ).window(
                TumblingEventTimeWindows.of(Time.seconds(30))
        ).reduce(new ReduceFunction<UserRecord>() {
            @Override
            public UserRecord reduce(UserRecord value1, UserRecord value2)
                    throws Exception {
                value1.shoppingTime += value2.shoppingTime;
                return value1;
            }
        }).filter(new FilterFunction<UserRecord>() {
            @Override
            public boolean filter(UserRecord value) throws Exception {
                return value.shoppingTime > 120;
            }
        }).print();

        // execute program
        env.execute("FlinkConfigtable java");
    }

    private static class UserRecordSelector implements KeySelector<UserRecord, String> {
        @Override
        public String getKey(UserRecord value) throws Exception {
            return value.name;
        }
    }

    // class to set watermark and timestamp
    private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<OriginalRecord> {

        // add tag in the data of datastream elements
        @Override
        public long extractTimestamp(OriginalRecord element, long previousTimestamp) {
            return System.currentTimeMillis();
        }

        // give the watermark to trigger the window to execute, and use the value to check if the window elements is ready
        @Override
        public Watermark checkAndGetNextWatermark(OriginalRecord element, long extractedTimestamp) {
            return new Watermark(extractedTimestamp - 1);
        }
    }

    private static OriginalRecord getRecord(String line) {
        String[] elems = line.split(",");
        assert elems.length == 3;
        return new OriginalRecord(elems[0], elems[1], Integer.parseInt(elems[2]));
    }

    public static class OriginalRecord {
        private String name;
        private String sexy;
        private int shoppingTime;

        public OriginalRecord(String n, String s, int t) {
            name = n;
            sexy = s;
            shoppingTime = t;
        }
    }

    public static class UserRecord {
        private String name;
        private int age;
        private String company;
        private String workLocation;
        private String educational;
        private int workYear;
        private String phone;
        private String nativeLocation;
        private String school;
        private String sexy;
        private int shoppingTime;

        public UserRecord(String nm, int a, String c, String w, String e, int wy, String p, String nl, String sc, String sx, int st) {
            name = nm;
            age = a;
            company = c;
            workLocation = w;
            educational = e;
            workYear = wy;
            phone = p;
            nativeLocation = nl;
            school = sc;
            sexy = sx;
            shoppingTime = st;
        }

        public void setInput(String input_nm, String input_sx, int input_st) {
            name = input_nm;
            sexy = input_sx;
            shoppingTime = input_st;
        }

        public String toString() {
            return "UserRecord-----name: " + name + "  age: " + age + "  company: " + company
                    + "  workLocation: " + workLocation + "  educational: " + educational
                    + "  workYear: " + workYear + "  phone: " + phone + "  nativeLocation: " + nativeLocation + "  school: " + school
                    + "  sexy: " + sexy + "  shoppingTime: " + shoppingTime;
        }
    }

    public static class AsyncRedisRequest extends RichAsyncFunction<OriginalRecord, UserRecord>{
        private String fields = "";
        private transient JedisCluster client;
        private LoadingCache<String, UserRecord> cacheRecords;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            // init cache builder
            cacheRecords = CacheBuilder.newBuilder()
                    .maximumSize(10000)
                    .expireAfterAccess(7, TimeUnit.DAYS)
                    .build(new CacheLoader<String, UserRecord>() {
                        public UserRecord load(String key) throws Exception {
                            //load from redis
                            return loadFromRedis(key);
                        }
                    });

            // get configure from config/read.properties, you must put this with commands:
            // ./bin/yarn-session.sh -t config -n 3 -jm 1024 -tm 1024 or
            // ./bin/flink run -m yarn-cluster -yt config -yn 3 -yjm 1024 -ytm 1024 /opt/test.jar
            String configPath = "config/read.properties";
            fields = ParameterTool.fromPropertiesFile(configPath).get("ReadFields");
            final String hostPort = ParameterTool.fromPropertiesFile(configPath).get("Redis_IP_Port");
            // create jedisCluster client
            Set<HostAndPort> hosts = new HashSet<HostAndPort>();
            for (String node : hostPort.split(",")) {
                hosts.add(new HostAndPort(node.split(":")[0], Integer.parseInt(node.split(":")[1])));
            }
            client = new JedisCluster(hosts, 60000);
            System.out.println("JedisCluster init, getClusterNodes: " + client.getClusterNodes().size());
        }

        @Override
        public void close() throws Exception {
            super.close();

            if (client != null) {
                System.out.println("JedisCluster close!!!");
                client.close();
            }
        }

        public UserRecord loadFromRedis(final String key) throws Exception {
            if (client.getClusterNodes().size() <= 0) {
                System.out.println("JedisCluster init failed, getClusterNodes: " + client.getClusterNodes().size());
            }
            if (!client.exists(key)) {
                System.out.println("test-------cannot find data to key:  " + key);
                return new UserRecord(
                        "null",
                        0,
                        "null",
                        "null",
                        "null",
                        0,
                        "null",
                        "null",
                        "null",
                        "null",
                        0);
            } else {
                // get some fields
                List<String> values = client.hmget(key, fields.split(","));
                System.out.println("test-------key: " + key + "  get some fields:  " + values.toString());
                return new UserRecord(
                        values.get(0),
                        Integer.parseInt(values.get(1)),
                        values.get(2),
                        values.get(3),
                        values.get(4),
                        Integer.parseInt(values.get(5)),
                        values.get(6),
                        values.get(7),
                        values.get(8),
                        "null",
                        0);
            }
        }

        public void asyncInvoke(final OriginalRecord input, final AsyncCollector<UserRecord> collector) throws Exception {
            // set key string, if you key is more than one column, build your key string with columns
            String key = input.name;
            UserRecord info = cacheRecords.get(key);
            info.setInput(input.name, input.sexy, input.shoppingTime);
            collector.collect(Collections.singletonList(info));
        }
    }
}

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注