In an AnuglarJS 2 application I'd like to make Observable from chained promises. As promises provide one time result the first step was to use Observable.fromPromise()
and then mapAll()
on meta observable to deal with complete
of each fromPromise
. I've here found useful this SO question RxJS: How to have one Observer process multiple Observables?
As the accepted answer from above question addresses simple events, I've easily prepared own solution using Observable.fromPromise(someComposedPromise)
in place of Observable.fromEvent(someEvent)
. Unfortunately while all works fine for simple single promise, the problem arises where promise is composed from two promises because of the the order in which promises are resolved.
For the sake of simplicity and case isolation lets assume we have some existing external DumbCache
(and whats I'd like to use is Ionic 2 LocalStorage
where the most simple variant looks similar to this one):
class DumbCache {
cache = {};
get(key) {
return new Promise((resolve, reject) => {
var value = this.cache[key];
resolve(value);
});
}
set(key, value) {
return new Promise((resolve, reject) => {
this.cache[key] = value;
resolve();
});
}
}
then the approach described above is:
class CacheValueObservable {
private cache: DumbCache;
constructor(private key: string) {
this.cache = new DumbCache();
}
/*
* meta observer to handle promises from observables with all results and errors
* thanks to ReplaySubject(1) current value is available immediately after subscribing
*/
private _valueSource$$ = new Rx.ReplaySubject(1);
private _value$ = this._valueSource$$.mergeAll();
public value$() { return this._value$; }
public updateValue(value) {
this._valueSource$$.next(
Rx.Observable.fromPromise(
this.cache.set(this.key, value)
.then(() => this.cache.get(this.key))
)
);
}
}
now for the following code:
let cacheValueObservable = new CacheValueObservable("TEST_KEY");
cacheValueObservable.updateValue('VALUE 0');
cacheValueObservable.value$().subscribe(
val => {
console.log('val:' + val);
},
val => console.log('err', val.stack),
() => (console.log('complete'))
);
cacheValueObservable.updateValue('VALUE 1');
cacheValueObservable.updateValue('VALUE 2');
console.log('end');
the result is:
starting...
end
val:VALUE 2
val:VALUE 2
val:VALUE 2
while obviously I'd like to achieve
starting...
end
val:VALUE 0
val:VALUE 1
val:VALUE 2
complete example here: http://jsbin.com/wiheki/edit?js,console
While trying to express the question to be well described I was still investigating and understanding the issue better and better. The main point is that first promise this.cache.set(this.key, value)
may be actually immediately resolved while Observable.fromPromise
makes no guarantee in which order all the following chained promises are being resolved.
The problem was caused by the fact that the last promise from each chain was executed only after state was altered by first promise of the last chain (thus VALUE 2
).
After all the solution looks pretty simple from code point of view yet it is not that obvious as consist of two key changes:
mergeAll
phase by using Observable.defer
instead of Observable.fromPromise
mergeAll(1)
thus working solution looks like this:
class CacheValueObservable {
private cache: DumbCache;
constructor(private key: string) {
this.cache = new DumbCache();
}
/*
* meta observer to handle promises from observables with all results and errors
* thanks to ReplaySubject(1) current value is available immediately after subscribing
*/
private _valueSource$$ = new Rx.ReplaySubject(1);
// disable merge concurrency
private _value$ = this._valueSource$$.mergeAll(1);
public value$() { return this._value$; }
public updateValue(value) {
this._valueSource$$.next(
// defer promise resolution to ensure they will be fully resolved
// one by one thanks to no concurrency in mergeAll
Rx.Observable.defer(() =>
this.cache.set(this.key, value)
.then(() => this.cache.get(this.key))
)
);
}
}
and here is live example: http://jsbin.com/bikawo/edit?html,js,console
©2020 All rights reserved.