1.1.1 Case 5: Configuring the JOIN Application Between Tables and Streams
1.1.1.1 Scenarios
Applicable Versions
FusionInsight HD V100R002C70 and FusionInsight HD V100R002C80
Scenario Description
Assume that there is a log text file about the time spent by netizens in shopping online and a CSV-formatted table about netizen information. Develop a Flink application that can achieve the following functions:
l Collect statistics on female netizens who spend more than 2 hours in total for online shopping during the weekend in real time.
The name fields in the log file and the CSV-formatted table can be used as keywords, based on which the two files are joined.
l In the file saving logs collected during the weekend, the first, second, and third columns record the names, gender, and dwell duration (in the unit of minute), respectively. The three columns are separated by commas (,).
− data.txt: logs collected during the weekend
LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 NotExist,female,200
− configtable.csv: The SCS-formatted table listing personal information about netizens. Fields starting from the first column to the ninth column are the name, age, company name, work place, academic degree, years of working, mobile number, the place where one's residence is registered, and school of graduation. These columns are separated by commas (,).
username,age,company,workLocation,educational,workYear,phone,nativeLocation,school LiuYang,25,Microsoft,hangzhou,college,5,13512345678,hangzhou zhejiang,wuhan university YuanJing,26,Oracle,shanghai,master,6,13512345679,shijiazhuang hebei,zhejiang university GuoYijun,27,Alibaba,beijing,college,7,13512345680,suzhou jiangsu,qinghua university CaiXuyu,28,Coca Cola,shenzheng,master,8,13512345681,hefei anhui,beijing university Liyuan,29,Tencent,chengdou,doctor,9,13512345682,nanchang jiangxi,nanjing university FangBo,30,Huawei,qingdao,doctor,10,13512345683,xiamen fujian,fudan university
Data Planning
The stream data of the example project is saved in a TXT file, and the configuration table is in CSV format.
1. Ensure that the clusters, including HDFS, Yarn, secure Redis, and Flink have been installed.
![]()
l In the security cluster mode, common Redis can be installed. During the installation, configure the REDIS_SECURITY_ENABLED parameter to specify whether to install a secure Redis or common Redis. Value true indicates that security Redis is installed and value false indicates that common Redis is installed.
l To install the secure Redis in security mode, run the example code.
l For details about the example code for installing common Redis in security mode, see Application Development Guide > Developing an Application > JOIN Operation between Configuration Tables and Streams in the FusionInsight HD Product Documentation (Common Mode).
2. Create a Redis cluster, add a Redis user, set permissions for the user. Then, download the user.keytab and krb5.conf files. For details about Redis, see Service Operation Guide > Redis in the FusionInsight HD Product Documentation.
3. Modify the import.properties and read.properties files, which are saved in the config directory specified in the example code.
− The following is an example of configurations in the import.properties file:
#path to read csv files, it can be
file or directory
CsvPath=config/configtable.csv
#csv file headers exist in file first line or not
CsvHeaderExist=true
#csv file headers, also the redis field names
#Notice: if CsvHeaderExist false, you must set it, if CsvHeaderExist true, it
read from csv file
ColumnNames=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school
#redis security mode open or not
Redis_Security=true
#redis hostname/ip and port when you need to connect to redis
Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400
#redis user principal
Redis_Principal=test11@HADOOP.COM
#redis keytab file path
Redis_KeytabFile=config/user.keytab
#redis krb5 file path
Redis_Krb5File=config/krb5.conf
− The following is an example of configurations in the read.properties file:
#the redis field names, configure
which fields you need to read, Notice you need English field names
ReadFields=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school
#redis security mode open or not
Redis_Security=true
#redis hostname/ip and port when you need to connect to redis
Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400
#redis user principal
Redis_Principal=test11@HADOOP.COM
#redis keytab file path
Redis_KeytabFile=config/user.keytab
#redis krb5 file path
Redis_Krb5File=config/krb5.conf
4. On the Flink client, create a config directory, and copy the user.keytab, krb5.conf, data.txt, configtable.csv, import.properties, and read.properties files to the config directory, for example, /opt/FI-Client/Flink/flink/config/configtable.csv.
![]()
The data.txt and configtable.csv files are in the data directory specified in the example code.
1.1.1.2 Development Idea
Collect statistics on female netizens who spend more than 2 hours in total for online shopping during the weekend, including the personal information.
The procedure for obtaining the information is as follows:
l Modify the configurations of the import.properties and read.properties files, configure fields in the CSV file, fields read by Redis, and Redis security.
l Import the configtable.csv table to Redis and save it.
l Read text data, create DataStreams, and resolve data to create OriginalRecord information.
l Invoke the function of asynchronous I/O, use the OriginalRecord username field as the keyword to query personal information in Redis, and convert the information into UserRecord.
l Filter out data on the time that female netizens spend online.
l Perform keyby operations by name, and collect the time that female netizens spend online within a time window.
l Filter out data about netizens whose consecutive online duration exceeds the threshold, and obtain the results.
1.1.1.3 Example Code Description
JAVA Example Code
l Function description
Collect statistics on female netizens who spend more than 2 hours in total for online shopping during the weekend from the log file, read personal information from the CSV-formatted table, and then use the netizen names as the keywords for joining the log file and the CSV-formatted table.
l Example code
The following code snippet is an example. The function of the following code is used to import the configtable.csv file to Redis secure clusters. For details, see com.huawei.bigdata.flink.examples.RedisDataImport.
package
com.huawei.bigdata.flink.examples;
import com.huawei.bigdata.security.LoginUtil;
import org.apache.flink.api.java.utils.ParameterTool;
import org.supercsv.cellprocessor.constraint.NotNull;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.io.CsvBeanReader;
import org.supercsv.io.ICsvBeanReader;
import org.supercsv.prefs.CsvPreference;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.*;
/**
* Read data from csv file and import to redis.
*/
public class RedisDataImport {
public static void main(String[] args) throws Exception
{
// print comment for command to use
run flink
System.out.println("use command
as: \n" +
"java -cp
/opt/FI-Client/Flink/flink/lib/*:/opt/FlinkConfigtableJavaExample.jar"
+
" com.huawei.bigdata.flink.examples.RedisDataImport --configPath
<config filePath>" +
"******************************************************************************************\n"
+
"<config filePath> is for configure file to load\n"
+
"you
may write following content into config filePath: \n" +
"CsvPath=config/configtable.csv\n" +
"CsvHeaderExist=true\n" +
"ColumnNames=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school\n"
+
"Redis_Security=true\n" +
"Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400\n"
+
"Redis_Principal=test11@HADOOP.COM\n" +
"Redis_KeytabFile=config/user.keytab\n" +
"Redis_Krb5File=config/krb5.conf\n" +
"******************************************************************************************");
// read all configures
final String configureFilePath =
ParameterTool.fromArgs(args).get("configPath",
"config/import.properties");
final String csvFilePath =
ParameterTool.fromPropertiesFile(configureFilePath).get("CsvPath",
"config/configtable.csv");
final boolean isHasHeaders =
ParameterTool.fromPropertiesFile(configureFilePath).getBoolean("CsvHeaderExist",
true);
final String csvScheme =
ParameterTool.fromPropertiesFile(configureFilePath).get("ColumnNames");
final boolean isSecurity =
ParameterTool.fromPropertiesFile(configureFilePath).getBoolean("Redis_Security",
true);
final String redisIPPort =
ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_IP_Port");
final String principal =
ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_Principal");
final String keytab =
ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_KeytabFile");
final String krb5 =
ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_Krb5File");
// init redis client
initRedis(isSecurity, principal,
keytab, krb5);
Set<HostAndPort> hosts = new
HashSet<HostAndPort>();
for (String hostAndPort :
redisIPPort.split(",")) {
hosts.add(new
HostAndPort(hostAndPort.split(":")[0],
Integer.parseInt(hostAndPort.split(":")[1])));
}
final JedisCluster client = new
JedisCluster(hosts, 15000);
// get all files under csv file
path
ArrayList<File> files =
getListFiles(csvFilePath);
System.out.println("Read file
or directory under " + csvFilePath
+ ", total file num: " + files.size() + ", columns: " +
csvScheme);
// run read csv file and analyze
it
for (int index = 0; index <
files.size(); index++) {
readWithCsvBeanReader(files.get(index).getAbsolutePath(), csvScheme,
isHasHeaders, client);
}
client.close();
System.out.println("Data import
finish!!!");
}
public static void initRedis(boolean isRedisSecurity, String
userPrincipal, String keytabPath, String krb5Path) throws IOException
{
// redis security
System.setProperty("redis.authentication.jaas", isRedisSecurity ?
"true" : "false");
// check and set
if
(System.getProperty("redis.authentication.jaas",
"false").equals("true")) {
LoginUtil.setJaasFile(userPrincipal, keytabPath);
LoginUtil.setKrb5Config(krb5Path);
}
}
public static ArrayList<File> getListFiles(Object obj)
{
File directory = null;
if (obj instanceof File)
{
directory =
(File) obj;
} else {
directory =
new File(obj.toString());
}
ArrayList<File> files = new
ArrayList<File>();
if (directory.isFile())
{
files.add(directory);
return
files;
} else if (directory.isDirectory())
{
File[]
fileArr = directory.listFiles();
for (int i =
0; i < fileArr.length; i++) {
File fileOne = fileArr[i];
files.addAll(getListFiles(fileOne));
}
}
return files;
}
/**
* Sets up the processors used for read csv. There are
9 CSV columns. Empty
* columns are read as null (hence the NotNull() for
mandatory columns).
*
* @return the cell processors
*/
private static CellProcessor[] getProcessors() {
final CellProcessor[] processors =
new CellProcessor[] {
new NotNull(), // username
new NotNull(), // age
new NotNull(), // company
new NotNull(), // workLocation
new NotNull(), // educational
new NotNull(), // workYear
new NotNull(), // phone
new NotNull(), // nativeLocation
new
NotNull(), // school
};
return processors;
}
private static void readWithCsvBeanReader(String path,
String csvScheme, boolean isSkipHeader, JedisCluster client) throws Exception
{
ICsvBeanReader beanReader =
null;
try {
beanReader =
new CsvBeanReader(new FileReader(path),
CsvPreference.STANDARD_PREFERENCE);
// the
header elements are used to map the values to the bean (names must
match)
final
String[] header = isSkipHeader ? beanReader.getHeader(true) :
csvScheme.split(",");
final
CellProcessor[] processors = getProcessors();
UserInfo
userinfo;
while(
(userinfo = beanReader.read(UserInfo.class, header, processors)) != null )
{
System.out.println(String.format("lineNo=%s, rowNo=%s, userinfo=%s",
beanReader.getLineNumber(),
beanReader.getRowNumber(), userinfo));
// set redis key and value
client.hmset(userinfo.getKeyValue(), userinfo.getMapInfo());
}
}
finally {
if(
beanReader != null ) {
beanReader.close();
}
}
}
// define the UserInfo structure
public static class UserInfo {
private String username;
private String age;
private String company;
private String
workLocation;
private String
educational;
private String workYear;
private String phone;
private String nativeLocation;
private String school;
public UserInfo() {
}
public UserInfo(String nm, String a,
String c, String w, String e, String wy, String p, String nl, String sc)
{
username =
nm;
age =
a;
company =
c;
workLocation
= w;
educational
= e;
workYear =
wy;
phone =
p;
nativeLocation = nl;
school =
sc;
}
public String toString()
{
return
"UserInfo-----[username: " + username + " age: " +
age + " company: " + company
+ " workLocation: " + workLocation + " educational:
" + educational
+ " workYear: " + workYear + " phone: " + phone
+ " nativeLocation: " + nativeLocation + " school:
" + school + "]";
}
// get key
public String getKeyValue()
{
return
username;
}
public Map<String, String>
getMapInfo() {
Map<String, String> info = new HashMap<String,
String>();
info.put("username",
username);
info.put("age", age);
info.put("company", company);
info.put("workLocation", workLocation);
info.put("educational", educational);
info.put("workYear", workYear);
info.put("phone", phone);
info.put("nativeLocation", nativeLocation);
info.put("school", school);
return
info;
}
/**
* @return the username
*/
public String getUsername()
{
return
username;
}
/**
* @param username
* the
username to set
*/
public void setUsername(String username)
{
this.username = username;
}
/**
* @return the age
*/
public String getAge() {
return
age;
}
/**
* @param age
*
the age to set
*/
public void setAge(String age)
{
this.age =
age;
}
/**
* @return the
company
*/
public String getCompany()
{
return
company;
}
/**
* @param company
* the company
to set
*/
public void setCompany(String
company) {
this.company
= company;
}
/**
* @return the
workLocation
*/
public String getWorkLocation()
{
return
workLocation;
}
/**
* @param
workLocation
* the
workLocation to set
*/
public void setWorkLocation(String
workLocation) {
this.workLocation = workLocation;
}
/**
* @return the
educational
*/
public String getEducational()
{
return
educational;
}
/**
* @param
educational
* the
educational to set
*/
public void setEducational(String
educational) {
this.educational = educational;
}
/**
* @return the
workYear
*/
public String getWorkYear()
{
return
workYear;
}
/**
* @param workYear
* the workYear
to set
*/
public void setWorkYear(String
workYear) {
this.workYear = workYear;
}
/**
* @return the
phone
*/
public String getPhone()
{
return
phone;
}
/**
* @param phone
* the phone
to set
*/
public void setPhone(String phone)
{
this.phone =
phone;
}
/**
* @return the
nativeLocation
*/
public String getNativeLocation()
{
return
nativeLocation;
}
/**
* @param
nativeLocation
* the
nativeLocation to set
*/
public void setNativeLocation(String
nativeLocation) {
this.nativeLocation = nativeLocation;
}
/**
* @return the
school
*/
public String getSchool()
{
return
school;
}
/**
* @param school
* the school
to set
*/
public void setSchool(String school)
{
this.school
= school;
}
}
}
The following code snippet is an example. The function of the following code is used to read stream data from the data.txt file, use netizen names as keywords to query from Redis secure clusters, join the log file and CSV-formatted table, and print the results. For details, see com.huawei.bigdata.flink.examples.FlinkConfigtableJavaExample.
package
com.huawei.bigdata.flink.examples;
import com.huawei.bigdata.security.LoginUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.shaded.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.com.google.common.cache.CacheLoader;
import
org.apache.flink.shaded.com.google.common.cache.LoadingCache;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.TimeCharacteristic;
import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import
org.apache.flink.streaming.api.functions.async.AsyncFunction;
import
org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* Read stream data and join from configure table from redis.
*/
public class FlinkConfigtableJavaExample {
public static void main(String[] args) throws Exception
{
// print comment for command to use
run flink
System.out.println("use command
as: \n" +
"./bin/flink run --class
com.huawei.bigdata.flink.examples.FlinkConfigtableJavaExample"
+
" -m yarn-cluster -yt /opt/config -yn 3 -yjm 1024 -ytm 1024 "
+
"/opt/FlinkConfigtableJavaExample.jar --dataPath config/data.txt"
+
"******************************************************************************************\n"
+
"Especially you may write following content into config filePath, as in
config/read.properties: \n" +
"ReadFields=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school\n"
+
"Redis_Security=true\n" +
"Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400\n"
+
"Redis_Principal=test11@HADOOP.COM\n" +
"Redis_KeytabFile=config/user.keytab\n" +
"Redis_Krb5File=config/krb5.conf\n" +
"******************************************************************************************");
// set up the execution
environment
final StreamExecutionEnvironment env
= StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// get configure and read data and
transform to OriginalRecord
final String dataPath =
ParameterTool.fromArgs(args).get("dataPath",
"config/data.txt");
DataStream<OriginalRecord>
originalStream = env.readTextFile(
dataPath
).map(new MapFunction<String,
OriginalRecord>() {
@Override
public
OriginalRecord map(String value) throws Exception {
return getRecord(value);
}
}).assignTimestampsAndWatermarks(
new Record2TimestampExtractor()
).disableChaining();
// read from redis and join to the
whole user information
AsyncFunction<OriginalRecord,
UserRecord> function = new AsyncRedisRequest();
// timeout set to 2 minutes, max
parallel request num set to 5, you can modify this to optimize
DataStream<UserRecord> result
= AsyncDataStream.unorderedWait(
originalStream,
function,
2,
TimeUnit.MINUTES,
5);
// data transform
result.filter(new
FilterFunction<UserRecord>() {
@Override
public boolean
filter(UserRecord value) throws Exception {
return value.sexy.equals("female");
}
}).keyBy(
new UserRecordSelector()
).window(
TumblingEventTimeWindows.of(Time.seconds(30))
).reduce(new
ReduceFunction<UserRecord>() {
@Override
public
UserRecord reduce(UserRecord value1, UserRecord value2)
throws Exception {
value1.shoppingTime += value2.shoppingTime;
return value1;
}
}).filter(new
FilterFunction<UserRecord>() {
@Override
public
boolean filter(UserRecord value) throws Exception {
return value.shoppingTime > 120;
}
}).print();
// execute program
env.execute("FlinkConfigtable
java");
}
private static class UserRecordSelector implements
KeySelector<UserRecord, String> {
@Override
public String getKey(UserRecord
value) throws Exception {
return
value.name;
}
}
// class to set watermark and timestamp
private static class Record2TimestampExtractor implements
AssignerWithPunctuatedWatermarks<OriginalRecord> {
// add tag in the data of datastream
elements
@Override
public long
extractTimestamp(OriginalRecord element, long previousTimestamp) {
return
System.currentTimeMillis();
}
// give the watermark to trigger the
window to execute, and use the value to check if the window elements is
ready
@Override
public Watermark
checkAndGetNextWatermark(OriginalRecord element, long extractedTimestamp)
{
return new
Watermark(extractedTimestamp - 1);
}
}
private static OriginalRecord getRecord(String line)
{
String[] elems =
line.split(",");
assert elems.length ==
3;
return new OriginalRecord(elems[0],
elems[1], Integer.parseInt(elems[2]));
}
public static class OriginalRecord {
private String name;
private String sexy;
private int
shoppingTime;
public OriginalRecord(String n,
String s, int t) {
name =
n;
sexy =
s;
shoppingTime
= t;
}
}
public static class UserRecord {
private String name;
private int age;
private String company;
private String
workLocation;
private String
educational;
private int workYear;
private String phone;
private String
nativeLocation;
private String school;
private String sexy;
private int
shoppingTime;
public UserRecord(String nm, int a,
String c, String w, String e, int wy, String p, String nl, String sc, String
sx, int st) {
name =
nm;
age =
a;
company =
c;
workLocation
= w;
educational
= e;
workYear =
wy;
phone =
p;
nativeLocation = nl;
school =
sc;
sexy = sx;
shoppingTime
= st;
}
public void setInput(String
input_nm, String input_sx, int input_st) {
name =
input_nm;
sexy =
input_sx;
shoppingTime
= input_st;
}
public String toString()
{
return
"UserRecord-----name: " + name + " age: " + age +
" company: " + company
+ " workLocation: " + workLocation + " educational:
" + educational
+ " workYear: " + workYear + " phone: " + phone
+ " nativeLocation: " + nativeLocation + " school:
" + school
+ " sexy: " + sexy + " shoppingTime: " +
shoppingTime;
}
}
public static class AsyncRedisRequest extends
RichAsyncFunction<OriginalRecord, UserRecord>{
private String fields =
"";
private transient JedisCluster
client;
private LoadingCache<String,
UserRecord> cacheRecords;
@Override
public void open(Configuration
parameters) throws Exception {
super.open(parameters);
// init
cache builder
cacheRecords
= CacheBuilder.newBuilder()
.maximumSize(10000)
.expireAfterAccess(7, TimeUnit.DAYS)
.build(new CacheLoader<String, UserRecord>() {
public UserRecord load(String key) throws Exception {
//load from redis
return
loadFromRedis(key);
}
});
// get
configure from config/read.properties, you must put this with
commands:
//
./bin/yarn-session.sh -t config -n 3 -jm 1024 -tm 1024 or
//
./bin/flink run -m yarn-cluster -yt config -yn 3 -yjm 1024 -ytm 1024
/opt/test.jar
String
configPath = "config/read.properties";
fields =
ParameterTool.fromPropertiesFile(configPath).get("ReadFields");
final
boolean isSecurity =
ParameterTool.fromPropertiesFile(configPath).getBoolean("Redis_Security",
true);
final String
hostPort =
ParameterTool.fromPropertiesFile(configPath).get("Redis_IP_Port");
final String
principal =
ParameterTool.fromPropertiesFile(configPath).get("Redis_Principal");
final String
keytab =
ParameterTool.fromPropertiesFile(configPath).get("Redis_KeytabFile");
final String
krb5 = ParameterTool.fromPropertiesFile(configPath).get("Redis_Krb5File");
// init
redis security mode
System.setProperty("redis.authentication.jaas", isSecurity ?
"true" : "false");
if
(System.getProperty("redis.authentication.jaas", "false").equals("true"))
{
LoginUtil.setJaasFile(principal, keytab);
LoginUtil.setKrb5Config(krb5);
}
// create
jedisCluster client
Set<HostAndPort> hosts = new HashSet<HostAndPort>();
for (String
node : hostPort.split(",")) {
hosts.add(new HostAndPort(node.split(":")[0],
Integer.parseInt(node.split(":")[1])));
}
client = new
JedisCluster(hosts, 60000);
System.out.println("JedisCluster
init, getClusterNodes: " + client.getClusterNodes().size());
}
@Override
public void close() throws Exception
{
super.close();
if (client
!= null) {
System.out.println("JedisCluster close!!!");
client.close();
}
}
public UserRecord
loadFromRedis(final String key) throws Exception {
if
(client.getClusterNodes().size() <= 0) {
System.out.println("JedisCluster init failed, getClusterNodes: " +
client.getClusterNodes().size());
}
if
(!client.exists(key)) {
System.out.println("test-------cannot find data to key: " +
key);
return new UserRecord(
"null",
0,
"null",
"null",
"null",
0,
"null",
"null",
"null",
"null",
0);
} else
{
// get some fields
List<String>
values = client.hmget(key, fields.split(","));
System.out.println("test-------key: " + key + " get some
fields: " + values.toString());
return new UserRecord(
values.get(0),
Integer.parseInt(values.get(1)),
values.get(2),
values.get(3),
values.get(4),
Integer.parseInt(values.get(5)),
values.get(6),
values.get(7),
values.get(8),
"null",
0);
}
}
public void asyncInvoke(final
OriginalRecord input, final AsyncCollector<UserRecord> collector) throws
Exception {
// set key
string, if you key is more than one column, build your key string with
columns
String key =
input.name;
UserRecord
info = cacheRecords.get(key);
info.setInput(input.name, input.sexy, input.shoppingTime);
collector.collect(Collections.singletonList(info));
}
}
}
Scala Example Code
l Function description
Collect statistics on female netizens who spend more than 2 hours in total for online shopping during the weekend from the log file, read personal information from the CSV-formatted table, and then use the netizen names as the keywords for joining the log file and the CSV-formatted table.
l Example code
The following code snippet is an example. The function of the following code is used to import the configtable.csv file to Redis secure clusters. For details, see com.huawei.bigdata.flink.examples.RedisDataImport.
package
com.huawei.bigdata.flink.examples;
import com.huawei.bigdata.security.LoginUtil;
import org.apache.flink.api.java.utils.ParameterTool;
import org.supercsv.cellprocessor.constraint.NotNull;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.io.CsvBeanReader;
import org.supercsv.io.ICsvBeanReader;
import org.supercsv.prefs.CsvPreference;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.*;
/**
* Read data from csv file and import to redis.
*/
public class RedisDataImport {
public static void main(String[] args) throws Exception
{
// print comment for command to use
run flink
System.out.println("use command
as: \n" +
"java -cp
/opt/FI-Client/Flink/flink/lib/*:/opt/FlinkConfigtableJavaExample.jar"
+
" com.huawei.bigdata.flink.examples.RedisDataImport --configPath
<config filePath>" +
"******************************************************************************************\n"
+
"<config filePath> is for configure file to load\n"
+
"you may write following content into config filePath: \n"
+
"CsvPath=config/configtable.csv\n" +
"CsvHeaderExist=true\n" +
"ColumnNames=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school\n"
+
"Redis_Security=true\n" +
"Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400\n"
+
"Redis_Principal=test11@HADOOP.COM\n" +
"Redis_KeytabFile=config/user.keytab\n" +
"Redis_Krb5File=config/krb5.conf\n" +
"******************************************************************************************");
// read all configures
final String configureFilePath =
ParameterTool.fromArgs(args).get("configPath",
"config/import.properties");
final String csvFilePath =
ParameterTool.fromPropertiesFile(configureFilePath).get("CsvPath",
"config/configtable.csv");
final boolean isHasHeaders =
ParameterTool.fromPropertiesFile(configureFilePath).getBoolean("CsvHeaderExist",
true);
final String csvScheme =
ParameterTool.fromPropertiesFile(configureFilePath).get("ColumnNames");
final boolean isSecurity =
ParameterTool.fromPropertiesFile(configureFilePath).getBoolean("Redis_Security",
true);
final String redisIPPort =
ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_IP_Port");
final String principal =
ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_Principal");
final String keytab =
ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_KeytabFile");
final String krb5 =
ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_Krb5File");
// init redis client
initRedis(isSecurity, principal,
keytab, krb5);
Set<HostAndPort> hosts = new
HashSet<HostAndPort>();
for (String hostAndPort :
redisIPPort.split(",")) {
hosts.add(new
HostAndPort(hostAndPort.split(":")[0],
Integer.parseInt(hostAndPort.split(":")[1])));
}
final JedisCluster client = new
JedisCluster(hosts, 15000);
// get all files under csv file
path
ArrayList<File> files =
getListFiles(csvFilePath);
System.out.println("Read file
or directory under " + csvFilePath
+ ", total file num: " + files.size() + ", columns: " +
csvScheme);
// run read csv file and analyze
it
for (int index = 0; index <
files.size(); index++) {
readWithCsvBeanReader(files.get(index).getAbsolutePath(), csvScheme,
isHasHeaders, client);
}
client.close();
System.out.println("Data import
finish!!!");
}
public static void initRedis(boolean isRedisSecurity, String
userPrincipal, String keytabPath, String krb5Path) throws IOException
{
// redis security
System.setProperty("redis.authentication.jaas", isRedisSecurity ?
"true" : "false");
// check and set
if
(System.getProperty("redis.authentication.jaas",
"false").equals("true")) {
LoginUtil.setJaasFile(userPrincipal, keytabPath);
LoginUtil.setKrb5Config(krb5Path);
}
}
public static ArrayList<File> getListFiles(Object obj)
{
File directory = null;
if (obj instanceof File)
{
directory =
(File) obj;
} else {
directory =
new File(obj.toString());
}
ArrayList<File> files = new
ArrayList<File>();
if (directory.isFile())
{
files.add(directory);
return
files;
} else if (directory.isDirectory())
{
File[]
fileArr = directory.listFiles();
for (int i =
0; i < fileArr.length; i++) {
File fileOne = fileArr[i];
files.addAll(getListFiles(fileOne));
}
}
return files;
}
/**
* Sets up the processors used for read csv. There are
9 CSV columns. Empty
* columns are read as null (hence the NotNull() for
mandatory columns).
*
* @return the cell processors
*/
private static CellProcessor[] getProcessors() {
final CellProcessor[] processors =
new CellProcessor[] {
new NotNull(), // username
new NotNull(), // age
new NotNull(), // company
new NotNull(), // workLocation
new NotNull(), // educational
new NotNull(), // workYear
new NotNull(), // phone
new NotNull(), // nativeLocation
new
NotNull(), // school
};
return processors;
}
private static void readWithCsvBeanReader(String path,
String csvScheme, boolean isSkipHeader, JedisCluster client) throws Exception
{
ICsvBeanReader beanReader =
null;
try {
beanReader =
new CsvBeanReader(new FileReader(path),
CsvPreference.STANDARD_PREFERENCE);
// the
header elements are used to map the values to the bean (names must
match)
final
String[] header = isSkipHeader ? beanReader.getHeader(true) :
csvScheme.split(",");
final
CellProcessor[] processors = getProcessors();
UserInfo
userinfo;
while(
(userinfo = beanReader.read(UserInfo.class, header, processors)) != null )
{
System.out.println(String.format("lineNo=%s, rowNo=%s, userinfo=%s",
beanReader.getLineNumber(),
beanReader.getRowNumber(), userinfo));
// set redis key and value
client.hmset(userinfo.getKeyValue(), userinfo.getMapInfo());
}
}
finally {
if(
beanReader != null ) {
beanReader.close();
}
}
}
// define the UserInfo structure
public static class UserInfo {
private String username;
private String age;
private String company;
private String
workLocation;
private String
educational;
private String workYear;
private String phone;
private String
nativeLocation;
private String school;
public UserInfo() {
}
public UserInfo(String nm, String a,
String c, String w, String e, String wy, String p, String nl, String sc)
{
username =
nm;
age =
a;
company =
c;
workLocation
= w;
educational
= e;
workYear =
wy;
phone =
p;
nativeLocation = nl;
school =
sc;
}
public String toString()
{
return
"UserInfo-----[username: " + username + " age: " +
age + " company: " + company
+ " workLocation: " + workLocation + " educational:
" + educational
+ " workYear: " + workYear + " phone: " + phone
+ " nativeLocation: " + nativeLocation + " school:
" + school + "]";
}
// get key
public String getKeyValue()
{
return
username;
}
public Map<String, String>
getMapInfo() {
Map<String, String> info = new HashMap<String,
String>();
info.put("username",
username);
info.put("age", age);
info.put("company", company);
info.put("workLocation", workLocation);
info.put("educational", educational);
info.put("workYear", workYear);
info.put("phone", phone);
info.put("nativeLocation", nativeLocation);
info.put("school", school);
return
info;
}
/**
* @return the username
*/
public String getUsername()
{
return
username;
}
/**
* @param username
* the
username to set
*/
public void setUsername(String username)
{
this.username = username;
}
/**
* @return the age
*/
public String getAge() {
return
age;
}
/**
* @param age
*
the age to set
*/
public void setAge(String age)
{
this.age =
age;
}
/**
* @return the
company
*/
public String getCompany()
{
return
company;
}
/**
* @param company
* the company
to set
*/
public void setCompany(String
company) {
this.company
= company;
}
/**
* @return the
workLocation
*/
public String getWorkLocation()
{
return
workLocation;
}
/**
* @param
workLocation
* the
workLocation to set
*/
public void setWorkLocation(String
workLocation) {
this.workLocation = workLocation;
}
/**
* @return the
educational
*/
public String getEducational()
{
return
educational;
}
/**
* @param
educational
* the
educational to set
*/
public void setEducational(String
educational) {
this.educational = educational;
}
/**
* @return the
workYear
*/
public String getWorkYear()
{
return
workYear;
}
/**
* @param workYear
* the workYear
to set
*/
public void setWorkYear(String
workYear) {
this.workYear = workYear;
}
/**
* @return the
phone
*/
public String getPhone()
{
return
phone;
}
/**
* @param phone
* the phone
to set
*/
public void setPhone(String phone)
{
this.phone =
phone;
}
/**
* @return the
nativeLocation
*/
public String getNativeLocation()
{
return
nativeLocation;
}
/**
* @param
nativeLocation
* the
nativeLocation to set
*/
public void setNativeLocation(String
nativeLocation) {
this.nativeLocation = nativeLocation;
}
/**
* @return the
school
*/
public String getSchool()
{
return
school;
}
/**
* @param school
* the school
to set
*/
public void setSchool(String school)
{
this.school
= school;
}
}
}
The following code snippet is an example. The function of the following code is used to read stream data from the data.txt file, use netizen names as keywords to query from Redis secure clusters, join the log file and CSV-formatted table, and print the results. For details, see com.huawei.bigdata.flink.examples.FlinkConfigtableScalaExample.
package
com.huawei.bigdata.flink.examples
import java.util.HashSet
import java.util.Set
import java.util.concurrent.TimeUnit
import com.huawei.bigdata.security.LoginUtil
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.TimeCharacteristic
import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.async.AsyncCollector
import org.apache.flink.streaming.api.watermark.Watermark
import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import redis.clients.jedis.HostAndPort
import redis.clients.jedis.JedisCluster
import scala.concurrent.{ExecutionContext, Future}
/**
* Read stream data and join from configure table from redis.
*/
object FlinkConfigtableScalaExample {
def main(args: Array[String]) {
// print comment for command to use run flink
System.out.println("use command as: \n"
+
"./bin/flink run -m yarn-cluster -yt
/opt/config -yn 3 -yjm 1024 -ytm 1024 " +
"/opt/FlinkConfigtableScalaExample.jar
--dataPath config/data.txt" +
"******************************************************************************************\n"
+
"Especially you may write following content
into config filePath, as in config/read.properties: \n" +
"ReadFields=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school\n"
+
"Redis_Security=true\n" +
"Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400\n"
+
"Redis_Principal=test11@HADOOP.COM\n"
+
"Redis_KeytabFile=config/user.keytab\n" +
"Redis_Krb5File=config/krb5.conf\n"
+
"******************************************************************************************")
// set up the execution environment
val env =
StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// get configure and read data and transform to
OriginalRecord
val dataPath =
ParameterTool.fromArgs(args).get("dataPath",
"config/data.txt")
val originalStream = env.readTextFile(dataPath)
.map(it => getRecord(it)).assignTimestampsAndWatermarks(new
Record2TimestampExtractor).disableChaining()
// read from redis and join to the whole user
information
val resultStream =
AsyncDataStream.unorderedWait(
originalStream,
2,
TimeUnit.MINUTES,
5) {
(input, collector: AsyncCollector[UserRecord])
=>
Future {
// get configure from
config/read.properties, you must put this with commands:
// ./bin/yarn-session.sh
-t /opt/config -n 3 -jm 1024 -tm 1024 or
// ./bin/flink run -m
yarn-cluster -yt /opt/config -yn 3 -yjm 1024 -ytm 1024
/opt/test.jar
val configPath =
"config/read.properties"
val fields =
ParameterTool.fromPropertiesFile(configPath).get("ReadFields")
val isSecurity =
ParameterTool.fromPropertiesFile(configPath).getBoolean("Redis_Security",
true)
val hostPort =
ParameterTool.fromPropertiesFile(configPath).get("Redis_IP_Port")
val principal =
ParameterTool.fromPropertiesFile(configPath).get("Redis_Principal")
val keytab =
ParameterTool.fromPropertiesFile(configPath).get("Redis_KeytabFile")
val krb5 =
ParameterTool.fromPropertiesFile(configPath).get("Redis_Krb5File")
// init redis security
mode
System.setProperty("redis.authentication.jaas", if (isSecurity)
"true" else "false")
if
(System.getProperty("redis.authentication.jaas",
"false").equals("true")) {
LoginUtil.setJaasFile(principal, keytab)
LoginUtil.setKrb5Config(krb5)
}
// create jedisCluster
client
val hosts:
Set[HostAndPort] = new HashSet[HostAndPort]()
hostPort.split(",").foreach(it => hosts.add(new HostAndPort(it.split(":").apply(0),
Integer.parseInt(it.split(":").apply(1)))))
val client = new
JedisCluster(hosts, 60000)
System.out.println("JedisCluster init, getClusterNodes: " +
client.getClusterNodes.size())
if (client.getClusterNodes.size()
<= 0) {
System.out.println("JedisCluster init failed, getClusterNodes: " +
client.getClusterNodes.size())
}
// set key string, if
you key is more than one column, build your key string with columns
val key =
input.name
if (!client.exists(key))
{
System.out.println("test-------cannot find data to key: " +
key)
collector.collect(Seq(new UserRecord(
input.name,
0,
"null",
"null",
"null",
0,
"null",
"null",
"null",
input.sexy,
input.shoppingTime)))
} else {
val values =
client.hmget(key, fields.split(","):_*)
System.out.println("test-------key: " + key + " get some
fields: " + values.toString)
collector.collect(Seq(new UserRecord(
values.get(0),
Integer.parseInt(values.get(1)),
values.get(2),
values.get(3),
values.get(4),
Integer.parseInt(values.get(5)),
values.get(6),
values.get(7),
values.get(8),
input.sexy,
input.shoppingTime)))
}
client.close()
}
(ExecutionContext.global)
}
// data transform
resultStream.filter(_.sexy ==
"female")
.keyBy("name")
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.reduce((e1, e2) => UserRecord(e1.name,
e2.age, e2.company, e2.workLocation, e2.educational, e2.workYear,
e2.phone, e2.nativeLocation,
e2.school, e2.sexy, e1.shoppingTime + e2.shoppingTime))
.filter(_.shoppingTime >
120).print()
// execute program
env.execute("FlinkConfigtable scala")
}
// get enums of record
def getRecord(line: String): OriginalRecord = {
val elems = line.split(",")
assert(elems.length == 3)
val name = elems(0)
val sexy = elems(1)
val time = elems(2).toInt
OriginalRecord(name, sexy, time)
}
// the scheme of record read from txt
case class OriginalRecord(name: String, sexy: String, shoppingTime:
Int)
case class UserRecord(name: String, age: Int, company: String,
workLocation: String, educational: String, workYear: Int,
phone: String, nativeLocation: String, school:
String, sexy: String, shoppingTime: Int)
// class to set watermark and timestamp
private class Record2TimestampExtractor extends
AssignerWithPunctuatedWatermarks[OriginalRecord] {
// add tag in the data of datastream elements
override def extractTimestamp(element: OriginalRecord,
previousTimestamp: Long): Long = {
System.currentTimeMillis()
}
// give the watermark to trigger the window to execute, and
use the value to check if the window elements is ready
def checkAndGetNextWatermark(lastElement:
OriginalRecord,
extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp -
1)
}
}
}
1.1.1.4 Obtaining Example Code
Using the FusionInsight Client
Decompress the FusionInsight client installation package and obtain the examples file, which is saved in the Flink sub-folder of the FusionInsight_Services_ClientConfig folder.
l In security mode, obtain FlinkConfigtableJavaExample and FlinkConfigtableScalaExample from the flink-examples-security directory.
l In non-security mode, obtain FlinkConfigtableJavaExample and FlinkConfigtableScalaExample from the flink-examples-normal directory.
Using the Maven Project
Download the code from the Huawei DevCloud website to the local computer. Huawei DevCloud URL: https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home
l Security mode
− components/flink/flink-examples-maven-security/FlinkConfigtableJavaExample
− components/flink/flink-examples-maven-security/FlinkConfigtableScalaExample
l Non-security mode
− components/spark2x/flink-examples-maven/FlinkConfigtableJavaExample
− components/spark2x/flink-examples-maven/FlinkConfigtableScalaExample
1.1.1.5 Debugging the Application
1.1.1.5.1 Compiling and Running the Application
1.1.1.5.2 Viewing the Debugging Result
Scenarios
After a Flink application completes running, you can view the running result, or use Apache Flink Dashboard to view application running status.
Procedure
l View the running result of the Flink application.
If you want to check the execution result, view the Stdout log of TaskManager on the Apache Flink Dashboard.
If the execution result is exported to a file or a location specified by Flink, view the result from the exported file or the location. The checkpoint, pipeline, and join between configuration tables and streams are used as examples.
− View checkpoint results and files
The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Apache Flink Dashboard, click the task manager label, and click out.
Either the following methods can be used to view the checkpoint file:
l If the checkpoint snapshot information is saved in the HDFS, run the hdfs dfs -ls hdfs://hacluster/flink-checkpoint/ command to view checkpoint files.
l If the checkpoint snapshot information is saved to a local file, log in to each node to view checkpoint files.
− View pipeline results
The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Apache Flink Dashboard, click the task manager label, and click out.
− View the JOIN result of configuration table and steams
The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Flink WebUI, click the task manager label, and click out.
− Viewing the Stream SQL Join result
The result is saved in the taskmanager.out file of Flink. You can view the result by clicking the out button under the task manager label on the web UI of Flink.
l Use Apache Flink Dashboard to view the running status of the Flink application.
The Apache Flink Dashboard mainly includes Overview, Running Jobs, Completed Jobs, Task Managers, Job Manager and Logout and so on.
On In the YARN web UI, find the desired Flink application. Click the ApplicationMaster at the last column of the application to switch to the Apache Flink Dashboard.
View the print results of the program execution: find the corresponding Task Manager to see the corresponding Stdout tag log information.
l View Flink logs.
Both of the following methods can be used to obtain Flink logs:
− Log in to the Apache Flink Dashboard and view logs of TaskManagers and JobManager.
− Log in to the YARN web UI to view logs about JobManager and GC.
On the YARN web UI wind, find the desired Flink application. Click the ID of the application. On the switched page, click Logs in the Logs column.
