netty

概念

  1. 最基础的 Observable 是观察者模式
  2. 生产者消费者模式的话 需要 使用 Flowable, 因为需要积压

流式过程

  1. 起点,生产事件
  2. 过程, 操作函数进行加工
  3. 终点,处理事件

例子:

1
2
3
4
5
6
7
8
9
Observable.just(1,2,3,4).map(new Function<Integer, Integer>() {
public Integer apply(Integer integer) throws Exception {
return integer;
}
}).subscribe(new Consumer<Integer>() {
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});

关键点

谁,如何触发开始执行任务?

触发任务由最末端的 subscribe 触发.

他内部不断让上层的 对象调用 subscribeActual

1
2
3
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}

哪一个线程执行的 rx 计算?

主要是这2个 subscribeOnobserveOn 如何判断

他们和上面一样都是 Observable 都是通过 subscribeActual, 触发。

ObservableObserveOn 关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
protected void subscribeActual(Observer<? super T> observer) {
// 新建一个线程
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {

// 上层任务直接push到 queue
public void onNext(T t) {
queue.offer(t);
}
}

再看看 ObservableMap 的代码

1
2
3
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}

也就是 和 map 一样, 从上层取数据, 只不过 onNext 吧任务派发到queue 内。

上一个例子

1
2
3
4
5
6
7
8
9
10
11
12
Observable.just(1, 2, 3, 4)
.map(new Function<Integer, Integer>() {
public Integer apply(Integer integer) throws Exception {
return integer;
}
})
.observeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});

执行图:

也就是说 observeOn 将后面 前面的任务 一个个的作为 自任务,派发到queue 内, 线程内将 上层的 数据源里的数据 一个一个给 后面放到 线程池内执行。

ObservableSubscribeOn 关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
// 后来的观察者 如果被订阅了,则触发他。
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

final class SubscribeTask implements Runnable {
@Override
public void run() {
// 整个 subscribe 行为在 线程内执行
// source 是 代码上的上一层
source.subscribe(parent);
}
}

可见他和上面的 向下执行的不同, 他是用了 onSubscribe 也就是说, 他将 subscribe 的行为放到线程池内执行

上一个例子

1
2
3
4
5
6
7
8
9
10
11
12
Observable.just(1, 2, 3, 4)
.map(new Function<Integer, Integer>() {
public Integer apply(Integer integer) throws Exception {
return integer;
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});

这里 map 和 just 的操作 会在 线程内执行,也就是说,代码上面的 会在线程内执行

参考

https://zhuanlan.zhihu.com/p/23584382

https://zhuanlan.zhihu.com/p/23585300

https://segmentfault.com/a/1190000004856071

avatar

lelouchcr's blog