百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

停止使用Pandas并开始使用Spark+Scala

toyiye 2024-06-21 12:37 13 浏览 0 评论

为什么数据科学家和工程师应该考虑将Spark与Scala结合使用以替代Pandas,以及如何入门

> Source: https://unsplash.com/photos/8IGKYypIZ9k

使用Scala从Pandas迁移到Spark并不像您想象的那么困难,因此,您的代码将运行得更快,并且最终可能会编写出更好的代码。

以数据工程师的经验,我发现在Pandas中建立数据管道经常需要我们定期增加资源,以跟上不断增加的内存使用量。 此外,由于意外的数据类型或空值,我们经常会看到许多运行时错误。 通过将Spark与Scala结合使用,解决方案感觉更强大,重构和扩展更容易。

在本文中,我们将介绍以下内容:

· 为什么要在Spark上使用Scala和Pandas

· Scala Spark API与Pandas API的实际区别不大

· 如何开始使用Jupyter笔记本电脑或您喜欢的IDE

什么是Spark?

· Spark是Apache开源框架

· 它可用作库并在"本地"集群上运行,或在Spark集群上运行

· 在Spark集群上,可以以分布式方式执行代码,其中一个主节点和多个工作节点共享负载

· 即使在本地群集上,您仍然可以看到与Pandas相比的性能提升,我们将在下面介绍原因

为什么要使用Spark?

Spark由于能够快速处理大型数据集而变得流行

· 默认情况下,Spark是多线程的,而Pandas是单线程的

· 可以在Spark集群上以分布式方式执行Spark代码,而Pandas可以在一台机器上运行

· Spark是懒惰的,这意味着它只会在您收集时(即,当您实际上需要退还东西时)才执行,同时它会建立执行计划并找到执行代码的最佳方式

· 这与立即执行的Pandas不同,它会在到达每个步骤时执行每个步骤

· Spark也不太可能用完内存,因为当达到内存限制时它将开始使用磁盘

对于运行时的直观比较,请参见Databricks的以下图表,我们可以看到Spark的运行速度明显快于Pandas,并且Pandas的内存不足以较低的阈值运行。

> https://databricks.com/blog/2018/05/03/benchmarking-apache-spark-on-a-single-node-machine.html

Spark拥有丰富的生态系统

· 数据科学库,例如内置的Spark ML或用于图形算法的Graph X

· Spark Streaming用于实时数据处理

· 与其他系统和文件类型(orc,镶木地板等)的互操作性

为什么要使用Scala而不是PySpark?

Spark提供了熟悉的API,因此使用Scala而不是Python不会感觉像是一个巨大的学习曲线。 以下是一些您可能要使用Scala的原因:

· Scala是一种静态类型的语言,这意味着您会发现代码运行时错误可能比Python少

· Scala还允许您创建不可变的对象,这意味着在引用对象时,您可以确信在创建和调用对象之间其状态没有发生变化。

· Spark是用Scala编写的,因此在Python之前的Scala中可以使用新功能

· 对于数据科学家和数据工程师一起工作,由于Scala代码的类型安全性和不变性,使用Scala可以帮助进行协作。

Spark核心概念

· DataFrame:spark DataFrame是一种与Pandas DataFrame非常相似的数据结构

· Dataset:数据集是类型化的数据框,对于确保数据符合预期的架构非常有用

· RDD:这是Spark中的核心数据结构,在其上构建了DataFrame和Dataset

通常,我们会尽可能使用数据集,因为它们很安全,更有效并且可以提高可读性,因为很明显,我们可以从数据集中获得期望的数据。

数据集 Dataset

要创建数据集,我们首先需要创建一个case类,该类与Python中的数据类相似,并且实际上只是一种指定数据结构的方式。

例如,我们创建一个名为FootballTeam的案例类,其中包含几个字段:

case class FootballTeam(
    name: String,
    league: String,
    matches_played: Int,
    goals_this_season: Int,
    top_goal_scorer: String,
    wins: Int
)

现在,让我们创建这个case类的实例:

val brighton: FootballTeam =
    FootballTeam(
      "Brighton and Hove Albion",
      "Premier League",
      matches_played = 29,
      goals_this_season = 32,
      top_goal_scorer = "Neil Maupay",
      wins = 6
    )

让我们创建另一个名为man City的实例,现在我们将创建具有这两个足球队的数据集:

val teams: Dataset[FootballTeam] = spark.createDataset(Seq(brighton,  
      manCity))

另一种方法是:

val teams: Dataset[FootballTeam] = 
      spark.createDataFrame(Seq(brighton, manCity)).as[FootballTeam]

从外部数据源读取数据并返回DataFrame时,第二种方法很有用,因为您可以将其强制转换为Dataset,这样我们现在有了一个类型化的集合。

数据转换

您可以在Pandas DataFrame中应用的大多数(如果不是全部)数据转换在Spark中可用。 当然,语法上会有差异,有时还有其他需要注意的地方,其中一些我们现在将介绍。

总的来说,我发现Spark与Pandas相比在符号上更一致,并且由于Scala是静态类型的,因此您通常可以执行myDataset。 等待编译器告诉您可用的方法!

让我们从一个简单的转换开始,我们只想向我们的数据集添加一个新列,并为其分配常量值。 在Pandas中,这看起来像:

Pandas

df_teams['sport'] = 'football'

除了语法外,Spark还有一个小的区别,那就是在此新字段中添加一个常量值需要我们导入一个名为lit的spark函数。

Spark

import org.apache.spark.sql.functions.litval 
newTeams = teams.withColumn("sport", lit("football"))

请注意,我们已经创建了一个新对象,因为我们原始的团队数据集是一个val,这意味着它是不可变的。 众所周知,这是一件好事,每当我们使用团队数据集时,我们总是会得到相同的对象。

现在,我们基于函数添加一列。 在Pandas中,它看起来像:

Pandas

def is_prem(league):
    if league == 'Premier League':
        return True
    else:
        return False
df_teams['premier_league'] = df_teams['league'].apply(lambda x: 
                                    is_prem(x))

为了在Spark中执行相同的操作,我们需要序列化该函数,以便Spark可以应用它。 这可以通过使用UserDefinedFunctions来完成。 我们还使用了大小写匹配,因为这在Scala中比if-else更好,但是两者都可以。

我们还需要导入另一个有用的spark函数col,该函数用于引用列。

Spark

import org.apache.spark.sql.functions.col
def isPrem(league: String): Boolean =
    league match {
      case "Premier League" => true
      case _                => false
    }
val isPremUDF: UserDefinedFunction =
    udf[Boolean, String](isPrem)
val teamsWithLeague: DataFrame = teams.withColumn("premier_league",                                                            
          isPremUDF(col("league")))

现在,我们添加了case类中没有的新列,这会将其转换回DataFrame。 因此,我们需要在原始案例类中添加另一个字段(并使用Options使其可以为空),或创建一个新的案例类。

Scala中的Option仅表示该字段可为空。 如果值是null,则使用None;如果填充,则使用Some(" value")。 可选字符串的示例:

val optionalString : Option[String] = Some("something")

要从中获取字符串,我们可以调用optionalString.get(),这将仅返回" something"。 请注意,如果不确定是否为null,则可以使用optionalString.getOrElse(" nothing"),如果为null,它将返回字符串" nothing"。

过滤数据集是另一个常见要求,这是一个示例,说明Spark比Pandas更一致,因为它遵循与其他转换相同的模式,在该转换中我们执行数据集"点"转换(即,dataset.filter(…) )。

Pandas
df_teams = df_teams[df_teams['goals_this_season'] > 50]

Spark
val filteredTeams = teams.filter(col("goals_this_season") > 50)

我们可能需要对数据集执行一些聚合,这在Pandas和Spark中非常相似。

Pandas
df_teams.groupby(['league']).count()

Spark
teams.groupBy("league").count()

对于多个聚合,我们可以再次执行类似于Pandas的操作,并具有映射到聚合的字段。 如果我们要进行自己的聚合,则可以使用UserDefinedAggregations。

teams.agg(Map( "matches_played" -> "avg", "goals_this_season" -> "count"))

通常,我们还想结合多个数据集,这可能与并集:

Pandas
pd.concat([teams, another_teams], ignore_index=True)

Spark
teams.unionByName(anotherTeams)

…或加入:

val players: Dataset[Player] = spark
        .createDataset(Seq(neilMaupey, sergioAguero))
teams.join(players,
        teams.col("top_goal_scorer") === players.col("player_name"),
        "left"
      ).drop("player_name")

在此示例中,我们还使用名为Player的案例类创建了一个新的数据集。 请注意,此案例类有现场伤害,可以为null。

case class Player(player_name: String, goals_scored: Int, injury: Option[String])

请注意,我们删除了player_name列,因为它与top_goal_scorer相同。

我们可能还希望代码的某些部分仅使用Scala本机数据结构,例如Arrays,Lists等。要获得列之一作为Array,我们需要映射到我们的值并调用.collect()。

val teamNames: Array[String] = teams.map(team => team.name) .collect()

请注意,我们可以使用案例类的内置getter返回name字段,如果name不是我们的FootballTeam类中的字段,则不会编译该字段。

顺便说一句,我们也可以将函数添加到case类中,并且在使用诸如IntelliJ或带有Metals插件的vs代码的IDE时,值和函数都将作为自动完成的选项出现。

为了根据数据集是否存在于此数组中来过滤数据集,我们需要通过调用_ *将其视为args序列。

val filteredPlayers: Dataset[Player] = players .filter(col("team").isin(teamNames: _*))

运行一些代码

希望在这一点上,您愿意尝试编写一些Spark代码,即使只是为了看看我是否认为它与Pandas差别不大也是如此。

首先,我们有两种选择。 我们可以使用笔记本,这是获取一些数据并开始玩耍的快速方法。 另外,我们可以建立一个简单的项目。 无论哪种方式,您都需要安装Java 8。

Notebooks

在此示例中,我们将在Jupyter笔记本中使用spylon内核。 https://pypi.org/project/spylon-kernel/。 首先运行以下命令来设置您的笔记本,这应该在浏览器中打开您的笔记本。 然后从可用的内核中选择spylon内核。

 pip install spylon-kernel
 python -m spylon_kernel install
 jupyter notebook

通过将以下内容添加到单元格中,检查我们是否具有正确的Java版本:

!java -version

输出应为:

java version "1.8.0_211"
Java(TM) SE Runtime Environment (build 1.8.0_211-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode)

如果不是,请在您的bash配置文件中检查JAVA_HOME,并确保它指向Java 8。

下一步是安装一些依赖项。 为此,我们可以将以下代码片段添加到新的单元格中。 这设置了一些spark配置,还允许您添加依赖项。 在这里,我添加了一个名为vegas的可视化库。

%%init_spark
launcher.num_executors = 4
launcher.executor_cores = 2
launcher.driver_memory = '4g'
launcher.conf.set("spark.sql.catalogImplementation", "hive")
launcher.packages = ["org.vegas-viz:vegas_2.11:0.3.11",
                    "org.vegas-viz:vegas-spark_2.11:0.3.11"]

要连接到我们的数据源,我们可以定义一个函数,也许像这样:

def getData(file: String): DataFrame = 
        spark.read
        .format("csv")
        .option("header", "true")
        .load(file)

这是与csv文件的连接,但是我们可以连接许多其他数据源。 此函数返回一个DataFrame,我们可能要将其转换为Dataset:

val footballTeams: Dataset[FootballTeam] = 
  getData("footballs_teams.csv").as[FootballTeam]

然后,我们可以开始使用这些数据,并进行我们讨论过的一些数据转换,还有更多。

建立一个项目

现在您已经可以处理一些数据了,您可能想建立一个项目。

要包括的两个主要内容:

· build.sbt-以前我们在一个笔记本单元中添加了依赖项,现在我们需要将它们添加到build.sbt文件中

· SparkSession-在笔记本中我们已经有一个spark会话,这意味着我们能够执行spark.createDataFrame之类的事情。 在我们的项目中,我们需要创建此Spark会话

示例build.sbt:

name := "spark-template"
version := "0.1" 
scalaVersion := "2.12.11"
val sparkVersion = "2.4.3"
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion

示例SparkSession:

import org.apache.spark.sql.SparkSession 
trait SparkSessionWrapper {
       val spark: SparkSession = SparkSession
           .builder()
           .master("local")
           .appName("spark-example")
           .getOrCreate()
}

然后,我们可以使用此包装器扩展对象,这给了我们一个Spark会话。

object RunMyCode extends SparkSessionWrapper { //your code here}

然后,您可以开始编写您的Spark代码!

总而言之,Spark是用于快速数据处理的出色工具,并且在数据世界中越来越流行。 因此,Scala也正变得越来越流行,并且由于其类型安全性,对于可能更熟悉Python和Pandas的数据工程师和数据科学家来说,它是一个不错的选择。 Spark是该语言的出色介绍,因为我们可以使用熟悉的概念(例如DataFrames),因此它并不像一个巨大的学习曲线。

希望这可以给您一个快速的概述,也许使您能够在笔记本或新项目中开始探索Spark。 祝好运!

(本文翻译自Chloe Connor的文章《Stop using Pandas and start using Spark with Scala》,参考:https://towardsdatascience.com/stop-using-pandas-and-start-using-spark-with-scala-f7364077c2e0)

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码