Flink 配置表与流JOIN程序
场景说明
场景说明
假定用户有某个网站周末网民网购停留时间的日志文本,另有一张网民个人信息的csv格式表,基于某些业务要求,要求开发Flink的应用程序实现如下功能:
- 实时统计总计网购时间超过2个小时的女性网民信息,包含对应的个人详细信息; 其中日志文本和csv格式表中的姓名字段可作为关键字,通过该值将两张表联合起来。
- 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 data.txt:周末两天网民停留日志
LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 NotExist,female,200
- configtable.csv:网民个人信息,第一列为姓名,第二列为年龄,第三列为公司,第四列为工作地点,第五列为学历,第六列为工作年数,第七列为手机号码,第八列为户籍所在地,第九列为毕业学校,csv标准格式,即分隔符为“,”
username,age,company,workLocation,educational,workYear,phone,nativeLocation,school LiuYang,25,Microsoft,hangzhou,college,5,13512345678,hangzhou zhejiang,wuhan university YuanJing,26,Oracle,shanghai,master,6,13512345679,shijiazhuang hebei,zhejiang university GuoYijun,27,Alibaba,beijing,college,7,13512345680,suzhou jiangsu,qinghua university CaiXuyu,28,Coca Cola,shenzheng,master,8,13512345681,hefei anhui,beijing university Liyuan,29,Tencent,chengdou,doctor,9,13512345682,nanchang jiangxi,nanjing university FangBo,30,Huawei,qingdao,doctor,10,13512345683,xiamen fujian,fudan university
开发思路
统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息,包含对应的个人详细信息。
主要分为七个部分:
- 修改“import.properties”和“read.properties”配置,配置csv文件字段、Redis读取字段以及Redis的节点信息配置
- 导入“configtable.csv”配置表进入Redis中存储起来。
- 读取文本数据,生成相应DataStream,解析数据生成OriginalRecord信息。
- 调用异步IO的函数,以OriginalRecord用户姓名字段为关键字在Redis中查询对应的个人信息,并转化为UserRecord。
- 筛选出女性网民上网时间数据信息。
- 按照姓名进行keyby操作,并汇总在一个时间窗口内每个女性上网时间。
- 筛选连续上网时间超过阈值的用户,并获取结果。
package com.huawei.bigdata.flink.examples;
import org.apache.flink.api.java.utils.ParameterTool;
import org.supercsv.cellprocessor.constraint.NotNull;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.io.CsvBeanReader;
import org.supercsv.io.ICsvBeanReader;
import org.supercsv.prefs.CsvPreference;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import java.io.File;
import java.io.FileReader;
import java.util.*;
/**
* Read data from csv file and import to redis.
*/
public class RedisDataImport {
public static void main(String[] args) throws Exception {
// print comment for command to use run flink
System.out.println("use command as: \n" +
"java -cp /opt/FI-Client/Flink/flink/lib/*:/opt/FlinkConfigtableJavaExample.jar" +
" com.huawei.bigdata.flink.examples.RedisDataImport --configPath <config filePath>" +
"******************************************************************************************\n" +
"<config filePath> is for configure file to load\n" +
"you may write following content into config filePath: \n" +
"CsvPath=config/configtable.csv\n" +
"CsvHeaderExist=true\n" +
"ColumnNames=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school\n" +
"Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400\n" +
"******************************************************************************************");
// read all configures
final String configureFilePath = ParameterTool.fromArgs(args).get("configPath", "config/import.properties");
final String csvFilePath = ParameterTool.fromPropertiesFile(configureFilePath).get("CsvPath", "config/configtable.csv");
final boolean isHasHeaders = ParameterTool.fromPropertiesFile(configureFilePath).getBoolean("CsvHeaderExist", true);
final String csvScheme = ParameterTool.fromPropertiesFile(configureFilePath).get("ColumnNames");
final String redisIPPort = ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_IP_Port");
// init redis client
Set<HostAndPort> hosts = new HashSet<HostAndPort>();
for (String hostAndPort : redisIPPort.split(",")) {
hosts.add(new HostAndPort(hostAndPort.split(":")[0], Integer.parseInt(hostAndPort.split(":")[1])));
}
final JedisCluster client = new JedisCluster(hosts, 15000);
// get all files under csv file path
ArrayList<File> files = getListFiles(csvFilePath);
System.out.println("Read file or directory under " + csvFilePath
+ ", total file num: " + files.size() + ", columns: " + csvScheme);
// run read csv file and analyze it
for (int index = 0; index < files.size(); index++) {
readWithCsvBeanReader(files.get(index).getAbsolutePath(), csvScheme, isHasHeaders, client);
}
client.close();
System.out.println("Data import finish!!!");
}
public static ArrayList<File> getListFiles(Object obj) {
File directory = null;
if (obj instanceof File) {
directory = (File) obj;
} else {
directory = new File(obj.toString());
}
ArrayList<File> files = new ArrayList<File>();
if (directory.isFile()) {
files.add(directory);
return files;
} else if (directory.isDirectory()) {
File[] fileArr = directory.listFiles();
for (int i = 0; i < fileArr.length; i++) {
File fileOne = fileArr[i];
files.addAll(getListFiles(fileOne));
}
}
return files;
}
/**
* Sets up the processors used for read csv. There are 9 CSV columns. Empty
* columns are read as null (hence the NotNull() for mandatory columns).
*
* @return the cell processors
*/
private static CellProcessor[] getProcessors() {
final CellProcessor[] processors = new CellProcessor[] {
new NotNull(), // username
new NotNull(), // age
new NotNull(), // company
new NotNull(), // workLocation
new NotNull(), // educational
new NotNull(), // workYear
new NotNull(), // phone
new NotNull(), // nativeLocation
new NotNull(), // school
};
return processors;
}
private static void readWithCsvBeanReader(String path, String csvScheme, boolean isSkipHeader, JedisCluster client) throws Exception {
ICsvBeanReader beanReader = null;
try {
beanReader = new CsvBeanReader(new FileReader(path), CsvPreference.STANDARD_PREFERENCE);
// the header elements are used to map the values to the bean (names must match)
final String[] header = isSkipHeader ? beanReader.getHeader(true) : csvScheme.split(",");
final CellProcessor[] processors = getProcessors();
UserInfo userinfo;
while( (userinfo = beanReader.read(UserInfo.class, header, processors)) != null ) {
System.out.println(String.format("lineNo=%s, rowNo=%s, userinfo=%s", beanReader.getLineNumber(),
beanReader.getRowNumber(), userinfo));
// set redis key and value
client.hmset(userinfo.getKeyValue(), userinfo.getMapInfo());
}
}
finally {
if( beanReader != null ) {
beanReader.close();
}
}
}
// define the UserInfo structure
public static class UserInfo {
private String username;
private String age;
private String company;
private String workLocation;
private String educational;
private String workYear;
private String phone;
private String nativeLocation;
private String school;
public UserInfo() {
}
public UserInfo(String nm, String a, String c, String w, String e, String wy, String p, String nl, String sc) {
username = nm;
age = a;
company = c;
workLocation = w;
educational = e;
workYear = wy;
phone = p;
nativeLocation = nl;
school = sc;
}
public String toString() {
return "UserInfo-----[username: " + username + " age: " + age + " company: " + company
+ " workLocation: " + workLocation + " educational: " + educational
+ " workYear: " + workYear + " phone: " + phone + " nativeLocation: " + nativeLocation + " school: " + school + "]";
}
// get key
public String getKeyValue() {
return username;
}
public Map<String, String> getMapInfo() {
Map<String, String> info = new HashMap<String, String>();
info.put("username", username);
info.put("age", age);
info.put("company", company);
info.put("workLocation", workLocation);
info.put("educational", educational);
info.put("workYear", workYear);
info.put("phone", phone);
info.put("nativeLocation", nativeLocation);
info.put("school", school);
return info;
}
/**
* @return the username
*/
public String getUsername() {
return username;
}
/**
* @param username
* the username to set
*/
public void setUsername(String username) {
this.username = username;
}
/**
* @return the age
*/
public String getAge() {
return age;
}
/**
* @param age
* the age to set
*/
public void setAge(String age) {
this.age = age;
}
/**
* @return the company
*/
public String getCompany() {
return company;
}
/**
* @param company
* the company to set
*/
public void setCompany(String company) {
this.company = company;
}
/**
* @return the workLocation
*/
public String getWorkLocation() {
return workLocation;
}
/**
* @param workLocation
* the workLocation to set
*/
public void setWorkLocation(String workLocation) {
this.workLocation = workLocation;
}
/**
* @return the educational
*/
public String getEducational() {
return educational;
}
/**
* @param educational
* the educational to set
*/
public void setEducational(String educational) {
this.educational = educational;
}
/**
* @return the workYear
*/
public String getWorkYear() {
return workYear;
}
/**
* @param workYear
* the workYear to set
*/
public void setWorkYear(String workYear) {
this.workYear = workYear;
}
/**
* @return the phone
*/
public String getPhone() {
return phone;
}
/**
* @param phone
* the phone to set
*/
public void setPhone(String phone) {
this.phone = phone;
}
/**
* @return the nativeLocation
*/
public String getNativeLocation() {
return nativeLocation;
}
/**
* @param nativeLocation
* the nativeLocation to set
*/
public void setNativeLocation(String nativeLocation) {
this.nativeLocation = nativeLocation;
}
/**
* @return the school
*/
public String getSchool() {
return school;
}
/**
* @param school
* the school to set
*/
public void setSchool(String school) {
this.school = school;
}
}
}
package com.huawei.bigdata.flink.examples;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.com.google.common.cache.LoadingCache;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* Read stream data and join from configure table from redis.
*/
public class FlinkConfigtableJavaExample {
public static void main(String[] args) throws Exception {
// print comment for command to use run flink
System.out.println("use command as: \n" +
"./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkConfigtableJavaExample" +
" -m yarn-cluster -yt /opt/config -yn 3 -yjm 1024 -ytm 1024 " +
"/opt/FlinkConfigtableJavaExample.jar --dataPath config/data.txt" +
"******************************************************************************************\n" +
"Especially you may write following content into config filePath, as in config/read.properties: \n" +
"ReadFields=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school\n" +
"Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400\n" +
"******************************************************************************************");
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// get configure and read data and transform to OriginalRecord
final String dataPath = ParameterTool.fromArgs(args).get("dataPath", "config/data.txt");
DataStream<OriginalRecord> originalStream = env.readTextFile(
dataPath
).map(new MapFunction<String, OriginalRecord>() {
@Override
public OriginalRecord map(String value) throws Exception {
return getRecord(value);
}
}).assignTimestampsAndWatermarks(
new Record2TimestampExtractor()
).disableChaining();
// read from redis and join to the whole user information
AsyncFunction<OriginalRecord, UserRecord> function = new AsyncRedisRequest();
// timeout set to 2 minutes, max parallel request num set to 5, you can modify this to optimize
DataStream<UserRecord> result = AsyncDataStream.unorderedWait(
originalStream,
function,
2,
TimeUnit.MINUTES,
5);
// data transform
result.filter(new FilterFunction<UserRecord>() {
@Override
public boolean filter(UserRecord value) throws Exception {
return value.sexy.equals("female");
}
}).keyBy(
new UserRecordSelector()
).window(
TumblingEventTimeWindows.of(Time.seconds(30))
).reduce(new ReduceFunction<UserRecord>() {
@Override
public UserRecord reduce(UserRecord value1, UserRecord value2)
throws Exception {
value1.shoppingTime += value2.shoppingTime;
return value1;
}
}).filter(new FilterFunction<UserRecord>() {
@Override
public boolean filter(UserRecord value) throws Exception {
return value.shoppingTime > 120;
}
}).print();
// execute program
env.execute("FlinkConfigtable java");
}
private static class UserRecordSelector implements KeySelector<UserRecord, String> {
@Override
public String getKey(UserRecord value) throws Exception {
return value.name;
}
}
// class to set watermark and timestamp
private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<OriginalRecord> {
// add tag in the data of datastream elements
@Override
public long extractTimestamp(OriginalRecord element, long previousTimestamp) {
return System.currentTimeMillis();
}
// give the watermark to trigger the window to execute, and use the value to check if the window elements is ready
@Override
public Watermark checkAndGetNextWatermark(OriginalRecord element, long extractedTimestamp) {
return new Watermark(extractedTimestamp - 1);
}
}
private static OriginalRecord getRecord(String line) {
String[] elems = line.split(",");
assert elems.length == 3;
return new OriginalRecord(elems[0], elems[1], Integer.parseInt(elems[2]));
}
public static class OriginalRecord {
private String name;
private String sexy;
private int shoppingTime;
public OriginalRecord(String n, String s, int t) {
name = n;
sexy = s;
shoppingTime = t;
}
}
public static class UserRecord {
private String name;
private int age;
private String company;
private String workLocation;
private String educational;
private int workYear;
private String phone;
private String nativeLocation;
private String school;
private String sexy;
private int shoppingTime;
public UserRecord(String nm, int a, String c, String w, String e, int wy, String p, String nl, String sc, String sx, int st) {
name = nm;
age = a;
company = c;
workLocation = w;
educational = e;
workYear = wy;
phone = p;
nativeLocation = nl;
school = sc;
sexy = sx;
shoppingTime = st;
}
public void setInput(String input_nm, String input_sx, int input_st) {
name = input_nm;
sexy = input_sx;
shoppingTime = input_st;
}
public String toString() {
return "UserRecord-----name: " + name + " age: " + age + " company: " + company
+ " workLocation: " + workLocation + " educational: " + educational
+ " workYear: " + workYear + " phone: " + phone + " nativeLocation: " + nativeLocation + " school: " + school
+ " sexy: " + sexy + " shoppingTime: " + shoppingTime;
}
}
public static class AsyncRedisRequest extends RichAsyncFunction<OriginalRecord, UserRecord>{
private String fields = "";
private transient JedisCluster client;
private LoadingCache<String, UserRecord> cacheRecords;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// init cache builder
cacheRecords = CacheBuilder.newBuilder()
.maximumSize(10000)
.expireAfterAccess(7, TimeUnit.DAYS)
.build(new CacheLoader<String, UserRecord>() {
public UserRecord load(String key) throws Exception {
//load from redis
return loadFromRedis(key);
}
});
// get configure from config/read.properties, you must put this with commands:
// ./bin/yarn-session.sh -t config -n 3 -jm 1024 -tm 1024 or
// ./bin/flink run -m yarn-cluster -yt config -yn 3 -yjm 1024 -ytm 1024 /opt/test.jar
String configPath = "config/read.properties";
fields = ParameterTool.fromPropertiesFile(configPath).get("ReadFields");
final String hostPort = ParameterTool.fromPropertiesFile(configPath).get("Redis_IP_Port");
// create jedisCluster client
Set<HostAndPort> hosts = new HashSet<HostAndPort>();
for (String node : hostPort.split(",")) {
hosts.add(new HostAndPort(node.split(":")[0], Integer.parseInt(node.split(":")[1])));
}
client = new JedisCluster(hosts, 60000);
System.out.println("JedisCluster init, getClusterNodes: " + client.getClusterNodes().size());
}
@Override
public void close() throws Exception {
super.close();
if (client != null) {
System.out.println("JedisCluster close!!!");
client.close();
}
}
public UserRecord loadFromRedis(final String key) throws Exception {
if (client.getClusterNodes().size() <= 0) {
System.out.println("JedisCluster init failed, getClusterNodes: " + client.getClusterNodes().size());
}
if (!client.exists(key)) {
System.out.println("test-------cannot find data to key: " + key);
return new UserRecord(
"null",
0,
"null",
"null",
"null",
0,
"null",
"null",
"null",
"null",
0);
} else {
// get some fields
List<String> values = client.hmget(key, fields.split(","));
System.out.println("test-------key: " + key + " get some fields: " + values.toString());
return new UserRecord(
values.get(0),
Integer.parseInt(values.get(1)),
values.get(2),
values.get(3),
values.get(4),
Integer.parseInt(values.get(5)),
values.get(6),
values.get(7),
values.get(8),
"null",
0);
}
}
public void asyncInvoke(final OriginalRecord input, final AsyncCollector<UserRecord> collector) throws Exception {
// set key string, if you key is more than one column, build your key string with columns
String key = input.name;
UserRecord info = cacheRecords.get(key);
info.setInput(input.name, input.sexy, input.shoppingTime);
collector.collect(Collections.singletonList(info));
}
}
}