content[-]
BatchWordCount
# BatchWordCount
package com.pengfei.chapter02
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation} // or import org.apache.flink.api.scala._
/**
 *  Create by pengfei
 */
object BatchWordCount {
  def main(args: Array[String]): Unit = {
    // 1. create a execute environment
    val env = ExecutionEnvironment.getExecutionEnvironment
    // 2. Read file data
    val lineDataset = env.readTextFile(filePath = "/home/pengfei/IdeaProjects/FlinkTutorial/input/words.txt")
    // 3. transform
    val wordAndOne = lineDataset.flatMap(_.toLowerCase.split(" ")).filter(_.nonEmpty).map(word => (word, 1))
    // 4. group by based on words
    val wordAndOneGroup = wordAndOne.groupBy(0)
    // 5. sum by group
    val sum = wordAndOneGroup.sum(1)
    // 6. print
    sum.print()
  }
}
BoundedStreamWordCOunt
package com.nielsen.chapter02
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation} // or import org.apache.flink.api.scala._
object BoundedStreamWordCOunt {
  def main(args: Array[String]): Unit = {
    // 1. create stream execute environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 2. read file
    val lineDataStream = env.readTextFile("input/words.txt")
    //3. transform
    // 3. transform
    val wordAndOne = lineDataStream.flatMap(_.toLowerCase.split(" ")).filter(_.nonEmpty).map(word => (word, 1))
    // 4. group by based on words
    val wordAndOneGroup = wordAndOne.keyBy(data => data._1)
    // 5. sum by group
    val sum = wordAndOneGroup.sum(1)
    // 6
    sum.print()
    // 7 execute task
    env.execute()
  }
}
StreamWordCount
package com.nielsen.chapter02
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}
object StreamWordCount {
  /**
   *
   *  nc -lk 7777
   */
  def main(args: Array[String]): Unit = {
    // 1. create stream execute environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 2. read stream data
    // val lineDataStream = env.socketTextStream("NLLH7477M3", 7777)
    val parameterTool = ParameterTool.fromArgs(args) // --host NLLH7477M3 --port 7777
    val hostname = parameterTool.get("host")
    val port = parameterTool.getInt("port")
    val lineDataStream = env.socketTextStream(hostname, port)
    // 3. transform
    val wordAndOne = lineDataStream.flatMap(_.toLowerCase.split(" ")).filter(_.nonEmpty).map(word => (word, 1))
    // 4. group by based on words
    val wordAndOneGroup = wordAndOne.keyBy(data => data._1)
    // 5. sum by group
    val sum = wordAndOneGroup.sum(1)
    // 6
    sum.print()
    // 7 execute task
    env.execute()
  }
}
StreamWordCount
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
/**
  * Created by pengfeiqiao on 2019/9/16 14:08
  */
object StreamWordCount {
  def main(args: Array[String]): Unit = {
    val params = ParameterTool.fromArgs(args)
    val host: String = params.get("host")
    val port: Int = params.getInt("port")
    // 创建一个流处理的执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//    env.setParallelism(1)
//    env.disableOperatorChaining()
    // 接收socket数据流
    val textDataStream = env.socketTextStream(host, port)
    // 逐一读取数据,分词之后进行wordcount
    val wordCountDataStream = textDataStream.flatMap(_.split("\\s"))
      .filter(_.nonEmpty).startNewChain()
      .map( (_, 1) )
      .keyBy(0)
      .sum(1)
    // 打印输出
    wordCountDataStream.print().setParallelism(1)
    // 执行任务
    env.execute("stream word count job")
  }
}
WordCount
import org.apache.flink.api.scala._
/**
  * Created by pengfeiqiao on 2019/9/16 11:48
  */
// 批处理代码
object WordCount {
  def main(args: Array[String]): Unit = {
    // 创建一个批处理的执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    // 从文件中读取数据
    val inputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\hello.txt"
    val inputDataSet = env.readTextFile(inputPath)
    // 分词之后做count
    val wordCountDataSet = inputDataSet.flatMap(_.split(" "))
      .map( (_, 1) )
      .groupBy(0)
      .sum(1)
    // 打印输出
    wordCountDataSet.print()
  }
}
# submit flink job in cluster by command
```shell
./bin/flink run -m 0.0.0.0:8081 -c com.nielsen.chapter02.StreamWordCount -p 1 /home/pengfei/IdeaProjects/FlinkTutorial/target/FlinkTutorial-1.0-SNAPSHOT.jar --host NLLH7477M3 --port 7777
check flink job list
./bin/flink list 
./bin/flink list -a
./bin/flink cancel job_id
                