content[-]

Map

package com.nielsen.chapter05
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._

object TransformMapTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream : DataStream[Event] = env.fromElements(Event("fei", "./aws", 1000L),
      Event("lisa", "./aws", 2000L)
    )

    // extract every user name
    // 1 use lambda
    stream.map( _.user ).print("1")

    //2. implement MapFunction interface
    stream.map(new UserExtractor).print("2")
    env.execute()

  }

  class UserExtractor extends MapFunction[Event, String]{
    override def map(t: Event): String = t.user
  }
}

Filter

package com.nielsen.chapter05


import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._

object TransformFilterTest {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream : DataStream[Event] = env.fromElements(Event("fei", "./aws", 1000L),
      Event("lisa", "./aws", 2000L)
    )

    // filter user is fei
    // 1 use lambda
    stream.filter(_.user == "fei" ).print("1")

    // 2 implement FilterFunction interface
    stream.filter(new UserFilter).print("2")
    env.execute()
  }

  class UserFilter extends FilterFunction[Event] {
    override def filter(t: Event): Boolean = t.user== "lisa"
  }
}

Flatmap

package com.nielsen.chapter05

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object TransformFlatmapTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream : DataStream[Event] = env.fromElements(Event("fei", "./aws", 1000L),
      Event("lisa", "./aws", 2000L),   Event("mia", "./aws", 2000L)
    )

    stream.flatMap(new MyFlatMap).print()
    env.execute()
  }

  class MyFlatMap extends FlatMapFunction[Event, String] {

    override def flatMap(t: Event, collector: Collector[String]): Unit = {
      // if current data is fei then print user
      if (t.user == "fei"){
        collector.collect(t.user)
      }
      // if data is lisa then print user and url
      else if (t.user == "lisa"){
        collector.collect(t.user)
        collector.collect(t.url)
      }
    }
  }
}