content[-]

BatchWordCount

# BatchWordCount

package com.pengfei.chapter02

import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation} // or import org.apache.flink.api.scala._

/**
 *  Create by pengfei
 */


object BatchWordCount {
  def main(args: Array[String]): Unit = {
    // 1. create a execute environment
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 2. Read file data
    val lineDataset = env.readTextFile(filePath = "/home/pengfei/IdeaProjects/FlinkTutorial/input/words.txt")

    // 3. transform
    val wordAndOne = lineDataset.flatMap(_.toLowerCase.split(" ")).filter(_.nonEmpty).map(word => (word, 1))

    // 4. group by based on words
    val wordAndOneGroup = wordAndOne.groupBy(0)

    // 5. sum by group
    val sum = wordAndOneGroup.sum(1)

    // 6. print
    sum.print()

  }
}

BoundedStreamWordCOunt

package com.nielsen.chapter02

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation} // or import org.apache.flink.api.scala._

object BoundedStreamWordCOunt {
  def main(args: Array[String]): Unit = {
    // 1. create stream execute environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 2. read file
    val lineDataStream = env.readTextFile("input/words.txt")

    //3. transform
    // 3. transform
    val wordAndOne = lineDataStream.flatMap(_.toLowerCase.split(" ")).filter(_.nonEmpty).map(word => (word, 1))

    // 4. group by based on words
    val wordAndOneGroup = wordAndOne.keyBy(data => data._1)

    // 5. sum by group
    val sum = wordAndOneGroup.sum(1)

    // 6
    sum.print()

    // 7 execute task

    env.execute()
  }
}

StreamWordCount

package com.nielsen.chapter02

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}


object StreamWordCount {
  /**
   *
   *  nc -lk 7777
   */
  def main(args: Array[String]): Unit = {
    // 1. create stream execute environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 2. read stream data
    // val lineDataStream = env.socketTextStream("NLLH7477M3", 7777)
    val parameterTool = ParameterTool.fromArgs(args) // --host NLLH7477M3 --port 7777
    val hostname = parameterTool.get("host")
    val port = parameterTool.getInt("port")
    val lineDataStream = env.socketTextStream(hostname, port)


    // 3. transform

    val wordAndOne = lineDataStream.flatMap(_.toLowerCase.split(" ")).filter(_.nonEmpty).map(word => (word, 1))

    // 4. group by based on words
    val wordAndOneGroup = wordAndOne.keyBy(data => data._1)

    // 5. sum by group
    val sum = wordAndOneGroup.sum(1)

    // 6
    sum.print()

    // 7 execute task

    env.execute()
  }
}

StreamWordCount

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

/**
  * Created by pengfeiqiao on 2019/9/16 14:08
  */
object StreamWordCount {
  def main(args: Array[String]): Unit = {

    val params = ParameterTool.fromArgs(args)
    val host: String = params.get("host")
    val port: Int = params.getInt("port")

    // 创建一个流处理的执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//    env.setParallelism(1)
//    env.disableOperatorChaining()

    // 接收socket数据流
    val textDataStream = env.socketTextStream(host, port)

    // 逐一读取数据,分词之后进行wordcount
    val wordCountDataStream = textDataStream.flatMap(_.split("\\s"))
      .filter(_.nonEmpty).startNewChain()
      .map( (_, 1) )
      .keyBy(0)
      .sum(1)

    // 打印输出
    wordCountDataStream.print().setParallelism(1)

    // 执行任务
    env.execute("stream word count job")
  }
}

WordCount

import org.apache.flink.api.scala._

/**
  * Created by pengfeiqiao on 2019/9/16 11:48
  */

// 批处理代码
object WordCount {
  def main(args: Array[String]): Unit = {
    // 创建一个批处理的执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 从文件中读取数据
    val inputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\hello.txt"
    val inputDataSet = env.readTextFile(inputPath)

    // 分词之后做count
    val wordCountDataSet = inputDataSet.flatMap(_.split(" "))
      .map( (_, 1) )
      .groupBy(0)
      .sum(1)

    // 打印输出
    wordCountDataSet.print()
  }
}

# submit flink job in cluster by command
```shell
./bin/flink run -m 0.0.0.0:8081 -c com.nielsen.chapter02.StreamWordCount -p 1 /home/pengfei/IdeaProjects/FlinkTutorial/target/FlinkTutorial-1.0-SNAPSHOT.jar --host NLLH7477M3 --port 7777

check flink job list

./bin/flink list 
./bin/flink list -a
./bin/flink cancel job_id