Implementing GraphQL Subscriptions using PubSub

Implementing GraphQL Subscriptions using PubSub

GraphQL does an amazing job in executing queries and mutations on the server. The query and mutation operations generally follow a request-response cycle.

Let's say the client opens up a connection to the server using the HTTP protocol. It sends the query and other required params along with this request. The GraphQL server parses and validates this query. This query is then executed by the corresponding resolvers and the response is sent back to the client.

If a client needs the result of some other query, it will again open up the connection and do the usual work. Please note: this connection between the server and the client is stateless in nature and is not persistent. The server is not aware of the state of the client. The client has to specifically ask for whatever it needs!

This request-response cycle model won't work for real-time data updates. We'll have to establish a persistent connection between the client and the server to stay updated about the new events. We've already seen how GraphQL solves the real-time data fetching problem using GraphQL Subscriptions in this article.

Let's recap some of the important concepts that we have learned in the previous article:

  1. The client can stay up-to-date with the server by constantly querying the resources. This may not be suited for a variety of use-cases. We can implement this technique for data that changes frequently over time, like in the case of stock prices.
  2. We can also query server just to check if the state of the data we're interested in has changed. The resources would be fetched only when the state is changed. This is called polling. We can also use the push-based approach where the server pushes the updated changes to the client.
  3. GraphQL uses Event-based approach to implement its subscription operation. The client subscribes for some particular events to the server. The server informs the client whenever these events trigger.
  4. The subscribe block in the GraphQL layer consists of a createSource stream that returns a source stream and a mapper that maps the source stream to the response stream. The event data is sent over the response stream to the client.

If you're interested in reading more, here's a detailed guide on what GraphQL subscriptions are and how do they work under the hood.

In this article, we're going to implement GraphQL Subscriptions using PubSub and WebSocket as the transport layer.

Let's revisit the food ordering project and check out the use-case for implementing subscriptions:

Revisiting the Food Ordering System

We've integrated the apollo-server-express as a dependency for building the GraphQL server. We've also connected a MongoDB cluster on the backend.

Here are some of the types that we've built as part of the use-case:

  • Restaurant
  • Menu
  • Customer
  • Order

The goal is to have a seamless food ordering system in place! The customers can select any item from the menu of a restaurant and place an order.

Here are the links to the previous articles:

How to connect MongoDB to a GraphQL Server?

Introducing the Apollo GraphQL Platform for implementing the GraphQL Specification

Building a GraphQL Server using NodeJs and Express

The use-case here is to build a real-time system for keeping track of the new orders.

The solution is obvious; we're going to use GraphQL Subscriptions to solve this problem. Let's get started!

GraphQL Subscription -- How to start?

How to start? That's exactly what a lot of people thought when the core GraphQL committee released the subscription specification. There was no concrete implementation back then and there were many different ways to do a particular thing.

The Apollo GraphQL committee came up with this proposal:

Image loading...alt txt

They propose using subscriptions-transport-ws module to establish a connection between the client and the server using WebSockets. Please note: subscriptions-transport-ws package can be used with any of the GraphQL clients, including but not limited to Apollo and Relay. The graphql-subscriptions module is used for executing subscriptions. We'll see these components in detail later in the article.

We're going to implement this approach in our project. Please note there are many other ways as well to implement GraphQL Subscriptions.

First off, let's see what the spec says about subscriptions:

If the operation is a subscription, the result is an event stream called the Response Stream where each event in the event stream is the result of executing the operation for each new event on an underlying Source Stream.

We've already learned about streams and have seen 3 ways of implementing streams in JavaScript as:

  1. Using NodeJs Streams
  2. Using RxJs Observables
  3. Using Async Iterators

We'll be using Async Iterators here. Before we start implementing this, I'd like to explain what async iterators are:

Overview on Iterators and Async Iterators

An iterator is an object that implements the iterator protocol. The iterator protocol defines a standard way to generate a sequence of values. The iterator object should have an implementation of the next() method, which is used for yielding (returning) the next item in the sequence.

The most common iterator is an Array. It has an iterator function on its prototype which is used for iterating over the elements in the array when we use operations like the spread operator, for-of loop, etc.

javascript
let arr = [1,2,3,4,5] for (let element of arr) { console.log(element) } /* Output 1 2 3 4 5 */

The for-of loop in the above code internally uses the next() method to get the next element in the array arr.

Please note: the output of an iterator is of the type:

javascript
{ value: <VALUE>, done: false // (or true) }

The for-of loop extracts the value key and prints it. The boolean value of false in done indicates that the stream is not complete yet and there are more items to be processed.

The Async Iterators works the same way as iterator except they can be used to return bits of data asynchronously. This is exactly what we need in our case!

The streams receive event data over a period of time. We can call next() on this stream to get our events in sequence. The boolean value of true here indicates the completion signal for the stream.

The Iterators and Async Iterators terminology should not bother you anymore. With this out of the way, let's start building subscriptions for the food ordering project:

Installing the dependencies

As mentioned earlier, we would be using the subscriptions-transport-ws and graphql-subscriptions modules. Note that graphql-subscriptions is a peer dependency of subscriptions-transport-ws, so we'll only install subscriptions-transport-ws package.

shell
npm install subscriptions-transport-ws --save

Let's explore more and understand the subscriptions-transport-ws module in detail:

The subscriptions-transport-ws establishes a connection between the server and the client using WebSockets. Each WebSocket message follows this structure:

javascript
interface message { payload?: any; id?: string; type: string; }

Let's look into different types that are used for communication:

Communication from the client to the server:

MessageDetails
GQL_CONNECTION_INITClient sends this message to start communication with the server. If everything is good at the server end, it will reply with GQL_CONNECTION_ACK and GQL_CONNECTION_KEEP_ALIVE. If there is an error, it will send GQL_CONNECTION_ERROR to the client over the WebSocket.
GQL_STARTThis is an indication to the server to start executing the GraphQL operation. The client sends the id and the payload for executing a GraphQL operation.
GQL_STOPThis is an indication to the server to stop executing an operation.
GQL_CONNECTION_TERMINATEThis is used to terminate the connection.

Communication from the server to the client:

MessageDetails
GQL_CONNECTION_ERRORThe server sends this message to the client if there is an error in establishing a connection.
GQL_CONNECTION_ACKThis is an indication that the server accepted the connection.
GQL_DATAThe server sends the result of executing the GraphQL operation along with this message.
GQL_ERRORThe server sends this message if there is an error in executing the operation.
GQL_COMPLETEThis is an indication to the client that the GraphQL operation is completed.
GQL_CONNECTION_KEEP_ALIVEThis is an indication to keep the connection alive.

This is how the client and the server communicate over the WebSocket.

Let's now head over to the project and start coding!

If you get lost anywhere in the code below, you can always check out my GitHub repository and get to speed.

Adding Subscription type in the schema

Head over to the schema.js file and add Subscription type as:

javascript
type Subscription { newOrder (restaurantId: String): Order }

newOrder field is what we're interested in listening to! It returns an object of type Order. We've already implemented the definition of type Order in our previous articles. Here's a quick recap on all the fields that are included in the Order type:

javascript
type Order { _id: String, customerId: String, restaurantId: String, order: [String] }

We've added Subscription as one of the types in the Schema definition. Let's now implement the next steps!

Adding a PubSub instance

The publish-subscribe is a messaging pattern where the senders of the message called the publishers publish a message on a queue. The messages are categorized into different types. The subscribers are interested in listening to messages of some x category published on the queue.

The PubSub class is available in the apollo-server-express module. We'll create an instance of this class and use its publish method to post messages tagged in respective categories.

I suggest creating a new file to handle the PubSub configurations. Let's add pubsub.js at the root level in the project directory.

Add the following code in pubsub.js file:

javascript
const { PubSub } = require('apollo-server-express') const pubsub = new PubSub() module.exports = pubsub

PubSub is of type PubSubEngine and here's a snippet of PubSubEngine from graphql-subscriptions module:

javascript
export abstract class PubSubEngine { public abstract publish(triggerName: string, payload: any): Promise<void>; public abstract subscribe(triggerName: string, onMessage: Function, options: Object): Promise<number>; public abstract unsubscribe(subId: number); public asyncIterator<T>(triggers: string | string[]): AsyncIterator<T> { return new PubSubAsyncIterator<T>(this, triggers); } }

It has the publish, subscribe, unsubscribe and asyncIterator methods. You can check out the code for PubSubAsyncIterator class here.

Please note: This pubsub implementation will work for a single instance of a server and doesn't scale beyond a couple of connections. You can check out these reference implementations for production usage.

Subscription Server

Let's now set up our Subscriptions server. As we already know GraphQL operations are transport-agnostic, we can implement this in two ways:

  1. All the three operations - query, mutation, and subscription will operate on WebSocket
  2. query and mutation will operate on HTTP and subscription will operate on WebSocket

We're going to implement the second option here. Apollo 2 makes it super easy to integrate Subscription server along with the HTTP endpoints for query and mutation operations. You'll have to add the following code in your server.js file:

javascript
server.installSubscriptionHandlers(httpServer)

The httpServer refers to the express server. The installSubscriptionHandlers function does all the heavy lifting for us and sets up the subscription server. The subscription server is connected to the client using websocket.

You'll also need to add the HTTP endpoint value for the GraphQL server while initializing it. After doing all these changes, your server.js file should look like this:

javascript
const express = require('express') const { ApolloServer } = require('apollo-server-express') const { createServer } = require('http') const mongoose = require('mongoose') const typeDefs = require('./schema') const resolvers = require('./resolvers') const app = express() const server = new ApolloServer({ typeDefs, resolvers, playground: { endpoint: 'http://localhost:3000/graphql', settings: { 'editor.theme': 'light' } } }) server.applyMiddleware({ app }) const httpServer = createServer(app) server.installSubscriptionHandlers(httpServer) mongoose .connect(`mongodb+srv://${process.env.mongoUserName}:${process.env.mongoUserPassword}@cluster0-yhukr.mongodb.net/${process.env.mongoDatabase}?retryWrites=true&w=majority`) .then( (res) => { httpServer.listen(3000, () => { console.log('ssss') }) }) .catch( (err) => { console.error('Error while connecting to MongoDB', err); })

Handling Resolvers

This is the main bit! We'll do 2 important things in this section:

Adding a resolver for Subscription type:

javascript
Subscription: { newOrder: { resolve: (payload) => { return payload.newOrder }, subscribe: () => { return pubsub.asyncIterator('NEW_ORDER') } } }

Include the above Subscription object as one of the keys of the resolvers object. The key newOrder is what we're interested in listening to! You can add any custom name of your choice here.

Please note: newOrder is an object here that has subscribe and resolve functions. The subscribe method gets executed when a trigger of type NEW_ORDER is published. It returns the new event of type NEW_ORDER.

The resolve method is optional. If you'd want to do some processing on the event payload, you can do that in the resolve method.

Publishing a trigger when a new order is placed

When the order is placed successfully, we'll have to publish a trigger of type NEW_ORDER. Modify the addOrder mutation as:

javascript
addOrder (parent, args, context, info) { const { customerId, restaurantId, order } = args const orderObj = new Order({ customerId, restaurantId, order }) return orderObj.save() .then (result => { const order = { ...result._doc } pubsub.publish('NEW_ORDER', { newOrder: order }) return order }) .catch (err => { console.error(err) }) }

Notice how we're calling the publish method when the promise gets resolved. The publish method takes trigger name as the first argument and payload as the second argument. The order variable contains the details of the updated order. We'll see this in action in a moment.

Side-note: Don't forget to import pubsub from pubsub.js file.

Subscriptions in Action

After you've followed all the above actions, hit http:localhost:3000/graphql and open two tabs in the graphiql--one for the subscription operation and another one for mutation.

Here's my mutation:

javascript
mutation { addOrder( customerId: "5d2ad4bdca40324a8e27bb11" restaurantId: "5d064cfbd11f64672b2c2642" order: ["Veg Sandwich"] ) { order _id } }

Please note: the values of the keys customerId, restaurantId and order might be different in your case. Check your mongoDB collection and modify the above mutation operation accordingly.

And here's my subscription operation:

javascript
subscription { newOrder { restaurantId, order } }

When you hit play in the subscriptions tab:

Image loading...alt txt

Notice the loader and the Listening text at the bottom.

Let's now execute the mutation and see the changes in the subscription tab:

Image loading...alt txt

I got the new order details in my subscription tab. Sweet! This is working as expected. We're now getting real-time updates for our orders.

Conclusion

We implemented a basic demo on using GraphQL Subscriptions to get real-time updates. We can do more cool things with GraphQL Subscriptions. We'll do more advanced stuff including withFilter, Auth tokens for subscriptions in the next series of articles.

Read similar articles