flink快速开始
快速开始
1. 基于flink DataStream
flink本身提供了丰富的数据读取、转换、写入api,我们可以创建DataStream,并对DataStream进行处理,实现数据处理
运行环境
java8
flink-12.2
maven
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17<properties>
<flink.version>1.12.2</flink.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
案例
本例将用户转账数据大于10000的,作为大额转账记录发出告警。
直接通过启动
main
即可启动
1 | public class FraudDetectionJob { |
步骤:
创建执行环境
1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
创建数据源
指定数据来源,如从数据库、kafka、文件等第三方读取。其中定义数据来源是通过实现
SourceFunction
实现,在本例中TransactionSource
实现了SourceFunction
接口,并从数组读取数据。1
2
3DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");TransactionSource.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public class TransactionSource implements SourceFunction<Transaction> {
private List<Transaction> transactionList = Lists.newArrayList(
new Transaction("NO001",0.1d),
new Transaction("NO001",101000d),
new Transaction("NO002",100d),
new Transaction("NO002",220d)
);
public void run(SourceContext<Transaction> ctx) throws Exception {
for (Transaction transaction : transactionList) {
collector.collect(transaction);
}
}
public void cancel() {
}
}Transaction.java
数据实体类,需要实现序列化接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public class Transaction implements Serializable {
private static final long serialVersionUID = 1L;
private String accountId;
private double amount;
public Transaction(String accountId, double amount) {
this.accountId = accountId;
this.amount = amount;
}
public String getAccountId() {
return accountId;
}
public double getAmount() {
return amount;
}
public void setAmount(double amount) {
this.amount = amount;
}
}数据处理
处理
TransactionSource
的生成的数据,其中keyBy(Transaction::getAccountId)
是将数据流根据accountId
进行分组处理,也就是相同的accountId
会在同一个处理线程中处理。数据处理完成后,通过调用Collector#collect
接口,将数据传递到下游。本例中,直接生成一个告警信息,传递给下游。1
2
3
4DataStream<FraudDetector.Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");FraudDetector.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
if (transaction.getAmount() > 10000d) {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
}
}输出结果
指定输出端,输出端需要实现
SinkFunction
接口,并实现invoke
方法,用于处理输出的数据。本例中AlertSink
实现SinkFunction
接口,将告警数据打印到控制台,在实际处理中,可将数据保存到数据库、kafka、或者调用网络接口。1
2alerts.addSink(new AlertSink())
.name("send-alerts");AlertSink.java
1
2
3
4
5
6public class AlertSink implements SinkFunction<Alert> {
public void invoke(Alert value, Context context) {
System.out.println("alert : " + value);
}
}
2. 基于flink table api
flink 提供基于table api,进行批处理和流出来统一关系型api,也就是我们不用关注底层的数据层的差异,将物理层数据结构,抽象映射成逻辑层table,对通过table api(以及直接通过SQL)对数据进行处理
运行环境
maven
在基于
flink DataStream
的依赖基础上,还需要以下依赖1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
案例
本例将CSV文件中的数据读取,其数据格式为:用户id,商品,数量 ,下面我们需要计算出每个用户,各类型商品总共购买了多少
通过启动
main
即可启动
1 | public class File2PrintExample { |
步骤
创建执行环境
创建一个执行环境,该环境表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回一个本地执行环境
1
2StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tEnv = StreamTableEnvironment.create(env);注册表
接下来将Table注册到当前环境,我们可以使用表连接到外部存储系统,并通过使用table api,访问外出存储数据。如订阅kafka消息、将数据写入到关系型数据库、发送告警等。本例中
order
表用于读取CSV中的数据,print
表用于将处理后的数据,输出到控制台。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20String orders = "CREATE TABLE orders (\n" +
" user_id INT,\n" +
" product STRING,\n" +
" amount INT\n" +
") WITH (\n" +
" 'connector.type' = 'filesystem',\n" +
" 'connector.path' = '" + path + "',\n" +
" 'format.type' = 'csv'\n" +
")";
String print = "CREATE TABLE print (\n" +
" user_id INT,\n" +
" product STRING,\n" +
" amount INT\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")";
tEnv.executeSql(orders);
tEnv.executeSql(print);查询
将表注册后,就可以通过table ap读取数据,并通过函数处理数据,最后输出到目标端。下面两种方式,处理结果相同
- 通过table api
1
2
3
4
5
6
7Table ordersTable = tEnv.from("orders")
.groupBy($("user_id"), $("product"))
.select(
$("user_id"),
$("product"),
$("amount").sum().as("amount"));
ordersTable.executeInsert("print");- 通过sql
1
2
3
4
5
6String query = " insert into print SELECT\n" +
" user_id,\n" +
" product,\n" +
" sum(amount) as amount\n" +
" FROM orders group by user_id ,product";
tEnv.executeSql(query);