侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

webflux提供响应式API,玩出不一样的花样

2023-12-18 星期一 / 0 评论 / 0 点赞 / 22 阅读 / 13290 字

先说说什么是响应式 响应式编程或反应式编程(英语:Reactive programming)是一种面向数据流和变化传播的编程范式,直白的说就是:将变化的值通过数据流进行传播。 WebFlux又是什么呢

先说说什么是响应式

        响应式编程或反应式编程(英语:Reactive programming)是一种面向数据流和变化传播的编程范式,直白的说就是:将变化的值通过数据流进行传播。

WebFlux又是什么呢

        WebFlux 模块的名称是 spring-webflux,名称中的 Flux 来源于 Reactor 中的类 Flux。Spring webflux 有一个全新的非堵塞的函数式 Reactive Web 框架,可以用来构建异步的、非堵塞的、事件驱动的服务,在伸缩性方面表现非常好。

        spring-webflux 模块。该模块包含对响应式 HTTP 和 WebSocket 客户端的支持,以及对 REST,HTML 和 WebSocket 交互等程序的支持。一般来说,Spring MVC 用于同步处理,Spring Webflux 用于异步处理。

        Spring Boot Webflux 有两种编程模型实现,一种类似 Spring MVC 注解方式,另一种是基于 Reactor 的响应式方式。

实践走起

    我在网找了下发现现在支持的DAL包有:
    spring-boot-starter-data-redis-reactive、spring-boot-starter-data-mongodb-reactive
    也许还有别的,我本意是想要spring-boot-starter-data-mysql-reactive,然而并木有。那就说下上面2个包的实践把。

spring-boot-starter-data-redis-reactive

用到的包

        <dependency>			<groupId>org.springframework.boot</groupId>			<artifactId>spring-boot-starter-webflux</artifactId>		</dependency>		<dependency>			<groupId>org.springframework.boot</groupId>			<artifactId>spring-boot-starter-data-redis-reactive</artifactId>		</dependency>		<dependency>			<groupId>org.apache.commons</groupId>			<artifactId>commons-pool2</artifactId>		</dependency>		<dependency>			<groupId>org.springframework.boot</groupId>			<artifactId>spring-boot-configuration-processor</artifactId>			<optional>true</optional>		</dependency>		<dependency>			<groupId>org.projectlombok</groupId>			<artifactId>lombok</artifactId>			<optional>true</optional>		</dependency>		<dependency>			<groupId>com.github.flying-cattle</groupId>			<artifactId>mybatis-dsc-generator</artifactId>			<version>${mybatis-dsc-generator.version}</version>		</dependency>		<dependency>			<groupId>com.alibaba</groupId>			<artifactId>fastjson</artifactId>			<version>${fastjson.version}</version>		</dependency>

YMl配置

server:  port: 8080spring:  application:    name: webFlux-test  redis:    host: 127.0.0.1    port: 6379    password: pwd2020    timeout: 5000    lettuce:      pool:        max-active: 200         max-idle: 20         min-idle: 5         max-wait: 1000 

整合redis-reactive

        虽然包是starter,但是还是要有自己的配置才能用不然报错如下:

Description:Field redisTemplate in com.flying.cattle.wf.service.impl.RedisServiceImpl required a bean of type 'org.springframework.data.redis.core.ReactiveRedisTemplate' that could not be found.The injection point has the following annotations:	- @org.springframework.beans.factory.annotation.Autowired(required=true)Action:Consider defining a bean of type 'org.springframework.data.redis.core.ReactiveRedisTemplate' in your configuration.

看了下官方文档需要加上如下:

@Bean	public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {		ReactiveRedisTemplate<String, String> reactiveRedisTemplate = new ReactiveRedisTemplate<>(factory,RedisSerializationContext.string());		return reactiveRedisTemplate;	}

发现了么是ReactiveRedisTemplate<String, String> 感觉就不很友好了,本来我是想声明成ReactiveRedisTemplate<String, Serializable>,搞古了一会儿木有搞定。有那个大佬有好的方案,望指点哈

Service代码:

@Servicepublic class RedisServiceImpl implements RedisService {	@Autowired	private ReactiveRedisTemplate<String, String> redisTemplate;		@Override	public Mono<String> getById(String key) {		// TODO Auto-generated method stub		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();		return operations.get(key);	}	@Override	public Mono<String> addUser(String key,User user) {		// TODO Auto-generated method stub		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();		return operations.getAndSet(key, JSON.toJSONString(user));	}	@Override	public Mono<Boolean> deleteById(String key) {		// TODO Auto-generated method stub		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();		return operations.delete(key);	}	@Override	public Mono<String> updateById(String key,User user) {		// TODO Auto-generated method stub		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();		return operations.getAndSet(key, JSON.toJSONString(user));	}	@Override	public Flux<String> findAll(String key) {		// TODO Auto-generated method stub		ReactiveListOperations<String, String> operations = redisTemplate.opsForList();		return operations.range(key, 0, -1);	}	@Override	public Mono<Long> addlist(String key,List<String> list) {		// TODO Auto-generated method stub		ReactiveListOperations<String, String> operations = redisTemplate.opsForList();		return operations.leftPushAll(key, list);	}		@Override	public Flux<String> findUsers(String key) {		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();		return redisTemplate.keys(key).flatMap(keyId ->operations.get(keyId));	}}

Controller代码

@RestController@RequestMapping("/user")public class UserController {		public final static String USER_KEY="user";		@Autowired	private RedisService redisService;		@Autowired	private RedisGenerateId redisGenerateId;		@GetMapping("/getId")	public Long getUserId(){		return redisGenerateId.generate(USER_KEY);			}		public String getKey(Long id) {		return USER_KEY+"_"+id;	}		@GetMapping("/getById/{id}")	public Mono<String> getUserById(@PathVariable("id")Long id){		return redisService.getById(getKey(id));	}		@GetMapping("/add")	public Mono<String> add(User user){		user = new User();		user.setAccount("admin1");		user.setPassword("123123");		user.setNickname("admin");		user.setEmail("[email protected]");		user.setPhone("13666275002");		user.setSex(true);		String bd="1990-01-01";		DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd");		try {			user.setBirthday(fmt.parse(bd));		} catch (ParseException e) {			// TODO Auto-generated catch block			e.printStackTrace();		}		user.setProvince("四川省");		user.setCity("成都市");		user.setCounty("高新区");		user.setAddress("天 府大道XXd段XX号");		user.setState("1");		// 以上是模拟数据		ValidationResult vr=ValidationUtils.validateEntity(user);		if (!vr.isHasErrors()) {			user.setId(getUserId());			System.out.println(JSON.toJSONString(user));			return redisService.addUser(getKey(user.getId()),user);		}else {			return Mono.just(vr.getFirstErrors());		}			}		@GetMapping("/addlist")	public Mono<Long> addlist(){		List<String> list=new ArrayList<String>();		User user = new User();		user.setAccount("admin1");		user.setPassword("123123");		user.setNickname("admin");		user.setEmail("[email protected]");		user.setPhone("13666275002");		user.setSex(true);		user.setBirthday(new Date());		user.setProvince("四川省");		user.setCity("成都市");		user.setCounty("高新区");		user.setAddress("天 府大道XXd段XX号");		user.setState("1");		//添加第一条数据		Long id=redisGenerateId.generate("user");		user.setId(id);		list.add(JSON.toJSONString(user));		//添加第二条数据		id=redisGenerateId.generate("user");		user.setId(id);		list.add(JSON.toJSONString(user));		//添加第三条数据		id=redisGenerateId.generate("user");		user.setId(id);		list.add(JSON.toJSONString(user));				return redisService.addlist("list", list);	}		/**	 *	这个就是流响应式的接口了,是一个一个的返回数据的,异步返回 	 *  delayElements(Duration.ofSeconds(2))这个是不要的,只是方便看效果	 *  redis 直接就是一个一个返回,不需要produces,不知道为什么...还木有深究。	 */	@GetMapping(value="/findAll",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)	public Flux<String> findAll(){		return redisService.findAll("list").delayElements(Duration.ofSeconds(2));	}		@GetMapping("/getUsers")	public Flux<String> findUsers() {		// TODO Auto-generated method stub		return redisService.findUsers(USER_KEY+"_"+"*").delayElements(Duration.ofSeconds(2));	}}

一个是差list数据类型,一个是匹配key查询的,都是一个一个返回的,实际开发中去掉.delayElements(Duration.ofSeconds(2))就好

整合mongodb-reactive

需要的包,只需要在redis的基础上下面的jar

        <dependency>			<groupId>org.springframework.boot</groupId>			<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>		</dependency>

MongoDB就很人性化了,感觉就很友好。而且是真的starter包,配置好数据库连接,就不需要其他配置了,直接可用

DAO

import org.springframework.data.mongodb.repository.ReactiveMongoRepository;import com.flying.cattle.wf.entity.User;public interface UserRepository extends ReactiveMongoRepository<User, Long>{}

SERVICE(接口层我就不贴代码了)

@Servicepublic class MongoServiceImpl implements MongoService {		@Autowired	private UserRepository userRepository;		@Override	public Mono<User> getById(Long id) {		// TODO Auto-generated method stub		return userRepository.findById(id);	}	@Override	public Mono<User> addUser(User user) {		// TODO Auto-generated method stub		return userRepository.save(user);	}	@Override	public Mono<Boolean> deleteById(Long id) {		// TODO Auto-generated method stub		 userRepository.deleteById(id);		 return Mono.create(userMonoSink -> userMonoSink.success());	}	@Override	public Mono<User> updateById(User user) {		// TODO Auto-generated method stub		return userRepository.save(user);	}	@Override	public Flux<User> findAllUser() {		// TODO Auto-generated method stub		return userRepository.findAll();	}}

CONTROLLER

@RestController@RequestMapping("/usermg")public class UserMongoController {		public final static String USER_KEY="user";		@Autowired	private RedisGenerateId redisGenerateId;		@Autowired	private MongoService mongoService;		@GetMapping("/getId")	public Long getUserId(){		return redisGenerateId.generate(USER_KEY);	}	@GetMapping("/add")	public Mono<User> add(User user) {		user = new User();		user.setAccount("admin1");		user.setPassword("123123");		user.setNickname("admin");		user.setEmail("[email protected]");		user.setPhone("13666275002");		user.setSex(true);		String bd = "1990-01-01";		DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd");		try {			user.setBirthday(fmt.parse(bd));		} catch (ParseException e) {			// TODO Auto-generated catch block			e.printStackTrace();		}		user.setProvince("四川省");		user.setCity("成都市");		user.setCounty("高新区");		user.setAddress("天 府大道XXd段XX号");		user.setState("1");		// 以上是模拟数据		ValidationResult vr = ValidationUtils.validateEntity(user);		if (!vr.isHasErrors()) {			user.setId(getUserId());			System.out.println(JSON.toJSONString(user));			return mongoService.addUser(user);		} else {			 System.err.println(vr.getFirstErrors());		}		return null;	}		/**	 *	注意这里produces = MediaType.APPLICATION_STREAM_JSON_VALUE	 *	如果不是application/stream+json则调用端无法滚动得到结果,将一直阻塞等待数据流结束或超时。	 */	@GetMapping(value="/findAll",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)	public Flux<User> findAll(){		return mongoService.findAllUser().delayElements(Duration.ofSeconds(1));	}}

代码就这些,大家要体验这个框架,建议还是用MongoDB把,毕竟redis主要是做缓存的。

给大家看下数据结构图

 

 

源码地址:https://gitee.com/flying-cattle/infrastructure/tree/master/webFluxTest

广告 广告

评论区