diff --git a/backend/src/schema/resolvers/messages.spec.ts b/backend/src/schema/resolvers/messages.spec.ts index 628034be5..1679b0c34 100644 --- a/backend/src/schema/resolvers/messages.spec.ts +++ b/backend/src/schema/resolvers/messages.spec.ts @@ -3,11 +3,13 @@ import Factory, { cleanDatabase } from '../../db/factories' import { getNeode, getDriver } from '../../db/neo4j' import { createRoomMutation, roomQuery } from '../../graphql/rooms' import { createMessageMutation, messageQuery, markMessagesAsSeen } from '../../graphql/messages' -import createServer from '../../server' +import createServer, { pubsub } from '../../server' const driver = getDriver() const neode = getNeode() +const pubsubSpy = jest.spyOn(pubsub, 'publish') + let query let mutate let authenticatedUser @@ -58,6 +60,10 @@ describe('Message', () => { }) describe('create message', () => { + beforeEach(() => { + jest.clearAllMocks() + }) + describe('unauthenticated', () => { it('throws authorization error', async () => { await expect( @@ -80,7 +86,7 @@ describe('Message', () => { }) describe('room does not exist', () => { - it('returns null', async () => { + it('returns null and does not publish subscription', async () => { await expect( mutate({ mutation: createMessageMutation(), @@ -95,6 +101,7 @@ describe('Message', () => { CreateMessage: null, }, }) + expect(pubsubSpy).not.toBeCalled() }) }) @@ -110,7 +117,7 @@ describe('Message', () => { }) describe('user chats in room', () => { - it('returns the message', async () => { + it('returns the message and publishes subscription', async () => { await expect( mutate({ mutation: createMessageMutation(), @@ -135,6 +142,10 @@ describe('Message', () => { }, }, }) + expect(pubsubSpy).toBeCalledWith('ROOM_COUNT_UPDATED', { + roomCountUpdated: '1', + userId: 'other-chatting-user', + }) }) describe('room is updated as well', () => { diff --git a/backend/src/schema/resolvers/messages.ts b/backend/src/schema/resolvers/messages.ts index 984d17cc2..a908f3fd8 100644 --- a/backend/src/schema/resolvers/messages.ts +++ b/backend/src/schema/resolvers/messages.ts @@ -1,5 +1,22 @@ import { neo4jgraphql } from 'neo4j-graphql-js' import Resolver from './helpers/Resolver' +import { getUnreadRoomsCount } from './rooms' +import { pubsub, ROOM_COUNT_UPDATED } from '../../server' + +const setMessagesAsDistributed = async (undistributedMessagesIds, session) => { + return session.writeTransaction(async (transaction) => { + const setDistributedCypher = ` + MATCH (m:Message) WHERE m.id IN $undistributedMessagesIds + SET m.distributed = true + RETURN m { .* } + ` + const setDistributedTxResponse = await transaction.run(setDistributedCypher, { + undistributedMessagesIds, + }) + const messages = await setDistributedTxResponse.records.map((record) => record.get('m')) + return messages + }) +} export default { Query: { @@ -20,27 +37,15 @@ export default { const undistributedMessagesIds = resolved .filter((msg) => !msg.distributed && msg.senderId !== context.user.id) .map((msg) => msg.id) - if (undistributedMessagesIds.length > 0) { - const session = context.driver.session() - const writeTxResultPromise = session.writeTransaction(async (transaction) => { - const setDistributedCypher = ` - MATCH (m:Message) WHERE m.id IN $undistributedMessagesIds - SET m.distributed = true - RETURN m { .* } - ` - const setDistributedTxResponse = await transaction.run(setDistributedCypher, { - undistributedMessagesIds, - }) - const messages = await setDistributedTxResponse.records.map((record) => record.get('m')) - return messages - }) - try { - await writeTxResultPromise - } finally { - session.close() + const session = context.driver.session() + try { + if (undistributedMessagesIds.length > 0) { + await setMessagesAsDistributed(undistributedMessagesIds, session) } - // send subscription to author to updated the messages + } finally { + session.close() } + // send subscription to author to updated the messages } return resolved.reverse() }, @@ -57,7 +62,9 @@ export default { MATCH (currentUser:User { id: $currentUserId })-[:CHATS_IN]->(room:Room { id: $roomId }) OPTIONAL MATCH (currentUser)-[:AVATAR_IMAGE]->(image:Image) OPTIONAL MATCH (m:Message)-[:INSIDE]->(room) - WITH MAX(m.indexId) as maxIndex, room, currentUser, image + OPTIONAL MATCH (room)<-[:CHATS_IN]-(recipientUser:User) + WHERE NOT recipientUser.id = $currentUserId + WITH MAX(m.indexId) as maxIndex, room, currentUser, image, recipientUser CREATE (currentUser)-[:CREATED]->(message:Message { createdAt: toString(datetime()), id: apoc.create.uuid(), @@ -70,6 +77,7 @@ export default { SET room.lastMessageAt = toString(datetime()) RETURN message { .*, + recipientId: recipientUser.id, senderId: currentUser.id, username: currentUser.name, avatar: image.url, @@ -81,13 +89,25 @@ export default { roomId, content, }) + const [message] = await createMessageTxResponse.records.map((record) => record.get('message'), ) + return message }) try { const message = await writeTxResultPromise + if (message) { + const roomCountUpdated = await getUnreadRoomsCount(message.recipientId, session) + + // send subscriptions + await pubsub.publish(ROOM_COUNT_UPDATED, { + roomCountUpdated, + userId: message.recipientId, + }) + } + return message } catch (error) { throw new Error(error) diff --git a/backend/src/schema/resolvers/rooms.ts b/backend/src/schema/resolvers/rooms.ts index 8460977f1..5e931a446 100644 --- a/backend/src/schema/resolvers/rooms.ts +++ b/backend/src/schema/resolvers/rooms.ts @@ -1,7 +1,31 @@ import { neo4jgraphql } from 'neo4j-graphql-js' import Resolver from './helpers/Resolver' +import { pubsub, ROOM_COUNT_UPDATED } from '../../server' +import { withFilter } from 'graphql-subscriptions' + +export const getUnreadRoomsCount = async (userId, session) => { + return session.readTransaction(async (transaction) => { + const unreadRoomsCypher = ` + MATCH (:User { id: $userId })-[:CHATS_IN]->(room:Room)<-[:INSIDE]-(message:Message)<-[:CREATED]-(sender:User) + WHERE NOT sender.id = $userId AND NOT message.seen + RETURN toString(COUNT(DISTINCT room)) AS count + ` + const unreadRoomsTxResponse = await transaction.run(unreadRoomsCypher, { userId }) + return unreadRoomsTxResponse.records.map((record) => record.get('count'))[0] + }) +} export default { + Subscription: { + roomCountUpdated: { + subscribe: withFilter( + () => pubsub.asyncIterator(ROOM_COUNT_UPDATED), + (payload, variables) => { + return payload.userId === variables.userId + }, + ), + }, + }, Query: { Room: async (object, params, context, resolveInfo) => { if (!params.filter) params.filter = {} @@ -15,17 +39,8 @@ export default { user: { id: currentUserId }, } = context const session = context.driver.session() - const readTxResultPromise = session.readTransaction(async (transaction) => { - const unreadRoomsCypher = ` - MATCH (:User { id: $currentUserId })-[:CHATS_IN]->(room:Room)<-[:INSIDE]-(message:Message)<-[:CREATED]-(sender:User) - WHERE NOT sender.id = $currentUserId AND NOT message.seen - RETURN toString(COUNT(DISTINCT room)) AS count - ` - const unreadRoomsTxResponse = await transaction.run(unreadRoomsCypher, { currentUserId }) - return unreadRoomsTxResponse.records.map((record) => record.get('count'))[0] - }) try { - const count = await readTxResultPromise + const count = await getUnreadRoomsCount(currentUserId, session) return count } finally { session.close() diff --git a/backend/src/schema/types/type/Room.gql b/backend/src/schema/types/type/Room.gql index 8b9982dda..fdce6865b 100644 --- a/backend/src/schema/types/type/Room.gql +++ b/backend/src/schema/types/type/Room.gql @@ -55,3 +55,7 @@ type Query { ): [Room] UnreadRooms: Int } + +type Subscription { + roomCountUpdated(userId: ID!): Int +} diff --git a/backend/src/server.ts b/backend/src/server.ts index b4d63c007..feceeb9eb 100644 --- a/backend/src/server.ts +++ b/backend/src/server.ts @@ -14,6 +14,8 @@ import bodyParser from 'body-parser' import { graphqlUploadExpress } from 'graphql-upload' export const NOTIFICATION_ADDED = 'NOTIFICATION_ADDED' +// export const CHAT_MESSAGE_ADDED = 'CHAT_MESSAGE_ADDED' +export const ROOM_COUNT_UPDATED = 'ROOM_COUNT_UPDATED' const { REDIS_DOMAIN, REDIS_PORT, REDIS_PASSWORD } = CONFIG let prodPubsub, devPubsub const options = { diff --git a/webapp/components/Chat/Chat.vue b/webapp/components/Chat/Chat.vue index 0ec77a9c5..43994ef5d 100644 --- a/webapp/components/Chat/Chat.vue +++ b/webapp/components/Chat/Chat.vue @@ -60,10 +60,10 @@