建网站公司用什么网站程序,网站服务器的工作原理,英语培训机构前十名,做多级分销的网站Flink开发环境搭建与WordCount实战
前言
上一篇我们从宏观角度认识了 Flink#xff0c;知道它是干什么的。但光说不练假把式#xff0c;这篇文章我们要动手搞起来——从零搭建 Flink 开发环境#xff0c;并写出人生中第一个 Flink 程序#xff1a;WordCount#xff08;单…Flink开发环境搭建与WordCount实战前言上一篇我们从宏观角度认识了 Flink知道它是干什么的。但光说不练假把式这篇文章我们要动手搞起来——从零搭建 Flink 开发环境并写出人生中第一个 Flink 程序WordCount单词计数。别担心环境问题我会一步步带你配置保证你能跑起来。如果中间遇到问题文末有常见问题汇总。个人主页你的主页目录一、环境准备清单二、JDK安装与配置三、Maven安装与配置四、IDEA创建Flink项目五、第一个WordCount程序六、运行与验证七、代码逐行解析八、常见问题与解决九、总结一、环境准备清单在开始之前先确认你的电脑上有以下环境环境要求推荐版本说明JDK必须JDK 8 或 JDK 11Flink 官方推荐JDK 17 需要额外配置Maven必须3.6依赖管理工具IDE推荐IntelliJ IDEA社区版免费够用Git可选最新版版本管理为什么推荐 JDK 8 或 11Flink 对 JDK 版本有一定要求JDK 8最稳定兼容性最好JDK 11官方支持长期维护版本JDK 17需要额外的 JVM 参数初学者不推荐如果你电脑上已经有 JDK可以用命令查看版本java -version输出类似java version 1.8.0_301 Java(TM) SE Runtime Environment (build 1.8.0_301-b09) Java HotSpot(TM) 64-Bit Server VM (build 25.301-b09, mixed mode)看到1.8就是 JDK 8看到11.x.x就是 JDK 11。二、JDK安装与配置如果你还没安装 JDK按以下步骤操作Windows 用户下载 JDKOracle JDK 下载 或 AdoptOpenJDK双击安装记住安装路径如C:\Program Files\Java\jdk1.8.0_301配置环境变量新建JAVA_HOMEC:\Program Files\Java\jdk1.8.0_301在Path中添加%JAVA_HOME%\binMac 用户推荐用 Homebrew 安装# 安装 JDK 8brewinstallopenjdk8# 配置环境变量添加到 ~/.zshrc 或 ~/.bash_profileexportJAVA_HOME$(/usr/libexec/java_home -v1.8)exportPATH$JAVA_HOME/bin:$PATH# 使配置生效source~/.zshrcLinux 用户# Ubuntu/Debiansudoaptupdatesudoaptinstallopenjdk-8-jdk# CentOS/RHELsudoyuminstalljava-1.8.0-openjdk-devel# 配置环境变量exportJAVA_HOME/usr/lib/jvm/java-8-openjdk-amd64exportPATH$JAVA_HOME/bin:$PATH三、Maven安装与配置安装 MavenWindows下载Maven 官网选择apache-maven-x.x.x-bin.zip解压到某个目录如C:\apache-maven-3.9.5配置环境变量新建MAVEN_HOMEC:\apache-maven-3.9.5在Path中添加%MAVEN_HOME%\binMacbrewinstallmavenLinux# Ubuntu/Debiansudoaptinstallmaven# CentOSsudoyuminstallmaven验证安装mvn -v输出类似Apache Maven 3.9.5 Maven home: /usr/local/Cellar/maven/3.9.5/libexec Java version: 1.8.0_301, vendor: Oracle Corporation配置国内镜像重要Maven 默认从国外下载依赖速度很慢。强烈建议配置阿里云镜像。找到 Maven 配置文件settings.xmlWindowsC:\Users\你的用户名\.m2\settings.xmlMac/Linux~/.m2/settings.xml如果文件不存在从{MAVEN_HOME}/conf/settings.xml复制一份。在mirrors标签内添加mirroridaliyunmaven/idmirrorOf*/mirrorOfname阿里云公共仓库/nameurlhttps://maven.aliyun.com/repository/public/url/mirror四、IDEA创建Flink项目第一步创建 Maven 项目打开 IDEA →File→New→Project选择Maven不要勾选 Create from archetype填写项目信息Nameflink-tutorialGroupIdcom.exampleArtifactIdflink-tutorial点击Create第二步配置 pom.xml这是最关键的一步。把pom.xml替换成以下内容?xml version1.0 encodingUTF-8?projectxmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.example/groupIdartifactIdflink-tutorial/artifactIdversion1.0-SNAPSHOT/versionpackagingjar/packagingpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding!-- Flink 版本推荐使用 1.17.x 或 1.18.x --flink.version1.17.2/flink.versionscala.binary.version2.12/scala.binary.versionslf4j.version1.7.36/slf4j.version/propertiesdependencies!-- Flink 核心依赖 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- Flink 客户端本地运行需要 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- 本地运行时需要的运行时依赖 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-runtime-web/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- 日志依赖 --dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion${slf4j.version}/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-simple/artifactIdversion${slf4j.version}/version/dependency/dependenciesbuildplugins!-- 编译插件 --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.11.0/versionconfigurationsource8/sourcetarget8/target/configuration/plugin!-- 打包插件用于生成包含依赖的 jar --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.5.0/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationtransformerstransformerimplementationorg.apache.maven.plugins.shade.resource.ManifestResourceTransformermainClasscom.example.WordCount/mainClass/transformer/transformers/configuration/execution/executions/plugin/plugins/build/project第三步刷新 Maven 依赖在 IDEA 右侧找到Maven面板点击刷新按钮等待依赖下载完成。如果下载很慢检查是否配置了阿里云镜像。第四步配置运行参数因为 Flink 依赖是provided作用域生产环境由集群提供本地运行需要额外配置点击Run→Edit Configurations选择你的运行配置勾选Include dependencies with Provided scope或者直接在 pom.xml 中把scopeprovided/scope改成scopecompile/scope仅开发阶段。五、第一个WordCount程序创建代码目录结构src/main/java/com/example/ └── WordCount.javaWordCount.java 完整代码packagecom.example;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;/** * Flink 入门程序WordCount单词计数 * * 功能统计输入文本中每个单词出现的次数 * * author 山沐与山 */publicclassWordCount{publicstaticvoidmain(String[]args)throwsException{// 第一步创建执行环境 // 这是所有 Flink 程序的入口类似于 SparkContextStreamExecutionEnvironmentenvStreamExecutionEnvironment.getExecutionEnvironment();// 第二步读取数据源Source // 这里我们用一个简单的集合作为数据源// 实际生产中数据源通常是 Kafka、文件、数据库等DataStreamStringtextStreamenv.fromElements(hello flink,hello world,flink is awesome,hello flink world);// 第三步数据转换Transformation DataStreamTuple2String,IntegerwordCountStreamtextStream// 3.1 将每行文本切分成单词并转换为 (单词, 1) 的形式.flatMap(newTokenizer())// 3.2 按单词分组keyBy.keyBy(tuple-tuple.f0)// 3.3 对每组数据求和.sum(1);// 第四步输出结果Sink // 这里直接打印到控制台实际生产中会写入 Kafka、数据库等wordCountStream.print(WordCount);// 第五步触发执行 // Flink 是懒执行的只有调用 execute() 才会真正开始运行env.execute(Flink WordCount Job);}/** * 自定义 FlatMapFunction将一行文本切分成多个单词 * * 输入一行文本如 hello flink * 输出多个 (单词, 1) 元组如 (hello, 1), (flink, 1) */publicstaticclassTokenizerimplementsFlatMapFunctionString,Tuple2String,Integer{OverridepublicvoidflatMap(Stringline,CollectorTuple2String,Integerout){// 按空格切分String[]wordsline.toLowerCase().split(\\s);// 遍历每个单词输出 (单词, 1)for(Stringword:words){if(word.length()0){out.collect(newTuple2(word,1));}}}}}六、运行与验证在 IDEA 中运行右键点击WordCount.java选择Run WordCount.main()预期输出WordCount:3 (awesome,1) WordCount:7 (flink,3) WordCount:6 (world,2) WordCount:1 (hello,3) WordCount:4 (is,1)输出解释WordCount:3表示这条数据由第3个并行任务处理(flink,3)表示单词 “flink” 出现了 3 次恭喜你你的第一个 Flink 程序已经成功运行了可能遇到的问题如果报错ClassNotFoundException说明 Flink 依赖没有正确加载。检查Maven 依赖是否下载完成是否配置了Include dependencies with Provided scope或者把pom.xml中的provided改成compile七、代码逐行解析让我们把代码拆开来一块块理解7.1 创建执行环境StreamExecutionEnvironmentenvStreamExecutionEnvironment.getExecutionEnvironment();这行代码是所有 Flink 流处理程序的起点。StreamExecutionEnvironment可以理解为 Flink 的总管家它负责配置并行度、状态后端等参数管理数据流的整个生命周期最终提交作业到集群执行类比一下如果 Flink 程序是一部电影env就是导演。7.2 读取数据源DataStreamStringtextStreamenv.fromElements(hello flink,hello world,// ...);fromElements()是最简单的数据源直接从内存中的集合读取数据。常用的数据源还有方法说明场景fromElements()从集合读取测试、演示fromCollection()从 List 读取测试readTextFile()从文件读取批处理socketTextStream()从 Socket 读取实时测试addSource(new FlinkKafkaConsumer())从 Kafka 读取生产环境7.3 数据转换这是 Flink 程序的核心部分textStream.flatMap(newTokenizer())// 切分单词.keyBy(tuple-tuple.f0)// 按单词分组.sum(1);// 求和flatMap一对多的转换。一行文本 → 多个 (单词, 1) 元组输入hello flink 输出(hello, 1), (flink, 1)keyBy按 key 分组。相同 key 的数据会被发送到同一个 Task 处理所有 keyhello 的数据 → Task A 所有 keyflink 的数据 → Task Bsum(1)对 Tuple 的第 2 个字段索引为 1求和(hello, 1) (hello, 1) (hello, 1) (hello, 3)7.4 数据流转换过程图解原始数据流 │ │ hello flink │ hello world │ flink is awesome │ hello flink world │ ▼ ┌─────────────────────────────────────────────────────────┐ │ flatMap(Tokenizer) │ │ │ │ hello flink → (hello,1), (flink,1) │ │ hello world → (hello,1), (world,1) │ │ flink is awesome → (flink,1), (is,1), (awesome,1) │ │ hello flink world → (hello,1), (flink,1), (world,1) │ └─────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────┐ │ keyBy(word) │ │ │ │ hello组 (hello,1), (hello,1), (hello,1) │ │ flink组 (flink,1), (flink,1), (flink,1) │ │ world组 (world,1), (world,1) │ │ is组 (is,1) │ │ awesome组(awesome,1) │ └─────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────┐ │ sum(1) │ │ │ │ (hello, 3) │ │ (flink, 3) │ │ (world, 2) │ │ (is, 1) │ │ (awesome, 1) │ └─────────────────────────────────────────────────────────┘7.5 触发执行env.execute(Flink WordCount Job);这行代码非常关键Flink 程序是懒执行的前面所有的代码只是在构建执行计划DAG只有调用execute()才会真正开始执行。这和 Spark 的action操作很像。八、常见问题与解决Q1报错java.lang.NoClassDefFoundError原因Flink 依赖没有正确加载解决在 IDEA 运行配置中勾选Include dependencies with Provided scope或者把 pom.xml 中的scopeprovided/scope改成scopecompile/scopeQ2Maven 下载依赖很慢原因默认使用国外仓库解决配置阿里云镜像见第三节Q3报错Could not find or load main class原因包路径不对解决确保WordCount.java的 package 声明和实际目录结构一致Q4运行后没有输出原因可能是日志级别问题解决在src/main/resources下创建log4j.propertieslog4j.rootLoggerINFO, console log4j.appender.consoleorg.apache.log4j.ConsoleAppender log4j.appender.console.layoutorg.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%nQ5JDK 版本不兼容原因使用了 JDK 17解决切换到 JDK 8 或 11或者添加 JVM 参数--add-opens java.base/java.langALL-UNNAMED九、总结这篇文章我们完成了环境准备JDK 8/11 Maven IDEA项目配置创建 Maven 项目配置 Flink 依赖第一个程序WordCount 单词计数代码理解StreamExecutionEnvironment程序入口Source数据从哪来Transformation数据怎么处理flatMap、keyBy、sumSink数据到哪去execute()触发执行核心知识点概念说明DataStreamFlink 中的数据流抽象flatMap一对多转换keyBy按 key 分组相同 key 的数据去同一个 Tasksum聚合操作对指定字段求和懒执行必须调用 execute() 才会真正运行下一篇文章我们将深入剖析Flink 的架构看看 JobManager 和 TaskManager 是怎么协作的理解 Flink 是如何把任务分配到多台机器上并行执行的。热门专栏推荐Agent小册Java基础合集Python基础合集Go基础合集大数据合集前端小册数据库合集Redis 合集Spring 全家桶微服务全家桶数据结构与算法合集设计模式小册Ai工具小册等等等还有许多优秀的合集在主页等着大家的光顾感谢大家的支持文章到这里就结束了如果有什么疑问的地方请指出诸佬们一起来评论区一起讨论希望能和诸佬们一起努力今后我们一起观看感谢您的阅读如果帮助到您不妨3连支持一下创造不易您们的支持是我的动力