How to make a Genstage consumer subscribe only to particular events or stream

By default, a Genstage consumer is subscribed to all the events from the producer. But in some cases, it would be necessary to filter the events which the consumer subscribes. We can subscribe to only some events or even can filter out some of the events.

This can be easily achieved by tweaking our producer despatcher to use GenStage.BroadcastDispatcher and subscribing only to events that we require by specifying the :selector option in the consumer.

So, let's update our producer and update the dispatcher

def init(counter) do
  {:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
end

Next step is to specify our selector option in the consumer. The selection option accepts a function that can be used to filter out the stream of events which we want to subscribe.

The consumer will receive only the events broadcast from the producer for which the selector function returns a truthy value.

For sync subscriptions this can be done by:

GenStage.sync_subscribe(consumer,
  to: producer,
  selector: fn %{key: key} -> String.starts_with?(key, "foo-") end)

For async subscriptions, this can be done by specifying the selector function in the :subscribe_to list in the return tuple of GenStage.init/1.

def init(:ok) do
  {:consumer, :ok, subscribe_to:
    [{producer, selector: fn %{key: key} -> String.starts_with?(key, "foo-") end}]}
end

NB: It is to be noted that the selector option works only if use GenStage.BroadcastDispatcher for the dispatcher. Unfortunately skipping that does not throw any error or warn the user.