console.blog( ,

Fluxbus 2: 2 Flux 2 Furious

This again.

I've talked about Fluxbus in the past. If you haven't read about Fluxbus, I (not very) humbly suggest it's the application architecture you need but have never heard of.

Astute readers might notice that I bemoan one of the worst things about Flux:

Every single listener is called for every single change

[…]

Since the subscription pattern in Redux is store.subscribe( someCallback ), there is no way to filter how many functions are run when the state updates.
Imagine a moderately complex application that has a state value like { messages: { loading: true } } and a few hundred subscribers.
How many of those subscribers care that the messages stop loading at some point? Probably 1. Wherever your chat module is, it's probably the only place that cares when the messages stop loading and it can hide a spinner.
But in this subscription paradigm, all of your few hundred subscribers are called, and the logic is checked to determine if they need to update. The inefficiency is mind-boggling.

Again, Flux doesn't quite have the exact same problem: not only because of the fact that you can have multiple stores for specific intents, but also because it doesn't prescribe any particular subscription pattern.
You could implement a pre-call filter that allows subscribers to only be called for particular actions that they care about, but this is so far off the beaten path it's never been seen by human eyes before.

And then go on to suggest the alternative - Fluxbus - which has an implementation like this:


function publish( message ){
	return bus.next( message );
}

function subscribe( map ){
	// Returns an unsubscriber
	return bus.subscribe( ( message ) => {
		if( map[ message.name ] ){
			map[ message.name ]( message );
		}
	} );
}

This is just wrapping the native Rx methods with a simple helper (in the case of publish) or our filter magic (in the case of subscribe). The subscriber takes a map of functions keyed on a message name. If that message name is seen, that subscriber is triggered.

Ladies and gentlefolk, this is what's called a Minimum Viable Product.

Explain yourself.

The problem here is that in a worst case scenario, this is exactly as bad as the Flux pattern where it calls every subscriber for every change.
Granted, that's the worst case behavior, compared to Flux's intended average behavior, but it's still not great.

Let me explain what I mean by worst case.
Let's say you have 1000 bus messages that you'd like to subscribe to. In a normal application, you might subscribe to 5 in one component, and 10 in another, and so on. This might result in close to 200 different subscribers.
As a refresher, you might subscribe like this:

subscribe( {
	"SOMETHING_I_CARE_ABOUT": ( message ) => { ... },
	"SOMETHING_ELSE": ( message ) => { ... },
	"ANOTHER_THING": ( message ) => { ... },
	"ONE_MORE_THING": ( message ) => { ... },
} );

In practice, you've added one subscriber, but it will hand off control to any of those four handlers when the bus recieves any of those four events.
Pretty decent, but the outer call is still dumb. There's no filtering at the top level, so every time you call subscribe, that's another outer subscriber that will get called on every message through the bus.

Again, I want to reiterate: this is significantly better than binding into every component and calling every handler on any message (or any data change, in Flux terms). Once a function like this is called, the Message Bus will determine that none of the message names match, and it won't call the handlers.

At this point, I'm sure you can imagine the worst case scenario. I'll rewrite the above:

subscribe( {
	"SOMETHING_I_CARE_ABOUT": ( message ) => { ... }
} );

subscribe( {
	"SOMETHING_ELSE": ( message ) => { ... }
} );

subscribe( {
	"ANOTHER_THING": ( message ) => { ... }
} );

subscribe( {
	"ONE_MORE_THING": ( message ) => { ... }
} );

Now we're close to Flux territory. While we're not running expressions inside component bindings to check whether the update is relevant to us or not, we're still calling every outer subscriber for every single message sent through the bus.
Reiterating one last time: even the worst case scenario here avoids unnecessary component cycles by never calling into the handlers if the message doesn't match.
Fluxbus with both hands tied behind its back still beats Flux on a normal day. But it's not ideal.

Fix it.

The key here is that Fluxbus was just the absolute thinnest of wrappers around an RxJS Subject. It's barely even there. Here's the code again:

function publish( message ){
	return bus.next( message );
}

function subscribe( map ){
	// Returns an unsubscriber
	return bus.subscribe( ( message ) => {
		if( map[ message.name ] ){
			map[ message.name ]( message );
		}
	} );
}

This is the absolute minimum extra code possible to implement subscriptions-by-message-name.

We can do a lot better by being just a little more clever.

The basic premise is that a Fluxbus instance can track handlers separately from handling inbound messages. So when a consumer calls subscribe, the internals of Fluxbus turn the message-name-to-handler mapping into a message-name-to-identifier mapping. Then it pushes the handler into a dictionary of all handlers and it's done.
Of course, that's not quite the whole picture - you need to reasonably handle unsubscriptions, too, and there are other little optimizations you can do along the way.
For the sake of our brains here, I'll present the MVP for this - sans abstractions and optimizations.

It gives us the code or it gets the hose again.

var bus = new Subject();
var handlers = {};
var listeners = {};

bus.subscribe( ( message ) => {
	( listeners[ message.name ] || [] )
		.map( ( id ) => handlers[ id ] )
		.forEach( ( handler ) => {
			handler( message );
		} );
} );

export function publish( message ){
	bus.next( message );
}

export function subscribe( messageMap ){
	var names = Object.keys( messageMap );
	var callbacks = Object.values( messageMap );
	var idMap = [];

	callbacks.forEach( ( cb, i ) => {
		let name = names[ i ];
		let id = uuidv4();

		handlers[ id ] = cb;

		if( listeners[ name ] ){
			listeners[ name ].push( id );
		}
		else{
			listeners[ name ] = [ id ];
		}

		idMap.push( { name, id } );
	} );

	return () => {
		idMap.forEach( ( { name, id } ) => {
			delete handlers[ id ];

			if( listeners[ name ] ){
				listeners[ name ] = listeners[ name ].filter( ( listener ) => listener != id );
			}

			if( listeners[ name ].length == 0 ){
				delete listeners[ name ];
			}
		} );
	};
}

My eyes.

You said you'd give me the hose if I didn't give you the code.

What's happening here?

Let's break it down.

var bus = new Subject();
var handlers = {};
var listeners = {};

bus.subscribe( ( message ) => {
	( listeners[ message.name ] || [] )
		.map( ( id ) => handlers[ id ] )
		.forEach( ( handler ) => {
			handler( message );
		} );
} );

In the first section, we set up an RxJS Subject as our bus, just like before. We also define two variables that are going to cause all of this to be "singletonish" which is a very technical term for "singleton-y" things. It's a closure, basically.
Then, we immediately subscribe to the bus. This is the only bus subscriber. It will determine which handlers to trigger when a message comes in.
Roughly, this subscriber goes something like this:

  1. Give me all the IDs of listeners for this message name
  2. Convert all those IDs into their real handler functions
  3. Call each handler function with the message
export function publish( message ){
	bus.next( message );
}

Then we have our normal publish function.

boring 🥱

export function subscribe( messageMap ){
	var names = Object.keys( messageMap );
	var callbacks = Object.values( messageMap );
	var idMap = [];

	...
}

Then, we get to the real magic: subscribe.
subscribe still accepts a dictionary that maps a message name to a handler function to be called. But now, we immediately split that map into the name keys we're listening for and the handler values to be called. It's going to be very convenient later to have these as discrete arrays. We need a way to keep track of the mappings we're going to create here, so we also initialize an empty array to store mappings.

export function subscribe( messageMap ){
	...

	callbacks.forEach( ( cb, i ) => {
		let name = names[ i ];
		let id = uuidv4();

		handlers[ id ] = cb;

		if( listeners[ name ] ){
			listeners[ name ].push( id );
		}
		else{
			listeners[ name ] = [ id ];
		}

		idMap.push( { name, id } );
	} );

	...
}

The next chunk begins by iterating over each of our handler functions and grabbing the message name associated with it and generating a new unique ID for that handler.

Say there fella, couldn't this be
Object
	.entries( messageMap )
	.forEach( ( [ name, cb ] ) => { ... } );
to avoid two extra variables and two iterations over the object to extract them?
……………………………… stop yelling at me i'm the internet's sweetie pie

Once we have a unique ID we immediately store our handler in a dictionary associated to it.

Then, we check our global listeners object for the message name. If we're already listening to that message name, we push our handler ID into the list. If we're not, we just create a new list.

To keep track of all our mappings in this subscriber, we push an object into the list of ids that includes both the message name and the handler ID.

export function subscribe( messageMap ){
	...

	return () => {
		idMap.forEach( ( { name, id } ) => {
			delete handlers[ id ];

			if( listeners[ name ] ){
				listeners[ name ] = listeners[ name ].filter( ( listener ) => listener != id );
			}

			if( listeners[ name ].length == 0 ){
				delete listeners[ name ];
			}
		} );
	}
};

Finally, we return an unsubscribe function. When a consumer calls subscribe with a map of message names and their handlers, they will expect to receive an unsubscriber that unsubscribes all of their handlers at once just like they passed in a single dictionary.

So we take our handy array of { "name": message.name, "id": handlerId } mappings and we loop over the whole thing.

We delete the handler associated with the ID we're iterating over. Then, we remove the listener ID from the list of all the listeners for that message name.
As a last cleanup check, if that message name no longer has any associated listeners, we just delete it from the dictionary of message names that we check in the bus subscriber.

Do math.

If we let n be the total calls to the subscribe and x be the total subscriptions for a given message (like "SOME_EVENT": ( message ) => { ... }), then...

The previous version of Fluxbus was O(n + x) for every published message.
That is: if your application calls subscribe 100 times, and in just one, single instance of those calls you subscribe to SOME_EVENT, there will be 101 function calls when a single SOME_EVENT is published into the bus.

This new version of Fluxbus is O(x + 1) for every published message.
That is: if your application calls subscribe 100 times, and in just one, single instance of those calls you subscribe to SOME_EVENT, there will be 2 function calls when a single SOME_EVENT is published into the bus.

Done?

Of course not.

The immediately obvious next step is improving this code. You can see such an improvement in the library I built to reduce my own application boilerplate. I've very cleverly named the implementation MessageBus.

Of course then there's syntactical stuff like what you were yelling at me about up above.
Code is never finished.

What do you think about Fluxbus 2?