RxJs: updating values in a list from an other stream identified by an ID

I have two RX infinite streams (let's call them mainValues and decoratorValues). The one called mainValues are adding elements to a list called valueList. The other stream (decoratorValues) should assign properties to these elements in the list.

Elements in the two streams are arriving at random times in random order, and I need to solve it in either order that mainValues are put into the list as soon as available, while decoratorValues are not lost until their corresponding elements (identified by an ID) arrive.

const mainValues = [ { id: 1, someValue: 'a' }, { id: 2, someValue: 'b'  }, { id: 3, someValue: 'c' }];
const decoratorValues = [ { id: 3, decoratedValue: true }, { id: 2, decoratedValue: false }];

// delay here is only for demonstration purpose 
// a time independent solution can be tested with swapping it between these two
const mainValueStream = Rx.Observable.fromArray(mainValues).delay(2000);
const decoratorStream = Rx.Observable.fromArray(decoratorValues);

const valueList = [];

mainValueStream.subscribe(
  val => valueList.push(val), 
  (err) => console.log(err),
  () => console.log('complete', valueList));

// CODE time independent merging of the two streams go here

// resulting valueList should be:  [ { id: 1, someValue: 'a' }, { id: 2, someValue: 'b', decoratedValue: false }, { id: 3, someValue: 'c', decoratedValue: true } ]

Solution idea

mainValueStream
  .concatMap(main => {
     decoratorStream
       .filter(dec => dec.id === main.id)
       .do(dec => {
          main.decoratedValue = dec.decoratedValue;
     }).subscribe(x => x);

     return [main];
  })
  .subscribe(
    val => valueList.push(val), 
    (err) => console.log(err),
    () => console.log('complete', valueList));

Unfortunately this only works when all of the decorator values has arrived before the main value stream.

My second idea would be adding a second stream that checks if the given value is already in valueList and updates it, if it has the element with the appropriate ID.

Is there a time-independent solution that results in only one stream? Or maybe I'm stuck because I want to solve it with one stream?

Answers:

Answer

It seems like the best option is forkJoin() here. I was thinking about using zip() but since you don't have the same order and number of items in each list you'll probably have to wait until both complete and then work with their results.

const mainValues = [ { id: 1, someValue: 'a' }, { id: 2, someValue: 'b'  }, { id: 3, someValue: 'c' }];
const decoratorValues = [ { id: 3, decoratedValue: true }, { id: 2, decoratedValue: false }];

// delay here is only for demonstration purpose
// a time independent solution can be tested with swapping it between these two
const mainValueStream = Rx.Observable.from(mainValues).delay(500).toArray();
const decoratorStream = Rx.Observable.from(decoratorValues).toArray();

Observable.forkJoin(mainValueStream, decoratorStream, (main, decorator) => {
    main.forEach(m => {
      decorator.forEach(d => {
        if (m.id === d.id) {
          Object.assign(m, d);
        }
      })
    });

    return main;
  })
  .subscribe(val => console.log(val));

I'm using two nested forEach()s within forkJoin()'s result selector function because it seems like the easies way (note that Object.assign exists since ES6). I could split it into two Observable streams then filter and merge them but that would get very messy.

This prints to console:

[ { id: 1, someValue: 'a' },
  { id: 2, someValue: 'b', decoratedValue: false },
  { id: 3, someValue: 'c', decoratedValue: true } ]

EDIT:

If you need everything to be asynchronous than you can use scan() to collect decorators as they arrive:

const mainValueStream = Rx.Observable.from(mainValues)
  .concatMap(val => Observable.of(val).delay(Math.random() * 1000));

const decoratorStream = Rx.Observable.from(decoratorValues)
  .concatMap(val => Observable.of(val).delay(Math.random() * 1000))
  .scan((acc, val) => {
    acc.push(val);
    return acc;
  }, [])
  .share();


mainValueStream
  .mergeMap(main => decoratorStream
    .mergeAll()
    .filter(d => d.id === main.id)
    .defaultIfEmpty(null)
    .map(d => {
      if (d) {
        return Object.assign(main, d);
      } else {
        return main;
      }
    })
    .take(1)
  )
  .subscribe(val => console.log(val));

This prints results in random order as they arrive:

{ id: 2, someValue: 'b', decoratedValue: false }
{ id: 1, someValue: 'a' }
{ id: 3, someValue: 'c', decoratedValue: true }

The core is collection items from decoratorStream as they arrive. Then on each item from mainValueStream I make another Observable that doesn't emit until it receives its matching decorate item.

Downside of this is that the accumulator in scan() is still growing and there's no easy way I can release already used items.

Tags

Recent Questions

Top Questions

Home Tags Terms of Service Privacy Policy DMCA Contact Us

©2020 All rights reserved.