How to make a Genstage consumer subscribe only to particular events or stream
Genstage consumer can optionally subscribe to some of the events produced by the producer by using GenStage.BroadcastDispatcher as dispatcher and by specifying a :selector function which filters out the events we are interested in
By default, a Genstage consumer is subscribed to all the events from the producer. But in some cases, it would be necessary to filter the events which the consumer subscribes. We can subscribe to only some events or even can filter out some of the events.
This can be easily achieved by tweaking our producer despatcher to use GenStage.BroadcastDispatcher
and subscribing only to events that we require by specifying the :selector
option in the consumer.
So, let's update our producer and update the dispatcher
def init(counter) do
{:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
end
Next step is to specify our selector
option in the consumer. The selection option accepts a function that can be used to filter out the stream of events which we want to subscribe.
The consumer will receive only the events broadcast from the producer for which the selector function returns a truthy value.
For sync subscriptions this can be done by:
GenStage.sync_subscribe(consumer,
to: producer,
selector: fn %{key: key} -> String.starts_with?(key, "foo-") end)
For async subscriptions, this can be done by specifying the selector function in the :subscribe_to
list in the return tuple of GenStage.init/1
.
def init(:ok) do
{:consumer, :ok, subscribe_to:
[{producer, selector: fn %{key: key} -> String.starts_with?(key, "foo-") end}]}
end
NB: It is to be noted that the selector option works only if use GenStage.BroadcastDispatcher
for the dispatcher. Unfortunately skipping that does not throw any error or warn the user.