content[-]

SparkKafkaConsumer

package com.joe.spark

import org.apache.htrace.fasterxml.jackson.databind.deser.std.StringDeserializer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkKafkaConsumer {
  def main(args: Array[String]): Unit = {
    //1 environment

    val conf = new SparkConf().setMaster("local[*]").setAppName("spark-kafka")

    val streamingContext = new StreamingContext(conf,Seconds(3))
    // 2 consumer data
    val kafkapara = Map[String,Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"hadoop102:9092,hadoop103:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG->"test"
    )
    val KafkaDStream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Set("first"),kafkapara))

    val vakueDStream = KafkaDStream.map(record=>record.value())

    vakueDStream.print()

    //3 run code
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

SparkKafkaProducer

package com.joe.spark
import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
object SparkKafkaProducer {
  def main(args: Array[String]): Unit = {

    // 0 conf
    val properties = new Properties()
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092")
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])
    // 1 create a producer

    val producer = new KafkaProducer[String,String](properties)

    // 2 send data
    for (i <- 1 to 5) {
      producer.send(new ProducerRecord[String,String]("first","atguigu"+ i))
    }
    // 3 close source
    producer.close()

  }
}

CustomProducer

package com.nielsen.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducer {
    public static void main(String[] args) {
        // 0 properties
        Properties properties = new Properties();
        // connect kafka cluster
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.42.130:9092,192.168.42.131:9092");
        //SERIALIZER key and value
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        // 1 create kafka instance
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        // 2 send data
        for (int i = 0; i < 50 ; i++) {
            kafkaProducer.send(new ProducerRecord<>("ERROR","joe" + i));
        }

        //3 close kafka
        kafkaProducer.close();
    }
}