content[-]
BatchWordCount
# BatchWordCount
package com.pengfei.chapter02
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation} // or import org.apache.flink.api.scala._
/**
* Create by pengfei
*/
object BatchWordCount {
def main(args: Array[String]): Unit = {
// 1. create a execute environment
val env = ExecutionEnvironment.getExecutionEnvironment
// 2. Read file data
val lineDataset = env.readTextFile(filePath = "/home/pengfei/IdeaProjects/FlinkTutorial/input/words.txt")
// 3. transform
val wordAndOne = lineDataset.flatMap(_.toLowerCase.split(" ")).filter(_.nonEmpty).map(word => (word, 1))
// 4. group by based on words
val wordAndOneGroup = wordAndOne.groupBy(0)
// 5. sum by group
val sum = wordAndOneGroup.sum(1)
// 6. print
sum.print()
}
}
BoundedStreamWordCOunt
package com.nielsen.chapter02
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation} // or import org.apache.flink.api.scala._
object BoundedStreamWordCOunt {
def main(args: Array[String]): Unit = {
// 1. create stream execute environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2. read file
val lineDataStream = env.readTextFile("input/words.txt")
//3. transform
// 3. transform
val wordAndOne = lineDataStream.flatMap(_.toLowerCase.split(" ")).filter(_.nonEmpty).map(word => (word, 1))
// 4. group by based on words
val wordAndOneGroup = wordAndOne.keyBy(data => data._1)
// 5. sum by group
val sum = wordAndOneGroup.sum(1)
// 6
sum.print()
// 7 execute task
env.execute()
}
}
StreamWordCount
package com.nielsen.chapter02
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}
object StreamWordCount {
/**
*
* nc -lk 7777
*/
def main(args: Array[String]): Unit = {
// 1. create stream execute environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2. read stream data
// val lineDataStream = env.socketTextStream("NLLH7477M3", 7777)
val parameterTool = ParameterTool.fromArgs(args) // --host NLLH7477M3 --port 7777
val hostname = parameterTool.get("host")
val port = parameterTool.getInt("port")
val lineDataStream = env.socketTextStream(hostname, port)
// 3. transform
val wordAndOne = lineDataStream.flatMap(_.toLowerCase.split(" ")).filter(_.nonEmpty).map(word => (word, 1))
// 4. group by based on words
val wordAndOneGroup = wordAndOne.keyBy(data => data._1)
// 5. sum by group
val sum = wordAndOneGroup.sum(1)
// 6
sum.print()
// 7 execute task
env.execute()
}
}
StreamWordCount
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
/**
* Created by pengfeiqiao on 2019/9/16 14:08
*/
object StreamWordCount {
def main(args: Array[String]): Unit = {
val params = ParameterTool.fromArgs(args)
val host: String = params.get("host")
val port: Int = params.getInt("port")
// 创建一个流处理的执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// env.setParallelism(1)
// env.disableOperatorChaining()
// 接收socket数据流
val textDataStream = env.socketTextStream(host, port)
// 逐一读取数据,分词之后进行wordcount
val wordCountDataStream = textDataStream.flatMap(_.split("\\s"))
.filter(_.nonEmpty).startNewChain()
.map( (_, 1) )
.keyBy(0)
.sum(1)
// 打印输出
wordCountDataStream.print().setParallelism(1)
// 执行任务
env.execute("stream word count job")
}
}
WordCount
import org.apache.flink.api.scala._
/**
* Created by pengfeiqiao on 2019/9/16 11:48
*/
// 批处理代码
object WordCount {
def main(args: Array[String]): Unit = {
// 创建一个批处理的执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 从文件中读取数据
val inputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\hello.txt"
val inputDataSet = env.readTextFile(inputPath)
// 分词之后做count
val wordCountDataSet = inputDataSet.flatMap(_.split(" "))
.map( (_, 1) )
.groupBy(0)
.sum(1)
// 打印输出
wordCountDataSet.print()
}
}
# submit flink job in cluster by command
```shell
./bin/flink run -m 0.0.0.0:8081 -c com.nielsen.chapter02.StreamWordCount -p 1 /home/pengfei/IdeaProjects/FlinkTutorial/target/FlinkTutorial-1.0-SNAPSHOT.jar --host NLLH7477M3 --port 7777
check flink job list
./bin/flink list
./bin/flink list -a
./bin/flink cancel job_id