当前位置: 首页 > news >正文

spark3 streaming 读kafka写es

1. 代码

package data_import
import org.apache.spark.sql.{DataFrame, Row, SparkSession, SaveMode}
import org.apache.spark.sql.types.{ArrayType, DoubleType, LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.commons.lang3.exception.ExceptionUtils
import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneId
import org.apache.spark._
import org.apache.spark.streaming._
import alarm.Alarm
import org.elasticsearch.spark.sql._/** 用户特征记录*/
object KafkaImport {case class Parram(env: String, dst_table: String, kafka_address: String, kafka_topic: String, kafka_group_id: String, trigger_time: String)def main(args: Array[String]): Unit = {val param: Parram = utils.parseParam[Parram](args)println("args:" + param.toString())try {processTable(param)Alarm.alarm(env = param.env,level = Alarm.LevelInfo,content = "UserFeature Success")} catch {case e: Exception =>val msg = s"UserFeature handle failed,Error message:${e.getClass()}:${e.getMessage()}===>${ExceptionUtils.getStackTrace(e)}==>argsMap:${param.toString()}"println(msg)Alarm.alarm(env = param.env, level = Alarm.LevelWarning, content = msg)}}def processTable(param: Parram): Unit = {val conf = new SparkConf().setAppName("appName").setMaster("yarn")val ssc = new StreamingContext(conf, Seconds(param.trigger_time.toInt))val ss = SparkSession.builder.appName("KafkaImport").config("spark.sql.mergeSmallFiles.enabled", "true").config("spark.sql.mergeSmallFiles.threshold.avgSize", "true").config("spark.sql.mergeSmallFiles.maxSizePerTask", "true").config("es.net.http.auth.user", "elastic").config("es.net.http.auth.pass", "YX2021@greendog").getOrCreate()val kafkaParams = Map[String, Object]("bootstrap.servers" -> param.kafka_address,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> param.kafka_group_id,"auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean))val topics = Array(param.kafka_topic)val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))val schema: StructType = StructType(List(StructField("key", StringType, true),StructField("value", StringType, true)))stream.foreachRDD { rdd =>val data1 = rdd.map { record => Row(record.key, record.value) }val userData = ss.createDataFrame(data1, schema).withColumn("id", get_json_object(col("value"), "$.ctx.session_id").cast(StringType)).withColumn("app_id", get_json_object(col("value"), "$.ctx.app_id").cast(StringType)).withColumn("user_id", get_json_object(col("value"), "$.ctx.user_id").cast(StringType)).withColumn("session_id", get_json_object(col("value"), "$.ctx.session_id").cast(StringType)).withColumn("time", get_json_object(col("value"), "$.time").cast(LongType)).withColumn("datetime", getDayHourTime(col("time")).cast(TimestampType))userData.show()println("尝试连接ES...")val esDF = ss.read.format("org.elasticsearch.spark.sql").option("es.nodes", "192.168.145.43").option("es.port", "9200").option("es.net.http.auth.user", "elastic").option("es.net.http.auth.pass", "YX2021@greendog").load("test_saylo_user_feature_30033")println(s"索引中存在 ${esDF.count()} 条记录")userData.select(col("id"), col("session_id"), col("value"), col("app_id"), col("datetime")).filter(col("id").isNotNull && col("id") =!= "").write.option("es.nodes", "192.168.145.43").option("es.nodes.wan.only", "true").option("es.port", "9200").option("es.mapping.id", "id") // 替换为您的实际ID字段名// .option("es.mapping.type", "user_id:keyword") // 替换为您的实际ID字段名.mode("append").option("es.write.operation", "upsert").format("org.elasticsearch.spark.sql").option("es.net.http.auth.user", "elastic").option("es.net.http.auth.pass", "YX2021@greendog").save("test_saylo_user_feature_30033")val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}ssc.start()// 等待停止ssc.awaitTermination()}private val getDayHourTime = udf((timestamp: Long) => {utils.getDayTime(timestamp)})
}

2. 依赖

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xverse.saylo.rec</groupId><artifactId>saylo_rec_data_offline_v2</artifactId><version>1.0.0</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>1.8</java.version><scala.version>2.12.10</scala.version><spark.version>3.2.2</spark.version><jackson.version>2.14.0</jackson.version><shade.jar.name>${project.artifactId}-${project.version}-jar-with-dependencies.jar</shade.jar.name></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-spark-30_2.12</artifactId><version>7.12.0</version></dependency><dependency><groupId>commons-httpclient</groupId><artifactId>commons-httpclient</artifactId><version>3.1</version>  <!-- 或者你需要的版本 --></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.10.1</version>    <!-- 使用最新版本 --></dependency><dependency><groupId>com.qcloud</groupId><artifactId>cos_api</artifactId><version>5.6.227</version></dependency><dependency><groupId>com.typesafe.play</groupId><artifactId>play-json_2.12</artifactId><version>2.9.2</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>${jackson.version}</version>    <!-- Add Jackson dependencies --></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency><!--    <dependency>--><!--      <groupId>org.apache.hadoop</groupId>--><!--      <artifactId>hadoop-client</artifactId>--><!--      <version>${spark.version}</version>--><!--    </dependency>--><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.6.1</version></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java-util</artifactId><version>3.6.1</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty</artifactId><version>1.64.0</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-protobuf</artifactId><version>1.64.0</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-stub</artifactId><version>1.64.0</version></dependency><!-- <dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.6</version></dependency> --><dependency><groupId>org.json4s</groupId><artifactId>json4s-core_2.12</artifactId><version>3.6.6</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.6.3</version></dependency><dependency><groupId>com.tencentcloudapi</groupId><artifactId>tencentcloud-sdk-java-cls</artifactId><version>3.1.1174</version>    <!-- 使用最新版本 --></dependency><dependency><groupId>com.tencentcloudapi.cls</groupId><artifactId>tencentcloud-cls-sdk-java</artifactId><version>1.0.15</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency></dependencies><build><extensions><extension><groupId>kr.motd.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.6.0</version></extension></extensions><plugins><!--      <plugin>--><!--        <groupId>org.scala-tools</groupId>--><!--        <artifactId>maven-scala-plugin</artifactId>--><!--        <version>2.9.1</version>--><!--        <executions>--><!--          <execution>--><!--            <goals>--><!--              <goal>compile</goal>--><!--              <goal>testCompile</goal>--><!--            </goals>--><!--          </execution>--><!--        </executions>--><!--      </plugin>--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target><encoding>utf-8</encoding></configuration></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>compile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.5.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><finalName>${shade.jar.name}</finalName><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><relocations><!-- <relocation><pattern>org.apache.commons</pattern><shadedPattern>com.acme.shaded.apachecommons</shadedPattern></relocation> --><relocation><pattern>com.google.protobuf</pattern><shadedPattern>my.project.shaded.protobuf</shadedPattern></relocation></relocations><createDependencyReducedPom>false</createDependencyReducedPom><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>my.Application</mainClass><manifestEntries><Implementation-Version>${version}</Implementation-Version><Main-Class>my.Application</Main-Class></manifestEntries></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /></transformers></configuration></execution></executions></plugin></plugins>
</build>
</project>

3. 注意事项

  1. ElasticSearch 7.x 默认不在支持指定索引类型
    如果es版本是7.+
    需注意
    save应写为
    .save(“test_saylo_user_feature_30033”)
    而不是
    .save(“test_saylo_user_feature_30033/docs”)
    否则会报类型转换错误。例如
    [user_id] cannot be changed from type [keyword] to [text]

  2. 依赖冲突
    找不到类

    Caused by: java.lang.ClassNotFoundException: com.acme.shaded.apachecommons.httpclient.HttpConnectionManager
    

在pom文件中手动加入,具体参加上面的pom文件

    <dependency><groupId>commons-httpclient</groupId><artifactId>commons-httpclient</artifactId><version>3.1</version>   </dependency>
http://www.xdnf.cn/news/1101367.html

相关文章:

  • 使用浏览器inspect调试wx小程序
  • 【HTML】俄罗斯方块(精美版)
  • AI产品经理面试宝典第7天:核心算法面试题-上
  • GPT3/chatGPT/T5/PaLM/LLaMA/GLM主流大语言模型的原理和差异
  • flutter redux状态管理
  • 文章发布易优CMS(Eyoucms)网站技巧
  • oracle
  • 【InnoDB存储引擎4】行结构
  • PDF转图片
  • 2025 年第十五届 APMCM 亚太地区大学生数学建模竞赛-B题 疾病的预测与大数据分析
  • 滑动窗口-3.无重复字符的最长子串-力扣(LeetCode)
  • 使用Python和AkShare轻松获取新闻联播文字稿:从数据获取到文本挖掘
  • vue3+ts div自由拖拽改变元素宽度
  • C++——构造函数的补充:初始化列表
  • UML 与 SysML 图表对比全解析:软件工程 vs 系统工程建模语言
  • ContextMenu的Item如何绑定命令
  • “28项评测23项SOTA——GLM-4.1V-9B-Thinking本地部署教程:10B级视觉语言模型的性能天花板!
  • 【AI大模型】BERT微调文本分类任务实战
  • 拼数(字符串排序)
  • 力扣面试150(29/100)
  • 问题 C: 为美好的世界献上爆炎(博弈论)
  • 如何在 Windows 10 上安装设置 Apache Kafka
  • 聊聊AI大模型的上下文工程(Context Engineering)
  • 你见过的最差的程序员是怎样的?
  • Redis底层数据结构
  • CSS3的核心功能介绍及实战使用示例
  • 提示工程:解锁大模型潜力的核心密码
  • 库存订单管理系统:3月份开源项目汇总
  • linux中cmake编译项目
  • Django母婴商城项目实践(二)