本文共 3020 字,大约阅读时间需要 10 分钟。
Flink起源于Stratosphere项目,Stratosphere是在2010~2014年由3所地处柏林的大学和欧洲的一些其他的大学共同进行的研究项目,2014年4月Stratosphere的代码被复制并捐赠给了Apache软件基金会,参加这个孵化项目的初始成员是Stratosphere系统的核心开发人员,2014年12月,Flink一跃成为Apache软件基金会的顶级项目。
Flink项目的理念是:“Apache Flink是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架”。
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。
Flink官方对Flink的应用进行了分类,分为三类应用,如下图所示:
Event-driven Applications执行流程:比如采集的数据Events可以不断的放入消息队列,Flink应用会不断ingest(消费)消息队列中的数据,Flink 应用内部维护着一段时间的数据(state),隔一段时间会将数据持久化存储(Persistent sstorage),防止Flink应用死掉。Flink应用每接受一条数据,就会处理一条数据,处理之后就会触发(trigger)一个动作(Action),同时也可以将处理结果写入外部消息队列中,其他Flink应用再消费。
典型的事件驱动类应用:
1.欺诈检测(Fraud detection)
2.异常检测(Anomaly detection)
3.基于规则的告警(Rule-based alerting)
4.业务流程监控(Business process monitoring)
5.Web应用程序(社交网络)
Streaming analytics可以理解为连续性查询:比如实时展示某平台的销售GMV,用户下单数据需要实时写入消息队列,Flink 应用源源不断读取数据做实时计算,然后不断的将数据更新至Database或者K-VStore,最后做大屏实时展示。
Data Pipeline:比如启动一个Flink 实时应用,数据源(比如数据库、Kafka)中的数据不断的通过Flink Data Pipeline流入或者追加到数据仓库(数据库或者文件系统),或者Kafka消息队列。
Flink官方对api调用进行分层,如下:
最底层级的抽象仅仅提供了有状态流,它将通过过程函数(Process Function)被嵌入到DataStream API中。
DataStream API(有界或无界流数据)以及DataSet API(有界数据集)。这些API为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations),连接(joins),聚合(aggregations),窗口操作(windows)等等。
Table API 是以表为中心的声明式编程,其中表可能会动态变化(在表达流数据时)。Table API遵循(扩展的)关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时API提供可比较的操作,例如select、project、join、group-by、aggregate等。
Flink提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似,但是是以SQL查询表达式的形式表现程序。
Flink对数据处理可以分为三个阶段:定义source,编写transform,定义sink。
source:定义接受的数据源。
transform:编写一些内部转换或者统计的逻辑。
sink:定义统计的结果最后保存的介质。
1)批处理wordcount
object WordCount { def main(args: Array[String]): Unit = { // 创建执行环境 val env = ExecutionEnvironment.getExecutionEnvironment // 从文件中读取数据 val inputPath = "D:\\Projects\\BigData\\TestWC1\\src\\main\\resources\\hello.txt" val inputDS: DataSet[String] = env.readTextFile(inputPath) // 分词之后,对单词进行groupby分组,然后用sum进行聚合 val wordCountDS: AggregateDataSet[(String, Int)] = inputDS.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1) // 打印输出 wordCountDS.print() }}
object StreamWordCount { def main(args: Array[String]): Unit = { // 从外部命令中获取参数 val params: ParameterTool = ParameterTool.fromArgs(args) val host: String = params.get("host") val port: Int = params.getInt("port") // 创建流处理环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 接收socket文本流 val textDstream: DataStream[String] = env.socketTextStream(host, port) // flatMap和Map需要引用的隐式转换 import org.apache.flink.api.scala._ val dataStream: DataStream[(String, Int)] = textDstream.flatMap(_.split("\\s")).filter(_.nonEmpty).map((_, 1)).keyBy(0).sum(1) dataStream.print().setParallelism(1) // 启动executor,执行任务 env.execute("Socket stream word count") }}
测试——在linux系统中用netcat命令进行发送测试。
nc -lk 7777
转载地址:http://jkuwi.baihongyu.com/