百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

spark与spring集成做web接口

toyiye 2024-06-21 12:18 12 浏览 0 评论

需要实现的功能:

写访问spark的接口,也就是从web上输入网址就能把我们需要的信息通过提交一个job然后返回给我们json数据。

成果展示:

通过url请求,然后的到一个wordcount的json结果(借助的是谷歌浏览器postman插件显示的,直接在浏览器上输入网址是一样的效果)

使用的关键技术:

java语言编程,springmvc框架,tomcat容器,spark框架,scala相关依赖

成体架构:

我使用的是maven构建的一个web工程,pom文件如下:

<dependencies>

<!-- https://mvnrepository.com/artifact/junit/junit -->

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.12</version>

<scope>test</scope>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_2.11</artifactId>

<version>1.6.3</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11 -->

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>1.6.3</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->

<dependency>

<groupId>org.scala-lang</groupId>

<artifactId>scala-library</artifactId>

<version>2.11.11</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-reflect -->

<dependency>

<groupId>org.scala-lang</groupId>

<artifactId>scala-reflect</artifactId>

<version>2.11.11</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-compiler -->

<dependency>

<groupId>org.scala-lang</groupId>

<artifactId>scala-compiler</artifactId>

<version>2.11.11</version>

</dependency>

<!-- spring框架的相关jar包 -->

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-context</artifactId>

<version>4.3.4.RELEASE</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.springframework/spring-jdbc -->

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-jdbc</artifactId>

<version>4.3.4.RELEASE</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.springframework/spring-web -->

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-web</artifactId>

<version>4.3.4.RELEASE</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.springframework/spring-webmvc -->

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-webmvc</artifactId>

<version>4.3.4.RELEASE</version>

</dependency>

<!--添加持久层框架(mybatise)-->

<!-- https://mvnrepository.com/artifact/org.mybatis/mybatis -->

<dependency>

<groupId>org.mybatis</groupId>

<artifactId>mybatis</artifactId>

<version>3.4.1</version>

</dependency>

<!--mybatise和spring整合包-->

<!-- https://mvnrepository.com/artifact/org.mybatis/mybatis-spring -->

<dependency>

<groupId>org.mybatis</groupId>

<artifactId>mybatis-spring</artifactId>

<version>1.3.0</version>

</dependency>

<!-- -->

<dependency>

<groupId>commons-DBCP</groupId>

<artifactId>commons-DBCP</artifactId>

<version>1.4</version>

</dependency>

<dependency>

<groupId>org.aspectj</groupId>

<artifactId>aspectjweaver</artifactId>

<version>1.8.9</version>

</dependency>

<!--添加连接池的jar包-->

<!-- https://mvnrepository.com/artifact/com.alibaba/druid -->

<dependency>

<groupId>com.alibaba</groupId>

<artifactId>druid</artifactId>

<version>1.0.18</version>

</dependency>

<!--添加数据库驱动-->

<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

<version>5.1.39</version>

</dependency>

<!-- 日志处理 -->

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->

<dependency>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-api</artifactId>

<version>1.7.21</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->

<dependency>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-log4j12</artifactId>

<version>1.7.21</version>

</dependency>

<!-- https://mvnrepository.com/artifact/log4j/log4j -->

<dependency>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

<version>1.2.17</version>

</dependency>

<!--json相关的依赖,不要使用jackson的依赖-->

<dependency>

<groupId>net.sf.json-lib</groupId>

<artifactId>json-lib</artifactId>

<version>2.4</version>

<classifier>jdk15</classifier>

</dependency>

</dependencies>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134

web.xml的配置(这里只配置了springmvc容器)

<?xml version="1.0" encoding="UTF-8"?>

<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" id="WebApp_ID" version="3.0">

<display-name>Archetype Created Web Application</display-name>

<!-- springmvc的前端控制器 -->

<servlet>

<servlet-name>manager</servlet-name>

<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>

<!-- contextConfigLocation不是必须的, 如果不配置contextConfigLocation, springmvc的配置文件默认在:WEB-INF/servlet的name+"-servlet.xml" -->

<init-param>

<param-name>contextConfigLocation</param-name>

<param-value>classpath:springmvc.xml</param-value>

</init-param>

<load-on-startup>1</load-on-startup>

</servlet>

<servlet-mapping>

<servlet-name>manager</servlet-name>

<url-pattern>/</url-pattern>

</servlet-mapping>

<!-- 解决post乱码 -->

<filter>

<filter-name>CharacterEncodingFilter</filter-name>

<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>

<init-param>

<param-name>encoding</param-name>

<param-value>utf-8</param-value>

</init-param>

</filter>

<filter-mapping>

<filter-name>CharacterEncodingFilter</filter-name>

<url-pattern>/*</url-pattern>

</filter-mapping>

<!-- 日志配置 -->

<context-param>

<param-name>log4jConfigLocation</param-name>

<param-value>classpath:log4j.properties</param-value>

</context-param>

<listener>

<listener-class>org.springframework.web.util.Log4jConfigListener</listener-class>

</listener>

</web-app>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

然后就是springMVC的配置文件

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"

xmlns:context="http://www.springframework.org/schema/context"

xmlns:mvc="http://www.springframework.org/schema/mvc"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd

http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd

http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

<!-- 配置包扫描器 -->

<context:component-scan base-package="com.zzrenfeng.zhsx.controller" />

<!-- 配置注解驱动 -->

<mvc:annotation-driven />

<context:component-scan base-package="com.zzrenfeng.zhsx.service"></context:component-scan>

<context:component-scan base-package="com.zzrenfeng.zhsx.spark.service"></context:component-scan>

<context:component-scan base-package="com.zzrenfeng.zhsx.spark.conf"></context:component-scan>

</beans>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

配置文件就就没有了,如果有需要可以再去集成其他的,下面进入编码的介绍

对象和json相互转换的工具类:

(为什么使用手动的去转换,而没有使用jackson的相关依赖进行自动转换,是我在使用的时候发现使用jackson会对咱们的spark作业有影响,spark作业会异常终止掉)

package com.zzrenfeng.zhsx.util;

import java.lang.reflect.Field;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.Iterator;

import java.util.List;

import java.util.Map;

import java.util.Set;

import net.sf.json.JSONArray;

import net.sf.json.JSONObject;

import net.sf.json.JsonConfig;

/**

* Json与javaBean之间的转换工具类

*

* @author

* @version

*

* {@code 现使用json-lib组件实现

* 需要

* json-lib-2.4-jdk15.jar

* ezmorph-1.0.6.jar

* commons-collections-3.1.jar

* commons-lang-2.0.jar

* 支持

* }

*/

public class JsonUtil {

/**

* 从一个JSON 对象字符格式中得到一个java对象

*

* @param jsonString

* @param beanCalss

* @return

*/

@SuppressWarnings("unchecked")

public static <T> T jsonToBean(String jsonString, Class<T> beanCalss) {

JSONObject jsonObject = JSONObject.fromObject(jsonString);

T bean = (T) JSONObject.toBean(jsonObject, beanCalss);

return bean;

}

/**

* 将java对象转换成json字符串

*

* @param bean

* @return

*/

public static String beanToJson(Object bean) {

JSONObject json = JSONObject.fromObject(bean);

return json.toString();

}

/**

* 将java对象转换成json字符串

*

* @param bean

* @return

*/

public static String beanToJson(Object bean, String[] _nory_changes, boolean nory) {

JSONObject json = null;

if(nory){//转换_nory_changes里的属性

Field[] fields = bean.getClass().getDeclaredFields();

String str = "";

for(Field field : fields){

// System.out.println(field.getName());

str+=(":"+field.getName());

}

fields = bean.getClass().getSuperclass().getDeclaredFields();

for(Field field : fields){

// System.out.println(field.getName());

str+=(":"+field.getName());

}

str+=":";

for(String s : _nory_changes){

str = str.replace(":"+s+":", ":");

}

json = JSONObject.fromObject(bean,configJson(str.split(":")));

}else{//转换除了_nory_changes里的属性

json = JSONObject.fromObject(bean,configJson(_nory_changes));

}

return json.toString();

}

private static JsonConfig configJson(String[] excludes) {

JsonConfig jsonConfig = new JsonConfig();

jsonConfig.setExcludes(excludes);

//

jsonConfig.setIgnoreDefaultExcludes(false);

//

// jsonConfig.setCycleDetectionStrategy(CycleDetectionStrategy.LENIENT);

// jsonConfig.registerJsonValueProcessor(Date.class,

//

// new DateJsonValueProcessor(datePattern));

return jsonConfig;

}

/**

* 将java对象List集合转换成json字符串

* @param beans

* @return

*/

@SuppressWarnings("unchecked")

public static String beanListToJson(List beans) {

StringBuffer rest = new StringBuffer();

rest.append("[");

int size = beans.size();

for (int i = 0; i < size; i++) {

rest.append(beanToJson(beans.get(i))+((i<size-1)?",":""));

}

rest.append("]");

return rest.toString();

}

/**

*

* @param beans

* @param _no_changes

* @return

*/

@SuppressWarnings("unchecked")

public static String beanListToJson(List beans, String[] _nory_changes, boolean nory) {

StringBuffer rest = new StringBuffer();

rest.append("[");

int size = beans.size();

for (int i = 0; i < size; i++) {

try{

rest.append(beanToJson(beans.get(i),_nory_changes,nory));

if(i<size-1){

rest.append(",");

}

}catch(Exception e){

e.printStackTrace();

}

}

rest.append("]");

return rest.toString();

}

/**

* 从json HASH表达式中获取一个map,改map支持嵌套功能

*

* @param jsonString

* @return

*/

@SuppressWarnings({ "unchecked" })

public static Map jsonToMap(String jsonString) {

JSONObject jsonObject = JSONObject.fromObject(jsonString);

Iterator keyIter = jsonObject.keys();

String key;

Object value;

Map valueMap = new HashMap();

while (keyIter.hasNext()) {

key = (String) keyIter.next();

value = jsonObject.get(key).toString();

valueMap.put(key, value);

}

return valueMap;

}

/**

* map集合转换成json格式数据

* @param map

* @return

*/

public static String mapToJson(Map<String, ?> map, String[] _nory_changes, boolean nory){

String s_json = "{";

Set<String> key = map.keySet();

for (Iterator<?> it = key.iterator(); it.hasNext();) {

String s = (String) it.next();

if(map.get(s) == null){

}else if(map.get(s) instanceof List<?>){

s_json+=(s+":"+JsonUtil.beanListToJson((List<?>)map.get(s), _nory_changes, nory));

}else{

JSONObject json = JSONObject.fromObject(map);

s_json += (s+":"+json.toString());;

}

if(it.hasNext()){

s_json+=",";

}

}

s_json+="}";

return s_json;

}

/**

* 从json数组中得到相应java数组

*

* @param jsonString

* @return

*/

public static Object[] jsonToObjectArray(String jsonString) {

JSONArray jsonArray = JSONArray.fromObject(jsonString);

return jsonArray.toArray();

}

public static String listToJson(List<?> list) {

JSONArray jsonArray = JSONArray.fromObject(list);

return jsonArray.toString();

}

/**

* 从json对象集合表达式中得到一个java对象列表

*

* @param jsonString

* @param beanClass

* @return

*/

@SuppressWarnings("unchecked")

public static <T> List<T> jsonToBeanList(String jsonString, Class<T> beanClass) {

JSONArray jsonArray = JSONArray.fromObject(jsonString);

JSONObject jsonObject;

T bean;

int size = jsonArray.size();

List<T> list = new ArrayList<T>(size);

for (int i = 0; i < size; i++) {

jsonObject = jsonArray.getJSONObject(i);

bean = (T) JSONObject.toBean(jsonObject, beanClass);

list.add(bean);

}

return list;

}

/**

* 从json数组中解析出java字符串数组

*

* @param jsonString

* @return

*/

public static String[] jsonToStringArray(String jsonString) {

JSONArray jsonArray = JSONArray.fromObject(jsonString);

String[] stringArray = new String[jsonArray.size()];

int size = jsonArray.size();

for (int i = 0; i < size; i++) {

stringArray[i] = jsonArray.getString(i);

}

return stringArray;

}

/**

* 从json数组中解析出javaLong型对象数组

*

* @param jsonString

* @return

*/

public static Long[] jsonToLongArray(String jsonString) {

JSONArray jsonArray = JSONArray.fromObject(jsonString);

int size = jsonArray.size();

Long[] longArray = new Long[size];

for (int i = 0; i < size; i++) {

longArray[i] = jsonArray.getLong(i);

}

return longArray;

}

/**

* 从json数组中解析出java Integer型对象数组

*

* @param jsonString

* @return

*/

public static Integer[] jsonToIntegerArray(String jsonString) {

JSONArray jsonArray = JSONArray.fromObject(jsonString);

int size = jsonArray.size();

Integer[] integerArray = new Integer[size];

for (int i = 0; i < size; i++) {

integerArray[i] = jsonArray.getInt(i);

}

return integerArray;

}

/**

* 从json数组中解析出java Double型对象数组

*

* @param jsonString

* @return

*/

public static Double[] jsonToDoubleArray(String jsonString) {

JSONArray jsonArray = JSONArray.fromObject(jsonString);

int size = jsonArray.size();

Double[] doubleArray = new Double[size];

for (int i = 0; i < size; i++) {

doubleArray[i] = jsonArray.getDouble(i);

}

return doubleArray;

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347
  • 348
  • 349
  • 350
  • 351
  • 352
  • 353
  • 354
  • 355
  • 356
  • 357
  • 358
  • 359
  • 360
  • 361
  • 362
  • 363
  • 364
  • 365
  • 366
  • 367
  • 368
  • 369
  • 370
  • 371
  • 372
  • 373
  • 374
  • 375
  • 376
  • 377
  • 378
  • 379
  • 380
  • 381

spark的工具类:(主要负责sparkcontext的初始化工作)

package com.zzrenfeng.zhsx.spark.conf;

import java.io.Serializable;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaSparkContext;

import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;

import org.springframework.stereotype.Component;

@Component

public class ApplicationConfiguration implements Serializable{

private static final long serialVersionUID = 1L;

public SparkConf sparkconf(){

SparkConf conf = new SparkConf()

.setMaster("local[*]")

.setAppName("wc");

return conf;

}

public JavaSparkContext javaSparkContext(){

return new JavaSparkContext(sparkconf());

}

public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {

return new PropertySourcesPlaceholderConfigurer();

}

public String filePath(){

return "E:\\测试文件\\nlog.txt";

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

wordcount model类(对wordcount进行封装)

package com.zzrenfeng.zhsx.spark.domain;

import scala.Serializable;

public class WordCount implements Serializable{

/**

*

*/

private static final long serialVersionUID = 1L;

private String word;

private Integer count;

public WordCount(){}

public WordCount(String v1, int l) {

word = v1;

count = l;

}

public String getWord() {

return word;

}

public void setWord(String word) {

this.word = word;

}

public int getCount() {

return count;

}

public void setCount(int count) {

this.count = count;

}

@Override

public String toString() {

return "WordCount [word=" + word + ", count=" + count + "]";

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

spark service类,主要是负责spark word count的job任务逻辑

package com.zzrenfeng.zhsx.spark.service;

import java.util.Arrays;

import java.util.List;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import scala.Tuple2;

import com.zzrenfeng.zhsx.spark.conf.ApplicationConfiguration;

import com.zzrenfeng.zhsx.spark.domain.WordCount;

@Component

public class SparkServiceTest implements java.io.Serializable{

@Autowired

ApplicationConfiguration applicationConfiguration;

public List<WordCount> doWordCount(){

JavaSparkContext javaSparkContext = applicationConfiguration.javaSparkContext();

System.out.println(javaSparkContext);

JavaRDD<String> file = javaSparkContext.textFile(applicationConfiguration.filePath());

JavaRDD<String> worlds = file.flatMap(new FlatMapFunction<String, String>() {

@Override

public Iterable<String> call(String t) throws Exception {

// TODO Auto-generated method stub

List<String> list = Arrays.asList(t.split(" "));

return list;

}

});

JavaRDD<WordCount> wordcount = worlds.map(new Function<String, WordCount>() {

@Override

public WordCount call(String v1) throws Exception {

return new WordCount(v1,1);

}

});

JavaPairRDD<String, Integer> pairwordCount = wordcount.mapToPair(new PairFunction<WordCount, String, Integer>() {

@Override

public Tuple2<String, Integer> call(WordCount t) throws Exception {

// TODO Auto-generated method stub

return new Tuple2<>(t.getWord() , new Integer(t.getCount()));

}

});

JavaPairRDD<String, Integer> worldCounts = pairwordCount.reduceByKey(new Function2<Integer, Integer, Integer>() {

@Override

public Integer call(Integer v1, Integer v2) throws Exception {

// TODO Auto-generated method stub

return v1+v2;

}

});

JavaRDD result = worldCounts.map(new Function<Tuple2<String,Integer>, WordCount>() {

@Override

public WordCount call(Tuple2<String, Integer> v1) throws Exception {

// TODO Auto-generated method stub

return new WordCount(v1._1,v1._2);

}

});

List<WordCount> list = result.collect();

javaSparkContext.close();

System.out.println(list.toString());

return list;

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79

controller层,主要负责请求的拦截

package com.zzrenfeng.zhsx.controller;

import java.util.ArrayList;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Controller;

import org.springframework.ui.Model;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RequestMethod;

import org.springframework.web.bind.annotation.ResponseBody;

import com.zzrenfeng.zhsx.spark.domain.WordCount;

import com.zzrenfeng.zhsx.spark.service.SparkServiceTest;

import com.zzrenfeng.zhsx.util.JsonUtil;

@Controller

@RequestMapping("hello")

public class ControllerTest {

@Autowired

private SparkServiceTest sparkServiceTest;

@RequestMapping("wc")

@ResponseBody

public String wordCount(){

List<WordCount> list = sparkServiceTest.doWordCount();

return JsonUtil.listToJson(list);

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

进行启动,然后在浏览器上输入上面的拦截的url就可以看到开始出现的结果了。

应为这是个web接口,所以可以从各个端去调用,甚至可以用其他语言去调用。

现在可以愉快的去撸spark代码了,也许有人会问spark不应该用scala开发更好吗?

个人认为如果是纯粹的数据处理可以使用scala,编写起来太爽了,但是跟其他的集成的时候最好还是用java,毕竟有问题了还可以跟java大牛去讨论讨论。

欢迎有兴趣的一起来探讨

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码