content[-]
SparkKafkaConsumer
package com.joe.spark
import org.apache.htrace.fasterxml.jackson.databind.deser.std.StringDeserializer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkKafkaConsumer {
def main(args: Array[String]): Unit = {
//1 environment
val conf = new SparkConf().setMaster("local[*]").setAppName("spark-kafka")
val streamingContext = new StreamingContext(conf,Seconds(3))
// 2 consumer data
val kafkapara = Map[String,Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"hadoop102:9092,hadoop103:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG->"test"
)
val KafkaDStream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Set("first"),kafkapara))
val vakueDStream = KafkaDStream.map(record=>record.value())
vakueDStream.print()
//3 run code
streamingContext.start()
streamingContext.awaitTermination()
}
}
SparkKafkaProducer
package com.joe.spark
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
object SparkKafkaProducer {
def main(args: Array[String]): Unit = {
// 0 conf
val properties = new Properties()
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092")
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])
// 1 create a producer
val producer = new KafkaProducer[String,String](properties)
// 2 send data
for (i <- 1 to 5) {
producer.send(new ProducerRecord[String,String]("first","atguigu"+ i))
}
// 3 close source
producer.close()
}
}
CustomProducer
package com.nielsen.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducer {
public static void main(String[] args) {
// 0 properties
Properties properties = new Properties();
// connect kafka cluster
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.42.130:9092,192.168.42.131:9092");
//SERIALIZER key and value
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 1 create kafka instance
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 send data
for (int i = 0; i < 50 ; i++) {
kafkaProducer.send(new ProducerRecord<>("ERROR","joe" + i));
}
//3 close kafka
kafkaProducer.close();
}
}