百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

亲手搭建个spark 环境试试

toyiye 2024-06-21 12:18 7 浏览 0 评论


1. spark 介绍

由加州大学伯克利分校AMP实验室 开发的大数据计算引擎,以RDD (弹性分布式数据集)为最小数据单,基于内存的列式数据结构DataFrame,计算速度较快。spark 专注于大数据一栈式服务生态,包含sparkCore/sparkSql/sparkStreaming/sparkMllib 等。

2. 测试环境架构

Linux 7.0 + zk3.4 集群 + kafka 2.11 集群 + scala 2.12 + jdk 1.8 + spark 2.1.0 集群

3. 环境搭建步骤

特别说明:

· scala2.12 版本需要jdk 1.8 环境支持:

· spark2.0 及以上版本默认配置scala2.1l

· Springboot 1.5 基于的SpringFrameWork4.3 不支持jdk8 ,因此只能用kafka 0.11 版本,即Spring-kafka 最高为1.37版

· SpringBoot 2.0 基于的SpringFrameWorld5.0 支持jdk8, 因此可以使用kafka 2.0 及以上版本

首先修改host:

1) 安装Scala(与jdk 安装方式一样),以192.168.52.40 服务器为例。

a. 上传安装包到服务器路径下解压,例如 /opt

b.设置环境变量 SCALA_HOME, vim /etc/profile

使配置生效 source /etc/profile

测试安装结果:

分别在192.168.52.35 和192.168.52.41 服务器按照相同的方法安装scala

2) 安装spark (前提是需要先安装hadoop、jdk、scala)

(Hadoop 集群之前已经发出过安装说明)

根据架构配置,40 服务器为master,35 和41 服务器都当做worker计算节点使用,以40服务器安装为例。

A.传压缩包到 服务器路径下并且解压:

B. 修改环境变量

source /etc/profile

C. 修改spark配置环境

cd /opt/spark/conf

//复制一份配置模板

cp spark-env.sh.template spark-env.sh

//做如下修改,vim spark-env.sh

SCALA_HOME

JAVA_HOME

HADOOP_CONF_DIR//hadoop 配置文件路径

SPARK_MASTER_IP//master 地址

SPARK_WORKER_MEMORY//每个work节点所能够分配到的内存

D.配置slave(worker)

cp slaves.template slaves

vim slaves //此处严格地讲是不应该将master 也当做worker的,40 服务器已经装太多东东了

E.在35 和41 服务器以相同的方式安装spark

(或者 将scp /opt/spark 到35 和41 服务器,但是确保scala、jdk和hadoop的安装路径相同,否则需要单独修改)

F. 启动spark 集群

cd /opt/spark/sbin

./start-all.sh

写个测试用例试试:

Spark 集群环境搭建完成。

3) 安装ZK集群,以40服务器安装为例

上传安装包并且解压

修改配置文件

cd ./conf

cp zoo_sample.cfg zoo.cfg

vim zoo.cfg

在dataDir 文件路径下创建myid文件

touch myid

vim myid

为了在集群环境下,各机器之间的识别标识,唯一

将40服务器的ZK 分发到其他服务器,分别修改myid 文件为2/3,然后分别启动ZKServer

./zkServer.sh

// 查看节点状态

./zkServer.sh status

可以看出41 的ZK是leader,另两台是follower

4) 安装kafka集群

上传安装包并且进行解压

修改配置文件

cd ./config

vim server.properties

配置完成后,分发到35 和41 服务器,并且分别修改broker.id为1 和2 ,并且将listeners 改为对应服务器地址即可。

以守护进程方式 启动三台服务器的kafka

./kafka-server-start.sh -daemon ../config/server.properties

测试:

测试成功,kafka 集群安装完成。

至此环境已经搭建完成,接下来从开发层面demo展示

4. Spark 项目Demo开发

本地开发调试也需要安装scala、jdk环境,方式同上

应用场景

A.读取静态数据

通过SparkSql 从静态数据源(RDDs、csv、json、hive、jdbc 数据源等)读取数据进行处理。

创建测试文件sparkSql.txt(json格式)

{"chuangwei":"H44ddddddddddddddddd01","pv":13}

{"chuangwei":"H4402","pv":11}

{"chuangwei":"H4401","pv":12}

{"chuangwei":"H4403","pv":10}

{"chuangwei":"H4405","pv":13}

{"chuangwei":"H4401","pv":140}

SparkSQL 分为SparkContext 和 HiveContext(继承自SparkContext),可以通过SparkSession创建

B.流式数据处理

从kafka、flume等数据源读取数据进行实时分析

· Kafka与SpringBoot整合

由于Springboot不同的版本支持不同的jdk,因此需要不同版本的kafka 支持

Springboot 1.5 最高只能使用1.37 版本的Spring-kafka

Springboot2.0 可以使用2.0 以上版本的Spring-kafka

在 SpringbootApplication.yml 主配置文件关于kafak 配置如下

引入依赖包

模拟生产者发出消息(Spring 容器启动后,注入Bean成功后将每个1 s 发出消息)

模拟消费者接受消息

· 通过SparkStreaming 实时读取流数据

由于 idea 编辑器对scala 函数式——面向对象编程有更加友好的支持,因此测试过程中spark 项目都是idea 编辑

关于idea 的使用不做赘述,针对spark 项目开发只需要安装插件scala 即可

添加依赖:

<properties> <spark.version>2.3.2</spark.version> <scala.version>2.11</scala.version></properties><dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.0</version> </dependency></dependencies>

测试案例

package com.fengmang.statimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}import org.joda.time.DateTimeobject StatKafkaStreaming { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("statKafkaStreaming").setMaster("local[2]"); val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); val sc = sparkSession.sparkContext; sc.setLogLevel("WARN") //设置日志级别 val kafkaParams = Map( "bootstrap.servers" -> "192.168.52.40:9092,192.168.52.35:9092,192.168.52.41:9092", //brokers "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "spark_group" //kafka 消费者group.id ) val streamingContext = new StreamingContext(sc,Seconds(10)); 
	//实时流数据对象,每10s 从kafka 读取一次数据 val topic = Array("stat") //主题 val inputDStream = KafkaUtils.createDirectStream[String, String]( 
	//读取数据流 streamingContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topic, kafkaParams) ) val date = new DateTime()// inputDStream.foreachRDD(_.foreach(println(_))) //输出数据 inputDStream.foreachRDD(_.foreach(reccord => println( date + ":" + reccord.value()))) //输出数据,与上面函数表达式功能相同 streamingContext.start() //开启实时数据流任务 streamingContext.awaitTermination() }}

同时开启上面springboot 项目的消息生产者和StatKafkaStreaming 流数据读取任务

5. 问题点

1) 异常问题一:调试Spark遇到的问题

spark 程序项目打包执行时,出现scala 版本与spark 不匹配所致

Exception in thread "main" java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction1$mcII$sp

查看当前运行版本spark 所支持的scala 版本,在spark 安装目录下查看jar 包中的scala 包版本,然后替换成相对应的版本

2) 异常问题二

component.AbstractLifeCycle: FAILED ServerConnector@4a9cc6cb{HTTP/1.1}{0.0.0.0:4040}: java.net.BindException: Address already in use

java.net.BindException: Address already in use

每启动一个SparkContext 时sparkUI 就会默认使用4040 端口,当其被占用(即已经开启了SaprkCotext),新开启的SparkContext 会尝试连接4041端口

3) SparkContext 冲突的问题

一个JVM 中默认只运行一个SparkContext,SparkContext 可以通过new StreamingContext(sc,Seconds(10)) 获取

但是通过new StreamingContext(sparkConf.Seconds(10)) 就会冲突

该报错是因为创建了多个sparkContext, 一般是因为在创建StreamingContext的时候使用了SparkContext而非SparkConf,如下:

val sc = new SparkContext(new SparkConf()) val ssc = new StreamingContext(sc,Minutes(5))

Exception in thread "main" org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:

4) DataFrame 操作时内存溢出

dataFrame.take()

dataFrame.takeAsList(n) //获取前n 行并且以list 的方式展示

take 和 takeAsList 会将获取的数据返回到driver端,因此需要特别注意使用这两个方法时,返回的数据量,避免OOM 内存溢出

5) 引入SparkSession 下的隐式函数失败

val sparkSession= SparkSession.builder.appName("SparkSqlTest").master("local").getOrCreate()

//需要引入当前sparkSession 下的隐式函数,否则 $ 将被当做不可识别符号

import sparkSession.implicits._

dataFrame.select($"weight" + 10)

6) 远程服务无法访问producer和consumer,异常报连接超时和

Marking the coordinator dead for group(Kafka)

问题在于服务器没有配置kafka 服务器需要监听的端口号,如果服务是在本地运行可不用配置,会使用localhost默认地址。

如是远程访问得必须配置 listeners=PLAINTEXT:// 192.168.1.1:9092

7) Kafka 与springboot 版本兼容问题(上文已经提到)

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码