Spark Streaming接收Kafka数据存储到Hbase
fly
spark
hbase
kafka
主要参考了这篇文章https://yq.aliyun.com/articles/60712([点我])(https://yq.aliyun.com/articles/60712), 不过这篇文章使用的spark貌似是spark1.x的。我这里主要是改为了spark2.x的方式
kafka生产数据
闲话少叙,直接上代码:
- import java.util.{Properties, UUID}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.util.Random
object KafkaProducerTest {
def main(args: Array[String]): Unit = {
val rnd = new Random()
// val topics = "world"
val topics = "test"
val brokers = "localhost:9092"
val props = new Properties()
props.put("delete.topic.enable", "true")
props.put("key.serializer", classOf[StringSerializer])
// props.put("value.serializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.serializer", classOf[StringSerializer])
props.put("bootstrap.servers", brokers)
//props.put("batch.num.messages","10");//props.put("batch.num.messages","10");
//props.put("queue.buffering.max.messages", "20");
//linger.ms should be 0~100 ms
props.put("linger.ms", "50")
//props.put("block.on.buffer.full", "true");
//props.put("max.block.ms", "100000");
//batch.size and buffer.memory should be changed with "the kafka message size and message sending speed"
props.put("batch.size", "16384")
props.put("buffer.memory", "1638400")
props.put("queue.buffering.max.messages", "1000000")
props.put("queue.enqueue.timeout.ms", "20000000")
props.put("producer.type", "sync")
val producer = new KafkaProducer[String,String](props)
for(i <- 1001 to 2000){
val key = UUID.randomUUID().toString.substring(0,5)
val value = "fly_" + i + "_" + key
producer.send(new ProducerRecord[String, String](topics,key, value))//.get()
}
producer.flush()
}
}
生产的数据格式为(key,value) = (uuid, fly_i_key) 的形式
spark streaming 读取kafka并保存到hbase
当kafka里面有数据后,使用spark streaming 读取,并存。直接上代码:
- import java.util.UUID
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Mutation, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.OutputFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* spark streaming 写入到hbase
* Sparkstreaming读取Kafka消息再结合SparkSQL,将结果保存到HBase
*/
object OBDSQL {
case class Person(name: String, age: Int, key: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("sparkSql")
.master("local[4]")
.getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(5))
val topics = Array("test")
val kafkaParams = Map(
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
// "group.id" -> "use_a_separate_group_id_for_each_stream",
"group.id" -> "use_a_separate_group_id_for_each_stream_fly",
// "auto.offset.reset" -> "latest",
"auto.offset.reset" -> "earliest",
// "auto.offset.reset" -> "none",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val lines = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
// lines.map(record => (record.key, record.value)).print()
// lines.map(record => record.value.split("_")).map(x=> (x(0),x(1), x(2))).print()
lines.foreachRDD((rdd: RDD[ConsumerRecord[String, String]]) => {
import spark.implicits._
if (!rdd.isEmpty()) {
// temp table
rdd.map(_.value.split("_")).map(p => Person(p(0), p(1).trim.toInt, p(2))).toDF.createOrReplaceTempView("temp")
// use spark sql
val rs = spark.sql("select * from temp")
// create hbase conf
val hconf = HBaseConfiguration.create
hconf.set("hbase.zookeeper.quorum", "localhost"); //ZKFC
hconf.set("hbase.zookeeper.property.clientPort", "2181")
hconf.set("hbase.defaults.for.version.skip", "true")
hconf.set(TableOutputFormat.OUTPUT_TABLE, "t1") // t1是表名, 表里面有一个列簇 cf1
hconf.setClass("mapreduce.job.outputformat.class", classOf[TableOutputFormat[String]], classOf[OutputFormat[String, Mutation]])
val jobConf = new JobConf(hconf)
// convert every line to hbase lines
rs.rdd.map(line => {
val put = new Put(Bytes.toBytes(UUID.randomUUID().toString.substring(0, 9)))
put.addColumn(Bytes.toBytes("cf1")
, Bytes.toBytes("name")
, Bytes.toBytes(line.get(0).toString)
)
put.addColumn(Bytes.toBytes("cf1")
, Bytes.toBytes("age")
, Bytes.toBytes(line.get(1).toString)
)
put.addColumn(Bytes.toBytes("cf1")
, Bytes.toBytes("key")
, Bytes.toBytes(line.get(2).toString)
)
(new ImmutableBytesWritable, put)
}).saveAsNewAPIHadoopDataset(jobConf)
}
})
lines.map(record => record.value.split("_")).map(x=> (x(0),x(1), x(2))).print()
ssc start()
ssc awaitTermination()
}
}