Got it

Flink:Case 5: Configuring the JOIN Application Between Tables and Streams

Latest reply: Aug 24, 2018 01:42:52 1317 1 0 0 0

1.1.1 Case 5: Configuring the JOIN Application Between Tables and Streams

1.1.1.1 Scenarios

Applicable Versions

FusionInsight HD V100R002C70 and FusionInsight HD V100R002C80

Scenario Description

Assume that there is a log text file about the time spent by netizens in shopping online and a CSV-formatted table about netizen information. Develop a Flink application that can achieve the following functions:

l   Collect statistics on female netizens who spend more than 2 hours in total for online shopping during the weekend in real time.

The name fields in the log file and the CSV-formatted table can be used as keywords, based on which the two files are joined.

l   In the file saving logs collected during the weekend, the first, second, and third columns record the names, gender, and dwell duration (in the unit of minute), respectively. The three columns are separated by commas (,).

           data.txt: logs collected during the weekend

LiuYang,female,20   YuanJing,male,10   GuoYijun,male,5   CaiXuyu,female,50   Liyuan,male,20   FangBo,female,50   LiuYang,female,20   YuanJing,male,10   GuoYijun,male,50   CaiXuyu,female,50   FangBo,female,60   LiuYang,female,20   YuanJing,male,10   CaiXuyu,female,50   FangBo,female,50   GuoYijun,male,5   CaiXuyu,female,50   Liyuan,male,20   CaiXuyu,female,50   FangBo,female,50   LiuYang,female,20   YuanJing,male,10   FangBo,female,50   GuoYijun,male,50   CaiXuyu,female,50   FangBo,female,60   NotExist,female,200

           configtable.csv: The SCS-formatted table listing personal information about netizens. Fields starting from the first column to the ninth column are the name, age, company name, work place, academic degree, years of working, mobile number, the place where one's residence is registered, and school of graduation. These columns are separated by commas (,).

username,age,company,workLocation,educational,workYear,phone,nativeLocation,school   LiuYang,25,Microsoft,hangzhou,college,5,13512345678,hangzhou zhejiang,wuhan university   YuanJing,26,Oracle,shanghai,master,6,13512345679,shijiazhuang hebei,zhejiang university   GuoYijun,27,Alibaba,beijing,college,7,13512345680,suzhou jiangsu,qinghua university   CaiXuyu,28,Coca Cola,shenzheng,master,8,13512345681,hefei anhui,beijing university   Liyuan,29,Tencent,chengdou,doctor,9,13512345682,nanchang jiangxi,nanjing university   FangBo,30,Huawei,qingdao,doctor,10,13512345683,xiamen fujian,fudan university

Data Planning

The stream data of the example project is saved in a TXT file, and the configuration table is in CSV format.

1.         Ensure that the clusters, including HDFS, Yarn, secure Redis, and Flink have been installed.

note

l  In the security cluster mode, common Redis can be installed. During the installation, configure the REDIS_SECURITY_ENABLED parameter to specify whether to install a secure Redis or common Redis. Value true indicates that security Redis is installed and value false indicates that common Redis is installed.

l  To install the secure Redis in security mode, run the example code.

l  For details about the example code for installing common Redis in security mode, see Application Development Guide > Developing an Application > JOIN Operation between Configuration Tables and Streams in the FusionInsight HD Product Documentation (Common Mode).

2.         Create a Redis cluster, add a Redis user, set permissions for the user. Then, download the user.keytab and krb5.conf files. For details about Redis, see Service Operation Guide > Redis in the FusionInsight HD Product Documentation.

3.         Modify the import.properties and read.properties files, which are saved in the config directory specified in the example code.

           The following is an example of configurations in the import.properties file:

#path to read csv files, it can be file or directory  
CsvPath=config/configtable.csv  
  
#csv file headers exist in file first line or not  
CsvHeaderExist=true  
#csv file headers, also the redis field names  
#Notice: if CsvHeaderExist false, you must set it, if CsvHeaderExist true, it read from csv file  
ColumnNames=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school  
  
#redis security mode open or not  
Redis_Security=true  
#redis hostname/ip and port when you need to connect to redis  
Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400  
#redis user principal  
Redis_Principal=test11@HADOOP.COM  
#redis keytab file path  
Redis_KeytabFile=config/user.keytab  
#redis krb5 file path  
Redis_Krb5File=config/krb5.conf

           The following is an example of configurations in the read.properties file:

#the redis field names, configure which fields you need to read, Notice you need English field names  
ReadFields=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school  
#redis security mode open or not  
Redis_Security=true  
#redis hostname/ip and port when you need to connect to redis  
Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400  
#redis user principal  
Redis_Principal=test11@HADOOP.COM  
#redis keytab file path  
Redis_KeytabFile=config/user.keytab  
#redis krb5 file path  
Redis_Krb5File=config/krb5.conf

4.         On the Flink client, create a config directory, and copy the user.keytab, krb5.conf, data.txt, configtable.csv, import.properties, and read.properties files to the config directory, for example, /opt/FI-Client/Flink/flink/config/configtable.csv.

note

The data.txt and configtable.csv files are in the data directory specified in the example code.

1.1.1.2 Development Idea

Collect statistics on female netizens who spend more than 2 hours in total for online shopping during the weekend, including the personal information.

The procedure for obtaining the information is as follows:

l   Modify the configurations of the import.properties and read.properties files, configure fields in the CSV file, fields read by Redis, and Redis security.

l   Import the configtable.csv table to Redis and save it.

l   Read text data, create DataStreams, and resolve data to create OriginalRecord information.

l   Invoke the function of asynchronous I/O, use the OriginalRecord username field as the keyword to query personal information in Redis, and convert the information into UserRecord.

l   Filter out data on the time that female netizens spend online.

l   Perform keyby operations by name, and collect the time that female netizens spend online within a time window.

l   Filter out data about netizens whose consecutive online duration exceeds the threshold, and obtain the results.

1.1.1.3 Example Code Description

JAVA Example Code

l   Function description

Collect statistics on female netizens who spend more than 2 hours in total for online shopping during the weekend from the log file, read personal information from the CSV-formatted table, and then use the netizen names as the keywords for joining the log file and the CSV-formatted table.

l   Example code

The following code snippet is an example. The function of the following code is used to import the configtable.csv file to Redis secure clusters. For details, see com.huawei.bigdata.flink.examples.RedisDataImport.

package com.huawei.bigdata.flink.examples;  
  
import com.huawei.bigdata.security.LoginUtil;  
import org.apache.flink.api.java.utils.ParameterTool;  
  
import org.supercsv.cellprocessor.constraint.NotNull;  
import org.supercsv.cellprocessor.ift.CellProcessor;  
import org.supercsv.io.CsvBeanReader;  
import org.supercsv.io.ICsvBeanReader;  
import org.supercsv.prefs.CsvPreference;  
import redis.clients.jedis.HostAndPort;  
import redis.clients.jedis.JedisCluster;  
  
import java.io.File;  
import java.io.FileReader;  
import java.io.IOException;  
import java.util.*;  
  
/**  
 * Read data from csv file and import to redis.  
 */  
public class RedisDataImport {  
    public static void main(String[] args) throws Exception {  
        // print comment for command to use run flink  
        System.out.println("use command as: \n" +  
                "java -cp /opt/FI-Client/Flink/flink/lib/*:/opt/FlinkConfigtableJavaExample.jar" +  
                " com.huawei.bigdata.flink.examples.RedisDataImport --configPath <config filePath>" +  
                "******************************************************************************************\n" +  
                "<config filePath> is for configure file to load\n" +  
                "you may write following content into config filePath: \n" +  
                "CsvPath=config/configtable.csv\n" +  
              "CsvHeaderExist=true\n" +  
                "ColumnNames=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school\n" +  
                "Redis_Security=true\n" +  
                "Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400\n" +  
                "Redis_Principal=test11@HADOOP.COM\n" +  
                "Redis_KeytabFile=config/user.keytab\n" +  
                "Redis_Krb5File=config/krb5.conf\n" +  
                "******************************************************************************************");  
  
        // read all configures  
        final String configureFilePath = ParameterTool.fromArgs(args).get("configPath", "config/import.properties");  
        final String csvFilePath = ParameterTool.fromPropertiesFile(configureFilePath).get("CsvPath", "config/configtable.csv");  
        final boolean isHasHeaders = ParameterTool.fromPropertiesFile(configureFilePath).getBoolean("CsvHeaderExist", true);  
        final String csvScheme = ParameterTool.fromPropertiesFile(configureFilePath).get("ColumnNames");  
        final boolean isSecurity = ParameterTool.fromPropertiesFile(configureFilePath).getBoolean("Redis_Security", true);  
        final String redisIPPort = ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_IP_Port");  
        final String principal = ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_Principal");  
        final String keytab = ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_KeytabFile");  
        final String krb5 = ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_Krb5File");  
  
        // init redis client  
        initRedis(isSecurity, principal, keytab, krb5);  
        Set<HostAndPort> hosts = new HashSet<HostAndPort>();  
        for (String hostAndPort : redisIPPort.split(",")) {  
            hosts.add(new HostAndPort(hostAndPort.split(":")[0], Integer.parseInt(hostAndPort.split(":")[1])));  
        }  
        final JedisCluster client = new JedisCluster(hosts, 15000);  
  
        // get all files under csv file path  
        ArrayList<File> files = getListFiles(csvFilePath);  
        System.out.println("Read file or directory under  " + csvFilePath  
                + ", total file num: " + files.size() + ", columns: " + csvScheme);  
  
        // run read csv file and analyze it  
        for (int index = 0; index < files.size(); index++) {  
            readWithCsvBeanReader(files.get(index).getAbsolutePath(), csvScheme, isHasHeaders, client);  
        }  
        client.close();  
        System.out.println("Data import finish!!!");  
    }  
  
    public static void initRedis(boolean isRedisSecurity, String userPrincipal, String keytabPath, String krb5Path) throws IOException {  
        // redis security  
        System.setProperty("redis.authentication.jaas", isRedisSecurity ? "true" : "false");  
  
        // check and set  
        if (System.getProperty("redis.authentication.jaas", "false").equals("true")) {  
            LoginUtil.setJaasFile(userPrincipal, keytabPath);  
            LoginUtil.setKrb5Config(krb5Path);  
        }  
    }  
  
    public static ArrayList<File> getListFiles(Object obj) {  
        File directory = null;  
        if (obj instanceof File) {  
            directory = (File) obj;  
        } else {  
            directory = new File(obj.toString());  
        }  
        ArrayList<File> files = new ArrayList<File>();  
        if (directory.isFile()) {  
            files.add(directory);  
            return files;  
        } else if (directory.isDirectory()) {  
            File[] fileArr = directory.listFiles();  
            for (int i = 0; i < fileArr.length; i++) {  
                File fileOne = fileArr[i];  
                files.addAll(getListFiles(fileOne));  
            }  
        }  
        return files;  
    }  
  
    /**  
     * Sets up the processors used for read csv. There are 9 CSV columns. Empty  
     * columns are read as null (hence the NotNull() for mandatory columns).  
     *  
     * @return the cell processors  
 */  
    private static CellProcessor[] getProcessors() {  
        final CellProcessor[] processors = new CellProcessor[] {  
                new NotNull(), // username  
                new NotNull(), // age  
                new NotNull(), // company  
                new NotNull(), // workLocation  
                new NotNull(), // educational  
                new NotNull(), // workYear  
                new NotNull(), // phone  
                new NotNull(), // nativeLocation  
                new NotNull(), // school  
        };  
  
        return processors;  
    }  
  
    private static void readWithCsvBeanReader(String path, String csvScheme, boolean isSkipHeader, JedisCluster client) throws Exception {  
        ICsvBeanReader beanReader = null;  
        try {  
            beanReader = new CsvBeanReader(new FileReader(path), CsvPreference.STANDARD_PREFERENCE);  
  
            // the header elements are used to map the values to the bean (names must match)  
            final String[] header = isSkipHeader ? beanReader.getHeader(true) : csvScheme.split(",");  
            final CellProcessor[] processors = getProcessors();  
  
            UserInfo userinfo;  
            while( (userinfo = beanReader.read(UserInfo.class, header, processors)) != null ) {  
                System.out.println(String.format("lineNo=%s, rowNo=%s, userinfo=%s", beanReader.getLineNumber(),  
                        beanReader.getRowNumber(), userinfo));  
  
                // set redis key and value  
                client.hmset(userinfo.getKeyValue(), userinfo.getMapInfo());  
            }  
        }  
        finally {  
            if( beanReader != null ) {  
              beanReader.close();  
            }  
        }  
    }  
  
  
  
    // define the UserInfo structure  
    public static class UserInfo {  
        private String username;  
        private String age;  
        private String company;  
        private String workLocation;  
        private String educational;  
        private String workYear;  
        private String phone;  
      private String nativeLocation;  
        private String school;  
  
  
        public UserInfo() {  
  
        }  
  
        public UserInfo(String nm, String a, String c, String w, String e, String wy, String p, String nl, String sc) {  
            username = nm;  
            age = a;  
            company = c;  
            workLocation = w;  
            educational = e;  
            workYear = wy;  
            phone = p;  
            nativeLocation = nl;  
            school = sc;  
        }  
  
        public String toString() {  
            return "UserInfo-----[username: " + username + "  age: " + age + "  company: " + company  
                    + "  workLocation: " + workLocation + "  educational: " + educational  
                    + "  workYear: " + workYear + "  phone: " + phone + "  nativeLocation: " + nativeLocation + "  school: " + school + "]";  
        }  
  
        // get key  
        public String getKeyValue() {  
            return username;  
        }  
  
        public Map<String, String> getMapInfo() {  
            Map<String, String> info = new HashMap<String, String>();  
            info.put("username", username);  
            info.put("age", age);  
            info.put("company", company);  
            info.put("workLocation", workLocation);  
            info.put("educational", educational);  
            info.put("workYear", workYear);  
            info.put("phone", phone);  
            info.put("nativeLocation", nativeLocation);  
            info.put("school", school);  
            return info;  
        }  
  
        /**  
         * @return the username   
         */  
        public String getUsername() {  
            return username;  
        }  
  
        /**  
         * @param username  
         *            the username to set  
         */  
        public void setUsername(String username) {  
            this.username = username;  
        }  
  
        /**  
         * @return the age  
         */  
        public String getAge() {  
            return age;  
        }  
  
        /**  
         * @param age  
         *            the age to set  
         */  
        public void setAge(String age) {  
            this.age = age;  
        }  
  
        /**  
         * @return the company  
         */  
        public String getCompany() {  
            return company;  
        }  
  
        /**  
         * @param company  
         *            the company to set  
         */  
        public void setCompany(String company) {  
            this.company = company;  
        }  
  
        /**  
         * @return the workLocation  
         */  
        public String getWorkLocation() {  
            return workLocation;  
        }  
  
        /**  
         * @param workLocation  
         *            the workLocation to set  
         */  
        public void setWorkLocation(String workLocation) {  
            this.workLocation = workLocation;  
        }  
  
        /**  
         * @return the educational  
         */  
        public String getEducational() {  
            return educational;  
        }  
  
        /**  
         * @param educational  
         *            the educational to set  
         */  
        public void setEducational(String educational) {  
            this.educational = educational;  
        }  
  
        /**  
         * @return the workYear  
         */  
        public String getWorkYear() {  
            return workYear;  
        }  
  
        /**  
         * @param workYear  
         *            the workYear to set  
         */  
        public void setWorkYear(String workYear) {  
            this.workYear = workYear;  
        }  
  
        /**  
         * @return the phone  
         */  
        public String getPhone() {  
            return phone;  
        }  
  
        /**  
         * @param phone  
         *            the phone to set  
         */  
        public void setPhone(String phone) {  
            this.phone = phone;  
        }  
  
        /**  
         * @return the nativeLocation  
         */  
        public String getNativeLocation() {  
            return nativeLocation;  
        }  
  
        /**  
         * @param nativeLocation  
         *            the nativeLocation to set  
         */  
        public void setNativeLocation(String nativeLocation) {  
            this.nativeLocation = nativeLocation;  
        }  
  
        /**  
         * @return the school  
         */  
        public String getSchool() {  
            return school;  
        }  
  
        /**  
         * @param school  
         *            the school to set  
         */  
        public void setSchool(String school) {  
            this.school = school;  
        }  
    }  
}

The following code snippet is an example. The function of the following code is used to read stream data from the data.txt file, use netizen names as keywords to query from Redis secure clusters, join the log file and CSV-formatted table, and print the results. For details, see com.huawei.bigdata.flink.examples.FlinkConfigtableJavaExample.

package com.huawei.bigdata.flink.examples;  
  
import com.huawei.bigdata.security.LoginUtil;  
import org.apache.flink.api.common.functions.FilterFunction;  
import org.apache.flink.api.common.functions.MapFunction;  
import org.apache.flink.api.common.functions.ReduceFunction;  
import org.apache.flink.api.java.functions.KeySelector;  
import org.apache.flink.api.java.utils.ParameterTool;  
import org.apache.flink.configuration.Configuration;  
import org.apache.flink.shaded.com.google.common.cache.CacheBuilder;  
import org.apache.flink.shaded.com.google.common.cache.CacheLoader;  
import org.apache.flink.shaded.com.google.common.cache.LoadingCache;  
import org.apache.flink.streaming.api.datastream.AsyncDataStream;  
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.api.TimeCharacteristic;  
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;  
import org.apache.flink.streaming.api.functions.async.AsyncFunction;  
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;  
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;  
import org.apache.flink.streaming.api.watermark.Watermark;  
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;  
import org.apache.flink.streaming.api.windowing.time.Time;  
import redis.clients.jedis.HostAndPort;  
import redis.clients.jedis.JedisCluster;  
  
import java.util.*;  
import java.util.concurrent.TimeUnit;  
/**  
 * Read stream data and join from configure table from redis.  
 */   
public class FlinkConfigtableJavaExample {  
  
    public static void main(String[] args) throws Exception {  
        // print comment for command to use run flink  
        System.out.println("use command as: \n" +  
                "./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkConfigtableJavaExample" +  
                " -m yarn-cluster -yt /opt/config -yn 3 -yjm 1024 -ytm 1024 " +  
                "/opt/FlinkConfigtableJavaExample.jar --dataPath config/data.txt" +  
                "******************************************************************************************\n" +  
                "Especially you may write following content into config filePath, as in config/read.properties: \n" +  
                "ReadFields=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school\n" +  
                "Redis_Security=true\n" +  
                "Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400\n" +  
                "Redis_Principal=test11@HADOOP.COM\n" +  
                "Redis_KeytabFile=config/user.keytab\n" +  
                "Redis_Krb5File=config/krb5.conf\n" +  
                "******************************************************************************************");  
  
        // set up the execution environment  
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  
        env.setParallelism(1);  
  
        // get configure and read data and transform to OriginalRecord  
        final String dataPath = ParameterTool.fromArgs(args).get("dataPath", "config/data.txt");  
        DataStream<OriginalRecord> originalStream = env.readTextFile(  
                dataPath  
        ).map(new MapFunction<String, OriginalRecord>() {  
            @Override  
            public OriginalRecord map(String value) throws Exception {  
                return getRecord(value);  
            }  
        }).assignTimestampsAndWatermarks(  
                new Record2TimestampExtractor()  
        ).disableChaining();  
  
        // read from redis and join to the whole user information  
        AsyncFunction<OriginalRecord, UserRecord> function = new AsyncRedisRequest();  
        // timeout set to 2 minutes, max parallel request num set to 5, you can modify this to optimize  
        DataStream<UserRecord> result = AsyncDataStream.unorderedWait(  
                originalStream,  
                function,  
                2,  
                TimeUnit.MINUTES,  
                5);  
  
        // data transform  
        result.filter(new FilterFunction<UserRecord>() {  
            @Override  
            public boolean filter(UserRecord value) throws Exception {  
                return value.sexy.equals("female");  
            }  
        }).keyBy(  
                new UserRecordSelector()  
        ).window(  
                TumblingEventTimeWindows.of(Time.seconds(30))  
        ).reduce(new ReduceFunction<UserRecord>() {  
            @Override  
            public UserRecord reduce(UserRecord value1, UserRecord value2)  
                    throws Exception {  
                value1.shoppingTime += value2.shoppingTime;  
                return value1;  
            }  
        }).filter(new FilterFunction<UserRecord>() {  
            @Override  
            public boolean filter(UserRecord value) throws Exception {  
                return value.shoppingTime > 120;  
            }  
        }).print();  
  
        // execute program  
        env.execute("FlinkConfigtable java");  
    }  
  
    private static class UserRecordSelector implements KeySelector<UserRecord, String> {  
        @Override  
        public String getKey(UserRecord value) throws Exception {  
            return value.name;  
        }  
    }  
  
    // class to set watermark and timestamp  
    private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<OriginalRecord> {  
  
        // add tag in the data of datastream elements  
        @Override  
        public long extractTimestamp(OriginalRecord element, long previousTimestamp) {  
            return System.currentTimeMillis();  
        }  
  
        // give the watermark to trigger the window to execute, and use the value to check if the window elements is ready  
        @Override  
        public Watermark checkAndGetNextWatermark(OriginalRecord element, long extractedTimestamp) {  
            return new Watermark(extractedTimestamp - 1);  
        }  
    }  
  
    private static OriginalRecord getRecord(String line) {  
        String[] elems = line.split(",");  
        assert elems.length == 3;  
        return new OriginalRecord(elems[0], elems[1], Integer.parseInt(elems[2]));  
    }  
  
    public static class OriginalRecord {  
        private String name;  
      private String sexy;  
        private int shoppingTime;  
  
        public OriginalRecord(String n, String s, int t) {  
            name = n;  
            sexy = s;  
            shoppingTime = t;  
        }  
    }  
  
    public static class UserRecord {  
        private String name;  
        private int age;  
        private String company;  
        private String workLocation;  
        private String educational;  
        private int workYear;  
        private String phone;  
        private String nativeLocation;  
        private String school;  
        private String sexy;  
        private int shoppingTime;  
  
        public UserRecord(String nm, int a, String c, String w, String e, int wy, String p, String nl, String sc, String sx, int st) {  
            name = nm;  
            age = a;  
            company = c;  
            workLocation = w;  
            educational = e;  
            workYear = wy;  
            phone = p;  
            nativeLocation = nl;  
            school = sc;  
            sexy = sx;  
            shoppingTime = st;  
        }  
  
        public void setInput(String input_nm, String input_sx, int input_st) {  
            name = input_nm;  
            sexy = input_sx;  
            shoppingTime = input_st;  
        }  
   
        public String toString() {  
            return "UserRecord-----name: " + name + "  age: " + age + "  company: " + company  
                    + "  workLocation: " + workLocation + "  educational: " + educational  
                    + "  workYear: " + workYear + "  phone: " + phone + "  nativeLocation: " + nativeLocation + "  school: " + school  
                    + "  sexy: " + sexy + "  shoppingTime: " + shoppingTime;  
        }  
    }  
  
    public static class AsyncRedisRequest extends RichAsyncFunction<OriginalRecord, UserRecord>{  
        private String fields = "";  
        private transient JedisCluster client;  
        private LoadingCache<String, UserRecord> cacheRecords;  
  
        @Override  
        public void open(Configuration parameters) throws Exception {  
            super.open(parameters);  
  
            // init cache builder  
            cacheRecords = CacheBuilder.newBuilder()  
                    .maximumSize(10000)  
                    .expireAfterAccess(7, TimeUnit.DAYS)  
                    .build(new CacheLoader<String, UserRecord>() {  
                        public UserRecord load(String key) throws Exception {  
                            //load from redis  
                            return loadFromRedis(key);  
                        }  
                    });  
  
            // get configure from config/read.properties, you must put this with commands:  
            // ./bin/yarn-session.sh -t config -n 3 -jm 1024 -tm 1024 or  
            // ./bin/flink run -m yarn-cluster -yt config -yn 3 -yjm 1024 -ytm 1024 /opt/test.jar  
            String configPath = "config/read.properties";  
            fields = ParameterTool.fromPropertiesFile(configPath).get("ReadFields");  
            final boolean isSecurity = ParameterTool.fromPropertiesFile(configPath).getBoolean("Redis_Security", true);  
            final String hostPort = ParameterTool.fromPropertiesFile(configPath).get("Redis_IP_Port");  
            final String principal = ParameterTool.fromPropertiesFile(configPath).get("Redis_Principal");  
            final String keytab = ParameterTool.fromPropertiesFile(configPath).get("Redis_KeytabFile");  
            final String krb5 = ParameterTool.fromPropertiesFile(configPath).get("Redis_Krb5File");  
  
            // init redis security mode  
            System.setProperty("redis.authentication.jaas", isSecurity ? "true" : "false");  
            if (System.getProperty("redis.authentication.jaas", "false").equals("true")) {  
                LoginUtil.setJaasFile(principal, keytab);  
                LoginUtil.setKrb5Config(krb5);  
            }  
  
            // create jedisCluster client  
            Set<HostAndPort> hosts = new HashSet<HostAndPort>();  
            for (String node : hostPort.split(",")) {  
                hosts.add(new HostAndPort(node.split(":")[0], Integer.parseInt(node.split(":")[1])));  
            }  
            client = new JedisCluster(hosts, 60000);  
            System.out.println("JedisCluster init, getClusterNodes: " + client.getClusterNodes().size());  
        }  
  
        @Override  
        public void close() throws Exception {  
            super.close();  
  
            if (client != null) {  
                System.out.println("JedisCluster close!!!");  
                client.close();  
            }  
        }  
  
        public UserRecord loadFromRedis(final String key) throws Exception {  
            if (client.getClusterNodes().size() <= 0) {  
                System.out.println("JedisCluster init failed, getClusterNodes: " + client.getClusterNodes().size());  
            }  
            if (!client.exists(key)) {  
                System.out.println("test-------cannot find data to key:  " + key);  
                return new UserRecord(  
                        "null",  
                        0,  
                        "null",  
                        "null",  
                        "null",  
                        0,  
                      "null",  
                        "null",  
                        "null",  
                        "null",  
                        0);  
            } else {  
                // get some fields  
                List<String> values = client.hmget(key, fields.split(","));  
                System.out.println("test-------key: " + key + "  get some fields:  " + values.toString());  
                return new UserRecord(  
                        values.get(0),  
                        Integer.parseInt(values.get(1)),  
                        values.get(2),  
                        values.get(3),  
                        values.get(4),  
                        Integer.parseInt(values.get(5)),  
                        values.get(6),  
                        values.get(7),  
                        values.get(8),  
                        "null",  
                        0);  
          }  
        }  
  
        public void asyncInvoke(final OriginalRecord input, final AsyncCollector<UserRecord> collector) throws Exception {  
            // set key string, if you key is more than one column, build your key string with columns  
            String key = input.name;  
            UserRecord info = cacheRecords.get(key);  
            info.setInput(input.name, input.sexy, input.shoppingTime);  
            collector.collect(Collections.singletonList(info));  
        }  
    }  
}

Scala Example Code

l   Function description

Collect statistics on female netizens who spend more than 2 hours in total for online shopping during the weekend from the log file, read personal information from the CSV-formatted table, and then use the netizen names as the keywords for joining the log file and the CSV-formatted table.

l   Example code

The following code snippet is an example. The function of the following code is used to import the configtable.csv file to Redis secure clusters. For details, see com.huawei.bigdata.flink.examples.RedisDataImport.

package com.huawei.bigdata.flink.examples;  
  
import com.huawei.bigdata.security.LoginUtil;  
import org.apache.flink.api.java.utils.ParameterTool;  
  
import org.supercsv.cellprocessor.constraint.NotNull;  
import org.supercsv.cellprocessor.ift.CellProcessor;  
import org.supercsv.io.CsvBeanReader;  
import org.supercsv.io.ICsvBeanReader;  
import org.supercsv.prefs.CsvPreference;  
import redis.clients.jedis.HostAndPort;  
import redis.clients.jedis.JedisCluster;  
  
import java.io.File;  
import java.io.FileReader;  
import java.io.IOException;  
import java.util.*;  
  
/**  
 * Read data from csv file and import to redis.  
 */  
public class RedisDataImport {  
    public static void main(String[] args) throws Exception {  
        // print comment for command to use run flink  
        System.out.println("use command as: \n" +  
              "java -cp /opt/FI-Client/Flink/flink/lib/*:/opt/FlinkConfigtableJavaExample.jar" +  
                " com.huawei.bigdata.flink.examples.RedisDataImport --configPath <config filePath>" +  
                "******************************************************************************************\n" +  
                "<config filePath> is for configure file to load\n" +  
                "you may write following content into config filePath: \n" +  
                "CsvPath=config/configtable.csv\n" +  
                "CsvHeaderExist=true\n" +  
                "ColumnNames=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school\n" +  
                "Redis_Security=true\n" +  
                "Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400\n" +  
                "Redis_Principal=test11@HADOOP.COM\n" +  
                "Redis_KeytabFile=config/user.keytab\n" +  
                "Redis_Krb5File=config/krb5.conf\n" +  
                "******************************************************************************************");  
  
        // read all configures  
        final String configureFilePath = ParameterTool.fromArgs(args).get("configPath", "config/import.properties");  
        final String csvFilePath = ParameterTool.fromPropertiesFile(configureFilePath).get("CsvPath", "config/configtable.csv");  
        final boolean isHasHeaders = ParameterTool.fromPropertiesFile(configureFilePath).getBoolean("CsvHeaderExist", true);  
        final String csvScheme = ParameterTool.fromPropertiesFile(configureFilePath).get("ColumnNames");  
        final boolean isSecurity = ParameterTool.fromPropertiesFile(configureFilePath).getBoolean("Redis_Security", true);  
        final String redisIPPort = ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_IP_Port");  
        final String principal = ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_Principal");  
        final String keytab = ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_KeytabFile");  
        final String krb5 = ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_Krb5File");  
  
        // init redis client  
        initRedis(isSecurity, principal, keytab, krb5);  
        Set<HostAndPort> hosts = new HashSet<HostAndPort>();  
        for (String hostAndPort : redisIPPort.split(",")) {  
            hosts.add(new HostAndPort(hostAndPort.split(":")[0], Integer.parseInt(hostAndPort.split(":")[1])));  
      }  
        final JedisCluster client = new JedisCluster(hosts, 15000);  
  
        // get all files under csv file path  
        ArrayList<File> files = getListFiles(csvFilePath);  
        System.out.println("Read file or directory under  " + csvFilePath  
                + ", total file num: " + files.size() + ", columns: " + csvScheme);  
  
        // run read csv file and analyze it  
        for (int index = 0; index < files.size(); index++) {  
            readWithCsvBeanReader(files.get(index).getAbsolutePath(), csvScheme, isHasHeaders, client);  
        }  
        client.close();  
        System.out.println("Data import finish!!!");  
    }  
  
    public static void initRedis(boolean isRedisSecurity, String userPrincipal, String keytabPath, String krb5Path) throws IOException {  
        // redis security  
        System.setProperty("redis.authentication.jaas", isRedisSecurity ? "true" : "false");  
  
        // check and set  
        if (System.getProperty("redis.authentication.jaas", "false").equals("true")) {  
            LoginUtil.setJaasFile(userPrincipal, keytabPath);  
            LoginUtil.setKrb5Config(krb5Path);  
        }  
    }  
  
    public static ArrayList<File> getListFiles(Object obj) {  
        File directory = null;  
        if (obj instanceof File) {  
            directory = (File) obj;  
        } else {  
            directory = new File(obj.toString());  
        }  
        ArrayList<File> files = new ArrayList<File>();  
        if (directory.isFile()) {  
            files.add(directory);  
            return files;  
        } else if (directory.isDirectory()) {  
            File[] fileArr = directory.listFiles();  
            for (int i = 0; i < fileArr.length; i++) {  
                File fileOne = fileArr[i];  
                files.addAll(getListFiles(fileOne));  
            }  
        }  
        return files;  
    }  
  
    /**  
     * Sets up the processors used for read csv. There are 9 CSV columns. Empty  
     * columns are read as null (hence the NotNull() for mandatory columns).  
     *  
     * @return the cell processors  
     */   
    private static CellProcessor[] getProcessors() {  
        final CellProcessor[] processors = new CellProcessor[] {  
                new NotNull(), // username  
                new NotNull(), // age  
                new NotNull(), // company   
                new NotNull(), // workLocation  
                new NotNull(), // educational  
                new NotNull(), // workYear  
                new NotNull(), // phone  
                new NotNull(), // nativeLocation  
                new NotNull(), // school  
        };  
  
        return processors;  
    }  
  
    private static void readWithCsvBeanReader(String path, String csvScheme, boolean isSkipHeader, JedisCluster client) throws Exception {  
        ICsvBeanReader beanReader = null;  
        try {  
            beanReader = new CsvBeanReader(new FileReader(path), CsvPreference.STANDARD_PREFERENCE);  
  
            // the header elements are used to map the values to the bean (names must match)  
            final String[] header = isSkipHeader ? beanReader.getHeader(true) : csvScheme.split(",");  
            final CellProcessor[] processors = getProcessors();  
  
            UserInfo userinfo;  
            while( (userinfo = beanReader.read(UserInfo.class, header, processors)) != null ) {  
                System.out.println(String.format("lineNo=%s, rowNo=%s, userinfo=%s", beanReader.getLineNumber(),  
                        beanReader.getRowNumber(), userinfo));  
  
                // set redis key and value  
                client.hmset(userinfo.getKeyValue(), userinfo.getMapInfo());  
            }  
        }  
        finally {  
            if( beanReader != null ) {  
                beanReader.close();  
            }  
        }   
    }  
  
  
  
    // define the UserInfo structure  
    public static class UserInfo {  
        private String username;  
        private String age;  
        private String company;  
        private String workLocation;  
        private String educational;  
        private String workYear;  
        private String phone;  
        private String nativeLocation;  
        private String school;  
  
  
        public UserInfo() {  
  
        }  
  
        public UserInfo(String nm, String a, String c, String w, String e, String wy, String p, String nl, String sc) {  
            username = nm;  
            age = a;  
            company = c;  
            workLocation = w;  
            educational = e;  
            workYear = wy;  
            phone = p;  
            nativeLocation = nl;  
            school = sc;  
        }  
  
        public String toString() {  
            return "UserInfo-----[username: " + username + "  age: " + age + "  company: " + company  
                    + "  workLocation: " + workLocation + "  educational: " + educational  
                    + "  workYear: " + workYear + "  phone: " + phone + "  nativeLocation: " + nativeLocation + "  school: " + school + "]";  
        }  
  
        // get key  
        public String getKeyValue() {  
            return username;  
        }  
  
        public Map<String, String> getMapInfo() {  
            Map<String, String> info = new HashMap<String, String>();  
            info.put("username", username);  
            info.put("age", age);  
            info.put("company", company);  
            info.put("workLocation", workLocation);  
            info.put("educational", educational);  
            info.put("workYear", workYear);  
            info.put("phone", phone);  
            info.put("nativeLocation", nativeLocation);  
            info.put("school", school);  
            return info;  
        }  
  
        /**  
         * @return the username   
         */  
        public String getUsername() {  
            return username;  
        }  
  
        /**  
         * @param username  
         *            the username to set  
         */  
        public void setUsername(String username) {  
            this.username = username;  
        }  
  
        /**  
         * @return the age  
         */  
        public String getAge() {  
            return age;  
        }  
  
        /**  
         * @param age  
         *            the age to set  
         */  
        public void setAge(String age) {  
            this.age = age;  
        }  
  
        /**  
         * @return the company  
         */  
        public String getCompany() {  
            return company;  
        }  
  
        /**  
         * @param company  
         *            the company to set  
         */  
        public void setCompany(String company) {  
            this.company = company;  
        }  
  
        /**  
         * @return the workLocation  
         */  
        public String getWorkLocation() {  
            return workLocation;  
        }  
  
        /**  
         * @param workLocation  
         *            the workLocation to set  
         */  
        public void setWorkLocation(String workLocation) {  
            this.workLocation = workLocation;  
        }  
  
        /**  
         * @return the educational  
         */  
        public String getEducational() {  
            return educational;  
        }  
  
        /**  
         * @param educational  
         *            the educational to set  
         */  
        public void setEducational(String educational) {  
            this.educational = educational;  
        }  
  
        /**  
         * @return the workYear  
         */  
        public String getWorkYear() {  
            return workYear;  
        }  
  
        /**  
         * @param workYear  
         *            the workYear to set  
         */  
        public void setWorkYear(String workYear) {  
            this.workYear = workYear;  
        }  
  
        /**  
         * @return the phone  
         */  
        public String getPhone() {  
            return phone;  
        }  
  
        /**  
         * @param phone  
         *            the phone to set  
         */  
        public void setPhone(String phone) {  
            this.phone = phone;  
        }  
  
        /**  
         * @return the nativeLocation  
         */  
        public String getNativeLocation() {  
            return nativeLocation;  
        }  
  
        /**  
         * @param nativeLocation  
         *            the nativeLocation to set  
         */  
        public void setNativeLocation(String nativeLocation) {  
            this.nativeLocation = nativeLocation;  
        }  
  
        /**  
         * @return the school  
         */  
        public String getSchool() {  
            return school;  
        }  
  
        /**  
         * @param school  
         *            the school to set  
         */  
        public void setSchool(String school) {  
            this.school = school;  
        }  
    }  
}  
    

The following code snippet is an example. The function of the following code is used to read stream data from the data.txt file, use netizen names as keywords to query from Redis secure clusters, join the log file and CSV-formatted table, and print the results. For details, see com.huawei.bigdata.flink.examples.FlinkConfigtableScalaExample.

package com.huawei.bigdata.flink.examples  
  
import java.util.HashSet  
import java.util.Set  
import java.util.concurrent.TimeUnit  
  
import com.huawei.bigdata.security.LoginUtil  
import org.apache.flink.api.java.utils.ParameterTool  
import org.apache.flink.streaming.api.TimeCharacteristic  
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks  
import org.apache.flink.streaming.api.scala._  
import org.apache.flink.streaming.api.scala.async.AsyncCollector  
import org.apache.flink.streaming.api.watermark.Watermark  
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows  
import org.apache.flink.streaming.api.windowing.time.Time  
import redis.clients.jedis.HostAndPort  
import redis.clients.jedis.JedisCluster  
  
import scala.concurrent.{ExecutionContext, Future}  
  
/**  
 * Read stream data and join from configure table from redis.  
 */  
object FlinkConfigtableScalaExample {  
  def main(args: Array[String]) {  
    // print comment for command to use run flink  
    System.out.println("use command as: \n" +  
      "./bin/flink run -m yarn-cluster -yt /opt/config -yn 3 -yjm 1024 -ytm 1024 " +  
      "/opt/FlinkConfigtableScalaExample.jar --dataPath config/data.txt" +  
      "******************************************************************************************\n" +  
      "Especially you may write following content into config filePath, as in config/read.properties: \n" +  
      "ReadFields=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school\n" +  
      "Redis_Security=true\n" +  
      "Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400\n" +  
      "Redis_Principal=test11@HADOOP.COM\n" +  
      "Redis_KeytabFile=config/user.keytab\n" +  
      "Redis_Krb5File=config/krb5.conf\n" +  
      "******************************************************************************************")  
  
    // set up the execution environment  
    val env = StreamExecutionEnvironment.getExecutionEnvironment  
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  
    env.setParallelism(1)  
  
    // get configure and read data and transform to OriginalRecord  
    val dataPath = ParameterTool.fromArgs(args).get("dataPath", "config/data.txt")  
    val originalStream = env.readTextFile(dataPath)  
      .map(it => getRecord(it)).assignTimestampsAndWatermarks(new Record2TimestampExtractor).disableChaining()  
  
  
    // read from redis and join to the whole user information  
    val resultStream = AsyncDataStream.unorderedWait(  
      originalStream,  
      2,  
      TimeUnit.MINUTES,  
      5) {  
      (input, collector: AsyncCollector[UserRecord]) =>  
        Future {  
          // get configure from config/read.properties, you must put this with commands:  
          // ./bin/yarn-session.sh -t /opt/config -n 3 -jm 1024 -tm 1024 or  
          // ./bin/flink run -m yarn-cluster -yt /opt/config -yn 3 -yjm 1024 -ytm 1024 /opt/test.jar  
          val configPath = "config/read.properties"  
          val fields = ParameterTool.fromPropertiesFile(configPath).get("ReadFields")  
          val isSecurity = ParameterTool.fromPropertiesFile(configPath).getBoolean("Redis_Security", true)  
          val hostPort = ParameterTool.fromPropertiesFile(configPath).get("Redis_IP_Port")  
          val principal = ParameterTool.fromPropertiesFile(configPath).get("Redis_Principal")  
          val keytab = ParameterTool.fromPropertiesFile(configPath).get("Redis_KeytabFile")  
          val krb5 = ParameterTool.fromPropertiesFile(configPath).get("Redis_Krb5File")  
  
          // init redis security mode  
          System.setProperty("redis.authentication.jaas", if (isSecurity) "true" else "false")  
          if (System.getProperty("redis.authentication.jaas", "false").equals("true")) {  
            LoginUtil.setJaasFile(principal, keytab)  
            LoginUtil.setKrb5Config(krb5)  
          }  
  
        // create jedisCluster client  
          val hosts: Set[HostAndPort]  = new HashSet[HostAndPort]()  
          hostPort.split(",").foreach(it => hosts.add(new HostAndPort(it.split(":").apply(0), Integer.parseInt(it.split(":").apply(1)))))  
          val client = new JedisCluster(hosts, 60000)  
          System.out.println("JedisCluster init, getClusterNodes: " + client.getClusterNodes.size())  
  
          if (client.getClusterNodes.size() <= 0) {  
            System.out.println("JedisCluster init failed, getClusterNodes: " + client.getClusterNodes.size())  
          }  
          // set key string, if you key is more than one column, build your key string with columns  
          val key = input.name  
          if (!client.exists(key)) {  
            System.out.println("test-------cannot find data to key:  " + key)  
            collector.collect(Seq(new UserRecord(  
              input.name,  
              0,  
              "null",  
              "null",  
              "null",  
              0,  
              "null",  
              "null",  
              "null",  
              input.sexy,  
              input.shoppingTime)))  
          } else {  
            val values = client.hmget(key, fields.split(","):_*)  
            System.out.println("test-------key: " + key + "  get some fields:  " + values.toString)  
            collector.collect(Seq(new UserRecord(  
              values.get(0),  
              Integer.parseInt(values.get(1)),  
              values.get(2),  
              values.get(3),  
              values.get(4),  
              Integer.parseInt(values.get(5)),  
              values.get(6),  
              values.get(7),  
              values.get(8),  
              input.sexy,  
              input.shoppingTime)))  
          }  
          client.close()  
        } (ExecutionContext.global)  
    }  
  
    // data transform  
     resultStream.filter(_.sexy == "female")  
      .keyBy("name")  
      .window(TumblingEventTimeWindows.of(Time.seconds(30)))  
      .reduce((e1, e2) => UserRecord(e1.name, e2.age, e2.company, e2.workLocation, e2.educational, e2.workYear,  
        e2.phone, e2.nativeLocation, e2.school, e2.sexy, e1.shoppingTime + e2.shoppingTime))  
      .filter(_.shoppingTime > 120).print()  
  
    // execute program  
    env.execute("FlinkConfigtable scala")  
  }  
  
  // get enums of record  
  def getRecord(line: String): OriginalRecord = {  
    val elems = line.split(",")  
    assert(elems.length == 3)  
    val name = elems(0)  
    val sexy = elems(1)  
    val time = elems(2).toInt  
    OriginalRecord(name, sexy, time)  
  }  
  
  // the scheme of record read from txt  
  case class OriginalRecord(name: String, sexy: String, shoppingTime: Int)  
  
  case class UserRecord(name: String, age: Int, company: String, workLocation: String, educational: String, workYear: Int,  
                        phone: String, nativeLocation: String, school: String, sexy: String, shoppingTime: Int)  
  
  
  // class to set watermark and timestamp  
  private class Record2TimestampExtractor extends AssignerWithPunctuatedWatermarks[OriginalRecord] {  
  
    // add tag in the data of datastream elements  
    override def extractTimestamp(element: OriginalRecord, previousTimestamp: Long): Long = {  
      System.currentTimeMillis()  
    }  
  
    // give the watermark to trigger the window to execute, and use the value to check if the window elements is ready  
    def checkAndGetNextWatermark(lastElement: OriginalRecord,  
                                 extractedTimestamp: Long): Watermark = {  
      new Watermark(extractedTimestamp - 1)  
    }  
  }  
}

1.1.1.4 Obtaining Example Code

Using the FusionInsight Client

Decompress the FusionInsight client installation package and obtain the examples file, which is saved in the Flink sub-folder of the FusionInsight_Services_ClientConfig folder.

l   In security mode, obtain FlinkConfigtableJavaExample and FlinkConfigtableScalaExample from the flink-examples-security directory.

l   In non-security mode, obtain FlinkConfigtableJavaExample and FlinkConfigtableScalaExample from the flink-examples-normal directory.

Using the Maven Project

Download the code from the Huawei DevCloud website to the local computer. Huawei DevCloud URL: https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home

l   Security mode

           components/flink/flink-examples-maven-security/FlinkConfigtableJavaExample

           components/flink/flink-examples-maven-security/FlinkConfigtableScalaExample

l   Non-security mode

           components/spark2x/flink-examples-maven/FlinkConfigtableJavaExample

           components/spark2x/flink-examples-maven/FlinkConfigtableScalaExample

1.1.1.5 Debugging the Application

1.1.1.5.1 Compiling and Running the Application
1.1.1.5.2 Viewing the Debugging Result

Scenarios

After a Flink application completes running, you can view the running result, or use Apache Flink Dashboard to view application running status.

Procedure

l   View the running result of the Flink application.

If you want to check the execution result, view the Stdout log of TaskManager on the Apache Flink Dashboard.

If the execution result is exported to a file or a location specified by Flink, view the result from the exported file or the location. The checkpoint, pipeline, and join between configuration tables and streams are used as examples.

           View checkpoint results and files

The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Apache Flink Dashboard, click the task manager label, and click out.

Either the following methods can be used to view the checkpoint file:

l   If the checkpoint snapshot information is saved in the HDFS, run the hdfs dfs -ls hdfs://hacluster/flink-checkpoint/ command to view checkpoint files.

l   If the checkpoint snapshot information is saved to a local file, log in to each node to view checkpoint files.

           View pipeline results

The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Apache Flink Dashboard, click the task manager label, and click out.

           View the JOIN result of configuration table and steams

The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Flink WebUI, click the task manager label, and click out.

           Viewing the Stream SQL Join result

The result is saved in the taskmanager.out file of Flink. You can view the result by clicking the out button under the task manager label on the web UI of Flink.

l   Use Apache Flink Dashboard to view the running status of the Flink application.

The Apache Flink Dashboard mainly includes Overview, Running Jobs, Completed Jobs, Task Managers, Job Manager and Logout and so on.

On In the YARN web UI, find the desired Flink application. Click the ApplicationMaster at the last column of the application to switch to the Apache Flink Dashboard.

View the print results of the program execution: find the corresponding Task Manager to see the corresponding Stdout tag log information.

l   View Flink logs.

Both of the following methods can be used to obtain Flink logs:

           Log in to the Apache Flink Dashboard and view logs of TaskManagers and JobManager.

           Log in to the YARN web UI to view logs about JobManager and GC.

On the YARN web UI wind, find the desired Flink application. Click the ID of the application. On the switched page, click Logs in the Logs column.

 


This article contains more resources

You need to log in to download or view. No account? Register

x

welcome
View more
  • x
  • convention:

Comment

You need to log in to comment to the post Login | Register
Comment

Notice: To protect the legitimate rights and interests of you, the community, and third parties, do not release content that may bring legal risks to all parties, including but are not limited to the following:
  • Politically sensitive content
  • Content concerning pornography, gambling, and drug abuse
  • Content that may disclose or infringe upon others ' commercial secrets, intellectual properties, including trade marks, copyrights, and patents, and personal privacy
Do not share your account and password with others. All operations performed using your account will be regarded as your own actions and all consequences arising therefrom will be borne by you. For details, see " User Agreement."

My Followers

Login and enjoy all the member benefits

Login

Block
Are you sure to block this user?
Users on your blacklist cannot comment on your post,cannot mention you, cannot send you private messages.
Reminder
Please bind your phone number to obtain invitation bonus.