互联网技术 / 互联网资讯 · 2024年4月10日

学习Flink流计算常用算子的硬核指南

直入正题!

Flink和Spark类似,也是一种一站式处理的框架;既可以进行批处理(DataSet),也可以进行实时处理(DataStream)。

所以下面将Flink的算子分为两大类:一类是DataSet,一类是DataStream。

DataSet

一、Source算子

1. fromCollection

fromCollection:从本地集合读取数据

例:

val env = ExecutionEnvironment.getExecutionEnvironment val textDataSet: DataSet[String] = env.fromCollection( List(“1,张三”, “2,李四”, “3,王五”, “4,赵六”) )

2. ReadTextfile

ReadTextfile:从文件中读取:

val textDataSet: DataSet[String] = env.ReadTextfile(“/data/a.txt”)

3. ReadTextfile:遍历目录

ReadTextfile可以对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式:

val parameters = new configuration // Recursive.file.enumeration 开启递归 parameters.setBoolean(“Recursive.file.enumeration”, true) val file = env.ReadTextfile(“/data”).withParameters(parameters)

4. ReadTextfile:读取压缩文件

对于以下压缩类型,不需要指定任何额外的inputFormat方法,flink可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。

压缩方法 文件扩展名 是否可并行读取 DEFLATE .deflate no GZIP .gz .gzip no BZIP2 .bz2 no XZ .xz no val file = env.ReadTextfile(“/data/file.gz”)

二、Transform转换算子

因为Transform算子基于Source算子操作,所以首先构建Flink执行环境及Source算子,后续Transform算子操作基于此:

val env = ExecutionEnvironment.getExecutionEnvironment val textDataSet: DataSet[String] = env.fromCollection( List(“张三,1”, “李四,2”, “王五,3”, “张三,4”) )

1. Map

将DataSet中的每一个元素转换为另外一个元素:

// 使用Map将List转换为一个Scala的样例类 case class User(name: String, id: String) val UserDataSet: DataSet[User] = textDataSet.Map { text => val fieldArr = text.split(“,”) User(fieldArr(0), fieldArr(1)) } UserDataSet.print()

2. flatMap

将DataSet中的每一个元素转换为0…n个元素:

// 使用flatMap操作,将集合中的数据: // 根据第一个元素,进行分组 // 根据第二个元素,进行聚合求值 val Result = textDataSet.flatMap(line => line) .groupBy(0) // 根据第一个元素,进行分组 .sum(1) // 根据第二个元素,进行聚合求值 Result.print()

3. MappaRtition

将一个分区中的元素转换为另一个元素:

// 使用MappaRtition操作,将List转换为一个scala的样例类 case class User(name: String, id: String) val Result: DataSet[User] = textDataSet.MappaRtition(line => { line.Map(index => User(index._1, index._2)) }) Result.print()

4. filter

过滤出来一些符合条件的元素,返回boolean值为true的元素:

val source: DataSet[String] = env.fromElements(“java”, “scala”, “java”) val filter:DataSet[String] = source.filter(line => line.contains(“java”))//过滤出带java的数据 filter.print()

5. Reduce

可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素:

// 使用 fromElements 构建数据源 val source = env.fromElements((“java”, 1), (“scala”, 1), (“java”, 1)) // 使用Map转换成DataSet元组 val MapData: DataSet[(String, Int)] = source.Map(line => line) // 根据首个元素分组 val groupData = MapData.groupBy(_._1) // 使用Reduce聚合 val ReduceData = groupData.Reduce((x, y) => (x._1, x._2 + y._2)) // 打印测试 ReduceData.print()

6. ReduceGroup

将一个dataset或者一个group聚合成一个或多个元素。

// 使用 fromElements 构建数据源 val source: DataSet[(String, Int)] = env.fromElements((“java”, 1), (“scala”, 1), (“java”, 1)) // 根据首个元素分组 val groupData = source.groupBy(_._1) // 使用 ReduceGroup 聚合 val Result: DataSet[(String, Int)] = groupData.ReduceGroup { (in: Iterator[(String, Int)], out: Collector[(String, Int)]) => val tuple = in.Reduce((x, y) => (x._1, x._2 + y._2)) out.collect(tuple) } // 打印测试 Result.print()

7. MinBy和MaxBy

选择具有最小值或最大值的元素:

// 使用MinBy操作,求List中每个人的最小值 // List(“张三,1”, “李四,2”, “王五,3”, “张三,4”) case class User(name: String, id: String) // 将List转换为一个scala的样例类 val text: DataSet[User] = textDataSet.MappaRtition(line => { line.Map(index => User(index._1, index._2)) }) val Result = text .groupBy(0) // 按照姓名分组 .MinBy(1) // 每个人的最小值

8. Aggregate

在数据集上进行聚合求最值(最大值、最小值):

val data = new Mutable.MutableList[(Int, String, Double)] data.+=((1, “yuwen”, 89.0)) data.+=((2, “shuxue”, 92.2)) data.+=((3, “yuwen”, 89.99)) // 使用 fromCollection 构建数据源 val input: DataSet[(Int, String, Double)] = env.fromCollection(data) // 使用 group 执行分组操作 val value = input.groupBy(1) // 使用 aggregate 求最大值元素 .aggregate(Aggregations.Max, 2) // 打印测试 value.print()

Aggregate只能作用于元组上

注意:

要使用aggregate,只能使用字段索引名或索引名称来进行分组 groupBy(0),否则会报以下错误:

Exception in thread “Main” java.lang.UnsupportedOperationException: Aggregate does not support grouping with keyselector functions, yet.

9. distinct

去除重复的数据:

// 数据源使用上一题的 // 使用distinct操作,根据科目去除集合中重复的元组数据 val value: DataSet[(Int, String, Double)] = input.distinct(1) value.print()

10. first

取前N个数:

input.first(2) // 取前两个数

11. join

将两个DataSet按照一定条件连接到一起,形成新的DataSet:

// s1 和 s2 数据集格式如下: // DataSet[(Int, String, String, Double)] val joinData = s1.join(s2) // s1数据集 join s2数据集 .where(0).equalTo(0) { // join的条件 (s1, s2) => (s1._1, s1._2, s2._2, s1._3) }

12. leftOuterJoin

左外连接,左边的Dataset中的每一个元素,去连接右边的元素

此外还有:

RightOuterJoin:右外连接,左边的Dataset中的每一个元素,去连接左边的元素

FullOuterJoin:全外连接,左右两边的元素,全部连接

下面以 leftOuterJoin 进行示例:

val data1 = ListBuffer[Tuple2[Int, String>>() data1.append((1, “zhangsan”)) data1.append((2, “lisi”)) data1.append((3, “wangwu”)) data1.append((4, “zhaoliu”)) val data2 = ListBuffer[Tuple2[Int, String>>() data2.append((1, “beijing”)) data2.append((2, “shanghai”)) data2.append((4, “guangzhou”)) val text1 = env.fromCollection(data1) val text2 = env.fromCollection(data2) text1.leftOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{ if(second==null){ (first._1,first._2,”null”) }else{ (first._1,first._2,second._2) } }).print()

13. cross

交叉操作,通过形成这个数据集和其他数据集的笛卡尔积,创建一个新的数据集

和join类似,但是这种交叉操作会产生笛卡尔积,在数据比较大的时候,是非常消耗内存的操作:

val cross = input1.cross(input2){ (input1 , input2) => (input1._1,input1._2,input1._3,input2._2) } cross.print()

14. union

联合操作,创建包含来自该数据集和其他数据集的元素的新数据集,不会去重:

val unionData: DataSet[String] = elements1.union(elements2).union(elements3) // 去除重复数据 val value = unionData.distinct(line => line)

15. Rebalance

Flink也有数据倾斜的时候,比如当前有数据量大概10亿条数据需要处理,在处理过程中可能会发生如图所示的状况:

硬核!一文学完Flink流计算常用算子

这个时候本来总体数据量只需要10分钟解决的问题,出现了数据倾斜,机器1上的任务需要4个小时才能完成,那么其他3台机器执行完毕也要等待机器1执行完毕后才算整体将任务完成;所以在实际的工作中,出现这种情况比较好的解决方案就是接下来要介绍的——Rebalance(内部使用Round Robin方法将数据均匀打散。这对于数据倾斜时是很好的选择。)

硬核!一文学完Flink流计算常用算子

// 使用Rebalance操作,避免数据倾斜 val Rebalance = filterData.Rebalance()

16. partitionByHash

按照指定的key进行hash分区:

val data = new Mutable.MutableList[(Int, Long, String)] data.+=((1, 1L, “Hi”)) data.+=((2, 2L, “Hello”)) data.+=((3, 2L, “Hello world”)) val collection = env.fromCollection(data) val unique = collection.partitionByHash(1).MappaRtition(line => line.Map(x => (x._1 , x._2 , x._3))) unique.writeAsText(“hashPartition”, WRITeMode.NO_OVERWRITE) env.execute()

17. partitionByRange

根据指定的key对数据集进行范围分区:

val data = new Mutable.MutableList[(Int, Long, String)] data.+=((1, 1L, “Hi”)) data.+=((2, 2L, “Hello”)) data.+=((3, 2L, “Hello world”)) data.+=((4, 3L, “Hello world, how are you?”)) val collection = env.fromCollection(data) val unique = collection.partitionByRange(x => x._1).MappaRtition(line => line.Map{ x => (x._1 , x._2 , x._3) }) unique.writeAsText(“RangePartition”, WRITeMode.OVERWRITE) env.execute()

18. sortPartition

根据指定的字段值进行分区的排序:

val data = new Mutable.MutableList[(Int, Long, String)] data.+=((1, 1L, “Hi”)) data.+=((2, 2L, “Hello”)) data.+=((3, 2L, “Hello world”)) data.+=((4, 3L, “Hello world, how are you?”)) val ds = env.fromCollection(data) val Result = ds .Map { x => x } .setParallelism(2) .sortPartition(1, ORDER.DESCENDING)//第一个参数代表按照哪个字段进行分区 .MappaRtition(line => line) .collect() println(Result)

三、Sink算子

1. collect

将数据输出到本地集合:

Result.collect()

2. writeAsText

将数据输出到文件

Flink支持多种存储设备上的文件,包括本地文件,hdfs文件等

Flink支持多种文件的存储格式,包括text文件,CSV文件等

// 将数据写入本地文件 Result.writeAsText(“/data/a”, WRITeMode.OVERWRITE) // 将数据写入HDFS Result.writeAsText(“hdfs://node01:9000/data/a”, WRITeMode.OVERWRITE)

DataStream

和DataSet一样,DataStream也包括一系列的Transformation操作。

一、Source算子

Flink可以使用 StreamExecutionEnvironment.addSource(source) 来为我们的程序添加数据来源。

Flink 已经提供了若干实现好了的 source functions,当然我们也可以通过实现 SourceFunction 来自定义非并行的source或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。

Flink在流处理上的source和在批处理上的source基本一致。大致有4大类:

基于本地集合的source(Collection-based-source) 基于文件的source(file-based-source)- 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回 基于网络套接字的source(Socket-based-source)- 从 socket 读取。元素可以用分隔符切分。 自定义的source(Custom-source)

下面使用addSource将Kafka数据写入Flink为例:

如果需要外部数据源对接,可使用addSource,如将Kafka数据写入Flink, 先引入依赖:

oRg.Apache.flink flink-connector-kafka-0.11_2.11 1.10.0

将Kafka数据写入Flink:

val Properties = new Properties() Properties.setProperty(“bootstrap.servers”, “localhost:9092”) Properties.setProperty(“group.id”, “consumer-group”) Properties.setProperty(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”) Properties.setProperty(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”) Properties.setProperty(“auto.offset.reset”, “latest”) val source = env.addSource(new FlinkKafkaConsumer011[String](“sensor”, new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), Properties))

基于

OpenMagic API

Need more than content? Move into the product flow.

If you are here for model access, pricing, developer docs, or the future API console, the dedicated product path now lives on api.openmagic.ai.

登录免费注册