【开发案例】Flink案例:Stream SQL Join程序

[复制链接]
发表于 : 2018-5-29 14:43:35 最新回复:2018-10-15 14:46:46
2022 9
lWX387225  版主  

Flink案例6Stream SQL Join程序

1.1 场景说明

场景说明

假定某个Flink业务1每秒就会收到1条消息记录,消息记录某个用户的基本信息,包括名字、性别、年龄。另有一个Flink业务2会不定时收到1条消息记录,消息记录该用户的名字、职业信息。

基于某些业务要求,开发的Flink应用程序实现功能:实时的以根据业务2中消息记录的用户名字作为关键字,对两个业务数据进行联合查询。

数据规划

l   业务1的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户),并从Kafka组件接收数据。Kafka配置参见样例数据规划章节。

l   业务2的数据通过socket接收消息记录,可使用netcat命令用户输入模拟数据源。

           使用Linux命令netcat -l -p <port>,启动一个简易的文本服务器。

           启动应用程序连接netcat监听的port成功后,向netcat终端输入数据信息。

1.2 开发思路

1.         启动Flink Kafka Producer应用向Kafka发送数据。

2.         启动Flink Kafka Consumer应用从Kafka接收数据,构造Table1,保证topicproducer一致。

3.         soket中读取数据,构造Table2

4.         使用Flink SQLTable1Table2进行联合查询,并进行打印。

1.3 样例代码说明

JAVA样例代码

l   功能介绍

Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。

l   代码样例

用户在开发前需要使用对接安全模式的FusionInsight Kafka,则需要引入FusionInsightkafka-client-0.11.x.x.jar,该jar包可在FusionInsight client目录下获取。

下面列出producerconsumer,以及Flink Stream SQL Join使用主要逻辑代码作为演示。

完整代码参见com.huawei.bigdata.flink.examples.WriteIntoKafkacom.huawei.bigdata.flink.examples.SqlJoinWithSocket

a.         每秒钟往Kafka中生产一条用户信息,用户信息有姓名、年龄、性别组成。

//producer代码
public class WriteIntoKafka {
 
      public static void main(String[] args) throws Exception {
 
      // 打印出执行flink run的参考命令
        System.out.println("use command as: ");
 
        System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
 
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005");
 
        System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
 
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");
 
        System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
 
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");
 
        System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
 
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");
 
        System.out.println("******************************************************************************************");
 
        System.out.println("<topic> is the kafka topic name");
 
        System.out.println("<bootstrap.servers> is the ip:port list of brokers");
 
        System.out.println("******************************************************************************************");
       
        // 构造执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并发度
        env.setParallelism(1);
        // 解析运行参数
        ParameterTool paraTool = ParameterTool.fromArgs(args);
        // 构造流图,将自定义Source生成的数据写入Kafka
        DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());
 
        FlinkKafkaProducer010 producer = new FlinkKafkaProducer010<>(new FlinkKafkaProducer010<>(paraTool.get("topic"),
 
           new SimpleStringSchema(),
 
           paraTool.getProperties()));
 
        messageStream.addSink(producer);
 
        // 调用execute触发执行
        env.execute();
     }
 
// 自定义Source,每隔1s持续产生消息
public static class SimpleStringGenerator implements SourceFunction<String> {
        static final String[] NAME = {"Carry", "Alen", "Mike", "Ian", "John", "Kobe", "James"};
 
        static final String[] SEX = {"MALE", "FEMALE"};
 
        static final int COUNT = NAME.length;   
 
        boolean running = true;
 
        Random rand = new Random(47);
 
       @Override
        //rand随机产生名字,性别,年龄的组合信息
         public void run(SourceContext<String> ctx) throws Exception {
 
            while (running) {
 
                int i = rand.nextInt(COUNT);
 
                int age = rand.nextInt(70);
 
                String sexy = SEX[rand.nextInt(2)];
 
                ctx.collect(NAME[i] + "," + age + "," + sexy);
 
                thread.sleep(1000);
 
            }
 
    }
 
       @Override
 
       public void cancel() {
 
         running = false;
 
       }
 
     }
 
   }

b.         生成Table1Table2,并使用JoinTable1Table2进行联合查询,打印输出结果。

public class SqlJoinWithSocket {
    public static void main(String[] args) throws Exception{
 
        final String hostname;
 
        final int port;
 
        System.out.println("use command as: ");
 
        System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" +
                " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21005 --hostname xxx.xxx.xxx.xxx --port xxx");
 
        System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" +
                " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka"
                + "--hostname xxx.xxx.xxx.xxx --port xxx");
 
        System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" +
                " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks "
                + "--ssl.truststore.password huawei --hostname xxx.xxx.xxx.xxx --port xxx");
 
        System.out.println("******************************************************************************************");
        System.out.println("<topic> is the kafka topic name");
        System.out.println("<bootstrap.servers> is the ip:port list of brokers");
        System.out.println("******************************************************************************************");
 
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
 
            hostname = params.has("hostname") ? params.get("hostname") : "localhost";
 
            port = params.getInt("port");
 
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'FlinkStreamSqlJoinExample " +
                    "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                    "and port is the address of the text server");
 
            System.err.println("To start a simple text server, run 'netcat -l -p <port>' and " +
                    "type the input text into the command line");
 
            return;
        }
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
 
        //基于EventTime进行处理
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
        env.setParallelism(1);
 
        ParameterTool paraTool = ParameterTool.fromArgs(args);
 
        //Stream1,从Kafka中读取数据
        DataStream<Tuple3<String, String, String>> kafkaStream = env.addSource(new FlinkKafkaConsumer010<>(paraTool.get("topic"),
                new SimpleStringSchema(),
                paraTool.getProperties())).map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String s) throws Exception {
                String[] word = s.split(",");
 
                return new Tuple3<>(word[0], word[1], word[2]);
            }
        });
 
        //Stream1注册为Table1
        tableEnv.registerDataStream("Table1", kafkaStream, "name, age, sexy, proctime.proctime");
 
        //Stream2,从Socket中读取数据
        DataStream<Tuple2<String, String>> socketStream = env.socketTextStream(hostname, port, "\n").
                map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        String[] words = s.split("\\s");
                        if (words.length < 2) {
                            return new Tuple2<>();
                        }
 
                        return new Tuple2<>(words[0], words[1]);
                    }
                });
 
        //Stream2注册为Table2
        tableEnv.registerDataStream("Table2", socketStream, "name, job, proctime.proctime");
 
        //执行SQL Join进行联合查询
        Table result = tableEnv.sqlQuery("SELECT t1.name, t1.age, t1.sexy, t2.job, t2.proctime as shiptime\n" +
                "FROM Table1 AS t1\n" +
                "JOIN Table2 AS t2\n" +
                "ON t1.name = t2.name\n" +
                "AND t1.proctime BETWEEN t2.proctime - INTERVAL '1' SECOND AND t2.proctime + INTERVAL '1' SECOND");
 
        //将查询结果转换为Stream,并打印输出
        tableEnv.toAppendStream(result, Row.class).print();
 
        env.execute();
    }
}

1.4 样例代码获取

1.5 调测程序

1.5.1 编包并运行程序

操作场景

在程序代码完成开发后,建议您上传至Linux客户端环境中运行应用。使用ScalaJava语言开发的应用程序在Flink客户端的运行步骤是一样的。

说明

基于YARN集群的Flink应用程序不支持在Windows环境下运行,只支持在Linux环境下运行。

操作步骤

                                步骤 1      IntelliJ IDEA中,在生成Jar包之前配置工程的Artifacts信息。

1.         IDEA主页面,选择“File > Project Structures...”进入“Project Structure”页面。

2.         在“Project Structure”页面,选择“Artifacts”,单击“+”并选择“JAR > Empty”。

图1-1 添加Artifacts

20180529144139644002.png

 

3.         您可以根据实际情况设置Jar包的名称、类型以及输出路径。

图1-2 设置基本信息

20180529144139987003.png

 

4.         选中“'FlinkStreamJavaExample' compile output”,右键选择“Put into Output Root”。然后单击“Apply”。

图1-3 Put into Output Root

20180529144140371004.png

 

5.         最后单击“OK”完成配置。

                                步骤 2      生成Jar包。

1.         IDEA主页面,选择“Build > Build Artifacts...”。

图1-4 Build Artifacts

20180529144141572005.png

 

2.         在弹出的菜单中,选择“FlinkStreamJavaExample > Build”开始生成Jar包。

图1-5 Build

20180529144141576006.png

 

3.         Event log中出现如下类似日志时,表示Jar包生成成功。您可以从步骤1.3中配置的路径下获取到Jar包。

21:25:43 Compilation completed successfully in 36 sec

                                步骤 3      步骤2中生成的Jar包(如FlinkStreamJavaExample.jar)拷贝到Linux环境的Flink运行环境下(即Flink客户端),如“/opt/Flink_test”。运行Flink应用程序。

Linux环境中运行Flink应用程序,需要先启动Flink集群。在Flink客户端下执行yarn  session命令,启动flink集群。执行命令例如:

bin/yarn-session.sh -n 3 -jm 1024 -tm 1024

说明

l  执行yarn-session.sh之前,应预先将Flink应用程序的运行依赖包拷贝到客户端目录#{client_install_home}/Flink/flink/lib下,应用程序运行依赖包请参考参考信息。

l  Flink任务运行过程中禁止重启HDFS服务或者重启所有DataNode实例,否则可能会导致任务失败,并可能导致应用部分临时数据无法清空。

l   运行DataStreamScalaJava)样例程序。

在终端另开一个窗口,进入Flink客户端目录,调用bin/flink  run脚本运行代码,例如:

bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/Flink_test/FlinkStreamJavaExample.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2

表1-1 参数说明

参数名称

说明

<filePath>

指本地文件系统中文件路径,每个节点都需要放一份/opt/log1.txt/opt/log2.txt。可以默认,也可以设置。

<windowTime>

指窗口时间大小,以分钟为单位。可以默认,也可以设置

 

l   运行向Kafka生产并消费数据样例程序(ScalaJava语言)。

生产数据的执行命令启动程序。

bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/Flink_test/FlinkKafkaJavaExample.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [ssl.truststore.location] [ssl.truststore.password]

消费数据的执行命令启动程序。

bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/Flink_test/FlinkKafkaJavaExample.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [ssl.truststore.location] [ssl.truststore.password]

表1-2 参数说明

参数名称

说明

是否必须配置

topic

表示Kafka主题名。

bootstrap.server

表示broker集群ip/port列表。

security.protocol

运行参数可以配置为PLAINTEXT(可不配置)/SASL_PLAINTEXT/SSL/SASL_SSL四种协议,分别对应Fusion Insight Kafka集群的21005/21007/21008/21009端口。

l  如果配置了SASL,则必须配置sasl.kerberos.service.namekafka,并在conf/flink-conf.yaml中配置security.kerberos.login相关配置项。

l  如果配置了SSL,则必须配置ssl.truststore.locationssl.truststore.password,前者表示truststore的位置,后者表示truststore密码。

说明

该参数未配置时为非安全Kafka

 

说明

Kafka应用需要添加如下所示的jar文件:

l  Flink服务端安装路径的lib目录下“flink-dist_2.11-1.3.0.jar”。

l  Flink服务端安装路径的connectors目录下的“flink-connector-kafka-*.jar”。

l  FusionInsight HDKafka客户端或Kafka服务端安装路径中的lib目录下“kafka-clients-0.11.0.0.jar”。

四种类型实际命令示例,以ReadFromKafka为例如下:

bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/Flink_test/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:21005

bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/Flink_test/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka

bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/Flink_test/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei

bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/Flink_test/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei

l   运行异步Checkpoint机制样例程序(ScalaJava语言)。

为了丰富样例代码,Java版本使用了Processing Time作为数据流的时间戳,而Scala版本使用Event  Time作为数据流的时间戳。具体执行命令参考如下:

           Checkpoint的快照信息保存到HDFS

n   Java

bin/flink run --class com.huawei.bigdata.flink.examples.FlinkProcessingTimeAPIMain /opt/Flink_test/FlinkCheckpointJavaExample.jar --chkPath hdfs://hacluster/flink-checkpoint/

n   Scala

bin/flink run --class com.huawei.bigdata.flink.examples.FlinkEventTimeAPIMain /opt/Flink_test/FlinkCheckpointScalaExample.jar --chkPath hdfs://hacluster/flink-checkpoint/

           Checkpoint的快照信息保存到本地文件。

n   Java

bin/flink run --class com.huawei.bigdata.flink.examples.FlinkProcessingTimeAPIMain /opt/Flink_test/FlinkCheckpointJavaExample.jar --chkPath file:///home/zzz/flink-checkpoint/

n   Scala

bin/flink run --class com.huawei.bigdata.flink.examples.FlinkEventTimeAPIMain
/opt/Flink_test/FlinkCheckpointScalaExample.jar --ckkPath file:///home/zzz/flink-checkpoint/

说明

l  Checkpoint源文件路径:flink-checkpoint/checkpoint/fd5f5b3d08628d83038a30302b611/chk-X/4f854bf4-ea54-4595-a9d9-9b9080779ffe

flink-checkpoint/checkpoint                             //指定的根目录。

fd5f5b3d08628d83038a30302b611                 //jobID命名的第二次目录。

chk-X                                                                // "X"checkpoint编号,第三层目录。

4f854bf4-ea54-4595-a9d9-9b9080779ffe       //checkpoint源文件。

l  Flink在集群模式下checkpoint将文件放到HDFS,本地路径只支持Flinklocal模式,便于调测。

l   运行Pipeline样例程序。

           Java

i.          启动发布者Job

bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySink /opt/Flink_test/FlinkPipelineJavaExample.jar

ii.        启动订阅者Job1

bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource1 /opt/Flink_test/FlinkPipelineJavaExample.jar

iii.      启动订阅者Job2

bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource2 /opt/Flink_test/FlinkPipelineJavaExample.jar

           Scala

i.          启动发布者Job

bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySink /opt/Flink_test/FlinkPipelineScalaExample.jar

ii.        启动订阅者Job1

bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource1 /opt/Flink_test/FlinkPipelineScalaExample.jar

iii.      启动订阅者Job2

bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource2 /opt/Flink_test/FlinkPipelineScalaExample.jar

l   运行配置表与流JOIN样例程序(以FlinkConfigtableJavaExampleFlinkConfigtableScalaExample为例)。

Java,运行方式有如下两种:

           yarn-session方式

i.          导入configtable.csvRedis集群。

java -cp /opt/FI-Client/Flink/flink/lib/*:/opt/Flink_test/FlinkConfigtableJavaExample.jar  com.huawei.bigdata.flink.examples.RedisDataImport --configPath config/import.properties

ii.        启动Flink集群。

bin/yarn-session.sh -n 3 -t config -jm 1024 -tm 1024

iii.      流数据以网民姓名为关键字,读取Redis内的网民个人信息并JOIN输出。

bin/flink run --class com.huawei.bigdata.flink.examples.FlinkConfigtableJavaExample /opt/Flink_test/FlinkConfigtableJavaExample.jar --dataPath config/data.txt

           yarn-cluster方式

i.          导入configtable.csvRedis集群。

java -cp /opt/FI-Client/Flink/flink/lib/*:/opt/Flink_test/FlinkConfigtableJavaExample.jar  com.huawei.bigdata.flink.examples.RedisDataImport --configPath config/import.properties

ii.        启动Flink集群,流数据以网民姓名为关键字,读取Redis内的网民个人信息并JOIN输出。

bin/flink run --class com.huawei.bigdata.flink.examples.FlinkConfigtableJavaExample -m yarn-cluster -yt config -yn 3 -yjm 1024 -ytm 1024 /opt/Flink_test/FlinkConfigtableJavaExample.jar --dataPath config/data.txt

Scala,运行方式有如下两种:

           yarn-session方式

i.          导入configtable.csvRedis集群。

java -cp /opt/FI-Client/Flink/flink/lib/*:/opt/Flink_test/FlinkConfigtableScalaExample.jar  com.huawei.bigdata.flink.examples.RedisDataImport --configPath config/import.properties

ii.        启动Flink集群。

bin/yarn-session.sh -n 3 -t config -jm 1024 -tm 1024

iii.      流数据以网民姓名为关键字,读取Redis内的网民个人信息并JOIN输出。

bin/flink run --class com.huawei.bigdata.flink.examples.FlinkConfigtableScalaExample /opt/Flink_test/FlinkConfigtableScalaExample.jar --dataPath config/data.txt

           yarn-cluster方式

i.          导入configtable.csvRedis集群。

java -cp /opt/FI-Client/Flink/flink/lib/*:/opt/Flink_test/FlinkConfigtableScalaExample.jar  com.huawei.bigdata.flink.examples.RedisDataImport --configPath config/import.properties

ii.        启动Flink集群,流数据以网民姓名为关键字,读取Redis内的网民个人信息并JOIN输出。

bin/flink run --class com.huawei.bigdata.flink.examples.FlinkConfigtableScalaExample -m yarn-cluster -yt config -yn 3 -yjm 1024 -ytm 1024 /opt/Flink_test/FlinkConfigtableScalaExample.jar --dataPath config/data.txt

l   运行Stream SQL Join样例程序

a.         启动程序向Kafka生产。Kafka配置可参考运行向Kafka生产并消费数据样例程序。

bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/Flink_test/FlinkStreamSqlJoinExample.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:21005

b.         在集群内任一节点启动netcat命令,等待应用程序连接。

netcat -l -p 9000

c.         启动程序接受Socket数据,并执行联合查询。

bin/flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/Flink_test/FlinkStreamSqlJoinExample.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:21005 --hostname xxx.xxx.xxx.xxx --port 9000

----结束

说明

针对Flink提供的几个样例工程,其对应的运行依赖包请参考参考信息。

1.5.2 查看调测结果

操作场景

Flink应用程序运行完成后,您可以查看运行结果数据,也可以通过Flink WebUI查看应用程序运行情况。

操作步骤

l   查看Flink应用运行结果数据。

当用户查看执行结果时,需要在Flinkweb页面上查看Task ManagerStdout日志。

当执行结果输出到文件或者其他,由Flink应用程序指定,您可以通过指定文件或其他获取到运行结果数据。以下用CheckpointPipeline和配置表与流JOIN为例:

           查看Checkpoint结果和文件

n   结果在flink的“taskmanager.out”文件中,用户可以通过FlinkWebUI查看“task manager”标签下的out按钮查看。

n   有两种方式查看Checkpoint文件。

  若将checkpoint的快照信息保存到HDFS,则通过执行hdfs dfs -ls hdfs://hacluster/flink-checkpoint/命令查看。

  若将checkpoint的快照信息保存到本地文件,则可直接登录到各个节点查看。

           查看Pipeline结果

n   结果在flink的“taskmanager.out”文件中,用户可以通过FlinkWebUI查看“task manager”标签下的out按钮查看。

           查看配置表与流JOIN结果

n   结果在flink的“taskmanager.out”文件中,用户可以通过FlinkWebUI查看“task manager”标签下的out按钮查看。

           查看Stream SQL Join结果

n   结果在flink的“taskmanager.out”文件中,用户可以通过FlinkWebUI查看“task manager”标签下的out按钮查看。

l   使用Flink Web页面查看Flink应用程序运行情况。

Flink Web页面主要包括了OverviewRunning JobsCompleted JobsTask ManagersJob ManagerLogout等部分。

YARNWeb UI界面,查找到对应的Flink应用程序。单击应用信息的最后一列“ApplicationMaster”,即可进入Flink Web页面

查看程序执行的打印结果:找到对应的Task Manager,查看对应的Stdout标签日志信息。

l   查看Flink日志获取应用运行情况。

有两种方式获取Flink日志,分别为通过Flink Web页面或者Yarn的日志

           Flink Web页面可以查看Task ManagersJob Manager部分的日志。

           Yarn页面主要包括了Job Manager日志以及GC日志等。

页面入口:在YARNWeb UI界面,查找到对应的Flink应用程序。单击应用信息的第一列ID,然后选择Logs列点击进去即可打开。

 

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?注册

x
  • x
  • 常规:

点评 回复

跳转到指定楼层
lWX387225  版主   发表于 2018-5-29 14:47:35 已赞(0) 赞(0)

欢迎使用!
  • x
  • 常规:

点评 回复

潜行者     发表于 2018-7-1 11:46:29 已赞(0) 赞(0)

版主你好,这个例子你怎么没有scala的代码,
还有个问题,这个双流join,历史状态管理怎么控制,
不可能我运行一个月,两个流把一个月的明细数据都缓存着吧,
能控制状态缓存时间么
  • x
  • 常规:

点评 回复

潜行者     发表于 2018-7-2 13:35:46 已赞(0) 赞(0)

版主你好,这个例子你怎么没有scala的代码,
还有个问题,这个双流join,历史状态管理怎么控制,
不可能我运行一个月,两个流把一个月的明细数据都缓存着吧,
能控制状态缓存时间么
  • x
  • 常规:

点评 回复

yktq     发表于 5 天前 已赞(0) 赞(0)

双流join,可不可以改成流与维表Join
  • x
  • 常规:

点评 回复

yktq     发表于 5 天前 已赞(0) 赞(0)

Thread thead;缺少个声明 This post was last edited by yktq at 2018-10-12 16:27.
  • x
  • 常规:

点评 回复

yktq     发表于 5 天前 已赞(0) 赞(0)

2 This post was last edited by yktq at 2018-10-12 16:26.
  • x
  • 常规:

点评 回复

yktq     发表于 5 天前 已赞(0) 赞(0)

1 This post was last edited by yktq at 2018-10-12 16:26.
  • x
  • 常规:

点评 回复

yktq     发表于 5 天前 已赞(0) 赞(0)

proctime.proctime这个字段的作用是?
  • x
  • 常规:

点评 回复

yktq     发表于 前天 14:46 已赞(0) 赞(0)

proctime这个类呢,既然要放就放点正确的代码,差评
  • x
  • 常规:

点评 回复

发表回复
您需要登录后才可以回帖 登录 | 注册

内容安全提示:尊敬的用户您好,为了保障您、社区及第三方的合法权益,请勿发布可能给各方带来法律风险的内容,包括但不限于政治敏感内容,涉黄赌毒内容,泄露、侵犯他人商业秘密的内容,侵犯他人商标、版本、专利等知识产权的内容,侵犯个人隐私的内容等。也请勿向他人共享您的账号及密码,通过您的账号执行的所有操作,将视同您本人的行为,由您本人承担操作后果。详情请参看“隐私声明
如果附件按钮无法使用,请将Adobe Flash Player 更新到最新版本!
快速回复 返回顶部