order of execution for RxJS Observable from chained promises

In an AnuglarJS 2 application I'd like to make Observable from chained promises. As promises provide one time result the first step was to use Observable.fromPromise() and then mapAll() on meta observable to deal with complete of each fromPromise. I've here found useful this SO question RxJS: How to have one Observer process multiple Observables?

As the accepted answer from above question addresses simple events, I've easily prepared own solution using Observable.fromPromise(someComposedPromise) in place of Observable.fromEvent(someEvent). Unfortunately while all works fine for simple single promise, the problem arises where promise is composed from two promises because of the the order in which promises are resolved.

For the sake of simplicity and case isolation lets assume we have some existing external DumbCache (and whats I'd like to use is Ionic 2 LocalStorage where the most simple variant looks similar to this one):

class DumbCache {
  cache = {};

  get(key) {
    return new Promise((resolve, reject) => {
      var value = this.cache[key];
      resolve(value);
    });
  }

  set(key, value) {
    return new Promise((resolve, reject) => {
      this.cache[key] = value;
      resolve();
    });
  }
}

then the approach described above is:

class CacheValueObservable {
  private cache: DumbCache;

  constructor(private key: string) {
    this.cache = new DumbCache();
  }

  /*
   * meta observer to handle promises from observables with all results and errors
   * thanks to ReplaySubject(1) current value is available immediately after subscribing 
   */
  private _valueSource$$ = new Rx.ReplaySubject(1);
  private _value$ = this._valueSource$$.mergeAll();

  public value$() { return this._value$; }

  public updateValue(value) {
    this._valueSource$$.next(
      Rx.Observable.fromPromise(
        this.cache.set(this.key, value)
        .then(() => this.cache.get(this.key))
      )
    );
  }
}

now for the following code:

let cacheValueObservable = new CacheValueObservable("TEST_KEY");
cacheValueObservable.updateValue('VALUE 0');

cacheValueObservable.value$().subscribe(
    val => {
      console.log('val:' + val);
    },
    val => console.log('err', val.stack),
    () => (console.log('complete'))
);

cacheValueObservable.updateValue('VALUE 1');
cacheValueObservable.updateValue('VALUE 2');

console.log('end');

the result is:

starting...
end
val:VALUE 2
val:VALUE 2
val:VALUE 2

while obviously I'd like to achieve

starting...
end
val:VALUE 0
val:VALUE 1
val:VALUE 2

complete example here: http://jsbin.com/wiheki/edit?js,console

Answers:

Answer

While trying to express the question to be well described I was still investigating and understanding the issue better and better. The main point is that first promise this.cache.set(this.key, value) may be actually immediately resolved while Observable.fromPromise makes no guarantee in which order all the following chained promises are being resolved.

The problem was caused by the fact that the last promise from each chain was executed only after state was altered by first promise of the last chain (thus VALUE 2).

After all the solution looks pretty simple from code point of view yet it is not that obvious as consist of two key changes:

  • defer the initial promise execution until mergeAll phase by using Observable.defer instead of Observable.fromPromise
  • limit (or actually disable) the concurrency of merging the promises by using mergeAll(1)

thus working solution looks like this:

class CacheValueObservable {
  private cache: DumbCache;

  constructor(private key: string) {
    this.cache = new DumbCache();
  }

  /*
   * meta observer to handle promises from observables with all results and errors
   * thanks to ReplaySubject(1) current value is available immediately after subscribing 
   */
  private _valueSource$$ = new Rx.ReplaySubject(1);
  // disable merge concurrency 
  private _value$ = this._valueSource$$.mergeAll(1);

  public value$() { return this._value$; }

  public updateValue(value) {
    this._valueSource$$.next(
      // defer promise resolution to ensure they will be fully resolved
      // one by one thanks to no concurrency in mergeAll
      Rx.Observable.defer(() =>
        this.cache.set(this.key, value)
        .then(() => this.cache.get(this.key))
      )
    );
  }
}

and here is live example: http://jsbin.com/bikawo/edit?html,js,console

Tags

Recent Questions

Top Questions

Home Tags Terms of Service Privacy Policy DMCA Contact Us

©2020 All rights reserved.