在任何基于 rest-api 的应用程序中,需要拦截对应用程序的请求并执行多个操作只是时间问题。如果这些操作是需要应用于应用程序的所有请求的操作,那么过滤器的使用是有意义的,例如安全性。
在基于 Servlet 的应用程序中,我们曾经有ContentCachingRequestWrapper和ContentCachingResponseWrapper。我们在 WebFlux 环境中寻找与上述相同的品质。
等价的解决方案是webflux包提供的装饰器类:ServerHttpRequestDecorator、ServerHttpResponeDecorator、ServerWebExchangeDecorator。
让我们从一个简单的基于 Flux 的 api 开始。
首先我们导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
我们为发布请求创建一个简单的模型。
package com.gkatzioura.reactor.fluxfiltercapture;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Info {
private String description;
}
和回应
package com.gkatzioura.reactor.fluxfiltercapture;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class InfoResponse {
private boolean success;
public static InfoResponse successful() {
return InfoResponse.builder().success(true).build();
}
}
将实现使用模型的控制器。控制器将是一个简单的回声。
package com.gkatzioura.reactor.fluxfiltercapture;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
public class InfoController {
@PostMapping("/info")
public Mono<InfoResponse> getInfo(@RequestBody Info info) {
return Mono.just(InfoResponse.builder().success(true).build());
}
}
curl POST 可以帮助我们调试。
curl --location --request POST 'http://localhost:8080/info' \
--header 'Content-Type: application/json' \
--data-raw '{
"description": "Check"
}'
您在 Webflux 上的典型过滤器必须实现 WebFilter 接口,然后如果已注释,则运行时将拾取。
@Component
public class ExampleFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange serverWebExchange,
WebFilterChain webFilterChain) {
return webFilterChain.filter(serverWebExchange);
}
}
在我们的例子中,我们希望同时跟踪响应和请求正文。
让我们从创建一个 ServerHttpRequestDecorator 实现开始。
package com.gkatzioura.reactor.fluxfiltercapture;
import java.nio.charset.StandardCharsets;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import reactor.core.publisher.Flux;
public class BodyCaptureRequest extends ServerHttpRequestDecorator {
private final StringBuilder body = new StringBuilder();
public BodyCaptureRequest(ServerHttpRequest delegate) {
super(delegate);
}
public Flux<DataBuffer> getBody() {
return super.getBody().doOnNext(this::capture);
}
private void capture(DataBuffer buffer) {
this.body.append(StandardCharsets.UTF_8.decode(buffer.asByteBuffer()).toString());
}
public String getFullBody() {
return this.body.toString();
}
}
正如我们在 getBody 实现中看到的那样,我们添加了一个方法,该方法将捕获在实际服务读取正文时流动的字节块。
一旦请求完成,累积的数据将形成实际的主体。
相同的模式将适用于 ServerHttpResponeDecorator 实现。
package com.gkatzioura.reactor.fluxfiltercapture;
import java.nio.charset.StandardCharsets;
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class BodyCaptureResponse extends ServerHttpResponseDecorator {
private final StringBuilder body = new StringBuilder();
public BodyCaptureResponse(ServerHttpResponse delegate) {
super(delegate);
}
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
Flux<DataBuffer> buffer = Flux.from(body);
return super.writeWith(buffer.doOnNext(this::capture));
}
private void capture(DataBuffer buffer) {
this.body.append(StandardCharsets.UTF_8.decode(buffer.asByteBuffer()).toString());
}
public String getFullBody() {
return this.body.toString();
}
}
这里我们重写了 writeWith 函数。这些数据被写入并推送到我们用 Flux 装饰参数的流中,以便能够在 doOnNext 上使用方法。
在这两种情况下,正文和响应的字节都会累积。这可能适用于特定用例,例如更改请求/响应。如果您的用例仅通过将字节流式传输到另一个系统来涵盖,则无需累积,只需更改 getBody 和 writeWith 上的函数,流式传输数据即可完成工作。
让我们转到扩展 ServerWebExchangeDecorator 的父装饰器。
package com.gkatzioura.reactor.fluxfiltercapture;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.ServerWebExchangeDecorator;
public class BodyCaptureExchange extends ServerWebExchangeDecorator {
private BodyCaptureRequest bodyCaptureRequest;
private BodyCaptureResponse bodyCaptureResponse;
public BodyCaptureExchange(ServerWebExchange exchange) {
super(exchange);
this.bodyCaptureRequest = new BodyCaptureRequest(exchange.getRequest());
this.bodyCaptureResponse = new BodyCaptureResponse(exchange.getResponse());
}
@Override
public BodyCaptureRequest getRequest() {
return bodyCaptureRequest;
}
@Override
public BodyCaptureResponse getResponse() {
return bodyCaptureResponse;
}
}
是时候关注我们的过滤器了。为了使示例简单,我们将在控制台上打印请求和响应正文。
package com.gkatzioura.reactor.fluxfiltercapture;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
@Component
public class CustomWebFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange serverWebExchange,
WebFilterChain webFilterChain) {
BodyCaptureExchange bodyCaptureExchange = new BodyCaptureExchange(serverWebExchange);
return webFilterChain.filter(bodyCaptureExchange).doOnSuccess( (se) -> {
System.out.println("Body request "+bodyCaptureExchange.getRequest().getFullBody());
System.out.println("Body response "+bodyCaptureExchange.getResponse().getFullBody());
});
}
}
如果我们最终运行上面的 Curl,我们将打印请求和响应的正文。你可以在github
上找到源代码。
学习更多JAVA知识与技巧,关注与私信博主(学习)