开发者

Spring Boot 整合 Apache Flink 的详细过程

开发者 https://www.devze.com 2025-06-29 12:26 出处:网络 作者: 嘵奇
目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添加依赖1. 创建Spring Boot项目2. 添加Flink依赖四、基础整合示例1. 编写Flink流处理作业2. 在Spring Boot中启动作业五、进阶
目录
  • Spring Boot 整合 Apache Flink 教程
  • 一、背景与目标
  • 二、环境准备
  • 三、创建项目 & 添加依赖
    • 1. 创建Spring Boot项目
    • 2. 添加Flink依赖
  • 四、基础整合示例
    • 1. 编写Flink流处理作业
    • 2. 在Spring Boot中启动作业
  • 五、进阶整合 - 通过REST API动态提交作业
    • 1. 创建Job提交服务
    • 2. 创建REST控制器
  • 六、关键配置说明
    • 1. application.properties
    • 2. 解决依赖冲突
  • 七、运行与验证
    • 八、生产环境注意事项
      • 九、完整项目结构

        Spring Boot 整合 Apache Flink 教程

        一、背景与目标

        Apache Flink 是一个高性能的分布式流处理框架,而Spring Boot提供了快速构建企业级应用的能力。整合二者可实现:

        • 利用Spring Boot的依赖注入、配置管理等功能简化Flink作业开发
        • 构建完整的微服务架构,将流处理嵌入Spring生态
        • 实现动态作业提交与管理

        二、环境准备

        • JDK 17+
        • Maven 3.8+
        • Spring Boot 3.1.5
        • Flink 1.17.2

        三、创建项目 & 添加依赖

        1. 创建Spring Boot项目

        使用Spring Initializr生成基础项目,选择:

        • Maven
        • Spring Web(可选,用于创建REST接口)

        2. 添加Flink依赖

        <!-- pom.XML -->
        <dependencies>
            <!-- Spring Boot Starter -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <!-- Flink核心依赖 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-Java</artifactId>
                <version>1.17.2</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java</artifactId>
                <version>1.17.2</version>
                <scope>provided</scope>
            </dependency>
            <!-- 本地执行时需添加 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime</artifactId>
                <version>1.17.2</version>
                <scope>test</scope>
            </dependency>
        </dependencies>

        四、基础整合示例

        1. 编写Flink流处理作业

        // src/main/java/com/example/demo/flink/WordCountJob.java
        import org.apache.flink.api.common.functions.FlatMapFunction;
        import org.apache.flink.streaming.api.datastream.DataStream;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.util.Collector;
        public class WordCountJob {
            public static void ex编程ecute() throws Exception {
                final StreamExecutionEnvironment env = 
                    StreamExecutionEnvironment.getExecutionEnvironment();
                DataStream<String> text = env.fromElements(
           android         "Spring Boot整合Flink",
                    "Flink实时流处理",
                    "Spring生态集成"
                );
                DataStr编程客栈eam<WordCount> counts = text
                    .flatMap(new FlatMapFunction<String, WordCount>() {
                        @Override
                        public void flatMap(String value, Collector<WordCount>编程客栈 out) {
                            for (String word : value.split("\\s")) {
                                out.collect(new WordCount(word, 1L));
                            }
                        }
                    })
                    .keyBy(value -> value.word)
                    .sum("count");
                counts.print();
                env.execute("Spring Boot Flink Job");
            }
            public static class WordCount {
                public String word;
                public long count;
                public WordCount() {}
                public WordCount(String word, long count) {
                    this.word = word;
                    this.count = count;
                }
                @Override
                public String toString() {
                    return word + " : " + count;
                }
            }
        }

        2. 在Spring Boot中启动作业

        // src/main/java/com/example/demo/DemoApplication.java
        @SpringBootApplication
        public class DemoApplication implements CommandLineRunner {
            public static void main(String[] args) {
                SpringApplication.run(DemoApplication.class, args);
            }
            @Override
            public void run(String... args) throws Exception {
                WordCountJob.execute(); // 启动Flink作业
            }
        }

        五、进阶整合 - 通过REST API动态提交作业

        1. 创建Job提交服务

        // src/main/java/com/example/demo/service/FlinkJobService.java
        @Service
        public class FlinkJobService {
            public String submitWordCountJob(List<String> inputLines) {
                try {
                    final StreamExecutionEnvironment env = 
                        StreamExecutionEnvironment.getExecutionEnvironment();
                    DataStream<String> texjavascriptt = env.fromCollection(inputLines);
                    // ...(同上WordCount逻辑)
                    JobExecutionResult result = env.execute();
                    return "JobID: " + result.getJobID();
                } catch (Exception e) {
                    return "Job Failed: " + e.getMessage();
                }
            }
        }

        2. 创建REST控制器

        // src/main/java/com/example/demo/controller/JobController.java
        @RestController
        @RequestMapping("/jobs")
        public class JobController {
            @Autowired
            private FlinkJobService flinkJobService;
            @PostMapping("/wordcount")
            public String submitWordCount(@RequestBody List<String> inputs) {
                return flinkJobService.submitWordCountJob(inputs);
            }
        }

        六、关键配置说明

        1. application.properties

        # 设置Flink本地执行环境
        spring.flink.local.enabled=true
        spring.flink.job.name=SpringBootFlinkJob
        # 调整并行度(根据CPU核心数)
        spring.flink.parallelism=4

        2. 解决依赖冲突

        在pom.xml中排除冲突依赖:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.17.2</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        七、运行与验证

        启动Spring Boot应用:

        mvn spring-boot:run

        调用API提交作业:

        curl -X POST -H "Content-Type: application/json" \
        -d '["Hello Flink", "Spring Boot Integration"]' \
        http://localhost:8080/jobs/wordcount

        查看控制台输出:

        Flink> Spring : 1

        Flink> Boot : 1

        Flink> Integration : 1

        ...

        八、生产环境注意事项

        集群部署:将打包后的jar提交到Flink集群

        flink run -c com.example.demo.DemoApplication your-application.jar

        状态管理:集成Flink State Backend(如RocksDB)

        监控集成:通过Micrometer接入Spring Boot Actuator

        资源隔离:使用YarnKubernetes部署模式

        九、完整项目结构

        src/
        ├── main/
        │   ├── java/
        │   │   ├── com/example/demo/
        │   │   │   ├── DemoApplication.java
        │   │   │   ├── flink/
        │   │   │   │   └── WordCountJob.java
        │   │   │   ├── controller/
        │   │   │   ├── service/
        │   ├── resources/
        │   │   └── application.properties
        pom.xml

        通过以上步骤,即可实现Spring Boot与Apache Flink的深度整合。这种架构特别适合需要将实时流处理能力嵌入微服务体系的场景,如实时风控系统、IoT数据处理平台等。后续可扩展集成Kafka、HBase等大数据组件。

        到此这篇关于Spring Boot 整合 Apache Flink 教程的文章就介绍到这了,更多相关Spring Boot 整合 Apache Flink内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

        0

        精彩评论

        暂无评论...
        验证码 换一张
        取 消

        关注公众号