Skip to content

Commit

Permalink
add notify/listen example.
Browse files Browse the repository at this point in the history
  • Loading branch information
hantsy committed Sep 15, 2020
1 parent 310c0de commit 4192603
Show file tree
Hide file tree
Showing 5 changed files with 440 additions and 11 deletions.
85 changes: 74 additions & 11 deletions boot/src/main/java/com/example/demo/DemoApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
import com.fasterxml.jackson.databind.*;
import io.r2dbc.postgresql.PostgresqlConnectionConfiguration;
import io.r2dbc.postgresql.PostgresqlConnectionFactory;
import io.r2dbc.postgresql.codec.EnumCodec;
import io.r2dbc.postgresql.api.PostgresqlConnection;
import io.r2dbc.postgresql.api.PostgresqlResult;
import io.r2dbc.postgresql.codec.Json;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.Row;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
Expand All @@ -21,20 +23,14 @@
import org.springframework.boot.jackson.JsonComponent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.LastModifiedDate;
import org.springframework.data.annotation.Version;
import org.springframework.data.convert.ReadingConverter;
import org.springframework.data.convert.WritingConverter;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.ReactiveAuditorAware;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.data.r2dbc.config.EnableR2dbcAuditing;
import org.springframework.data.r2dbc.convert.EnumWriteSupport;
import org.springframework.data.r2dbc.repository.Query;
import org.springframework.data.r2dbc.repository.R2dbcRepository;
import org.springframework.data.relational.core.mapping.Column;
Expand All @@ -51,11 +47,13 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

Expand All @@ -77,7 +75,8 @@ ApplicationRunner initialize(
DatabaseClient databaseClient,
PostRepository posts,
CommentRepository comments,
TransactionalOperator operator) {
TransactionalOperator operator
) {
log.info("start data initialization...");
return args -> {
databaseClient
Expand Down Expand Up @@ -116,10 +115,25 @@ ApplicationRunner initialize(
err -> log.error("err: {}", err)
);


};

}

@Bean
@Qualifier("pgConnectionFactory")
public ConnectionFactory pgConnectionFactory() {
return new PostgresqlConnectionFactory(
PostgresqlConnectionConfiguration.builder()
.host("localhost")
.database("test")
.username("user")
.password("password")
//.codecRegistrar(EnumCodec.builder().withEnum("post_status", Post.Status.class).build())
.build()
);
}

@Bean
public ConnectionFactoryInitializer initializer(ConnectionFactory connectionFactory) {

Expand Down Expand Up @@ -148,6 +162,41 @@ public Jackson2ObjectMapperBuilderCustomizer jackson2ObjectMapperBuilderCustomiz
}
}

@Component
@Slf4j
class Listener {
@Autowired
@Qualifier("pgConnectionFactory")
ConnectionFactory pgConnectionFactory;

PostgresqlConnection postgresqlConnection;

@PostConstruct
public void initialize() throws InterruptedException {
postgresqlConnection = Mono.from(pgConnectionFactory.create())
.cast(PostgresqlConnection.class).block();

postgresqlConnection.createStatement("LISTEN mymessage")
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.log("listen::").subscribe();

Flux.interval(Duration.ofSeconds(5))
.delayElements(Duration.ofSeconds(1))
.concatMap(l -> postgresqlConnection.getNotifications())
.log("getNotifications::")
.subscribe(
data -> log.info("notifications: {}", data)
);
}

@PreDestroy
public void destroy() {
postgresqlConnection.close().subscribe();
}

}

// need to register EnumCodec to handle post_status
// If use a varchar to store Enum, no need converter for it.
// eg. "status VARCHAR(255) default 'DRAFT'"
Expand Down Expand Up @@ -224,8 +273,22 @@ ReactiveAuditorAware<String> auditorAware() {
class WebConfig {

@Bean
public RouterFunction<ServerResponse> routes(PostHandler postController, CommentHandler commentHandler) {
public RouterFunction<ServerResponse> routes(
PostHandler postController,
CommentHandler commentHandler,
@Qualifier("pgConnectionFactory") ConnectionFactory pgConnectionFactory) {
return route()
.GET("/hello", request -> Mono.from(pgConnectionFactory.create())
.cast(PostgresqlConnection.class)
.flatMap(sender ->
sender.createStatement("NOTIFY mymessage, 'hello world'")
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.log("sending notification::")
.then()
)
.flatMap((v) -> noContent().build())
)
.path("/posts", () -> route()
.nest(
path(""),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.example.demo;

import io.r2dbc.h2.H2ConnectionConfiguration;
import io.r2dbc.h2.H2ConnectionFactory;
import io.r2dbc.h2.H2ConnectionOption;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;

public class H2ConnectionFactories {
static ConnectionFactory fromUrl() {
return ConnectionFactories.get("r2dbc:h2:mem:https:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
}

static ConnectionFactory inMemory() {
// H2ConnectionFactory connectionFactory = new H2ConnectionFactory(H2ConnectionConfiguration.builder()
// .inMemory("...")
// .property(H2ConnectionOption.DB_CLOSE_DELAY, "-1")
// .build());
return H2ConnectionFactory.inMemory("test");
}

static ConnectionFactory file() {
return new H2ConnectionFactory(
H2ConnectionConfiguration.builder()
//.inMemory("testdb")
.file("./testdb")
.username("sa")
.password("password")
.build()
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.example.demo;

import io.r2dbc.postgresql.PostgresqlConnectionConfiguration;
import io.r2dbc.postgresql.PostgresqlConnectionFactory;
import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;

import java.util.Map;

public class PgConnectionFactories {

static ConnectionFactory fromUrl() {
return ConnectionFactories.get("r2dbc:postgres:https://user:password@localhost/test");
}

static ConnectionFactory fromOptions() {
var options = ConnectionFactoryOptions.builder()
.option(ConnectionFactoryOptions.HOST, "localhost")
.option(ConnectionFactoryOptions.DATABASE, "test")
.option(ConnectionFactoryOptions.USER, "user")
.option(ConnectionFactoryOptions.PASSWORD, "password")
.option(ConnectionFactoryOptions.DRIVER, "postgresql")
//.option(PostgresqlConnectionFactoryProvider.OPTIONS, Map.of("lock_timeout", "30s"))
.build();
return ConnectionFactories.get(options);
}

static ConnectionFactory pgConnectionFactory() {
return new PostgresqlConnectionFactory(
PostgresqlConnectionConfiguration.builder()
.host("localhost")
.database("test")
.username("user")
.password("password")
//.codecRegistrar(EnumCodec.builder().withEnum("post_status", Post.Status.class).build())
.build()
);
}
}
Loading

0 comments on commit 4192603

Please sign in to comment.