首页>源码>java>SpringBoot-Flink

SpringBoot-Flink

声明:资源链接索引至第三方,平台不作任何存储,仅提供信息检索服务,若有版权问题,请https://help.coders100.com提交工单反馈
SpringBoot与Flink的集成主要涉及到以下几个方面:

1. 引入Flink依赖:在pom.xml文件中添加Flink的依赖,例如:


org.apache.flink
flink-connector-kafka
1.13.2


org.apache.flink
flink-connector-jdbc
1.13.2


2. 配置Kafka和JDBC连接:在application.properties或application.yml文件中配置Kafka和JDBC的相关信息,例如:

spring.datasource.url=jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test-group
spring.kafka.consumer.auto-offset-reset=earliest

spring.flink.config.uris=classpath:flink-conf/flink-env.yaml


3. 创建Flink配置文件:在classpath下的flink-conf目录下创建一个名为flink-env.yaml的文件,用于配置Flink的相关参数,例如:

# Flink Configuration
cluster_name = cluster1
taskmanager_id = taskmanager-1
network_timeout_ms = 5000
max_parallelism = 4
checkpoint_interval = 10000
log_level = WARN
streaming_batch_size = 16384


4. 创建数据源:在Flink的SQLContext中,需要创建一个数据源,例如:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironmentFactory;
import java.util.Properties;

public class FlinkIntegrationDemo {
public static void main(String[] args) throws Exception {
// 创建环境设置
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// 创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 创建表环境
StreamTableEnvironment tEnv = StreamTableEnvironmentFactory.create(settings);
// 创建数据源
DataSet kafkaSource = tEnv.fromElements("1", "2", "3");
// 创建表并注册为数据源
Table table = tEnv.fromDataSet(kafkaSource, "myTable");
// 注册数据源
table.registerTemporaryTable("tempTable");
// 执行任务
Table result = table.sqlQuery("SELECT FROM tempTable");
System.out.println(result);
// 关闭环境
env.execute("Flink integration demo");
}
}


5. 运行程序:运行FlinkIntegrationDemo类,可以看到Flink已经成功接收到来自Kafka的数据,并将其存储在临时表中。SpringBoot与Flink代码的简单集成,通过写一些简单的代码来梳理其中的逻辑。
电信网络下载

访问申明(访问视为同意此申明)

1.在网站平台的任何操作视为已阅读和同意网站底部的版权及免责申明
2.部分网络用户分享TXT文件内容为网盘地址有可能会失效(此类多为视频教程,如发生失效情况【联系客服】自助退回)
3.请多看看评论和内容介绍大数据情况下资源并不能保证每一条都是完美的资源
4.是否访问均为用户自主行为,本站只提供搜索服务不提供技术支持,感谢您的支持
意见反馈 联系客服 返回顶部

登录注册找回密码

捐赠账单

可选择微信或支付宝捐赠

*请依据自身情况量力选择捐赠类型并点击“确认”按钮

*依据中国相关法规,捐赠金额平台将不予提供发票

*感谢您的捐赠,我们竭诚为您提供更好的搜索服务

*本着平台非营利,请自主选择捐赠或分享资源获得积分

*您的捐赠仅代表平台的搜索服务费,如有疑问请通过联系客服反馈

*推荐用chrome浏览器访问本站,禁用360/Edge浏览器

*请务必认真阅读上诉声明,捐赠视为理解同意上诉声明

账号剩余积分: 0
啥都没有哦