博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink简介与快速入门
阅读量:3937 次
发布时间:2019-05-23

本文共 3020 字,大约阅读时间需要 10 分钟。

1.1  初识Flink

Flink起源于Stratosphere项目,Stratosphere是在2010~2014年由3所地处柏林的大学和欧洲的一些其他的大学共同进行的研究项目,2014年4月Stratosphere的代码被复制并捐赠给了Apache软件基金会,参加这个孵化项目的初始成员是Stratosphere系统的核心开发人员,2014年12月,Flink一跃成为Apache软件基金会的顶级项目。

Flink项目的理念是:“Apache Flink是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架”。

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。

1.2  Flink应用场景

Flink官方对Flink的应用进行了分类,分为三类应用,如下图所示:

场景一:Event-driven Applications【事件驱动】

Event-driven Applications执行流程:比如采集的数据Events可以不断的放入消息队列,Flink应用会不断ingest(消费)消息队列中的数据,Flink 应用内部维护着一段时间的数据(state),隔一段时间会将数据持久化存储(Persistent sstorage),防止Flink应用死掉。Flink应用每接受一条数据,就会处理一条数据,处理之后就会触发(trigger)一个动作(Action),同时也可以将处理结果写入外部消息队列中,其他Flink应用再消费。

典型的事件驱动类应用:

1.欺诈检测(Fraud detection)

2.异常检测(Anomaly detection)

3.基于规则的告警(Rule-based alerting)

4.业务流程监控(Business process monitoring)

5.Web应用程序(社交网络)

场景二:Data Analytics Applications【分析】

Streaming analytics可以理解为连续性查询:比如实时展示某平台的销售GMV,用户下单数据需要实时写入消息队列,Flink 应用源源不断读取数据做实时计算,然后不断的将数据更新至Database或者K-VStore,最后做大屏实时展示。

场景三:Data Pipeline Applications【管道式ETL】

Data Pipeline:比如启动一个Flink 实时应用,数据源(比如数据库、Kafka)中的数据不断的通过Flink Data Pipeline流入或者追加到数据仓库(数据库或者文件系统),或者Kafka消息队列。

Flink官方对api调用进行分层,如下:

最底层级的抽象仅仅提供了有状态流,它将通过过程函数(Process Function)被嵌入到DataStream API中。

DataStream API(有界或无界流数据)以及DataSet API(有界数据集)。这些API为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations),连接(joins),聚合(aggregations),窗口操作(windows)等等。

Table API 是以表为中心的声明式编程,其中表可能会动态变化(在表达流数据时)。Table API遵循(扩展的)关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时API提供可比较的操作,例如select、project、join、group-by、aggregate等。

Flink提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似,但是是以SQL查询表达式的形式表现程序。

1.4 Flink 的程序结构

Flink对数据处理可以分为三个阶段:定义source,编写transform,定义sink。

source:定义接受的数据源。

transform:编写一些内部转换或者统计的逻辑。

sink:定义统计的结果最后保存的介质。

 

1.5 Flink 快速上手案例

1)批处理wordcount

object WordCount {  def main(args: Array[String]): Unit = {    // 创建执行环境    val env = ExecutionEnvironment.getExecutionEnvironment    // 从文件中读取数据    val inputPath = "D:\\Projects\\BigData\\TestWC1\\src\\main\\resources\\hello.txt"    val inputDS: DataSet[String] = env.readTextFile(inputPath)    // 分词之后,对单词进行groupby分组,然后用sum进行聚合    val wordCountDS: AggregateDataSet[(String, Int)] = inputDS.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)    // 打印输出    wordCountDS.print()  }}

2) 流处理 wordcount

object StreamWordCount {  def main(args: Array[String]): Unit = {    // 从外部命令中获取参数    val params: ParameterTool =  ParameterTool.fromArgs(args)    val host: String = params.get("host")    val port: Int = params.getInt("port")    // 创建流处理环境    val env = StreamExecutionEnvironment.getExecutionEnvironment    // 接收socket文本流    val textDstream: DataStream[String] = env.socketTextStream(host, port)    // flatMap和Map需要引用的隐式转换    import org.apache.flink.api.scala._    val dataStream: DataStream[(String, Int)] = textDstream.flatMap(_.split("\\s")).filter(_.nonEmpty).map((_, 1)).keyBy(0).sum(1)    dataStream.print().setParallelism(1)    // 启动executor,执行任务    env.execute("Socket stream word count")  }}

测试——在linux系统中用netcat命令进行发送测试。

 nc -lk  7777

 

转载地址:http://jkuwi.baihongyu.com/

你可能感兴趣的文章
SQL Server中VARCHAR(MAX)和NVARCHAR(MAX)的使用(转载)
查看>>
C# 对json字符串转换操作
查看>>
jQuery plugin: Validation 使用说明
查看>>
外部程序用process.start调用 其它exe文件时可以传入参数.
查看>>
动态 增加/删除 控件的关联事件
查看>>
Datagridview 相关
查看>>
一个带事务的Sql Server存储过程例子
查看>>
VS WinForm 中 父Datagridview嵌套子DatagridView
查看>>
Winform datagridview绑定数据源后,无法以 Rows.Insert方法插入新行
查看>>
Linq平行作業的例子
查看>>
Datagridview 綁定list注意事項(轉)
查看>>
DataGridView綁定數據相關
查看>>
合并兩個List,對方不存在相等的元素.則用null代替
查看>>
Linq GroupJoin(一)
查看>>
C#利用正则表达式获取特定格式的字串符
查看>>
C#将Excel文件中选择的内容,复制粘贴到 winform datagridview
查看>>
SQL 判断当前数据库是否存在某个表/临时表
查看>>
SQL列出数据库表的结构
查看>>
關于Enumerable distinct 的學習筆記
查看>>
使用Invoke,BeginInvoke 在多线程中更新UI主线程的元素
查看>>