快速开始

flink本身提供了丰富的数据读取、转换、写入api,我们可以创建DataStream,并对DataStream进行处理,实现数据处理

运行环境

  • java8

  • flink-12.2

  • maven

    Flink仓库

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
     <properties>
    <flink.version>1.12.2</flink.version>
    <scala.binary.version>2.11</scala.binary.version>
    </properties>

    <dependencies>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>
    </dependencies>

案例

本例将用户转账数据大于10000的,作为大额转账记录发出告警。

直接通过启动main即可启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class FraudDetectionJob {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");

DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");

alerts
.addSink(new AlertSink())
.name("send-alerts");

env.execute("Fraud Detection");
}
}

步骤:

  • 创建执行环境

    1
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • 创建数据源

    指定数据来源,如从数据库、kafka、文件等第三方读取。其中定义数据来源是通过实现SourceFunction实现,在本例中TransactionSource实现了SourceFunction接口,并从数组读取数据。

    1
    2
    3
    DataStream<Transaction> transactions = env
    .addSource(new TransactionSource())
    .name("transactions");

    TransactionSource.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public class TransactionSource implements SourceFunction<Transaction> {
    private List<Transaction> transactionList = Lists.newArrayList(
    new Transaction("NO001",0.1d),
    new Transaction("NO001",101000d),
    new Transaction("NO002",100d),
    new Transaction("NO002",220d)
    );
    @Override
    public void run(SourceContext<Transaction> ctx) throws Exception {
    for (Transaction transaction : transactionList) {
    collector.collect(transaction);
    }
    }
    @Override
    public void cancel() {
    }
    }

    Transaction.java

    数据实体类,需要实现序列化接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public class Transaction implements Serializable {
    private static final long serialVersionUID = 1L;

    private String accountId;
    private double amount;

    public Transaction(String accountId, double amount) {
    this.accountId = accountId;
    this.amount = amount;
    }

    public String getAccountId() {
    return accountId;
    }

    public double getAmount() {
    return amount;
    }

    public void setAmount(double amount) {
    this.amount = amount;
    }
    }
  • 数据处理

    处理TransactionSource的生成的数据,其中keyBy(Transaction::getAccountId)是将数据流根据accountId进行分组处理,也就是相同的accountId会在同一个处理线程中处理。数据处理完成后,通过调用Collector#collect接口,将数据传递到下游。本例中,直接生成一个告警信息,传递给下游。

    1
    2
    3
    4
    DataStream<FraudDetector.Alert> alerts = transactions
    .keyBy(Transaction::getAccountId)
    .process(new FraudDetector())
    .name("fraud-detector");

    FraudDetector.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    @Override
    public void processElement(
    Transaction transaction,
    Context context,
    Collector<Alert> collector) throws Exception {
    if (transaction.getAmount() > 10000d) {
    Alert alert = new Alert();
    alert.setId(transaction.getAccountId());
    collector.collect(alert);
    }
    }
    }
  • 输出结果

    指定输出端,输出端需要实现SinkFunction接口,并实现invoke方法,用于处理输出的数据。本例中AlertSink实现SinkFunction接口,将告警数据打印到控制台,在实际处理中,可将数据保存到数据库、kafka、或者调用网络接口。

    1
    2
    alerts.addSink(new AlertSink())
    .name("send-alerts");

    AlertSink.java

    1
    2
    3
    4
    5
    6
    public class AlertSink implements SinkFunction<Alert> {
    @Override
    public void invoke(Alert value, Context context) {
    System.out.println("alert : " + value);
    }
    }

flink 提供基于table api,进行批处理和流出来统一关系型api,也就是我们不用关注底层的数据层的差异,将物理层数据结构,抽象映射成逻辑层table,对通过table api(以及直接通过SQL)对数据进行处理

运行环境

  • maven

    在基于flink DataStream的依赖基础上,还需要以下依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>

案例

本例将CSV文件中的数据读取,其数据格式为:用户id,商品,数量 ,下面我们需要计算出每个用户,各类型商品总共购买了多少

通过启动main即可启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class File2PrintExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tEnv = StreamTableEnvironment.create(env);

String contents = "1,diaper,3\n" +
"1,diaper,4\n" +
"2,pen,3\n" +
"2,rubber,3\n" +
"3,rubber,2\n" +
"4,beer,1";
String path = createTempFile(contents);

String orders = "CREATE TABLE orders (\n" +
" user_id INT,\n" +
" product STRING,\n" +
" amount INT\n" +
") WITH (\n" +
" 'connector.type' = 'filesystem',\n" +
" 'connector.path' = '" + path + "',\n" +
" 'format.type' = 'csv'\n" +
")";


String print = "CREATE TABLE print (\n" +
" user_id INT,\n" +
" product STRING,\n" +
" amount INT\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")";
String query = " insert into print SELECT\n" +
" user_id,\n" +
" product,\n" +
" sum(amount) as amount\n" +
" FROM orders group by user_id ,product";
tEnv.executeSql(orders);
tEnv.executeSql(print);
tEnv.executeSql(query);

}

/**
* Creates a temporary file with the contents and returns the absolute path.
*/
private static String createTempFile(String contents) throws IOException {
File tempFile = File.createTempFile("orders", ".csv");
tempFile.deleteOnExit();
FileUtils.writeFileUtf8(tempFile, contents);
return tempFile.toURI().toString();
}
}

步骤

  • 创建执行环境

    创建一个执行环境,该环境表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回一个本地执行环境

    1
    2
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    TableEnvironment tEnv = StreamTableEnvironment.create(env);
  • 注册表

    接下来将Table注册到当前环境,我们可以使用表连接到外部存储系统,并通过使用table api,访问外出存储数据。如订阅kafka消息、将数据写入到关系型数据库、发送告警等。本例中order表用于读取CSV中的数据,print表用于将处理后的数据,输出到控制台。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    String orders = "CREATE TABLE orders (\n" +
    " user_id INT,\n" +
    " product STRING,\n" +
    " amount INT\n" +
    ") WITH (\n" +
    " 'connector.type' = 'filesystem',\n" +
    " 'connector.path' = '" + path + "',\n" +
    " 'format.type' = 'csv'\n" +
    ")";


    String print = "CREATE TABLE print (\n" +
    " user_id INT,\n" +
    " product STRING,\n" +
    " amount INT\n" +
    ") WITH (\n" +
    " 'connector' = 'print'\n" +
    ")";
    tEnv.executeSql(orders);
    tEnv.executeSql(print);
  • 查询

    将表注册后,就可以通过table ap读取数据,并通过函数处理数据,最后输出到目标端。下面两种方式,处理结果相同

    1. 通过table api
    1
    2
    3
    4
    5
    6
    7
    Table ordersTable = tEnv.from("orders")
    .groupBy($("user_id"), $("product"))
    .select(
    $("user_id"),
    $("product"),
    $("amount").sum().as("amount"));
    ordersTable.executeInsert("print");
    1. 通过sql
    1
    2
    3
    4
    5
    6
    String query = " insert into print SELECT\n" +
    " user_id,\n" +
    " product,\n" +
    " sum(amount) as amount\n" +
    " FROM orders group by user_id ,product";
    tEnv.executeSql(query);