content[-]
Map
package com.nielsen.chapter05
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._
object TransformMapTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream : DataStream[Event] = env.fromElements(Event("fei", "./aws", 1000L),
Event("lisa", "./aws", 2000L)
)
// extract every user name
// 1 use lambda
stream.map( _.user ).print("1")
//2. implement MapFunction interface
stream.map(new UserExtractor).print("2")
env.execute()
}
class UserExtractor extends MapFunction[Event, String]{
override def map(t: Event): String = t.user
}
}
Filter
package com.nielsen.chapter05
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._
object TransformFilterTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream : DataStream[Event] = env.fromElements(Event("fei", "./aws", 1000L),
Event("lisa", "./aws", 2000L)
)
// filter user is fei
// 1 use lambda
stream.filter(_.user == "fei" ).print("1")
// 2 implement FilterFunction interface
stream.filter(new UserFilter).print("2")
env.execute()
}
class UserFilter extends FilterFunction[Event] {
override def filter(t: Event): Boolean = t.user== "lisa"
}
}
Flatmap
package com.nielsen.chapter05
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object TransformFlatmapTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream : DataStream[Event] = env.fromElements(Event("fei", "./aws", 1000L),
Event("lisa", "./aws", 2000L), Event("mia", "./aws", 2000L)
)
stream.flatMap(new MyFlatMap).print()
env.execute()
}
class MyFlatMap extends FlatMapFunction[Event, String] {
override def flatMap(t: Event, collector: Collector[String]): Unit = {
// if current data is fei then print user
if (t.user == "fei"){
collector.collect(t.user)
}
// if data is lisa then print user and url
else if (t.user == "lisa"){
collector.collect(t.user)
collector.collect(t.url)
}
}
}
}