Что лучший способ состоит в том, чтобы ограничить параллелизм при использовании Promise.all ES6 ()?

У меня есть некоторый код, который выполняет итерации по списку, который был запрошен из базы данных и создания Запроса HTTP для каждого элемента в том списке. Тот список может иногда быть довольно большим количеством (в тысячах), и я хотел бы удостовериться, что я не поражаю веб-сервер тысячами параллельных Запросов HTTP.

Сокращенная версия этого кода в настоящее время выглядит примерно так...

function getCounts() {
  return users.map(user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
      });
    });
  });
}

Promise.all(getCounts()).then(() => { /* snip */});

Этот код работает на Узле 4.3.2. Для повторения, может Promise.all управляйтесь так, чтобы только определенное число Обещаний произошло в любой момент времени?

60
задан 16 November 2016 в 21:19

3 ответа

Здесь идет основной пример для потоковой передачи и 'p-предела'. Это передает поток чтения http потоком к дб монго.

const stream = require('stream');
const util = require('util');
const pLimit = require('p-limit');
const es = require('event-stream');
const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;


const pipeline = util.promisify(stream.pipeline)

const outputDBConfig = {
    dbURL: 'yr-db-url',
    collection: 'some-collection'
};
const limit = pLimit(3);

async yrAsyncStreamingFunction(readStream) => {
        const mongoWriteStream = streamToMongoDB(outputDBConfig);
        const mapperStream = es.map((data, done) => {
                let someDataPromise = limit(() => yr_async_call_to_somewhere())

                    someDataPromise.then(
                        function handleResolve(someData) {

                            data.someData = someData;    
                            done(null, data);
                        },
                        function handleError(error) {
                            done(error)
                        }
                    );
                })

            await pipeline(
                readStream,
                JSONStream.parse('*'),
                mapperStream,
                mongoWriteStream
            );
        }
1
ответ дан 1 November 2019 в 10:29

Это может быть разрешено с помощью рекурсии.

идея состоит в том, что первоначально Вы отправляете максимальное позволенное количество запросов, и каждый из этих запросов должен рекурсивно продолжить отправлять себя на своем завершении.

function batchFetch(urls, concurrentRequestsLimit) {
    return new Promise(resolve => {
        var documents = [];
        var index = 0;

        function recursiveFetch() {
            if (index === urls.length) {
                return;
            }
            fetch(urls[index++]).then(r => {
                documents.push(r.text());
                if (documents.length === urls.length) {
                    resolve(documents);
                } else {
                    recursiveFetch();
                }
            });
        }

        for (var i = 0; i < concurrentRequestsLimit; i++) {
            recursiveFetch();
        }
    });
}

var sources = [
    'http://www.example_1.com/',
    'http://www.example_2.com/',
    'http://www.example_3.com/',
    ...
    'http://www.example_100.com/'
];
batchFetch(sources, 5).then(documents => {
   console.log(documents);
});
1
ответ дан 1 November 2019 в 10:29

Рекурсия является ответом, если Вы не хотите пользоваться внешними библиотеками

downloadAll(someArrayWithData){
  var self = this;

  var tracker = function(next){
    return self.someExpensiveRequest(someArrayWithData[next])
    .then(function(){
      next++;//This updates the next in the tracker function parameter
      if(next < someArrayWithData.length){//Did I finish processing all my data?
        return tracker(next);//Go to the next promise
      }
    });
  }

  return tracker(0); 
}
0
ответ дан 1 November 2019 в 10:29

Другие вопросы по тегам:

Похожие вопросы: