RXJS Wait for all observables in an array to complete (or error)

I'm pushing observables into an array like such...

var tasks$ = [];
tasks$.push(Observable.timer(1000));
tasks$.push(Observable.timer(3000));
tasks$.push(Observable.timer(10000));

I want an Observable that emits when all tasks$ have completed. Keep in mind, in practice, tasks$ doesn't have a known number of Observables.

I've tried Observable.zip(tasks$).subscribe() but this seems to fail in the event that there is only 1 task, and is leading me to believe that ZIP requires an even number of elements in order to work the way I would expect.

I've tried Observable.concat(tasks$).subscribe() but the result of the concat operator just seems to be an array of observables... e.g. basically the same as the input. You can't even call subscribe on it.

In C# this would be akin to Task.WhenAll(). In ES6 promise it would be akin to Promise.all().

I've come across a number of SO questions but they all seem to deal with waiting on a known number of streams (e.g. mapping them together).

Answers:

Answer

If you want to compose an observable that emits when all of the source observables complete, you can use forkJoin:

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/forkJoin';
import 'rxjs/add/operator/first';

var tasks$ = [];
tasks$.push(Observable.timer(1000).first());
tasks$.push(Observable.timer(3000).first());
tasks$.push(Observable.timer(10000).first());
Observable.forkJoin(...tasks$).subscribe(results => { console.log(results); });
Answer

You can make usage of zip.

Combines multiple Observables to create an Observable whose values are calculated from the values, in order, of each of its input Observables.

const obsvA = this._service.getObjA();
const obsvB = this._service.getObjB();
// or with array
// const obsvArray = [obsvA, obsvB];

const zip = Observable.zip(obsvA, obsvB);
// or with array
// const zip = Observable.zip(...obsvArray);
zip.subscribe(
  result => console.log(result), // result is an array with the responses [respA, respB]
);

Things to consider:

  • Doesn't need to be an even number of observables.
  • zip visually
  • enter image description here As said here,

    The zip operator will subscribe to all inner observables, waiting for each to emit a value. Once this occurs, all values with the corresponding index will be emitted. This will continue until at least one inner observable completes.

  • When one of the observables throws an error (or even both of them), the subscription is closed (onComplete on complete is called), and with a onError method, you only get the first error.
  • zip.subscribe(
      result => console.log(result), // result is an array with the responses [respA, respB]
      error => console.log(error), // will return the error message of the first observable that throws error and then finish it
      () => console.log ('completed after first error or if first observable finishes)
    );
    
    Answer
    // waits for all Observables no matter of success/fails each of them
    // returns array of items
    // each item represent even first value of Observable or it's error
    export function waitAll(args: Observable<any>[]): Observable<any[]> {
      const final = new Subject<any[]>();
      const flags = new Array(args.length);
      const result = new Array(args.length);
      let total = args.length;
      for (let i = 0; i < args.length; i++) {
        flags[i] = false;
        args[i].subscribe(
          res => {
            console.info('waitAll ' + i + ' ok ', res);
            if (flags[i] === false) {
              flags[i] = true;
              result[i] = res;
              total--;
              if (total < 1) {
                final.next(result);
              }
            }
          },
          error => {
            console.error('waitAll ' + i + ' failed ', error);
            if (flags[i] === false) {
              flags[i] = true;
              result[i] = error;
              total--;
              if (total < 1) {
                final.next(result);
              }
            }
          }
        );
      }
      return final.asObservable();
    }
    

    unit test:

    describe('waitAll', () => {
      it('should wait for all observables', async () => {
        const o1 = new Subject();
        const o2 = new Subject();
        const o3 = new Subject();
    
        const o = waitAll([o1, o2, o3]);
        const res = {arr: []};
        o.subscribe(result => res.arr = result, err => res.arr = []);
    
        expect(res.arr).toEqual([]);
        o1.next('success1');
        expect(res.arr).toEqual([]);
        o2.error('failed2')
        expect(res.arr).toEqual([]);
        o3.next('success3')
        expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
    
        o1.next('success1*');
        expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
        o2.error('failed2*')
        expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
        o3.next('success3*')
        expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
      });
    });
    
    Answer

    For me this sample was best solution.

    const source = Observable.interval(500);
    const example = source.sample(Observable.interval(2000));
    const subscribe = example.subscribe(val => console.log('sample', val));
    

    So.. only when second (example) emit - you will see last emited value of first (source).

    In my task, I wait form validation and other DOM event.

    Tags

    Recent Questions

    Top Questions

    Home Tags Terms of Service Privacy Policy DMCA Contact Us

    ©2020 All rights reserved.