Add More Consumers and Buffer Events
Learn how adding more consumers and buffering events will affect the output of our scrape_pages function.
We'll cover the following...
Add more consumers
Rather than working in batches within a single process, we can easily scale our data processing pipeline by running more than one consumer, each responsible for scraping one page at a time. To do this, let’s adjust the demand of PageConsumer:
#scraper/lib/scraper/application.exchildren = [PageProducer,Supervisor.child_spec(PageConsumer, id: :consumer_a),Supervisor.child_spec(PageConsumer, id: :consumer_b)]
Add a PageProducer to init
Now let’s add a PageProducer to our init function:
#scraper/lib/page_consumer.exdef init(initial_state) doLogger.info("PageConsumer init")sub_opts = [{PageProducer, min_demand: 0, max_demand: 1}]{:consumer, initial_state, subscribe_to: sub_opts}end
Now, our consumers take only one event at a time, but we have two consumer processes running concurrently. Once one is free, it will issue the demand to scrape another page.
Notice that when we add another PageConsumer, we use Supervisor.child_spec/2. As we see, each process should have a unique ID in the supervision tree. In the example above, the processes are called :consumer_a and :consumer_b. If we don’t do that, we get an error when the main supervisor initializes. We can also use the Registry module to assign a name to each process.
With this approach, we can add as many consumer processes as needed, and GenServer will distribute the events for us, acting as a load balancer. Let’s try the scrape_pages/1 function and compare the results: ...