利用Spark将Kafka数据流写入HDFS

news/2024/4/30 15:37:05

利用Spark将Kafka数据流写入HDFS

在当今的大数据时代,实时数据处理和分析变得越来越重要。Apache Kafka作为一个分布式流处理平台,已经成为处理实时数据的事实标准。而Apache Spark则是一个强大的大数据处理框架,它提供了对数据进行复杂处理的能力。
本篇博客将介绍如何使用Spark来读取Kafka中的数据流,并将这些数据以CSV格式写入到HDFS中。
环境准备
在开始之前,确保你的开发环境中已经安装了以下软件:

Apache Kafka

#启动zookeeper
zkServer start
#启动kafka服务
kafka-server-start /opt/homebrew/etc/kafka/server.properties

Apache Spark

<properties><scala.version>2.12.17</scala.version><spark.version>3.0.0</spark.version><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><!-- Kafka Streaming dependency --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency>

Hadoop HDFS

#启动hdfs
start-dfs.sh

Java开发环境
此外,你需要在项目中包含Spark和Kafka的依赖库。

代码实现
首先,我们定义一个Scala case class Job 来表示从Kafka读取的每条记录的数据结构。

case class Job(Position: String,Company: String,Salary: String,Location: String,Experience: String,Education: String,Detail: String
)

接下来,我们编写一个Kafka2Hdfs对象,并在其中实现main方法。这个方法将创建一个SparkSession,配置Kafka读取选项,并从Kafka中读取数据流。

object Kafka2Hdfs {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("Kafka2Hdfs").master("local[*]").getOrCreate()import spark.implicits._val kafkaOptions = Map[String, String]("kafka.bootstrap.servers" -> "127.0.0.1:9092","subscribe" -> "flume","startingOffsets" -> "earliest")val stream = spark.readStream.format("kafka").options(kafkaOptions).load()

我们使用subscribe选项指定Kafka中的topic名称,这里我们使用的是flume。startingOffsets选项设置为earliest,意味着我们从最早的记录开始读取数据。

接下来,我们将Kafka中的数据转换成DataFrame。我们首先将每条记录的value字段转换为字符串,然后使用map函数将每条记录解析为Job对象。

val jobDs = stream.selectExpr("CAST(value AS STRING)").as[String].map(line => {val fields = line.split(",")Job(Position = fields(0),Company = fields(1).trim,Salary = fields(2).trim,Location = fields(3).trim,Experience = fields(4).trim,Education = fields(5).trim,Detail = fields(6).trim)}).toDF()

现在,我们已经有了一个包含Job对象的DataFrame。接下来,我们将这个DataFrame以CSV格式写入到HDFS中。我们使用writeStream方法,并设置format为csv,同时指定输出路径和检查点位置。

val query: StreamingQuery = jobDs.writeStream.format("csv").option("header", "false").option("path", "/").option("checkpointLocation", "/ck").start()

注意,我们在这里将header选项设置为false,因为我们不打算在CSV文件中包含列名。path选项指定了输出文件的存储路径,而checkpointLocation选项指定了检查点的存储路径,这对于流处理的可靠性非常重要。

最后,我们调用awaitTermination方法来等待流处理的结束。在实际的生产环境中,你可能希望将这个流处理任务部署到一个集群上,并让它持续运行。

query.awaitTermination()

总结
在这篇博客中,我们介绍了如何使用Spark读取Kafka中的数据流,并将这些数据以CSV格式写入到HDFS中。这种方法可以用于各种实时数据处理场景,例如日志分析、事件监控等。通过这种方式,我们可以将实时数据转换为静态数据,以便进行更深入的分析和处理。

完整代码:

package com.lhy.sparkkafka2hdfsimport org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Row, SparkSession}case class Job(Position:String,Company:String,Salary:String,Location:String,Experience:String,Education:String,Detail:String)
object Kafka2Hdfs{def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("Kafka2Hdfs").master("local[*]").getOrCreate()import spark.implicits._val kafkaOptions = Map[String, String]("kafka.bootstrap.servers" -> "127.0.0.1:9092","subscribe" -> "flume","startingOffsets" -> "earliest")val stream = spark.readStream.format("kafka").options(kafkaOptions).load()val jobDs = stream.selectExpr("CAST(value AS STRING)").as[String].map(line => {val fields = line.split(",")Job(Position = fields(0),Company = fields(1).trim,Salary = fields(2).trim,Location = fields(3).trim,Experience = fields(4).trim,Education = fields(5).trim,Detail = fields(6).trim)}).toDF()
//    val query = jobDs.writeStream.format("console").start()val query: StreamingQuery = jobDs.writeStream.format("csv").option("header", "false").option("path", "/").option("checkpointLocation", "/ck").start()query.awaitTermination()}

在这里插入图片描述
如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.cpky.cn/p/11570.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

【Web爬虫】爬⾍⿊⻰江省与四川省农机补贴以及数据分析

目录 一、实验目的 二、实验内容 2.1 实验爬取数据选择 2.2python代码爬取数据 2.3数据处理与分析 三、实验原理 3.1 python 连接网页的两种方式 3.1.1 使用requests连接网页 3.1.2 使用from selenium import webdriver连接网页&#xff0c;创建网页测试 3.2 python …

HarmonyOS NEXT应用开发之@State装饰器:组件内状态

State装饰的变量&#xff0c;或称为状态变量&#xff0c;一旦变量拥有了状态属性&#xff0c;就和自定义组件的渲染绑定起来。当状态改变时&#xff0c;UI会发生对应的渲染改变。 在状态变量相关装饰器中&#xff0c;State是最基础的&#xff0c;使变量拥有状态属性的装饰器&a…

2024 年高效开发的 React 生态系统

要使用 React 制作应用程序&#xff0c;需要熟悉正确的库来添加您需要的功能。例如&#xff0c;要添加某个功能&#xff08;例如身份验证或样式&#xff09;&#xff0c;您需要找到一个好的第三方库来处理它。 在这份综合指南中&#xff0c;我将向您展示我建议您在 2024 年使用…

【JavaScript 漫游】【049】ES6 规范中对象的扩展

文章简介 本篇文章为【JavaScript 漫游】专栏的第 049 篇文章&#xff0c;对 ES6 规范中对象的扩展知识点进行了记录。具体包括&#xff1a; 属性的简洁表示法属性名表达式方法的 name 属性属性的可枚举性和遍历super 关键字对象的扩展运算符链判断运算符Null 判断运算符新增…

是否有替代U盘,可安全交换的医院文件摆渡方案?

医院内部网络存储着大量的敏感医疗数据&#xff0c;包括患者的个人信息、病历记录、诊断结果等。网络隔离可以有效防止未经授权的访问和数据泄露&#xff0c;确保这些敏感信息的安全。随着法律法规的不断完善&#xff0c;如《网络安全法》、《个人信息保护法》等&#xff0c;医…

Python 潮流周刊#44:Mojo 本周开源了;AI 学会生成音乐了

△△请给“Python猫”加星标 &#xff0c;以免错过文章推送 你好&#xff0c;我是猫哥。这里每周分享优质的 Python、AI 及通用技术内容&#xff0c;大部分为英文。本周刊开源&#xff0c;欢迎投稿[1]。另有电报频道[2]作为副刊&#xff0c;补充发布更加丰富的资讯&#xff0c;…