Skip to content

Commit

Permalink
Updates to use ReactiveStringRedisTemplate
Browse files Browse the repository at this point in the history
  • Loading branch information
spencergibb committed May 1, 2019
1 parent 5234f53 commit 9a1fb20
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,11 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.validation.Validator;
import org.springframework.web.reactive.DispatcherHandler;
Expand All @@ -58,22 +55,9 @@ public RedisScript redisRequestRateLimiterScript() {
return redisScript;
}

@Bean
// TODO: replace with ReactiveStringRedisTemplate in future
public ReactiveRedisTemplate<String, String> stringReactiveRedisTemplate(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
RedisSerializer<String> serializer = new StringRedisSerializer();
RedisSerializationContext<String, String> serializationContext = RedisSerializationContext
.<String, String>newSerializationContext().key(serializer)
.value(serializer).hashKey(serializer).hashValue(serializer).build();
return new ReactiveRedisTemplate<>(reactiveRedisConnectionFactory,
serializationContext);
}

@Bean
@ConditionalOnMissingBean
public RedisRateLimiter redisRateLimiter(
ReactiveRedisTemplate<String, String> redisTemplate,
public RedisRateLimiter redisRateLimiter(ReactiveStringRedisTemplate redisTemplate,
@Qualifier(RedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript,
Validator validator) {
return new RedisRateLimiter(redisTemplate, redisScript, validator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.validation.Validator;
import org.springframework.validation.annotation.Validated;
Expand Down Expand Up @@ -92,7 +92,7 @@ public class RedisRateLimiter extends AbstractRateLimiter<RedisRateLimiter.Confi

private Log log = LogFactory.getLog(getClass());

private ReactiveRedisTemplate<String, String> redisTemplate;
private ReactiveStringRedisTemplate redisTemplate;

private RedisScript<List<Long>> script;

Expand All @@ -119,7 +119,7 @@ public class RedisRateLimiter extends AbstractRateLimiter<RedisRateLimiter.Confi
/** The name of the header that returns the burst capacity configuration. */
private String burstCapacityHeader = BURST_CAPACITY_HEADER;

public RedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate,
public RedisRateLimiter(ReactiveStringRedisTemplate redisTemplate,
RedisScript<List<Long>> script, Validator validator) {
super(Config.class, CONFIGURATION_PROPERTY_NAME, validator);
this.redisTemplate = redisTemplate;
Expand Down Expand Up @@ -182,8 +182,9 @@ public void setBurstCapacityHeader(String burstCapacityHeader) {
@SuppressWarnings("unchecked")
public void setApplicationContext(ApplicationContext context) throws BeansException {
if (initialized.compareAndSet(false, true)) {
this.redisTemplate = context.getBean("stringReactiveRedisTemplate",
ReactiveRedisTemplate.class);
if (this.redisTemplate == null) {
this.redisTemplate = context.getBean(ReactiveStringRedisTemplate.class);
}
this.script = context.getBean(REDIS_SCRIPT_NAME, RedisScript.class);
if (context.getBeanNamesForType(Validator.class).length > 0) {
this.setValidator(context.getBean(Validator.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
* @author Spencer Gibb
*/
@Configuration
@ConditionalOnProperty(name = "spring.cloud.gateway.rsocket.enabled", matchIfMissing = true)
@ConditionalOnProperty(name = "spring.cloud.gateway.rsocket.enabled",
matchIfMissing = true)
@EnableConfigurationProperties
@ConditionalOnClass(RSocket.class)
public class GatewayRSocketAutoConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import static org.assertj.core.api.Assertions.assertThat;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = PingPongApp.class, properties = {
"ping.take=5" }, webEnvironment = WebEnvironment.RANDOM_PORT)
@SpringBootTest(classes = PingPongApp.class, properties = { "ping.take=5" },
webEnvironment = WebEnvironment.RANDOM_PORT)
public class GatewayRSocketIntegrationTests {

private static int port;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,19 @@ public void onApplicationEvent(ApplicationReadyEvent event) {
}

Publisher<? extends String> doPing(Integer take, RSocket socket) {
Flux<String> pong = socket.requestChannel(
Flux.interval(Duration.ofSeconds(1)).map(i -> {
ByteBuf data = ByteBufUtil.writeUtf8(
ByteBufAllocator.DEFAULT, "ping" + id);
ByteBuf routingMetadata = Metadata.from("pong")
.encode();
Flux<String> pong = socket
.requestChannel(Flux.interval(Duration.ofSeconds(1)).map(i -> {
ByteBuf data = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT,
"ping" + id);
ByteBuf routingMetadata = Metadata.from("pong").encode();
return DefaultPayload.create(data, routingMetadata);
// onBackpressue is needed in case pong is not available yet
}).onBackpressureDrop(payload -> log.debug(
"Dropped payload " + payload.getDataUtf8()))
).map(Payload::getDataUtf8).doOnNext(str -> {
int received = pongsReceived.incrementAndGet();
log.info("received " + str + "(" + received + ") in Ping"
+ id);
}).doFinally(signal -> socket.dispose());
}).onBackpressureDrop(payload -> log
.debug("Dropped payload " + payload.getDataUtf8())))
.map(Payload::getDataUtf8).doOnNext(str -> {
int received = pongsReceived.incrementAndGet();
log.info("received " + str + "(" + received + ") in Ping" + id);
}).doFinally(signal -> socket.dispose());
if (take != null) {
return pong.take(take);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
* @author Spencer Gibb
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {
GatewaySampleApplicationTests.TestConfig.class }, webEnvironment = RANDOM_PORT, properties = "management.server.port=${test.port}")
@SpringBootTest(classes = { GatewaySampleApplicationTests.TestConfig.class },
webEnvironment = RANDOM_PORT, properties = "management.server.port=${test.port}")
public class GatewaySampleApplicationTests {

protected static int managementPort;
Expand Down

0 comments on commit 9a1fb20

Please sign in to comment.