需要实现的功能:
写访问spark的接口,也就是从web上输入网址就能把我们需要的信息通过提交一个job然后返回给我们json数据。
成果展示:
通过url请求,然后的到一个wordcount的json结果(借助的是谷歌浏览器postman插件显示的,直接在浏览器上输入网址是一样的效果)
使用的关键技术:
java语言编程,springmvc框架,tomcat容器,spark框架,scala相关依赖
成体架构:
我使用的是maven构建的一个web工程,pom文件如下:
<dependencies>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>1.6.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>1.6.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-reflect -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-compiler -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.11</version>
</dependency>
<!-- spring框架的相关jar包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.4.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-jdbc -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>4.3.4.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-web -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>4.3.4.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-webmvc -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>4.3.4.RELEASE</version>
</dependency>
<!--添加持久层框架(mybatise)-->
<!-- https://mvnrepository.com/artifact/org.mybatis/mybatis -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.4.1</version>
</dependency>
<!--mybatise和spring整合包-->
<!-- https://mvnrepository.com/artifact/org.mybatis/mybatis-spring -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>1.3.0</version>
</dependency>
<!-- -->
<dependency>
<groupId>commons-DBCP</groupId>
<artifactId>commons-DBCP</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.8.9</version>
</dependency>
<!--添加连接池的jar包-->
<!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.18</version>
</dependency>
<!--添加数据库驱动-->
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>
<!-- 日志处理 -->
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!--json相关的依赖,不要使用jackson的依赖-->
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
</dependencies>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
web.xml的配置(这里只配置了springmvc容器)
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" id="WebApp_ID" version="3.0">
<display-name>Archetype Created Web Application</display-name>
<!-- springmvc的前端控制器 -->
<servlet>
<servlet-name>manager</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<!-- contextConfigLocation不是必须的, 如果不配置contextConfigLocation, springmvc的配置文件默认在:WEB-INF/servlet的name+"-servlet.xml" -->
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:springmvc.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>manager</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
<!-- 解决post乱码 -->
<filter>
<filter-name>CharacterEncodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>utf-8</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>CharacterEncodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<!-- 日志配置 -->
<context-param>
<param-name>log4jConfigLocation</param-name>
<param-value>classpath:log4j.properties</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.util.Log4jConfigListener</listener-class>
</listener>
</web-app>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
然后就是springMVC的配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 配置包扫描器 -->
<context:component-scan base-package="com.zzrenfeng.zhsx.controller" />
<!-- 配置注解驱动 -->
<mvc:annotation-driven />
<context:component-scan base-package="com.zzrenfeng.zhsx.service"></context:component-scan>
<context:component-scan base-package="com.zzrenfeng.zhsx.spark.service"></context:component-scan>
<context:component-scan base-package="com.zzrenfeng.zhsx.spark.conf"></context:component-scan>
</beans>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
配置文件就就没有了,如果有需要可以再去集成其他的,下面进入编码的介绍
对象和json相互转换的工具类:
(为什么使用手动的去转换,而没有使用jackson的相关依赖进行自动转换,是我在使用的时候发现使用jackson会对咱们的spark作业有影响,spark作业会异常终止掉)
package com.zzrenfeng.zhsx.util;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import net.sf.json.JsonConfig;
/**
* Json与javaBean之间的转换工具类
*
* @author
* @version
*
* {@code 现使用json-lib组件实现
* 需要
* json-lib-2.4-jdk15.jar
* ezmorph-1.0.6.jar
* commons-collections-3.1.jar
* commons-lang-2.0.jar
* 支持
* }
*/
public class JsonUtil {
/**
* 从一个JSON 对象字符格式中得到一个java对象
*
* @param jsonString
* @param beanCalss
* @return
*/
@SuppressWarnings("unchecked")
public static <T> T jsonToBean(String jsonString, Class<T> beanCalss) {
JSONObject jsonObject = JSONObject.fromObject(jsonString);
T bean = (T) JSONObject.toBean(jsonObject, beanCalss);
return bean;
}
/**
* 将java对象转换成json字符串
*
* @param bean
* @return
*/
public static String beanToJson(Object bean) {
JSONObject json = JSONObject.fromObject(bean);
return json.toString();
}
/**
* 将java对象转换成json字符串
*
* @param bean
* @return
*/
public static String beanToJson(Object bean, String[] _nory_changes, boolean nory) {
JSONObject json = null;
if(nory){//转换_nory_changes里的属性
Field[] fields = bean.getClass().getDeclaredFields();
String str = "";
for(Field field : fields){
// System.out.println(field.getName());
str+=(":"+field.getName());
}
fields = bean.getClass().getSuperclass().getDeclaredFields();
for(Field field : fields){
// System.out.println(field.getName());
str+=(":"+field.getName());
}
str+=":";
for(String s : _nory_changes){
str = str.replace(":"+s+":", ":");
}
json = JSONObject.fromObject(bean,configJson(str.split(":")));
}else{//转换除了_nory_changes里的属性
json = JSONObject.fromObject(bean,configJson(_nory_changes));
}
return json.toString();
}
private static JsonConfig configJson(String[] excludes) {
JsonConfig jsonConfig = new JsonConfig();
jsonConfig.setExcludes(excludes);
//
jsonConfig.setIgnoreDefaultExcludes(false);
//
// jsonConfig.setCycleDetectionStrategy(CycleDetectionStrategy.LENIENT);
// jsonConfig.registerJsonValueProcessor(Date.class,
//
// new DateJsonValueProcessor(datePattern));
return jsonConfig;
}
/**
* 将java对象List集合转换成json字符串
* @param beans
* @return
*/
@SuppressWarnings("unchecked")
public static String beanListToJson(List beans) {
StringBuffer rest = new StringBuffer();
rest.append("[");
int size = beans.size();
for (int i = 0; i < size; i++) {
rest.append(beanToJson(beans.get(i))+((i<size-1)?",":""));
}
rest.append("]");
return rest.toString();
}
/**
*
* @param beans
* @param _no_changes
* @return
*/
@SuppressWarnings("unchecked")
public static String beanListToJson(List beans, String[] _nory_changes, boolean nory) {
StringBuffer rest = new StringBuffer();
rest.append("[");
int size = beans.size();
for (int i = 0; i < size; i++) {
try{
rest.append(beanToJson(beans.get(i),_nory_changes,nory));
if(i<size-1){
rest.append(",");
}
}catch(Exception e){
e.printStackTrace();
}
}
rest.append("]");
return rest.toString();
}
/**
* 从json HASH表达式中获取一个map,改map支持嵌套功能
*
* @param jsonString
* @return
*/
@SuppressWarnings({ "unchecked" })
public static Map jsonToMap(String jsonString) {
JSONObject jsonObject = JSONObject.fromObject(jsonString);
Iterator keyIter = jsonObject.keys();
String key;
Object value;
Map valueMap = new HashMap();
while (keyIter.hasNext()) {
key = (String) keyIter.next();
value = jsonObject.get(key).toString();
valueMap.put(key, value);
}
return valueMap;
}
/**
* map集合转换成json格式数据
* @param map
* @return
*/
public static String mapToJson(Map<String, ?> map, String[] _nory_changes, boolean nory){
String s_json = "{";
Set<String> key = map.keySet();
for (Iterator<?> it = key.iterator(); it.hasNext();) {
String s = (String) it.next();
if(map.get(s) == null){
}else if(map.get(s) instanceof List<?>){
s_json+=(s+":"+JsonUtil.beanListToJson((List<?>)map.get(s), _nory_changes, nory));
}else{
JSONObject json = JSONObject.fromObject(map);
s_json += (s+":"+json.toString());;
}
if(it.hasNext()){
s_json+=",";
}
}
s_json+="}";
return s_json;
}
/**
* 从json数组中得到相应java数组
*
* @param jsonString
* @return
*/
public static Object[] jsonToObjectArray(String jsonString) {
JSONArray jsonArray = JSONArray.fromObject(jsonString);
return jsonArray.toArray();
}
public static String listToJson(List<?> list) {
JSONArray jsonArray = JSONArray.fromObject(list);
return jsonArray.toString();
}
/**
* 从json对象集合表达式中得到一个java对象列表
*
* @param jsonString
* @param beanClass
* @return
*/
@SuppressWarnings("unchecked")
public static <T> List<T> jsonToBeanList(String jsonString, Class<T> beanClass) {
JSONArray jsonArray = JSONArray.fromObject(jsonString);
JSONObject jsonObject;
T bean;
int size = jsonArray.size();
List<T> list = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
jsonObject = jsonArray.getJSONObject(i);
bean = (T) JSONObject.toBean(jsonObject, beanClass);
list.add(bean);
}
return list;
}
/**
* 从json数组中解析出java字符串数组
*
* @param jsonString
* @return
*/
public static String[] jsonToStringArray(String jsonString) {
JSONArray jsonArray = JSONArray.fromObject(jsonString);
String[] stringArray = new String[jsonArray.size()];
int size = jsonArray.size();
for (int i = 0; i < size; i++) {
stringArray[i] = jsonArray.getString(i);
}
return stringArray;
}
/**
* 从json数组中解析出javaLong型对象数组
*
* @param jsonString
* @return
*/
public static Long[] jsonToLongArray(String jsonString) {
JSONArray jsonArray = JSONArray.fromObject(jsonString);
int size = jsonArray.size();
Long[] longArray = new Long[size];
for (int i = 0; i < size; i++) {
longArray[i] = jsonArray.getLong(i);
}
return longArray;
}
/**
* 从json数组中解析出java Integer型对象数组
*
* @param jsonString
* @return
*/
public static Integer[] jsonToIntegerArray(String jsonString) {
JSONArray jsonArray = JSONArray.fromObject(jsonString);
int size = jsonArray.size();
Integer[] integerArray = new Integer[size];
for (int i = 0; i < size; i++) {
integerArray[i] = jsonArray.getInt(i);
}
return integerArray;
}
/**
* 从json数组中解析出java Double型对象数组
*
* @param jsonString
* @return
*/
public static Double[] jsonToDoubleArray(String jsonString) {
JSONArray jsonArray = JSONArray.fromObject(jsonString);
int size = jsonArray.size();
Double[] doubleArray = new Double[size];
for (int i = 0; i < size; i++) {
doubleArray[i] = jsonArray.getDouble(i);
}
return doubleArray;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
- 203
- 204
- 205
- 206
- 207
- 208
- 209
- 210
- 211
- 212
- 213
- 214
- 215
- 216
- 217
- 218
- 219
- 220
- 221
- 222
- 223
- 224
- 225
- 226
- 227
- 228
- 229
- 230
- 231
- 232
- 233
- 234
- 235
- 236
- 237
- 238
- 239
- 240
- 241
- 242
- 243
- 244
- 245
- 246
- 247
- 248
- 249
- 250
- 251
- 252
- 253
- 254
- 255
- 256
- 257
- 258
- 259
- 260
- 261
- 262
- 263
- 264
- 265
- 266
- 267
- 268
- 269
- 270
- 271
- 272
- 273
- 274
- 275
- 276
- 277
- 278
- 279
- 280
- 281
- 282
- 283
- 284
- 285
- 286
- 287
- 288
- 289
- 290
- 291
- 292
- 293
- 294
- 295
- 296
- 297
- 298
- 299
- 300
- 301
- 302
- 303
- 304
- 305
- 306
- 307
- 308
- 309
- 310
- 311
- 312
- 313
- 314
- 315
- 316
- 317
- 318
- 319
- 320
- 321
- 322
- 323
- 324
- 325
- 326
- 327
- 328
- 329
- 330
- 331
- 332
- 333
- 334
- 335
- 336
- 337
- 338
- 339
- 340
- 341
- 342
- 343
- 344
- 345
- 346
- 347
- 348
- 349
- 350
- 351
- 352
- 353
- 354
- 355
- 356
- 357
- 358
- 359
- 360
- 361
- 362
- 363
- 364
- 365
- 366
- 367
- 368
- 369
- 370
- 371
- 372
- 373
- 374
- 375
- 376
- 377
- 378
- 379
- 380
- 381
spark的工具类:(主要负责sparkcontext的初始化工作)
package com.zzrenfeng.zhsx.spark.conf;
import java.io.Serializable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.stereotype.Component;
@Component
public class ApplicationConfiguration implements Serializable{
private static final long serialVersionUID = 1L;
public SparkConf sparkconf(){
SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("wc");
return conf;
}
public JavaSparkContext javaSparkContext(){
return new JavaSparkContext(sparkconf());
}
public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
return new PropertySourcesPlaceholderConfigurer();
}
public String filePath(){
return "E:\\测试文件\\nlog.txt";
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
wordcount model类(对wordcount进行封装)
package com.zzrenfeng.zhsx.spark.domain;
import scala.Serializable;
public class WordCount implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String word;
private Integer count;
public WordCount(){}
public WordCount(String v1, int l) {
word = v1;
count = l;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
@Override
public String toString() {
return "WordCount [word=" + word + ", count=" + count + "]";
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
spark service类,主要是负责spark word count的job任务逻辑
package com.zzrenfeng.zhsx.spark.service;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import com.zzrenfeng.zhsx.spark.conf.ApplicationConfiguration;
import com.zzrenfeng.zhsx.spark.domain.WordCount;
@Component
public class SparkServiceTest implements java.io.Serializable{
@Autowired
ApplicationConfiguration applicationConfiguration;
public List<WordCount> doWordCount(){
JavaSparkContext javaSparkContext = applicationConfiguration.javaSparkContext();
System.out.println(javaSparkContext);
JavaRDD<String> file = javaSparkContext.textFile(applicationConfiguration.filePath());
JavaRDD<String> worlds = file.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String t) throws Exception {
// TODO Auto-generated method stub
List<String> list = Arrays.asList(t.split(" "));
return list;
}
});
JavaRDD<WordCount> wordcount = worlds.map(new Function<String, WordCount>() {
@Override
public WordCount call(String v1) throws Exception {
return new WordCount(v1,1);
}
});
JavaPairRDD<String, Integer> pairwordCount = wordcount.mapToPair(new PairFunction<WordCount, String, Integer>() {
@Override
public Tuple2<String, Integer> call(WordCount t) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<>(t.getWord() , new Integer(t.getCount()));
}
});
JavaPairRDD<String, Integer> worldCounts = pairwordCount.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1+v2;
}
});
JavaRDD result = worldCounts.map(new Function<Tuple2<String,Integer>, WordCount>() {
@Override
public WordCount call(Tuple2<String, Integer> v1) throws Exception {
// TODO Auto-generated method stub
return new WordCount(v1._1,v1._2);
}
});
List<WordCount> list = result.collect();
javaSparkContext.close();
System.out.println(list.toString());
return list;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
controller层,主要负责请求的拦截
package com.zzrenfeng.zhsx.controller;
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import com.zzrenfeng.zhsx.spark.domain.WordCount;
import com.zzrenfeng.zhsx.spark.service.SparkServiceTest;
import com.zzrenfeng.zhsx.util.JsonUtil;
@Controller
@RequestMapping("hello")
public class ControllerTest {
@Autowired
private SparkServiceTest sparkServiceTest;
@RequestMapping("wc")
@ResponseBody
public String wordCount(){
List<WordCount> list = sparkServiceTest.doWordCount();
return JsonUtil.listToJson(list);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
进行启动,然后在浏览器上输入上面的拦截的url就可以看到开始出现的结果了。
应为这是个web接口,所以可以从各个端去调用,甚至可以用其他语言去调用。
现在可以愉快的去撸spark代码了,也许有人会问spark不应该用scala开发更好吗?
个人认为如果是纯粹的数据处理可以使用scala,编写起来太爽了,但是跟其他的集成的时候最好还是用java,毕竟有问题了还可以跟java大牛去讨论讨论。
欢迎有兴趣的一起来探讨