जब REMOTE_DATA_STARTED कार्रवाई भेजी जाती है तो मेरा महाकाव्य जाग जाता है और यह action.url और action.owner का उपयोग करके डेटा प्राप्त करता है।

मुझे यह सुनिश्चित करने की ज़रूरत है कि मैं एक ही मालिक/यूआरएल को दो समवर्ती कॉल शुरू नहीं करता हूं। एक बार मालिक/यूआरएल के लिए कॉल पूरा हो जाने के बाद, उसी मालिक/यूआरएल के लिए बाद में एक और कॉल शुरू करना ठीक है।

रद्द करना वह नहीं है जिसे मैं यहां ढूंढ रहा हूं क्योंकि मैं मौजूदा अनुरोध (अनुरोधों) को रद्द नहीं करना चाहता, मैं एक नया अनुरोध शुरू करने से रोकना चाहता हूं।

मुझे लगता है कि मुझे exhaustMap और groupBy के मिश्रण की जरूरत है, लेकिन मुझे नहीं पता कि यहां से कहां जाना है।

इस बिंदु पर यह मेरा महाकाव्य है, यह सभी समवर्ती कॉलों को अस्वीकार करता है, स्वामी/यूआरएल द्वारा नहीं

const myEpic = action$ =>
  action$.ofType("REMOTE_DATA_STARTED").exhaustMap(action =>
    fakeAjaxCall().map(() => {
      return { type: "COMPLETED", owner: action.owner, url: action.url };
    })
  );

लाइव ट्राई करें

मैंने इस परीक्षण प्रोजेक्ट को एक असफल परीक्षण मामले के साथ बनाया है। क्या आप यह काम करने में मेरी मदद कर सकते हैं?

https://codesandbox.io/s/l71zq6x8zl

जैसा कि आप देखेंगे, test1_exhaustMapByActionType_easy ठीक काम करता है, यह test2_exhaustMapByActionTypeOwnerAndUrl विफल रहता है।

सुनिश्चित करें कि आपने परीक्षण के परिणाम देखने के लिए कंसोल का विस्तार किया है।

4
Sylvain 29 मार्च 2018, 21:44

2 जवाब

समाधान तक पहुंचने के कई तरीके हैं। मेरा पहला विचार यह था कि आप कार्रवाइयों को स्वामी+URL विषयों में विभाजित कर सकते हैं और फिर उनके साथ काम कर सकते हैं:

const myEpic = (action$) => {
    const completed$ = new Subject();
    const flights = new DefaultMap((pair$) =>
        pair$.exhaustMap((action) =>
            fakeAjaxCall().map(() => ({
                ...action,
                type: 'COMPLETED',
            }))
        )
        .subscribe((action) => completed$.next(action))
    );
    action$.ofType('REMOTE_DATA_STARTED')
        .subscribe((action) => {
            flights.get(`${action.owner}+${action.url}`).next(action);
        });

    return completed$;
};

यह काम करता है, लेकिन माना जाता है कि इसे किसी प्रकार का "डिफ़ॉल्ट नक्शा" बनाए रखने की आवश्यकता होती है जहां नए मालिक + यूआरएल जोड़े को एक नया Subject मिलता है (मैंने एक त्वरित कार्यान्वयन लिखा)। एक परीक्षण मामला जो इसे पास करता है:

test('myEpic does both drop actions and NOT drop actions for two owner+url pairs', async () => {
    const arrayOfAtMost = (action$, limit) => action$.take(limit)
        .timeoutWith(1000, Observable.empty())
        .toArray().toPromise();
    const action$ = new ActionsObservable(
        Observable.create((observer) => {
            // Jim #1 emits four (4) concurrent calls—we expect only two to be COMPLETED, one per URL
            observer.next({ type: 'REMOTE_DATA_STARTED', url: 'google.com', owner: 'jim1' });
            observer.next({ type: 'REMOTE_DATA_STARTED', url: 'google.com', owner: 'jim1' });
            observer.next({ type: 'REMOTE_DATA_STARTED', url: 'google.org', owner: 'jim1' });
            observer.next({ type: 'REMOTE_DATA_STARTED', url: 'google.org', owner: 'jim1' });

            // Jim #2 emits two (2) calls at the same time as Jim #1—we expect only one to be COMPLETED, deduped URLs
            observer.next({ type: 'REMOTE_DATA_STARTED', url: 'google.biz', owner: 'jim2' });
            observer.next({ type: 'REMOTE_DATA_STARTED', url: 'google.biz', owner: 'jim2' });

            // Once all of the above calls are completed, Jim #1 and Jim #2 make calls simultaneously
            // We expect both to be COMPLETED
            setTimeout(() => {
                const url = 'https://stackoverflow.com/q/49563059/1267663';
                observer.next({ type: 'REMOTE_DATA_STARTED', url, owner: 'jim1' });
                observer.next({ type: 'REMOTE_DATA_STARTED', url, owner: 'jim2' });
            }, 505);
        })
    );
    const resultant$ = myEpic(action$);
    const results = await arrayOfAtMost(resultant$, 5);

    expect(results).toEqual([
        { type: 'COMPLETED', url: 'google.com', owner: 'jim1' },
        { type: 'COMPLETED', url: 'google.org', owner: 'jim1' },
        { type: 'COMPLETED', url: 'google.biz', owner: 'jim2' },
        { type: 'COMPLETED', url: 'https://stackoverflow.com/q/49563059/1267663', owner: 'jim1' },
        { type: 'COMPLETED', url: 'https://stackoverflow.com/q/49563059/1267663', owner: 'jim2' },
    ]);
});

DefaultMap कार्यान्वयन सहित संपूर्ण समाधान:

const { Observable, Subject } = require('rxjs');

class DefaultMap extends Map {
    constructor(initializeValue) {
        super();
        this._initializeValue = initializeValue || (() => {});
    }

    get(key) {
        if (this.has(key)) {
            return super.get(key);
        }

        const subject = new Subject();
        this._initializeValue(subject);
        this.set(key, subject);
        return subject;
    }
}

const fakeAjaxCall = () => Observable.timer(500);
const myEpic = (action$) => {
    const completed$ = new Subject();
    const flights = new DefaultMap((uniquePair) =>
        uniquePair.exhaustMap((action) =>
            fakeAjaxCall().map(() => ({
                ...action,
                type: 'COMPLETED',
            }))
        )
        .subscribe((action) => completed$.next(action))
    );
    action$.ofType('REMOTE_DATA_STARTED')
        .subscribe((action) => {
            flights.get(`${action.owner}+${action.url}`).next(action);
        });

    return completed$;
};

* उपरोक्त स्निपेट वास्तव में चलने योग्य नहीं है, मैं बस इसे संक्षिप्त करना चाहता था।

परीक्षण मामलों के साथ चलने योग्य उदाहरण

मैंने गिटहब पर परीक्षण मामलों के साथ एक चलने योग्य उदाहरण रखा है।

groupBy और exhaustMap ऑपरेटरों का उपयोग करना

मैंने मूल समाधान केवल परीक्षणों के साथ लिखा था, यह पता लगाने के लिए कि, हाँ, मौजूदा ऑपरेटरों के साथ यह संभव है, groupBy और exhaustMap आपने सुझाव दिया था:

const myEpic = action$ =>
    action$.ofType('REMOTE_DATA_STARTED')
        .groupBy((action) => `${action.owner}+${action.url}`)
        .flatMap((pair$) =>
            pair$.exhaustMap(action =>
                fakeAjaxCall().map(() => ({
                    ...action,
                    type: 'COMPLETED',
                }))
            )
        );

चल रहा है कि ऊपर एक ही परीक्षण सूट के खिलाफ पारित हो जाएगा।

2
Whymarrh 30 मार्च 2018, 21:27

यह निश्चित रूप से एक सुरुचिपूर्ण तरीके से groupBy और exhastMap के साथ किया जा सकता है:

const groupedByExhaustMap = (keySelector, project) => 
  source$ => source$.pipe(
    groupBy(keySelector),
    mergeMap(groupedCalls => 
      groupedCalls.pipe(
        exhaustMap(project)
      )
    )
  );
const { delay, groupBy, mergeMap, exhaustMap } = Rx.operators;

const groupedByExhaustMap = (keySelector, project) => 
  source$ => source$.pipe(
    groupBy(keySelector),
    mergeMap(groupedCalls => 
      groupedCalls.pipe(
        exhaustMap(project)
      )
    )
  );

const calls = [ // every call takes 500ms
  {startTime: 0, owner: 1, url: 'url1'}, 
  {startTime: 200, owner: 2, url: 'url2'},
  {startTime: 400, owner: 1, url: 'url1'}, // dropped
  {startTime: 400, owner: 1, url: 'url2'},
  {startTime: 600, owner: 1, url: 'url1'}
];

const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));

const simulateCallsOverTime$ = Rx.Observable.from(calls)  
  .pipe(
    mergeMap(call => Rx.Observable.of(call)
      .pipe(
        delay(call.startTime)
      )
    )
  );

simulateCallsOverTime$
  .pipe(
    groupedByExhaustMap(
      call => `${call.owner}_${call.url}`,
      async call => {
        await sleep(500); // http call goes here
        return call;
      }
    )
  )
  .subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.8/Rx.js"></script>
2
ZahiC 30 मार्च 2018, 14:15