概念
- 最基础的 Observable 是观察者模式
- 生产者消费者模式的话 需要 使用 Flowable, 因为需要积压
流式过程
- 起点,生产事件
- 过程, 操作函数进行加工
- 终点,处理事件
例子:
| 12
 3
 4
 5
 6
 7
 8
 9
 
 | Observable.just(1,2,3,4).map(new Function<Integer, Integer>() {public Integer apply(Integer integer) throws Exception {
 return integer;
 }
 }).subscribe(new Consumer<Integer>() {
 public void accept(Integer integer) throws Exception {
 System.out.println(integer);
 }
 });
 
 | 
关键点
谁,如何触发开始执行任务?
触发任务由最末端的 subscribe 触发.
他内部不断让上层的 对象调用 subscribeActual 
| 12
 3
 
 | public final void subscribe(Observer<? super T> observer) {subscribeActual(observer);
 }
 
 | 

哪一个线程执行的 rx 计算?
主要是这2个 subscribeOn 和 observeOn 如何判断
他们和上面一样都是 Observable 都是通过 subscribeActual, 触发。
ObservableObserveOn 关键代码
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 
 | protected void subscribeActual(Observer<? super T> observer) {
 Scheduler.Worker w = scheduler.createWorker();
 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
 }
 
 static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
 implements Observer<T>, Runnable {
 
 public void onNext(T t) {
 queue.offer(t);
 }
 }
 
 | 
再看看 ObservableMap 的代码
| 12
 3
 
 | public void subscribeActual(Observer<? super U> t) {source.subscribe(new MapObserver<T, U>(t, function));
 }
 
 | 
也就是 和 map 一样, 从上层取数据, 只不过 onNext 吧任务派发到queue 内。
上一个例子
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 
 | Observable.just(1, 2, 3, 4).map(new Function<Integer, Integer>() {
 public Integer apply(Integer integer) throws Exception {
 return integer;
 }
 })
 .observeOn(Schedulers.io())
 .subscribe(new Consumer<Integer>() {
 public void accept(Integer integer) throws Exception {
 System.out.println(integer);
 }
 });
 
 | 
执行图:

也就是说 observeOn 将后面 前面的任务 一个个的作为 自任务,派发到queue 内, 线程内将 上层的 数据源里的数据 一个一个给 后面放到 线程池内执行。
ObservableSubscribeOn 关键代码
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 
 | public void subscribeActual(final Observer<? super T> s) {final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
 
 s.onSubscribe(parent);
 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
 }
 
 final class SubscribeTask implements Runnable {
 @Override
 public void run() {
 
 
 source.subscribe(parent);
 }
 }
 
 | 
可见他和上面的 向下执行的不同, 他是用了 onSubscribe 也就是说, 他将 subscribe 的行为放到线程池内执行
上一个例子
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 
 | Observable.just(1, 2, 3, 4).map(new Function<Integer, Integer>() {
 public Integer apply(Integer integer) throws Exception {
 return integer;
 }
 })
 .subscribeOn(Schedulers.io())
 .subscribe(new Consumer<Integer>() {
 public void accept(Integer integer) throws Exception {
 System.out.println(integer);
 }
 });
 
 | 

这里 map 和 just 的操作 会在 线程内执行,也就是说,代码上面的 会在线程内执行
参考
https://zhuanlan.zhihu.com/p/23584382
https://zhuanlan.zhihu.com/p/23585300
https://segmentfault.com/a/1190000004856071