Reactor核心

发布时间:2024年01月17日

Mono和Flux简单数据

1.父pom锁定版本

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>2023.0.1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

子模块引入依赖

    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
        </dependency>
    </dependencies>

1.数据流:数据的源头
2.变化传播:数据操作(中间操作)
3.异步编程模式:底层控制异步
数据流:每个元素从流的源头,开始源源不断,自己往下滚动
onNext():当某个元素到达后,可以自定义它的处理逻辑
onComplete():处理结束
onError():异常结束
一个数据库:元素(0-n)+信号(1: 正常/异常);
流经过运算符计算后得到一个新流

案例:
Mono: 0|1个元素的流
Flux: n个元素的流
flux

    public static void main(String[] args) throws IOException {
        //发布者发布数据流:源头
        //1.多元素的流
        Flux<Integer> just = Flux.just(1, 2, 3, 4, 5);
        //流不消费就没用,消费就是订阅
        //一个数据流,可以有很多消费者
        just.subscribe(System.out::println);
        just.subscribe(e -> System.out.println("new just:" + e));

        //对于每个消费者来说,每个流都是一样的:广播模式

        //每秒产生一个从0开始的递增数字
        Flux<Long> flux = Flux.interval(Duration.ofSeconds(1));
        flux.subscribe(System.out::println);

        //流是异步的
        System.in.read();
    }

mono:只有一个元素

    public static void main(String[] args) {
        Mono<Integer> just = Mono.just(1);
        just.subscribe(System.out::println);

        //空流,但有一个信号,此时代表流完成的信号
        Flux<Object> empty = Flux.empty();
    }

事件感知Api-doOnXxx

doOnXxx:当流发生什么事情时,触发一个回调
系统调用提前定义好的钩子函数
doOnXxx

定义一个空流,但有一个信号
此时代表流完成的信号
流不被消费,流不会触发完成回调

        //空流,但有一个信号,此时代表流完成的信号
        Flux<Object> empty = Flux.empty()
                .doOnComplete(() -> {
                    System.out.println("stream finished");
                });
        empty.subscribe(System.out::println);

链式api中,下面的操作符,操作的是上面的流
自定义订阅者MySubscriber

public class MySubscriber extends BaseSubscriber<Integer> {
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        System.out.println("bind with subscription" + subscription);
        //订阅者找发布者要一个元素(背压)
        request(1);
    }

    @Override
    protected void hookOnNext(Integer value) {
        System.out.println("item arrive: value" + value);
        if (value < 5) {
            request(2);//继续要元素
            if (value == 3) {
                throw new RuntimeException("value == 3 error");
            }
        } else {
            cancel();//取消订阅
        }
    }

    @Override
    protected void hookOnComplete() {
        System.out.println("stream finish");
    }

    @Override
    protected void hookOnError(Throwable throwable) {
        System.out.println("stream error:" + throwable.getMessage());
    }

    @Override
    protected void hookOnCancel() {
        System.out.println("stream cancel");
    }
}

测试

public static void main(String[] args) throws InterruptedException {
        Flux<Integer> flux = Flux.range(1, 7)
                .doOnComplete(() -> System.out.println("stream flux finished"))
                .doOnCancel(() -> System.out.println("stream flux cancel"))
                .doOnError(throwable -> System.out.println("stream err:" + throwable.getMessage()))
                .doOnNext( item -> System.out.println("next item arrive:" + item));
        flux.subscribe(new MySubscriber());
        Thread.sleep(2000);
    }

结果:
测试结果
希望流中元素出问题时执行
后续不再处理

        Flux.just(1,2,3,4,5,6,7,0,5,6)
                .map(item -> 10 / item)//流会出问题
                .doOnNext(item -> System.out.println("item arrive:" + item))//流中元素到达触发
                .doOnError( exception -> System.out.println("stream error:" + exception.getMessage()))
                .subscribe(System.out::println);

结果:
流出问题时,执行回调
注意:doInXxx(),要感知某个流的事件,写在这个流的后面,新流的前面
doOnNext():数据到达时,触发
doOnEach():封装的更详细,元素(数据+信号)到达时会触发
信号:正常/异常(取消)

总结

doOnNext():每个数据(流的数据)到达时触发
doOnEach():每个元素(流的数据和信号)到达时候触发
doOnRequest():消费者请求流元素的时候
doOnError():流发生错误时
doOnSubscribe():流被订阅时
doOnTerminal():发送取消/异常信号中断流
doOnCancel():流被取消
doOnDiscard():流中元素被忽略时

响应式流日志

.log():日志
示例:

        Flux.range(1,10)
                //.log() //日志 1-7
                .filter(item -> item > 3)
                .log() //日志 4-7
                .map(item -> "new stream:" + item)// new stream 4-7
                .subscribe(System.out::println);

结果:
打印日志

核心-subscribe()

订阅流:没订阅之前流什么也不做
流的元素开始流动, 发生数据变化
响应式编程:数据流+ 变化传播(操作)
订阅方法参数
案例
流正常时:只传入正常消费者(只消费正常元素)

        Flux<Integer> flux = Flux.range(1,10)
                .map(item -> return item *2;);
        flux.subscribe(System.out::println);

流出现异常时

        Flux<Integer> flux = Flux.range(1,10)
                .map(item -> {
                    if (item == 9) {
                        int i = 10 / ( 9 - item );
                    }
                    return item *2;
                });
        flux.subscribe(item -> {
            System.out.println("stream is correct item: " + item);
        }, error -> {
            System.out.println("stream is error:" + error.getMessage());
        });

流完成时,执行回调

        Flux<Integer> flux = Flux.range(1,10)
                .map(item -> item *2);
        flux.subscribe(
                item -> System.out.println("stream is correct item: " + item) ,
                error -> System.out.println("stream is error:" + error.getMessage()),
                () -> System.out.println("stream finished")
                );

自定义消费者

流的生命周期钩子可以传播给订阅者
doOnXxx:发生这个事件的时候产生一个回调,通知你(不能改变)
onXxx:发生这个事件以后,执行一个动作,这个动作可以改变元素/信号
例如:
Aop:普通通知(前置/后置/异常/返回)
环绕通知(ProceedingJoinPoint),可以干扰目标方法执行

Flux<Integer> flux = Flux.range(1,10)
                .map(item -> item *2);
        flux.subscribe(new BaseSubscriber<Integer>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                System.out.println("stream is subscribe");
                //流被订阅时,找发布者要1个数据
                request(1);
            }

            @Override
            protected void hookOnNext(Integer value) {
                System.out.println("next item:" + value);
                //数据到达,持续要数据
                request(2);
            }

            @Override
            protected void hookOnComplete() {
                System.out.println("stream finished");
            }

            @Override
            protected void hookOnError(Throwable throwable) {
                System.out.println("stream error:" + throwable);
            }

            @Override
            protected void hookOnCancel() {
                super.hookOnCancel();
            }
        });

自定义消费者-生命周期钩子

取消流:
消费者调用cancel()方法可以取消订阅

            @Override
            protected void hookOnNext(Integer value) {
                System.out.println("next item:" + value);
                //数据到达,持续要数据
                if (value == 5) {
                    this.cancel();
                }
                request(2);
            }

自定义消费者建议直接编写BaseSubscriber的逻辑

请求重塑-buffer

buffer(2),指定缓冲区大小,也就是消费者一次可以最多拿到2个元素
凑满数批量发送给消费者

        Flux.range(1,10)
                .buffer(2)//缓冲区缓存2个元素
                .subscribe(System.out::println);

结果
在这里插入图片描述
消费者每次request(1),拿到buffer大小个真正元素
request(n):找发布者请求n次数据
总共能得到n*bufferSize个数据

请求重塑-limitRate

        Flux.range(1,1000)
                .log() //限流触发看上游是怎么限流过去
                .limitRate(100)//一次预取100个元素
                .subscribe();

从上游每次请求100个元素
在这里插入图片描述

75%预取策略:第一次抓100个数据,
如果75%的元素已经处理,继续抓取新的75%元素
在这里插入图片描述

创建序列-generate,create

编程方式创建序列
generate
同步环境下
sink:接收器

        Flux.generate(() -> 0, //初始值
                        (state, sink) -> {
                            //0-10
                            if (state == 10 ) {
                                sink.complete();//发送完成
                            }
                            sink.next(state); //把元素传出去
                            return state + 1;
                        })
                .log()
                .subscribe();

create();异步多线程
创建自定义监听器MyListener

public class MyListener {
    FluxSink<Object> sink;
    public MyListener(FluxSink<Object> sink) {
        this.sink = sink;
    }
    //用户登录,触发online监听
    public void online(String username) {
        System.out.println("user login:" + username);
        sink.next(username);
    }
}

发布流

        Flux.create(fluxSink -> {
                    MyListener listener = new MyListener(fluxSink);
                    for (int i = 0; i < 10; i++) {
                        listener.online("张" + i);
                    }
                })
                .log()
                .subscribe();
        System.in.read();

结果:
在这里插入图片描述

自定义元素处理-handle

自定义流中元素处理规则
返回自定义处理后的流

        Flux.range(1,10)
                .handle((value,sink) -> {
                    System.out.println("get value:" + value);
                    sink.next(value * 2); //可以向下发送数据的通道
                })
                .log()
                .subscribe();

handle():处理后,流中可以有不同类型数据
map():处理后,流中只能有相同类型数据

自定义线程调度规则

响应式编程,默认还是用当前线程,生成整个流,发布流,流操作

        Flux.range(1,10)
                .handle((value,sink) -> {
                    sink.next(value * 2);
                })
                .log()
                .subscribe();

流的发布和中间操作,默认使用当前线程
默认在main线程
调度器(线程池)
publishOn():改变发布者所在线程
subscribeOn():改变订阅者所在线程池
Schedulers.immediate();//无执行上下文,在当前线程运行
Schedulers.single();//使用固定的一个单线程

        Flux.range(1,10)
                .publishOn(Schedulers.single())//在哪个线程池把这个流的数据和操作执行了
                .log()
                .handle((value,sink) -> {
                    sink.next(value * 2);
                })
                .subscribe();

单开一个新线程:
新开一个线程

Schedulers.boundedElastic();//有界,弹性调度,不是无限扩充的线程池
线程池中有10*cpu核心个线程,队列默认10W
Schedulers.fromExector(new ThreadPoolExecutor())
Schedulers.parallel();//并发池

        Scheduler schedulers =Schedulers.newParallel("parallel-scheduler",4);
        Flux<String> flux = Flux.range(1, 10)
                .map(item -> item + 10)
                .publishOn(schedulers)
                .map(item -> "value:" + item)
                .log();
        new Thread(() -> flux.subscribe(System.out::println)).start();

指定发布者线程池:
在这里插入图片描述
发布者没有执行线程池:

        Scheduler schedulers =Schedulers.newParallel("parallel-scheduler",4);
        Flux<String> flux = Flux.range(1, 10)
                .map(item -> item + 10)
                .log()
                .publishOn(schedulers)
                .map(item -> "value:" + item);
        new Thread(() -> flux.subscribe(System.out::println)).start();

若没有线程,发布者用的线程就是消费者的线程
在这里插入图片描述

常用操作

pom.xml引入单元测试类
filter:过滤出来满足条件的元素

        Flux.range(1,10)
                .filter(item -> item % 2 == 0)
                .log()
                .subscribe();

结果:
在这里插入图片描述
onSubscribe():流被订阅
request(unbounded):请求无限数据
onNext():每个元素到达
onComplete():流订阅结束
flatMap():扁平化

        Flux.just("zhang san","li si")
                .flatMap(item -> {
                    String[] split = item.split(" "); //2个人的名字,按照空格拆分,单独打印姓和名
                    return Flux.fromArray(split); //把数据包装成多元素流
                })
                .log()
                .subscribe();

结果:
在这里插入图片描述

concatMap():一个元素可以变很多个

        Flux.range(1,10)
                .concatMap(item -> Flux.just(item * 2,0))
                .log()
                .subscribe();

结果:映射成很多个
在这里插入图片描述

conact():将很多流合并成一个

        Flux.concat(Flux.range(1,10),Flux.range(10,20))
                .log()
                .subscribe();

合并后:
在这里插入图片描述
concatWith():老流连接一个新流,元素类型要一样
contactMap():对于元素的位置不限制

        Flux.range(1,10)
                .concatWith(Flux.range(20,25))
                .log()
                .subscribe();

transform():
自定义对流中元素的操作,把流编程新数据
有多个订阅者,各个订阅者之间不会共享外部变量值

        AtomicInteger atomic = new AtomicInteger(0);
        Flux<String> transform = Flux.just("a", "b", "c")
                .transform(values -> {
                    if (atomic.incrementAndGet() == 1) {
                        //如果是第一次调用,老流中的所有元素转成大写
                        return values.map(String::toUpperCase);
                    } else {
                        //如果不是第一次调用,原封不动返回小写
                        return values;
                    }
                });
        transform.subscribe(item -> System.out.println("subscriber1:" + item));
        transform.subscribe(item -> System.out.println("subscriber2:" + item));

结果:
无论多少个订阅者,transform只执行一次
在这里插入图片描述

transformDeferred():
有多个订阅者
多个订阅者之间共享外部变量值,每个订阅者执行一次

        AtomicInteger atomic = new AtomicInteger(0);
        Flux<String> transform = Flux.just("a", "b", "c")
                .transformDeferred(values -> {
                    if (atomic.incrementAndGet() == 1) {
                        //如果是第一次调用,老流中的所有元素转成大写
                        return values.map(String::toUpperCase);
                    } else {
                        //如果不是第一次调用,原封不动返回小写
                        return values;
                    }
                });
        transform.subscribe(item -> System.out.println("subscriber1:" + item));
        transform.subscribe(item -> System.out.println("subscriber2:" + item));

不同订阅者共享外部变量:
在这里插入图片描述

defaultIfEmpty()
如果发布者元素为空,指定默认值,否则用发布者的值
静态兜底数据

    @Test
    public void testEmpty() {
        this.returnStr()
                .defaultIfEmpty("is empty")
                .subscribe(System.out::println);
    }
    private Mono<String> returnStr() {
        return Mono.just("a");
        //return Mono.empty();
    }

Mono.just(null);流里面有一个null值元素
Mono.empty();流里面没有元素,只有结束信号
switchEmpty()
空转换
动态兜底方法

    @Test
    public void testSwitchEmpty() {
        this.returnStr()
                .switchIfEmpty(Mono.just("stream is empty"))
                .subscribe(System.out::println);
    }

merge()
mergeWith()
将多个流合并在一起,按照发布者顺序发布
contact():将流中元素追加/连接(a流中所有元素完成后,才进行b中流)
mergeSequential():按照哪个流先发元素排队

    @Test
    public void testMerge() throws IOException {
        Flux.merge(
                Flux.just(1,2,3).delayElements(Duration.ofSeconds(1)),
                Flux.just('a','b','c').delayElements(Duration.ofSeconds(2))
                )
                .log()
                .subscribe();
        System.in.read();
    }

结果:
在这里插入图片描述
zip()
在这里插入图片描述
将2个流中每个元素结成一对
无法结对的元素会被忽略
最多支持8个流压缩

    @Test
    public void testZip() {
        Flux.just(1,2,3)
                .zipWith(Flux.just(4,5,6))
                .log()
                .map(tuple -> {
                    Integer t1 = tuple.getT1(); //元组中的第一个元素
                    Integer t2 = tuple.getT2(); //元组中的第二个元素
                    return t1 + t2;
                })
                .subscribe(System.out::println);
    }

结果:

错误处理

捕获异常返回一个静态默认值

onErrorReturn()
吃掉异常,消费者无异常感知
返回一个兜底默认值,
流正常完成

    @Test
    public void testError() {
        Flux.just(1,2,0)
                .map(item -> "100 / " + item + " = " + (100/item))
                .onErrorReturn(ArithmeticException.class,"math exception")
                .subscribe(System.out::println);
    }

结果:

捕获异常,执行兜底方法

onErrorResume()
调用兜底方法
流正常完成

        Flux.just(1,2,0)
                .map(item -> "100 / " + item + " = " + (100/item))
                .onErrorResume(err -> Mono.just("stream err:" + err.getMessage()))
                .subscribe(System.out::println);
捕获并动态计算一个返回值

根据错误返回新值

    @Test
    public void testError() {
        Flux.just(1,2,0)
                .map(item -> "100 / " + item + " = " + (100/item))
                .onErrorResume(this::returnByErrType)
                .subscribe(System.out::println);
    }
    private Mono<String> returnByErrType (Throwable throwable) {
        if (throwable instanceof NullPointerException) {
            return Mono.just("value is required");
        }else {
            return Mono.just("system error:" + throwable.getMessage());
        }
    }
捕获并包装成业务异常,并重新抛出

只需要写正确的业务代码,
所有的业务异常,全部抛出自定义异常,由全局异常处理器统一处理
流异常完成
1.吃掉异常,抛新异常,消费者有感知
2.流异常完成
3.抛新异常

        Flux.just(1, 2, 0,5)
                .map(item -> "100 / " + item + " = " + (100 / item))
                .onErrorMap(err -> new BusinessException("value is invalid"))
                .subscribe(System.out::println,
                        err -> System.out.println("err happen:" + err.getMessage()),
                        () -> System.out.println("stream finished"));

流异常完成,流中后续元素不执行
在这里插入图片描述

捕获异常,记录特殊的错误日志,重新抛出

doOnError()
异常被捕获,做自己的事情
不影响异常继续顺着流水线传播
不吃掉异常,只在异常发生时做一件事,消费者有感知

    @Test
    public void testError() {
        Flux.just(1, 2, 0,5)
                .map(item -> "100 / " + item + " = " + (100 / item))
                .doOnError(err -> {
                    System.out.println("err saved");
                    throw new BusinessException("err happen");
                })
                .subscribe(System.out::println,
                        err -> System.out.println("err happen:" + err.getMessage()),
                        () -> System.out.println("stream finished"));
    }
使用finally

文件读写/网络操作
doFinally()
这件事一定会做

        Flux.just(1, 2, 0,5)
                .map(item -> "100 / " + item + " = " + (100 / item))
                .doFinally(signalType -> System.out.println("signalType :" + signalType))
                .subscribe(System.out::println,
                        err -> System.out.println("err happen:" + err.getMessage()),
                        () -> System.out.println("stream finished"));

在这里插入图片描述

发生错误后,流跳过错误元素,继续向下推进

onErrorContinue()
流中后续元素,继续执行

    @Test
    public void testErrContinue() {
        Flux.just(1,2,3,0,5)
                .map(item -> 10 /item)
                .onErrorContinue((err,value) -> {
                    System.out.println("err in stream:" + err.getMessage());
                    System.out.println("err value in stream:" + value);
                    //例如:记录日志
                })
                .subscribe(
                        item -> System.out.println("stream success:" + item),
                        err -> System.out.println("err happen:" + err.getMessage())
                        );
    }

在这里插入图片描述
onErrorStop()
流为源源不断的流,出现错误时,取消继续发布
有多个消费者也不会继续往下
从源头中断
onErrorComplete()
一旦发生错误,整个流结束,但是正常结束,后续元素不执行
把错误信号,替换成正确结束信号

超时与重试

retry():将流从头到尾重新请求一次

    @Test
    public void testTimeoutAndRetry() throws Exception {
        Flux.just(1,2,3)
                .delayElements(Duration.ofSeconds(3))
                .log()
                .timeout(Duration.ofSeconds(2))
                .retry(3)
                .onErrorReturn(0)
                .map(item -> item *2)
				.subscribe(System.out::println);
        System.in.read();
    }

结果:流超时,重新获取流3次,仍旧超时,抛异常
在这里插入图片描述

Sinks工具类

Sinks:数据管道/接收器,所有数据顺着这个管道往下走
Sinks.many();//发送Flux数据
Sinks.one();//发送Mono数据
Sinks.many().unicast();//单播:这个管道只能绑定单个订阅者
Sinks.many().multicast();//多播:这个管道可以绑定多个订阅者
Sinks.many().replay();//重放:这个管道能重放元素
是否给后来的订阅者把之前的元素依旧发给它
从头消费还是从订阅的那一刻消费

unicast()单播

只能有一个消费者

    @Test
    public void testSinks() throws Exception{
        Sinks.Many<Object> many = Sinks.many()
                .unicast()
                .onBackpressureBuffer(new LinkedBlockingQueue<>(5));//背压队列,最多能放多少个元素

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                many.tryEmitNext(i);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();

        many.asFlux()
                .subscribe(System.out::println);

        System.in.read();
    }
多播

可以有多个消费者
存在问题:第二个消费者延迟到来,丢失前面元素
解决:replay()重放

@Test
    public void testSinks() throws Exception{
        Sinks.Many<Object> many = Sinks.many()
                .multicast()
                .onBackpressureBuffer();//背压队列,最多能放多少个元素

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                many.tryEmitNext(i);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();

        many.asFlux()
                .subscribe(item -> System.out.println("consumer 1 get item:" + item));

        many.asFlux()
                .subscribe(item -> System.out.println("consumer 2 get item:" + item));

        System.in.read();
    }

丢失元素:
默认订阅者从订阅的那一刻开始接元素
在这里插入图片描述

重放replay()

解决:后到达的订阅者,丢失之前元素

        Sinks.Many<Object> many = Sinks.many()
                .replay()
                .all();

不再丢失元素:底层利用队列,缓存之前数据
replay()

缓存

range(),just() 默认没有缓存

    @Test
    public void testCache() throws Exception{
        Flux<Integer> cache = Flux.range(1, 10)
                .delayElements(Duration.ofSeconds(1))
                .cache(3);//缓存之前3个元素
        cache.subscribe();
        new  Thread(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            cache.subscribe(item -> System.out.println("subscriber1 get item:" + item));

        }).start();
        System.in.read();
    }

结果:5秒后打印订阅者中元素,缓存3个
在这里插入图片描述

阻塞式api

block():获取元素

    @Test
    public void testBlock() {
        List<Integer> list = Flux.range(1, 10)
                .map(item -> item * 9)
                .collectList()
                .block();
        list.stream()
                .forEach(System.out::println);
    }

并发流

百万数据,8个线程,每个线程处理100,进行分批处理一直处理结束

    @Test
    public void testParallelFlux() {
        Flux.range(1,100)
                .buffer(10)
                .parallel(8)
                .runOn(Schedulers.newParallel("stream"))
                .log()
                .subscribe(System.out::println);
    }

在这里插入图片描述
收集成一个集合

        Flux.range(1,100)
                .buffer(10)
                .parallel(8)
                .runOn(Schedulers.newParallel("stream"))
                .log()
                .flatMap(list -> Flux.fromIterable(list))
                .collectSortedList(Integer::compareTo)
                .subscribe(System.out::println);

Context-api

ThreadLocal在响应式编程中无法使用
响应式中,数据流期间共享数据,ContextApi
Context:读写
ContextView:只读
上下文数据传播

    @Test
    public void testContextApi() {
        //支持context的中间操作api
        Flux.just(1,2,3)
                .transformDeferredContextual((value,context) -> {
                    System.out.println("get value:" + value);
                    System.out.println("context:" +context);
                    return value.map(item -> context.get("prefix") + String.valueOf(item));
                })
                //上游能拿到下游最近一次数据
                .contextWrite(Context.of("prefix","myPrefix:")) //类似threadLocal,共享数据,上游的所有人能看到
                //context由下游传到上游
                .subscribe(System.out::println);
        //响应式编程:dao(数据源)->service-controller;从下游反向传播
    }

结果:
在这里插入图片描述

文章来源:https://blog.csdn.net/bennettzhou404/article/details/135436808
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。