Observable.timer(5, TimeUnit.MILLISECONDS).map(value->{
return doSomething();
}).subscribe(System.out::println);
}
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction);
Observable.zip(Observable.timer(5,TimeUnit.MILLISECONDS)
,Observable.just(doSomething()), (x,y)->y)
.subscribe(System.out::println));
Observable.just(doSomething())
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(v->Utils.printlnWithThread(v.toString()););
Observable.create(s->{s.onNext(doSomething());})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(v->{
Utils.printlnWithThread(v.toString());
});
public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory);
Observable.defer(()->Observable.just(doSomething()))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(v->{Utils.printlnWithThread(v.toString());
});
Observable.just(doSomething())
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(v->{Utils.printlnWithThread(v.toString());
private static <T> Observable<T> applySchedulers(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation());
}
applySchedulers(Observable.from(someSource).map(new Func1<Data, Data>() {
@Override public Data call(Data data) {
return manipulate(data);
}
})
).subscribe(new Action1<Data>() {
@Override public void call(Data data) {
doSomething(data);
}
});
public Observable compose(Transformer<? super T, ? extends R> transformer);
private static <T> Transformer<T, T> applySchedulers() {
return new Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation());
}
};
}
Observable.just(doSomething()).compose(applySchedulers())
.subscribe(v->{Utils.printlnWithThread(v.toString());
});
private static <T> Transformer<T, T> applySchedulers() {
return observable->observable.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation());
}
Observable.concat(getDataFromCache(),getDataFromNetwork()).first()
.subscribe(v->System.out.println("result:"+v));
//从缓存获取数据
private static Observable<String> getDataFromCache(){
return Observable.create(s -> {
//dosomething to get data
int value = new Random().nextInt();
value = value%2;
if (value!=0){
s.onNext("data from cache:"+value); //产生数据
}
//s.onError(new Throwable("none"));
s.onCompleted();
}
);
}
//从网络获取数据
private static Observable<String> getDataFromNetwork(){
return Observable.create(s -> {
for (int i = 0; i < 10; i++) {
Utils.println("obs2 generate "+i);
s.onNext("data from network:" + i); //产生数据
}
s.onCompleted();
}
);
}
机械节能产品生产企业官网模板...
大气智能家居家具装修装饰类企业通用网站模板...
礼品公司网站模板
宽屏简约大气婚纱摄影影楼模板...
蓝白WAP手机综合医院类整站源码(独立后台)...苏ICP备2024110244号-2 苏公网安备32050702011978号 增值电信业务经营许可证编号:苏B2-20251499 | Copyright 2018 - 2025 源码网商城 (www.ymwmall.com) 版权所有