<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/io.asyncer/r2dbc-mysql -->
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>0.8.2.RELEASE</version>
</dependency>
<!-- 响应式 Spring Data R2dbc-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
</dependencies>
server:
port: 8081
spring:
r2dbc:
password: 123456
username: root
url: r2dbc:mysql://localhost:3306/test?serverZoneId=Asia/Shanghai
name: test
package com.example.webfluxdemo.configuration;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
@Configuration
@EnableR2dbcRepositories
public class MyConfiguration {
}
package com.example.webfluxdemo.Entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Column;
import org.springframework.data.relational.core.mapping.Table;
@Table("t_author")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TAuthor {
@Id
private Long id;
private String Name;
public TAuthor(String name) {
this.Name = name;
}
}
package com.example.webfluxdemo.repository;
import org.springframework.data.repository.NoRepositoryBean;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.data.repository.reactive.ReactiveSortingRepository;
@NoRepositoryBean
public interface R2dbcAuthorRepository<T,ID> extends ReactiveCrudRepository<T,ID> , ReactiveSortingRepository<T,ID> {
}
package com.example.webfluxdemo.repository;
import com.example.webfluxdemo.Entity.TAuthor;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import java.util.Collection;
@Repository
public interface AuthorRepository extends R2dbcAuthorRepository<TAuthor, Long>{
Flux<TAuthor> findByIdInAndName(@NonNull Collection<Long> ids, @NonNull String Name);
}
package com.example.webfluxdemo.Service;
import com.example.webfluxdemo.Entity.TAuthor;
import com.example.webfluxdemo.repository.AuthorRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
@Service
public class AuthorService {
@Autowired
private AuthorRepository authorRepository;
public Mono<TAuthor> save(TAuthor author){
Mono<TAuthor> save = authorRepository.save(author);
return save.filter(tAuthor -> tAuthor.getId() != null);
}
public Mono<TAuthor> findById(Long id){
return authorRepository.findById(id);
}
public Flux<TAuthor> findAll() {
// authorRepository.findAll().subscribe(a-> System.out.println("a = " + a));
return authorRepository.findAll();
}
public Flux<TAuthor> findByIdInAndName(List<Long> Ids,String name){
return authorRepository.findByIdInAndName(Ids, name);
}
}
package com.example.webfluxdemo.controller;
import com.example.webfluxdemo.Entity.TAuthor;
import com.example.webfluxdemo.Service.AuthorService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/author")
public class AuthorController {
@Autowired
private AuthorService authorService;
@PostMapping("/save")
public ResponseEntity<String> save(@RequestBody TAuthor requestBody){
authorService.save(requestBody);
return ResponseEntity.ok("success!");
}
@GetMapping("/findById/{id}")
public ResponseEntity<TAuthor> findById(@PathVariable Long id){
return ResponseEntity.ok(authorService.findById(id).block());
}
@GetMapping("/findAll")
public ResponseEntity<Iterable<TAuthor>> findAll(){
return ResponseEntity.ok(authorService.findAll().collectList().block());
}
}
?R2dbc的仓储类的使用类似于Jpa,语法类似Jpa,但是返回的对象是流式对象,Flux(多流)或者Mono(连续流),可以使用.block();将Mono<T>接收为T对象,.collectList.block()可以将Flux<T>接受为T对象。
这里是两种异步执行的代码,其中Runnable是没有返回值的,Callable<Integer>是有返回值得返回对象为Future<T>可以使用get();获取到T对象
package org.example;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Main myMain = new Main();
// myMain.thread(1,2);
Future<Integer> integerFuture = myMain.thread2(1, 2);
Integer result = integerFuture.get();
Thread thread = Thread.currentThread();
System.out.println("thread name = " + thread.getName());
System.out.println("Main result: a+b = " + result);
}
public void thread(int a, int b) throws InterruptedException {
Thread thread = new Thread(new MyTask(a, b));
thread.setName("myTask");
thread.start();
String name = thread.getName();
System.out.println("name = " + name);
Thread.State state = thread.getState();
System.out.println("state = " + state);
Thread.sleep(1000);
if (thread.isAlive()) {
thread.interrupt();
}
System.out.println("Now = " + thread.getState());
}
public Future<Integer> thread2(int a, int b) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<Integer> res = executorService.submit(new MyTask2(a, b));
try {
System.out.println("thread result: a+b = " + res.get());
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
executorService.shutdown();
return res;
}
public static class MyTask implements Runnable {
private final int a;
private final int b;
public MyTask(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public void run() {
Thread thread = Thread.currentThread();
System.out.println("thread name = " + thread.getName());
System.out.println("a+b = " + (a+b));
}
}
public static class MyTask2 implements Callable<Integer> {
private final int a;
private final int b;
public MyTask2(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public Integer call() {
Thread thread = Thread.currentThread();
System.out.println("thread name = " + thread.getName());
System.out.println("a+b = " + (a+b));
return a+b;
}
}
}
感谢阅读!