概念
- 最基础的 Observable 是观察者模式
- 生产者消费者模式的话 需要 使用
Flowable
, 因为需要积压
流式过程
- 起点,生产事件
- 过程, 操作函数进行加工
- 终点,处理事件
例子:
1 2 3 4 5 6 7 8 9
| Observable.just(1,2,3,4).map(new Function<Integer, Integer>() { public Integer apply(Integer integer) throws Exception { return integer; } }).subscribe(new Consumer<Integer>() { public void accept(Integer integer) throws Exception { System.out.println(integer); } });
|
关键点
谁,如何触发开始执行任务?
触发任务由最末端的 subscribe 触发.
他内部不断让上层的 对象调用 subscribeActual
1 2 3
| public final void subscribe(Observer<? super T> observer) { subscribeActual(observer); }
|
哪一个线程执行的 rx 计算?
主要是这2个 subscribeOn
和 observeOn
如何判断
他们和上面一样都是 Observable 都是通过 subscribeActual
, 触发。
ObservableObserveOn 关键代码
1 2 3 4 5 6 7 8 9 10 11 12 13
| protected void subscribeActual(Observer<? super T> observer) { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); }
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { public void onNext(T t) { queue.offer(t); } }
|
再看看 ObservableMap 的代码
1 2 3
| public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); }
|
也就是 和 map 一样, 从上层取数据, 只不过 onNext 吧任务派发到queue 内。
上一个例子
1 2 3 4 5 6 7 8 9 10 11 12
| Observable.just(1, 2, 3, 4) .map(new Function<Integer, Integer>() { public Integer apply(Integer integer) throws Exception { return integer; } }) .observeOn(Schedulers.io()) .subscribe(new Consumer<Integer>() { public void accept(Integer integer) throws Exception { System.out.println(integer); } });
|
执行图:
也就是说 observeOn 将后面 前面的任务 一个个的作为 自任务,派发到queue 内, 线程内将 上层的 数据源里的数据 一个一个给 后面放到 线程池内执行。
ObservableSubscribeOn 关键代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
final class SubscribeTask implements Runnable { @Override public void run() { source.subscribe(parent); } }
|
可见他和上面的 向下执行的不同, 他是用了 onSubscribe
也就是说, 他将 subscribe 的行为放到线程池内执行
上一个例子
1 2 3 4 5 6 7 8 9 10 11 12
| Observable.just(1, 2, 3, 4) .map(new Function<Integer, Integer>() { public Integer apply(Integer integer) throws Exception { return integer; } }) .subscribeOn(Schedulers.io()) .subscribe(new Consumer<Integer>() { public void accept(Integer integer) throws Exception { System.out.println(integer); } });
|
这里 map 和 just 的操作 会在 线程内执行,也就是说,代码上面的 会在线程内执行
参考
https://zhuanlan.zhihu.com/p/23584382
https://zhuanlan.zhihu.com/p/23585300
https://segmentfault.com/a/1190000004856071