Implementing GraphQL Subscriptions using PubSub
This article is part 8 of the series on Exploring GraphQL. Check out the previous articles:
- Part 1: What is GraphQL and why Facebook felt the need to build it?
- Part 2: Fundamentals of GraphQL
- Part 3: Building a GraphQL Server using NodeJS and Express
- Part 4: How to implement Pagination and Mutation in GraphQL
- Part 5: Introducing the Apollo GraphQL Platform for implementing the GraphQL Specification
- Part 6: How to Connect MongoDB to a GraphQL Server?
- Part 7: GraphQL Subscriptions - Core Concepts
GraphQL does an amazing job in executing queries and mutations on the server. The query and mutation operations generally follow a request-response cycle.
Let's say the client opens up a connection to the server using the HTTP protocol. It sends the query and other required params along with this request. The GraphQL server parses and validates this query. This query is then executed by the corresponding resolvers and the response is sent back to the client.
If a client needs the result of some other query, it will again open up the connection and do the usual work. Please note: this connection between the server and the client is stateless in nature and is not persistent. The server is not aware of the state of the client. The client has to specifically ask for whatever it needs!
This request-response cycle model won't work for real-time data updates. We'll have to establish a persistent connection between the client and the server to stay updated about the new events. We've already seen how GraphQL solves the real-time data fetching problem using GraphQL Subscriptions in this article.
Let's recap some of the important concepts that we have learned in the previous article:
- The client can stay up-to-date with the server by constantly querying the resources. This may not be suited for a variety of use-cases. We can implement this technique for data that changes frequently over time, like in the case of stock prices.
- We can also query server just to check if the state of the data we're interested in has changed. The resources would be fetched only when the state is changed. This is called polling. We can also use the push-based approach where the server pushes the updated changes to the client.
- GraphQL uses Event-based approach to implement its subscription operation. The client subscribes for some particular events to the server. The server informs the client whenever these events trigger.
- The subscribe block in the GraphQL layer consists of a
createSource
stream that returns a source stream and a mapper that maps the source stream to the response stream. The event data is sent over the response stream to the client.
If you're interested in reading more, here's a detailed guide on what GraphQL subscriptions are and how do they work under the hood.
In this article, we're going to implement GraphQL Subscriptions using PubSub and WebSocket as the transport layer.
Let's revisit the food ordering project and check out the use-case for implementing subscriptions:
Revisiting the Food Ordering System
We've integrated the apollo-server-express
as a dependency for building the GraphQL server. We've also connected a MongoDB cluster on the backend.
Here are some of the types that we've built as part of the use-case:
- Restaurant
- Menu
- Customer
- Order
The goal is to have a seamless food ordering system in place! The customers can select any item from the menu of a restaurant and place an order.
Here are the links to the previous articles:
How to connect MongoDB to a GraphQL Server?
Introducing the Apollo GraphQL Platform for implementing the GraphQL Specification
Building a GraphQL Server using NodeJs and Express
The use-case here is to build a real-time system for keeping track of the new orders.
The solution is obvious; we're going to use GraphQL Subscriptions to solve this problem. Let's get started!
GraphQL Subscription -- How to start?
How to start? That's exactly what a lot of people thought when the core GraphQL committee released the subscription specification. There was no concrete implementation back then and there were many different ways to do a particular thing.
The Apollo GraphQL committee came up with this proposal:
Image loading...
They propose using subscriptions-transport-ws
module to establish a connection between the client and the server using WebSockets. Please note: subscriptions-transport-ws
package can be used with any of the GraphQL clients, including but not limited to Apollo and Relay. The graphql-subscriptions
module is used for executing subscriptions. We'll see these components in detail later in the article.
We're going to implement this approach in our project. Please note there are many other ways as well to implement GraphQL Subscriptions.
First off, let's see what the spec says about subscriptions:
If the operation is a subscription, the result is an event stream called the Response Stream where each event in the event stream is the result of executing the operation for each new event on an underlying Source Stream.
We've already learned about streams and have seen 3 ways of implementing streams in JavaScript as:
- Using NodeJs Streams
- Using RxJs Observables
- Using Async Iterators
We'll be using Async Iterators here. Before we start implementing this, I'd like to explain what async iterators are:
Overview on Iterators and Async Iterators
An iterator
is an object that implements the iterator protocol. The iterator protocol defines a standard way to generate a sequence of values. The iterator object should have an implementation of the next()
method, which is used for yielding (returning) the next item in the sequence.
The most common iterator is an Array. It has an iterator function on its prototype which is used for iterating over the elements in the array when we use operations like the spread operator, for-of loop, etc.
javascriptlet arr = [1,2,3,4,5] for (let element of arr) { console.log(element) } /* Output 1 2 3 4 5 */
The for-of
loop in the above code internally uses the next()
method to get the next element in the array arr
.
Please note: the output of an iterator is of the type:
javascript{ value: <VALUE>, done: false // (or true) }
The for-of
loop extracts the value key and prints it. The boolean value of false
in done indicates that the stream is not complete yet and there are more items to be processed.
The Async Iterators works the same way as iterator except they can be used to return bits of data asynchronously. This is exactly what we need in our case!
The streams receive event data over a period of time. We can call next()
on this stream to get our events in sequence. The boolean value of true
here indicates the completion signal for the stream.
The Iterators and Async Iterators terminology should not bother you anymore. With this out of the way, let's start building subscriptions for the food ordering project:
Installing the dependencies
As mentioned earlier, we would be using the subscriptions-transport-ws
and graphql-subscriptions
modules. Note that graphql-subscriptions
is a peer dependency of subscriptions-transport-ws
, so we'll only install subscriptions-transport-ws
package.
shellnpm install subscriptions-transport-ws --save
Let's explore more and understand the subscriptions-transport-ws
module in detail:
The subscriptions-transport-ws
establishes a connection between the server and the client using WebSockets. Each WebSocket message follows this structure:
javascriptinterface message { payload?: any; id?: string; type: string; }
Let's look into different types that are used for communication:
Communication from the client to the server:
Message | Details |
---|---|
GQL_CONNECTION_INIT | Client sends this message to start communication with the server. If everything is good at the server end, it will reply with GQL_CONNECTION_ACK and GQL_CONNECTION_KEEP_ALIVE . If there is an error, it will send GQL_CONNECTION_ERROR to the client over the WebSocket. |
GQL_START | This is an indication to the server to start executing the GraphQL operation. The client sends the id and the payload for executing a GraphQL operation. |
GQL_STOP | This is an indication to the server to stop executing an operation. |
GQL_CONNECTION_TERMINATE | This is used to terminate the connection. |
Communication from the server to the client:
Message | Details |
---|---|
GQL_CONNECTION_ERROR | The server sends this message to the client if there is an error in establishing a connection. |
GQL_CONNECTION_ACK | This is an indication that the server accepted the connection. |
GQL_DATA | The server sends the result of executing the GraphQL operation along with this message. |
GQL_ERROR | The server sends this message if there is an error in executing the operation. |
GQL_COMPLETE | This is an indication to the client that the GraphQL operation is completed. |
GQL_CONNECTION_KEEP_ALIVE | This is an indication to keep the connection alive. |
This is how the client and the server communicate over the WebSocket.
Let's now head over to the project and start coding!
If you get lost anywhere in the code below, you can always check out my GitHub repository and get to speed.
Adding Subscription type in the schema
Head over to the schema.js
file and add Subscription
type as:
javascripttype Subscription { newOrder (restaurantId: String): Order }
newOrder
field is what we're interested in listening to!
It returns an object of type Order
. We've already implemented the definition of type Order
in our previous articles. Here's a quick recap on all the fields that are included in the Order
type:
javascripttype Order { _id: String, customerId: String, restaurantId: String, order: [String] }
We've added Subscription
as one of the types in the Schema definition. Let's now implement the next steps!
Adding a PubSub instance
The publish-subscribe is a messaging pattern where the senders of the message called the publishers publish a message on a queue. The messages are categorized into different types. The subscribers are interested in listening to messages of some x category published on the queue.
The PubSub
class is available in the apollo-server-express
module. We'll create an instance of this class and use its publish
method to post messages tagged in respective categories.
I suggest creating a new file to handle the PubSub configurations. Let's add pubsub.js
at the root level in the project directory.
Add the following code in pubsub.js
file:
javascriptconst { PubSub } = require('apollo-server-express') const pubsub = new PubSub() module.exports = pubsub
PubSub
is of type PubSubEngine
and here's a snippet of PubSubEngine
from graphql-subscriptions
module:
javascriptexport abstract class PubSubEngine { public abstract publish(triggerName: string, payload: any): Promise<void>; public abstract subscribe(triggerName: string, onMessage: Function, options: Object): Promise<number>; public abstract unsubscribe(subId: number); public asyncIterator<T>(triggers: string | string[]): AsyncIterator<T> { return new PubSubAsyncIterator<T>(this, triggers); } }
It has the publish
, subscribe
, unsubscribe
and asyncIterator
methods. You can check out the code for PubSubAsyncIterator
class here.
Please note: This pubsub implementation will work for a single instance of a server and doesn't scale beyond a couple of connections. You can check out these reference implementations for production usage.
Subscription Server
Let's now set up our Subscriptions server. As we already know GraphQL operations are transport-agnostic, we can implement this in two ways:
- All the three operations -
query
,mutation
, andsubscription
will operate on WebSocket query
andmutation
will operate on HTTP andsubscription
will operate on WebSocket
We're going to implement the second option here. Apollo 2 makes it super easy to integrate Subscription server along with the HTTP endpoints for query
and mutation
operations. You'll have to add the following code in your server.js
file:
javascriptserver.installSubscriptionHandlers(httpServer)
The httpServer
refers to the express server. The installSubscriptionHandlers
function does all the heavy lifting for us and sets up the subscription server. The subscription server is connected to the client using websocket.
You'll also need to add the HTTP endpoint
value for the GraphQL server while initializing it. After doing all these changes, your server.js
file should look like this:
javascriptconst express = require('express') const { ApolloServer } = require('apollo-server-express') const { createServer } = require('http') const mongoose = require('mongoose') const typeDefs = require('./schema') const resolvers = require('./resolvers') const app = express() const server = new ApolloServer({ typeDefs, resolvers, playground: { endpoint: 'http://localhost:3000/graphql', settings: { 'editor.theme': 'light' } } }) server.applyMiddleware({ app }) const httpServer = createServer(app) server.installSubscriptionHandlers(httpServer) mongoose .connect(`mongodb+srv://${process.env.mongoUserName}:${process.env.mongoUserPassword}@cluster0-yhukr.mongodb.net/${process.env.mongoDatabase}?retryWrites=true&w=majority`) .then( (res) => { httpServer.listen(3000, () => { console.log('ssss') }) }) .catch( (err) => { console.error('Error while connecting to MongoDB', err); })
Handling Resolvers
This is the main bit! We'll do 2 important things in this section:
Adding a resolver for Subscription type:
javascriptSubscription: { newOrder: { resolve: (payload) => { return payload.newOrder }, subscribe: () => { return pubsub.asyncIterator('NEW_ORDER') } } }
Include the above Subscription
object as one of the keys of the resolvers
object. The key newOrder
is what we're interested in listening to! You can add any custom name of your choice here.
Please note: newOrder
is an object here that has subscribe
and resolve
functions. The subscribe
method gets executed when a trigger of type NEW_ORDER
is published. It returns the new event of type NEW_ORDER
.
The resolve
method is optional. If you'd want to do some processing on the event payload, you can do that in the resolve
method.
Publishing a trigger when a new order is placed
When the order is placed successfully, we'll have to publish a trigger of type NEW_ORDER
. Modify the addOrder
mutation as:
javascriptaddOrder (parent, args, context, info) { const { customerId, restaurantId, order } = args const orderObj = new Order({ customerId, restaurantId, order }) return orderObj.save() .then (result => { const order = { ...result._doc } pubsub.publish('NEW_ORDER', { newOrder: order }) return order }) .catch (err => { console.error(err) }) }
Notice how we're calling the publish
method when the promise gets resolved. The publish
method takes trigger name as the first argument and payload as the second argument. The order
variable contains the details of the updated order. We'll see this in action in a moment.
Side-note: Don't forget to import pubsub
from pubsub.js
file.
Subscriptions in Action
After you've followed all the above actions, hit http:localhost:3000/graphql
and open two tabs in the graphiql--one for the subscription operation and another one for mutation.
Here's my mutation:
javascriptmutation { addOrder( customerId: "5d2ad4bdca40324a8e27bb11" restaurantId: "5d064cfbd11f64672b2c2642" order: ["Veg Sandwich"] ) { order _id } }
Please note: the values of the keys customerId
, restaurantId
and order
might be different in your case. Check your mongoDB collection and modify the above mutation operation accordingly.
And here's my subscription operation:
javascriptsubscription { newOrder { restaurantId, order } }
When you hit play in the subscriptions tab:
Image loading...
Notice the loader and the Listening text at the bottom.
Let's now execute the mutation and see the changes in the subscription tab:
Image loading...
I got the new order details in my subscription tab. Sweet! This is working as expected. We're now getting real-time updates for our orders.
Conclusion
We implemented a basic demo on using GraphQL Subscriptions to get real-time updates. We can do more cool things with GraphQL Subscriptions. We'll do more advanced stuff including withFilter
, Auth tokens for subscriptions in the next series of articles.