第1章分析Flink的基本原理、架构和组件。本章简单实现了一个Flink的入门案例,可以加深你对前面内容的理解。
2.1 Flink开发环境分析
2.1.1 推荐开发工具
在开始之前,我们需要先解释一下开发工具的问题。官方推荐使用IntelliJ IDEA。 IntelliJ IDEA 默认集成了Scala 和Maven 环境,使用起来更加方便。当然,你也可以使用Eclipse。
开发Flink程序时,可以使用Java或Scala语言。我个人推荐使用Scala,因为它更容易实现函数式编程。当然也可以使用Java,但是代码逻辑实现起来会比较复杂。
在开发Flink程序时,我们推荐使用Maven来管理依赖。对于Maven仓库,我们推荐使用全国镜像仓库地址。海外仓库下载较慢,可以使用国内阿里云Maven仓库。
注意:如果您依赖国内源而无法下载,记得切换到国际源。如果使用国内阿里云Maven仓库镜像进行相关配置,则需要修改$Maven_HOME/conf/settings.xml文件。
镜像idaliMaven/id namealiyun Maven/name urlhttp://Maven.aliyun.com/nexus/content/groups/public//url MirrorOfcentral/mirrorOf /mirror2.1.2 Flink 程序依赖设置
在使用Maven管理Flink程序相关依赖之前,必须先对其进行配置。创建对应的Maven项目后,还必须在该项目的pom.xml文件中进行相关配置。
使用Java语言开发Flink程序时,需要添加以下设置。
注:这里使用的Flink版本是1.6.1。如果您使用的是不同版本,则需要进入Maven仓库,找到对应版本的Maven配置。
依赖groupIdorg.apache.flink/groupId artifactIdflink-java/artifactId version1.6.1/versionscopeprovided/scope /dependency 依赖groupIdorg.apache.flink/groupId artifactIdflink-streaming-java_2.11/artifactId version1.6.1/versionscopeprovided/scope /dependency 开发时使用Scala语言的Flink程序,需要添加以下设置:
依赖项groupIdorg.apache.flink/groupId artifactIdflink-scala_2.11/artifactId version1.6.1/versionscopeprovided/scope /Dependency 依赖项groupIdorg.apache.flink/groupId artifactIdflink-streaming-scala_2.11/artifactId version1.6.1/versionscopeprovided/scope /dependency注意:如果您在IDEA等开发工具中运行代码,则必须注释掉依赖配置中的scope属性。集群本身就有Flink相关的依赖,所以在编译JAR包时,必须开启scope属性,避免最终的JAR包中包含这些依赖包。
2.2 Flink 程序开发流程
开发Flink程序有一套固定的流程。
(1)获取执行环境。
(2) 读取并创建初始化数据。
(3)指定对数据进行操作的事务操作符。
(4) 指定计算数据的保存位置。
(5) 调用execute()触发执行程序。
注意:Flink 程序存在计算延迟,执行程序只有在最后一次调用execute()方法时才会被触发。
惰性计算的优点:可以开发复杂的程序。 Flink 将这个复杂的程序转换为一个计划并作为一个整体执行该计划。
现在,首先创建一个Flink Maven 项目并将其命名为FlinkExample。结果如图2.1所示。
图2.1 项目目录
后续所有Java代码都存放在src/main/Java目录下,所有Scala代码都存放在src/main/Scala目录下,流计算相关代码存放在对应的流目录下,批处理相关代码存放在对应的流目录。代码对应批处理目录。
2.3 开发Flink流处理(streaming)案例
需求分析:通过Socket手动实时生成一些单词,使用Flink实时接收数据,并对指定时间窗口(例如2秒)内的数据进行聚合统计,输出该时间范围内的计算结果。
2.3.1 开发Java代码
首先,为您的Java 代码添加Maven 依赖项。参见第2.1.2 节。请注意,以下代码创建了一个WordWithCount 类。该类主要用于帮助统计每个单词出现的总次数。
要求:实现对过去2秒内数据的1秒汇总计算。
分析:通过Socket模拟生成单词,并使用Flink程序对数据进行汇总和计算。
代码实现如下。
包xuwei.tech.streaming;导入org.apache.Flink.api.common.functions.FlatMapFunction;导入org.apache.Flink.api.Java.utils.ParameterTool;导入org.apache.Flink.contrib.streaming.state。 RocksDBStateBackend;导入org.apache.Flink.runtime.state.filesystem.FsStateBackend;导入org.apache.Flink.runtime.state.memory.MemoryStateBackend;导入org.apache.Flink.streaming.api.DataStream.DataStream;导入org. apache.Flink.streaming.api.DataStream.DataStreamSource;导入org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment;导入org.apache.Flink.streaming.api.windowing.time.Time;导入org.apache.Flink .util.Collector;/** * 滑动窗口字数统计* * Created by xuwei.tech */public class SocketWindowWordCountJava { public static void main(String[] args) throws Exception{ //获取需要的端口号int try ; { ParameterTool ParameterTool=ParameterTool.fromArgs(args); Port=ParameterTool.getInt('port'); }catch (Exception e){ System.err.println('端口未设置。默认使用端口9000' ); } //获取Flink StreamExecutionEnvironment的执行环境env=StreamExecutionEnvironment.getExecutionEnvironment(); //连接一个socket来获取输入数据text=env .socketTextStream(hostname, port, delimiter); //a a c //a 1 //a 1 //c 1 DataStreamWordWithCount windowCounts=text. flatMap(new FlatMapFunction String, WordWithCount() { public void flatMap(String value, CollectorWordWithCount out) 抛出异常{ String[] splits=value.split('\\\s' ); for (String word : splits) { out.collect(new WordWithCount(word, 1L)); ('word ') .timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2秒,指定时间间隔为1秒。 sum('count');//这里使用sum或reduce Can/*.reduce(new ReduceFunctionWordWithCount() { public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception { return new WordWithCount(a.word, a.count+ b.count); } })*///打印数据到控制台并设置并行度windowCounts.print().setParallelism(1); //这行代码必须实现。如果你不实现它,你的程序将不会运行env.execute('Socket window count' ); } public static class WordWithCount{ public String word; public WordWithCount(){} public WordWithCount(String word,long count) { } @Override public String toString () { return 'WordWithCount{' + 'word='' + word + '\'' + ', count=' + count + '}' } }}2.3.2 开发Scala 代码
首先,为您的Scala 代码添加Maven 依赖项。参见第2.1.2 节。这里我们将使用Scala 中的case 类创建一个类。
要求:实现对过去2秒内数据的1秒汇总计算。
分析:通过Socket模拟生成单词,并使用Flink程序对数据进行汇总和计算。
代码实现如下。
包xuwei.tech.streamingimport org.apache.Flink.api.Java.utils.ParameterToolimport org.apache.Flink.streaming.api.Scala.StreamExecutionEnvironmentimport org.apache.Flink.streaming.api.windowing.time.Time/** * 滑动窗口计算字数* * Created by xuwei.tech */object SocketWindowWordCountScala { def main(args: Array[String]): Unit={ //获取socket端口号val port: Int=try { ParameterTool.fromArgs(args ) .getInt('port') }catch { case e: Exception={ System.err.println('Port not set. Default port 9000--Scala') } 9000 } //获取执行环境val env: StreamExecutionEnvironment=StreamExecutionEnvironment .getExecutionEnvironment //连接到Socket以获取输入数据。 val text=env.socketTextStream('hadoop100',port,'\n') //解析数据(展平数据)、分组、窗口和聚合以求和。 //注意:必需添加此行需要隐式换行符。否则下面的FlatMap方法会报错。 import org.apache.Flink.api.Scala._ val windowCounts=text. flatMap(line=line.split) ('\\\s'))//Flat,剪切每一行单词。 map(w=WordWithCount(w,1))//将单词转换为单词。在这种格式中它是1。 keyBy('word')//Group.timeWindow(Time.seconds(2),Time.seconds(1))//指定窗口大小和间隔time.sum('count');//总计或者可以使用要么reduce //.reduce ((a,b)=WordWithCount(a .word,a.count+b.count)) //打印到控制台windowCounts.print().setParallelism(1) //执行任务env。 execute('Socket 窗口计数'); } case class WordWithCount(word: String,count: Long)}2.3.3 程序执行
在前面的案例代码中,主机名指定为hadoop100,端口默认设置为9000。这意味着流处理程序将默认监听该主机上的9000端口。因此,在运行程序之前,需要在hadoop100节点上监听该端口,并运行以下命令:
[root@hadoop100 Soft]# 执行nc -l 9000aba,在IDEA中运行完成的程序代码。结果如下。
WordWithCount{word='a', count=1}WordWithCount{word='b', count=1}WordWithCount{word='a', count=2}WordWithCount{word='b', count=1}WordWithCount{ word='a', count=1}2.4 Flink批处理(Batch)案例开发
我们使用Flink实现了一个典型的流计算案例。我们再看一下Flink的另一个应用场景:——Batch离线批处理。
2.4.1 开发Java代码
需求:统计某个单词在文件中出现的总次数,并将结果保存到文件中。
Java代码实现如下。
包xuwei.tech.batch;导入org.apache.Flink.api.common.functions.FlatMapFunction;导入org.apache.Flink.api.Java.DataSet;导入org.apache.Flink.api.Java.ExecutionEnvironment;导入org .apache.Flink.api.Java.operators.DataSource;import org.apache.Flink.api.Java.tuple.Tuple2;import org.apache.Flink.util.Collector;/** *离线字数计算* * 创建by xuwei.tech */public class BatchWordCountJava { public static void main(String[] args) throws Exception{ String inputPath='D:\\\data\\\file'; //获取执行环境。 env=ExecutionEnvironment.getExecutionEnvironment(); //获取文件的内容。 text=env.readTextFile(inputPath), 整数counts=text. groupBy(0).sum(1); counts.writeAsCsv(outPath,'\n',' ').setParallelism(1);公共静态类Tokenizer 实现FlatMapFunctionString。Tuple2String , Integer{ public void flatMap(String value, CollectorTuple2String, Integer out) throws Exception { String[] tokens=value.toLowerCase().split('\\\W+') for (String token: tokens) { if( tokens) .length()0){ out.collect(new Tuple2String, Integer(token,1)); } } }}2.4.2 开发Scala 代码
需求:统计某个单词在文件中出现的总次数,并将结果保存到文件中。
Scala代码实现如下。
package xuwei.tech.batchimport org.apache.Flink.api.Scala.ExecutionEnvironment/** * 离线计算字数* Created by xuwei.tech */object BatchWordCountScala { def main(args: Array[String]): Unit={ val inputPath='D:\\\data\\\file' val outPut='D:\\\data\\\result' val env=ExecutionEnvironment.getExecutionEnvironment val text=env.readTextFile(inputPath) //隐式转换导入组织介绍.apache.Flink.api.Scala._ val counts=text. flatMap(_.toLowerCase.split('\\\W+')) .filter(_.nonEmpty) .map((_,1)) . 0) .sum(1) counts.writeAsCsv(outPut,'\n',' ').setParallelism(1) env.execute('批量字数统计') }}2.4.3 执行程序
首先,代码中指定的inputPath是D:\data\file目录。我需要在这个目录下创建一些文件,并在该文件中输入一些单词。
D:\data\filedir2018/03/20 09:01 24 a.txtD:\data\filetype a.txthello a hello bhello a然后在IDEA中运行程序代码时,结果为outPut\指定的D:\\\data\将是已保存。 \结果文件。
D:\datatype resulthello 3b 1a 2本文摘自徐巍新发布的《Flink入门与实战》。
这是一本Flink 入门指南,力求对Flink 的基础理论和实际操作进行详细、完整的讲解。使用Flink 1.6版本编写的示例丰富实用,学以致用。我们将通过细节和示例,以简单易懂的方式解释Flink 技术的本质。 51CTO热门在线课程教材可以与在线课程结合,快速提高您的大数据开发技能。本书的目的是帮助读者从零开始快速学习Flink的基本原理和核心特性。本书首先介绍了Flink的基本原理、安装和部署,并对Flink的一些核心API进行了详细分析。然后支持相应的案例分析,每个案例分别使用Java代码和Scala代码实现。最后通过两个项目演示了Flink在实际工作中的几种应用场景,帮助读者快速掌握Flink开发。
为了学习本书,每个人都应该具备大数据的基础知识,包括Hadoop、Kafka、Redis、Elasticsearch等框架的基本安装和使用。本书也适合对大数据实时计算感兴趣的读者。
版权声明:本文转载于网络,版权归作者所有。如有侵权,请联系本站编辑删除。