Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.
final PublishSubject<Integer> subject = PublishSubject.create();
subject.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
unSafeCount = unSafeCount + integer;
Log.d("TAG", "onNext: " + unSafeCount);
}
});
findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
final int unit = 1;
for(int i = 0;i < 10;i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 1000; j++) {
subject.onNext(unit);
}
}
}).start();
}
}
});
final PublishSubject<Integer> subject = PublishSubject.create();
subject.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
unSafeCount = unSafeCount + integer;
count.addAndGet(integer);
Log.d("TAG", "onNext: " + count);
}
});
final SerializedSubject<Integer, Integer> ser = new SerializedSubject<Integer, Integer>(subject);
findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
final int unit = 1;
for(int i = 0;i < 10;i++){
new Thread(new Runnable() {
@Override
public void run() {
for(int j = 0;j < 1000;j++){
ser.onNext(unit);
}
}
}).start();
}
}
});
@Override
public void onNext(T t) {
if (terminated) {
return;
}
synchronized (this) {
if (terminated) {
return;
}
if (emitting) {
FastList list = queue;
if (list == null) {
list = new FastList();
queue = list;
}
list.add(nl.next(t));
return;
}
emitting = true;
}
try {
actual.onNext(t);
} catch (Throwable e) {
terminated = true;
Exceptions.throwOrReport(e, actual, t);
return;
}
for (;;) {
for (int i = 0; i < MAX_DRAIN_ITERATION; i++) {
FastList list;
synchronized (this) {
list = queue;
if (list == null) {
emitting = false;
return;
}
queue = null;
}
for (Object o : list.array) {
if (o == null) {
break;
}
try {
if (nl.accept(actual, o)) {
terminated = true;
return;
}
} catch (Throwable e) {
terminated = true;
Exceptions.throwIfFatal(e);
actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
return;
}
}
}
}
}
Observable o1 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> subscriber) {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
subscriber.onNext(1);
subscriber.onCompleted();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
});
Observable o2 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(2);
subscriber.onCompleted();
}
});
Observable.merge(o1,o2)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer i) {
Log.d("TAG", "onNext: " + i);
}
});
@Override
public void onNext(Observable<? extends T> t) {
if (t == null) {
return;
}
if (t == Observable.empty()) {
emitEmpty();
} else
if (t instanceof ScalarSynchronousObservable) {
tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
} else {
InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
addInner(inner);
t.unsafeSubscribe(inner);
emit();
}
}
Concat waits to subscribe to each additional Observable that you pass to it until the previous Observable completes.
if (wip.getAndIncrement() != 0) {
return;
}
final int delayErrorMode = this.delayErrorMode;
for (;;) {
if (actual.isUnsubscribed()) {
return;
}
if (!active) {
if (delayErrorMode == BOUNDARY) {
if (error.get() != null) {
Throwable ex = ExceptionsUtils.terminate(error);
if (!ExceptionsUtils.isTerminated(ex)) {
actual.onError(ex);
}
return;
}
}
boolean mainDone = done;
Object v = queue.poll();
boolean empty = v == null;
if (mainDone && empty) {
Throwable ex = ExceptionsUtils.terminate(error);
if (ex == null) {
actual.onCompleted();
} else
if (!ExceptionsUtils.isTerminated(ex)) {
actual.onError(ex);
}
return;
}
if (!empty) {
Observable<? extends R> source;
try {
source = mapper.call(NotificationLite.<T>instance().getValue(v));
} catch (Throwable mapperError) {
Exceptions.throwIfFatal(mapperError);
drainError(mapperError);
return;
}
if (source == null) {
drainError(new NullPointerException("The source returned by the mapper was null"));
return;
}
if (source != Observable.empty()) {
if (source instanceof ScalarSynchronousObservable) {
ScalarSynchronousObservable<? extends R> scalarSource = (ScalarSynchronousObservable<? extends R>) source;
active = true;
arbiter.setProducer(new ConcatMapInnerScalarProducer<T, R>(scalarSource.get(), this));
} else {
ConcatMapInnerSubscriber<T, R> innerSubscriber = new ConcatMapInnerSubscriber<T, R>(this);
inner.set(innerSubscriber);
if (!innerSubscriber.isUnsubscribed()) {
active = true;
source.unsafeSubscribe(innerSubscriber);
} else {
return;
}
}
request(1);
} else {
request(1);
continue;
}
}
}
if (wip.decrementAndGet() == 0) {
break;
}
}
机械节能产品生产企业官网模板...
大气智能家居家具装修装饰类企业通用网站模板...
礼品公司网站模板
宽屏简约大气婚纱摄影影楼模板...
蓝白WAP手机综合医院类整站源码(独立后台)...苏ICP备2024110244号-2 苏公网安备32050702011978号 增值电信业务经营许可证编号:苏B2-20251499 | Copyright 2018 - 2025 源码网商城 (www.ymwmall.com) 版权所有