Spark写数据到Kafka

news/2024/4/28 7:16:26

创建KafkaSink对象:


import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}import java.util.concurrent.Future/**** Author:jianjipan@kanzhun.com* Date:2024/2/26 10:50*/
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {lazy val producer = createProducer()def send(topic: String, key: K, value: V): Future[RecordMetadata] =producer.send(new ProducerRecord[K, V](topic, key, value))def send(topic: String, value: V): Future[RecordMetadata] =producer.send(new ProducerRecord[K, V](topic, value))
}
  • 该对象接受一个类型为() => KafkaProducer[K, V]的函数类型参数createProducer。这是一种高阶函数,允许在实例化时提供创建KafkaProducer对象的具体逻辑。

  • 使用了lazy关键字进行声明。这意味着producer属性在首次访问时才会被初始化,延迟了对象的创建,提高了性能。

然后创建KafkaSink单例对象,用来实例化KafkaSink对象


import com.zhipin.model.factory.spark.kafka.KafkaSink
import org.apache.kafka.clients.producer.KafkaProducer/**** Author:jianjipan@kanzhun.com* Date:2024/2/26 10:59*/
object KafkaSink {import scala.collection.JavaConversions._def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {val createProducerFunc = () => {val producer = new KafkaProducer[K, V](config)sys.addShutdownHook {producer.close()}producer}new KafkaSink(createProducerFunc)}def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}

在Scala中,apply方法是一种特殊的方法,可以在对象名后面使用圆括号调用,就像调用一个函数一样。具体调用的方式有以下几种情况:
对象名():当对象的apply方法没有参数时,可以直接使用圆括号调用,例如obj()。
对象名(参数1, 参数2, …):当对象的apply方法具有参数时,可以通过将参数放入圆括号中来调用,例如obj(arg1, arg2)。
对象名.apply():也可以显式地使用.apply方法来调用。例如obj.apply()。
除了上述示例,还可以在类似于集合的场景下使用apply方法。例如,对于一个List对象list,可以通过下标来访问元素,实际上是调用了list的apply方法。例如list(0)实际上调用了list.apply(0)。
总之,Scala中的apply方法可以让对象像函数一样被调用,提供了一种简洁的语法来创建和调用对象。

然后应用上述方法实现DataFrame数据导入Kafka的逻辑

    val sparkConf = new SparkConf().setAppName("DatasetToKafka")sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)val spark = SparkSession.builder().config(sparkConf).getOrCreate()val taskId=args(0)val paramEntity = JobArgsService.queryJobArgs(taskId,classOf[DataSetToKafkaEntity])//构建kafkaProducer广播变量val kafkaProducer: Broadcast[KafkaSink[String, String]] = {val kafkaProducerConf = {val p = new Properties()val userName=paramEntity.getMqUserNameval password=paramEntity.getMqPassWordp.setProperty("bootstrap.servers", paramEntity.getMqBrokenIps)p.setProperty("key.serializer", classOf[StringSerializer].getName)p.setProperty("value.serializer", classOf[StringSerializer].getName)p.setProperty("acks","1")p.setProperty("retries","3")p.setProperty("security.protocol","SASL_PLAINTEXT")p.setProperty("sasl.mechanism","SCRAM-SHA-256")p.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +"username=\"" + userName + "\" password=\"" + password + "\";")p}spark.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConf))}//从dataset取数val topic = paramEntity.getMqTopicval sqlLogic = paramEntity.getSqlLogicval df = spark.sql(sqlLogic).withColumn("taskId",lit(taskId)).toJSON//写入Kafkadf.foreach(row => {kafkaProducer.value.send(topic, row)println("推送完成:" + row)})

通过使用广播变量,可以将KafkaSink实例在集群中的多个任务中共享,减少了每个任务中创建KafkaSink的开销,提高了效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.cpky.cn/p/10733.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

深入理解mysql 从入门到精通

1. MySQL结构 由下图可得MySQL的体系构架划分为:1.网络接入层 2.服务层 3.存储引擎层 4.文件系统层 1.网络接入层 提供了应用程序接入MySQL服务的接口。客户端与服务端建立连接,客户端发送SQL到服务端,Java中通过JDBC来实现连接数据库。 …

鸿蒙Harmony应用开发—ArkTS声明式开发(绘制组件:Ellipse)

椭圆绘制组件。 说明: 该组件从API Version 7开始支持。后续版本如有新增内容,则采用上角标单独标记该内容的起始版本。 子组件 无 接口 Ellipse(options?: {width?: string | number, height?: string | number}) 从API version 9开始&#xff0…

Yolov8有效涨点:YOLOv8-AM,添加多种注意力模块提高检测精度,含代码,超详细

前言 2023 年,Ultralytics 推出了最新版本的 YOLO 模型。注意力机制是提高模型性能最热门的方法之一。 本次介绍的是YOLOv8-AM,它将注意力机制融入到原始的YOLOv8架构中。具体来说,我们分别采用四个注意力模块:卷积块注意力模块…

算法——贪心算法

《算法图解》——贪心算法 # 首先创建一个表,包含所覆盖的州 states_needed set([mt,wa,or,id,nv,ut,az]) # 传入一个数组,转换成一个集合#可供选择的广播台清单 stations {} stations[kone] set([id,nv,ut]) #用集合表示想要覆盖的州,且不…

Linux环境搭建Jenkins(详细图文)

目录 简介Jenkins 特点 一、环境准备 1.jdk环境准备 2.maven环境准备 3.git环境准备 二、安装部署Jenkins(采用war包方式) 1.下载Jenkins ​2.启动war包 1)将下载好的Jenkins的war包上传到服务器上 2)编辑启动脚本,方便…

雷龙科技Nand flash芯片试用体验

一、项目背景 最近自己开始准备了一个智能家居控制系统项目,需要包含室内的温湿度、空气质量、烟雾浓度以及气体含量,能够存储相应的数据,并进行显示。 Nand-flash存储器是flash存储器的一种,其内部采用非线性宏单元模式,为固态大容量内存的实现提供了廉价有效的解决方案…