Spark Streaming接收Kafka数据存储到Hbase

2023-06-13,,

Spark Streaming接收Kafka数据存储到Hbase

fly
spark
hbase
kafka

主要参考了这篇文章https://yq.aliyun.com/articles/60712([点我])(https://yq.aliyun.com/articles/60712), 不过这篇文章使用的spark貌似是spark1.x的。我这里主要是改为了spark2.x的方式

kafka生产数据

闲话少叙,直接上代码:

    import java.util.{Properties, UUID} 



    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} 


    import org.apache.kafka.common.serialization.StringSerializer 



    import scala.util.Random 




    object KafkaProducerTest { 


    def main(args: Array[String]): Unit = { 


    val rnd = new Random() 


    // val topics = "world" 


    val topics = "test" 


    val brokers = "localhost:9092" 


    val props = new Properties() 


    props.put("delete.topic.enable", "true") 


    props.put("key.serializer", classOf[StringSerializer]) 


    // props.put("value.serializer", "org.apache.kafka.common.serialization.StringDeserializer") 


    props.put("value.serializer", classOf[StringSerializer]) 


    props.put("bootstrap.servers", brokers) 


    //props.put("batch.num.messages","10");//props.put("batch.num.messages","10"); 



    //props.put("queue.buffering.max.messages", "20"); 


    //linger.ms should be 0~100 ms 


    props.put("linger.ms", "50") 


    //props.put("block.on.buffer.full", "true"); 


    //props.put("max.block.ms", "100000"); 


    //batch.size and buffer.memory should be changed with "the kafka message size and message sending speed" 


    props.put("batch.size", "16384") 


    props.put("buffer.memory", "1638400") 



    props.put("queue.buffering.max.messages", "1000000") 


    props.put("queue.enqueue.timeout.ms", "20000000") 


    props.put("producer.type", "sync") 



    val producer = new KafkaProducer[String,String](props) 


    for(i <- 1001 to 2000){ 


    val key = UUID.randomUUID().toString.substring(0,5) 


    val value = "fly_" + i + "_" + key 


    producer.send(new ProducerRecord[String, String](topics,key, value))//.get() 





    producer.flush() 





生产的数据格式为(key,value) = (uuid, fly_i_key) 的形式

spark streaming 读取kafka并保存到hbase

当kafka里面有数据后,使用spark streaming 读取,并存。直接上代码:

    import java.util.UUID 



    import org.apache.hadoop.hbase.HBaseConfiguration 


    import org.apache.hadoop.hbase.client.{Mutation, Put} 


    import org.apache.hadoop.hbase.io.ImmutableBytesWritable 


    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat 


    import org.apache.hadoop.hbase.util.Bytes 


    import org.apache.hadoop.mapred.JobConf 


    import org.apache.hadoop.mapreduce.OutputFormat 


    import org.apache.kafka.clients.consumer.ConsumerRecord 


    import org.apache.kafka.common.serialization.StringDeserializer 


    import org.apache.spark.rdd.RDD 


    import org.apache.spark.sql.SparkSession 


    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 


    import org.apache.spark.streaming.kafka010.KafkaUtils 


    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 


    import org.apache.spark.streaming.{Seconds, StreamingContext} 



    /** 


    * spark streaming 写入到hbase 


    * Sparkstreaming读取Kafka消息再结合SparkSQL,将结果保存到HBase 


    */ 




    object OBDSQL { 


    case class Person(name: String, age: Int, key: String) 



    def main(args: Array[String]): Unit = { 


    val spark = SparkSession 


    .builder() 


    .appName("sparkSql") 


    .master("local[4]") 


    .getOrCreate() 



    val sc = spark.sparkContext 



    val ssc = new StreamingContext(sc, Seconds(5)) 



    val topics = Array("test") 


    val kafkaParams = Map( 


    "bootstrap.servers" -> "localhost:9092,anotherhost:9092", 


    "key.deserializer" -> classOf[StringDeserializer], 


    "value.deserializer" -> classOf[StringDeserializer], 


    // "group.id" -> "use_a_separate_group_id_for_each_stream", 


    "group.id" -> "use_a_separate_group_id_for_each_stream_fly", 


    // "auto.offset.reset" -> "latest", 


    "auto.offset.reset" -> "earliest", 


    // "auto.offset.reset" -> "none", 


    "enable.auto.commit" -> (false: java.lang.Boolean) 




    val lines = KafkaUtils.createDirectStream[String, String]( 


    ssc, 


    PreferConsistent, 


    Subscribe[String, String](topics, kafkaParams) 




    // lines.map(record => (record.key, record.value)).print() 


    // lines.map(record => record.value.split("_")).map(x=> (x(0),x(1), x(2))).print() 



    lines.foreachRDD((rdd: RDD[ConsumerRecord[String, String]]) => { 


    import spark.implicits._ 


    if (!rdd.isEmpty()) { 



    // temp table 


    rdd.map(_.value.split("_")).map(p => Person(p(0), p(1).trim.toInt, p(2))).toDF.createOrReplaceTempView("temp") 



    // use spark sql 


    val rs = spark.sql("select * from temp") 



    // create hbase conf 


    val hconf = HBaseConfiguration.create 


    hconf.set("hbase.zookeeper.quorum", "localhost"); //ZKFC 


    hconf.set("hbase.zookeeper.property.clientPort", "2181") 


    hconf.set("hbase.defaults.for.version.skip", "true") 


    hconf.set(TableOutputFormat.OUTPUT_TABLE, "t1") // t1是表名, 表里面有一个列簇 cf1 


    hconf.setClass("mapreduce.job.outputformat.class", classOf[TableOutputFormat[String]], classOf[OutputFormat[String, Mutation]]) 


    val jobConf = new JobConf(hconf) 



    // convert every line to hbase lines 


    rs.rdd.map(line => { 


    val put = new Put(Bytes.toBytes(UUID.randomUUID().toString.substring(0, 9))) 


    put.addColumn(Bytes.toBytes("cf1") 


    , Bytes.toBytes("name") 


    , Bytes.toBytes(line.get(0).toString) 



    put.addColumn(Bytes.toBytes("cf1") 


    , Bytes.toBytes("age") 


    , Bytes.toBytes(line.get(1).toString) 



    put.addColumn(Bytes.toBytes("cf1") 


    , Bytes.toBytes("key") 


    , Bytes.toBytes(line.get(2).toString) 



    (new ImmutableBytesWritable, put) 


    }).saveAsNewAPIHadoopDataset(jobConf) 



    }) 



    lines.map(record => record.value.split("_")).map(x=> (x(0),x(1), x(2))).print() 



    ssc start() 


    ssc awaitTermination() 






Spark Streaming接收Kafka数据存储到Hbase的相关教程结束。

《Spark Streaming接收Kafka数据存储到Hbase.doc》

下载本文的Word格式文档,以方便收藏与打印。