paint-brush
Hoe ek die WebSocket-skaalprobleem opgelos het sonder om die bank te breekdeur@axotion
15,358 lesings
15,358 lesings

Hoe ek die WebSocket-skaalprobleem opgelos het sonder om die bank te breek

deur Kamil Fronczak11m2025/01/19
Read on Terminal Reader

Te lank; Om te lees

Ek moes lank gelede 'n skaalbare stelsel skep wat in staat sou wees om honderde gelyktydige verbindings teen nie baie hoë koste te hanteer nie, en met 'n redelike, maar nie onmiddellike tyd van reaksie nie. Ek wou slegs geselekteerde geleenthede na websocket-kliënte stuur. Ek het 'n Redis pub-sub-module geskep, aangesien ek gedink het dat my gebeure, om in ander gevalle sigbaar te wees, deur die Redis Pub-sub-patroon oorgedra moet word.
featured image - Hoe ek die WebSocket-skaalprobleem opgelos het sonder om die bank te breek
Kamil Fronczak HackerNoon profile picture

Lank gelede het ek myself in 'n situasie bevind waar ek 'n skaalbare stelsel moes skep wat in staat sou wees om honderde gelyktydige verbindings te hanteer teen nie baie hoë koste nie, en met redelike, maar nie onmiddellike tyd van reaksie nie.


My eerste gedagtes? Kom ons skuif alle skep/wysig/vee aksies na die tou en stel gebruikers in kennis of hul aksies geslaag het of nie via WebSocket.


Maar destyds het ek nie veel ondervinding met WebSockets in produksie gehad nie, so my eerste stap was om te ondersoek hoe dit werk met behulp van tutoriale, stapeloorloop en ander bronne.


So, na 'n geruime tyd, het ek 'n kern van hoe dit moet werk gekry en begin om 'n kode voor te berei en vir 'n rukkie rond te mors met 'n lastoetsinstrument om hoë verkeer te simuleer.

Die eerste probleem

Sommige vrae en antwoorde het voorgestel dat u die intekenmetode op die Redis-instansie op die gekoppelde WebSocket-kliënt bel.


 io.sockets.on('connection', function (sockets) { sockets.emit('message',{Hello: 'World!'}); sub.subscribe('attack-map-production'); sockets.on('disconnect', function() { sub.unsubscribe('attack-map-production'); }); });


Maar op hierdie manier skep ons 'n nuwe verbinding met Redis, so die geheuegebruik in ons toepassing en Redis-verbindingspoel neem toe. (Redis laat slegs 10k verbindings na een geval toe)


Dit was 'n groot nee vir my, want ek moes geheuegebruik tot 'n minimum verlaag.


Op die oomblik noem baie artikels gelukkig dat jy nie 'n nuwe Redis-verbinding op elke WebSocket-kliënt moet skep nie.

Die tweede probleem

Nadat ek 'n groot stuk besigheidskode geskep het, toe ek die deel met websockets begin het, het 'n vraag by my opgekom - hoe om dit op 'n behoorlike en veilige manier te skep?


Ek het reeds 'n paar gebeurtenisse in die stelsel gehad, en sommige van hulle was gereed om addisioneel via WebSockets gepubliseer te word, die res van hulle was bedoel om binne die stelsel te bly.


My goue belofte was dat ek nie kode drasties hoef te verander nie en steeds net geselekteerde gebeurtenisse na websocket-kliënte sal kan stuur.


Dit is hoekom ek aanvanklik 'n Redis pub-sub-module geskep het, aangesien ek gedink het dat my gebeure, om in ander gevalle sigbaar te wees, deur die Redis pub-sub-patroon oorgedra moet word.


Moenie oorweldig voel deur na die module hieronder te kyk nie, want ek sal later besonderhede in 'n gebruiksgeval verduidelik


 export const REDIS_PUB_CLIENT = 'REDIS_PUB_CLIENT'; export const REDIS_SUB_CLIENT = 'REDIS_SUB_CLIENT'; export const REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS = 'REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS'; @Module({ providers: [ { provide: REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, useFactory: (options: RedisEventPubSubModuleOptions) => options, inject: [MODULE_OPTIONS_TOKEN], }, { provide: REDIS_PUB_CLIENT, useFactory: async (options: RedisEventPubSubModuleOptions) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err)); await client.connect(); return client; }, inject: [MODULE_OPTIONS_TOKEN], }, { provide: EVENT_EMITTER_TOKEN, useFactory: ( redisPubClient: RedisClientType, eventEmitter: EventEmitter2, ) => { return new RedisEventEmitter(redisPubClient, eventEmitter); }, inject: [REDIS_PUB_CLIENT, EventEmitter2], }, { provide: EVENT_SUBSCRIBER_TOKEN, useFactory: (eventEmitterSub: EventEmitter2) => { return new EventEmitter2EventSubscriber(eventEmitterSub); }, inject: [EventEmitter2], }, ], exports: [ REDIS_PUB_CLIENT, EVENT_EMITTER_TOKEN, EVENT_SUBSCRIBER_TOKEN, REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, ], }) export class RedisEventPubSubModule extends ConfigurableModuleClass { static registerEvents(eventsPublishableNames: string[]): DynamicModule { return { module: class {}, providers: [ { provide: REDIS_SUB_CLIENT, useFactory: async ( options: RedisEventPubSubModuleOptions, eventEmitter: EventEmitter2, ) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err), ); await client.connect(); for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); }); } return client; }, inject: [REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, EventEmitter2], }, ], }; } }


Hierdie module sorg vir die skep/blootstelling van 'n Pub Redis-kliënt en die blootlegging van 'n addisionele metode - registerEvents, wat verantwoordelik is om na gegewe gebeurtenisse op Redis pub-sub te luister en dit weer via gebeurtenis-uitsender uit te stuur.


Dit kan vir eers 'n bietjie mistig wees. Waarom gebeure heruitstuur? Hoekom moet ons vir daardie geleenthede registreer? Wat is EVENT_EMITTER_TOKEN en EVENT_SUBSCRIBER_TOKEN en hoekom moet ons dit uitvoer?


Dit sal duideliker wees met werklike gebruik, so kom ons skep 'n gebruiksgeval - kletsboodskappe. Ons wil in staat wees om boodskappe via HTTP POST te stuur en dit via WebSocket aan die voorkant te ontvang.


Kom ons begin

Publiseer geleenthede

Hier is 'n module daarvoor

 @Module({ imports: [], controllers: [], providers: [], }) export class UserChatModule {}


En 'n gebeurtenis wat hierdie module sal uitstuur nadat 'n POST-versoek ontvang is


 export class NewMessageEvent { constructor(public readonly message: string) {} }


In die kontroleerder moet ons dit moontlik maak om gebeurtenisse vir beide ons stelsel en die Redis-kroegry uit te stuur. Ons sal toegedraai gebruik

EventEmitter2 daarvoor


 export const EVENT_EMITTER_TOKEN = 'EVENT_EMITTER_TOKEN'; export class RedisEventEmitter implements EventEmitterInterface { constructor( private redisPubClient: RedisClientType, private eventEmitter: EventEmitter2, ) {} async emit(eventName: string, payload: Record<any, any>): Promise<void> { this.eventEmitter.emit(eventName, payload); if (this.isPublishableEvent(payload)) { await this.redisPubClient.publish( payload.publishableEventName, JSON.stringify(payload), ); } } private isPublishableEvent(event: any): event is PublishableEventInterface { return event.publishableEventName !== undefined; } }


En dan kan ons dit in ons beheerder gebruik


 @Controller('messages') export class SendMessageAction { constructor( // Previously eventEmitter2 @Inject(EVENT_EMITTER_TOKEN) private readonly eventEmitter: EventEmitterInterface, ) {} @Post() async handle(@Body() request: SendMessageHttpRequest) { await this.eventEmitter.emit( NewMessageEvent.name, new NewMessageEvent(request.content), ); } }


Maar voor dit moet ons ons geleentheid verbeter met PublishableEventInterface om RedisEventEmitter toe te laat om ons geleentheid te vang en dit in die Redis-kroegry uit te stuur.


 export class NewMessageEvent implements PublishableEventInterface { static publishableEventName = 'events:new-message'; publishableEventName = NewMessageEvent.publishableEventName; constructor(public readonly message: string) {} }


Wonderlik, ons stuur nou ons geleenthede soos voorheen, maar nou, as hulle as publiseerbaar gemerk is, sal hulle in die Redis-kroeg-tou beland.

Maar nou moet ons dit moontlik maak om daardie gebeurtenisse op WebSocket te ontvang, reg?

Gebeurtenisse ontvang

Dus, kom ons kyk na ons gebruikerskletsmodule


 @Module({ imports: [ RedisEventPubSubModule.registerEvents([ NewMessageEvent.publishableEventName, ]), ], controllers: [SendMessageAction], providers: [], }) export class UserChatModule {}


Soos u kan sien, het ons die metode wat vroeër genoem is, gebruik - registerEvents.

Danksy daardie metode het ons vir RedisEventPubSubModule gesê dat dit moet luister vir ons NewMessageEvent-gebeurtenis in die Redis-kroeg-sub-tou op die publishableEventName-kenmerk.


Dus, as enige NewMessageEvent-gebeurtenis plaasvind, sal dit weer uitgestuur word as 'n normale NewMessageEvent-gebeurtenis, maar onder die publishableEventName-kenmerk.


Dit is die moeite werd om te noem dat dit op 1 instansie of 1 000 instansies sal werk. So selfs as ons na 'n groot aantal gevalle skaal, sal elkeen van hulle dit ontvang en hierdie gebeurtenis weer binne die stelsel uitstuur.


So, nou het ons die vermoë om gebeurtenisse uit te saai en daarna te luister. Nou moet ons dit by ons websocket-kliënte aflewer.

Websocket Gateway

Kom ons kyk na Websocket Gateway


 export enum WebsocketEventSubscribeList { FETCH_EVENTS_MESSAGES = 'fetch-events-messages', EVENTS_MESSAGES_STREAM = 'events-messages-stream', } @WebSocketGateway({ pingInterval: 30000, pingTimeout: 5000, cors: { origin: '*', }, }) export class MessagesWebsocketGateway { constructor( @Inject(EVENT_SUBSCRIBER_TOKEN) private eventSubscriber: EventSubscriberInterface, ) {} @SubscribeMessage(WebsocketEventSubscribeList.FETCH_EVENTS_MESSAGES) async streamMessagesData(@ConnectedSocket() client: any) { const stream$ = this.createWebsocketStreamFromEventFactory( client, this.eventSubscriber, NewMessageEvent.publishableEventName, ); const event = WebsocketEventSubscribeList.EVENTS_MESSAGES_STREAM; return from(stream$).pipe(map((data) => ({ event, data }))); } private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


So daar is 'n ding, in die konstruktor het ons EVENT_SUBSCRIBER_TOKEN, watter tipe EventSubscriberInterface is. Maar wat doen dit regtig? Dit is hoe dit onder die enjinkap lyk


 export class EventEmitter2EventSubscriber implements EventSubscriberInterface { constructor(private eventEmitter: EventEmitter2) {} on(name: string, listener: any): void { this.eventEmitter.on(name, listener); } off(name: string, listener: any): void { this.eventEmitter.removeListener(name, listener); } }


Dit is net 'n omhulsel vir EventEmitter2, wat ons gebruik in die metode createWebsocketStreamFromEventFactory


 private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


Ons gebruik hierdie toegedraaide EventEmitter2 om dinamiese luisteraars op publishableName te skep wanneer websokkliënte koppel en verwyder wanneer hulle ontkoppel word.


Dan doen ons niks meer as om rxjs-stroom te skep om die websocket-verbinding te behou en boodskappe van die luisteraar af te stuur via observer.next(message); wanneer 'n nuwe boodskap verskyn.

Hoe sal hierdie geleentheid ons luisteraars bereik?


As jy terugkeer na die eerste stukkie kode, ons Redis pub sub module, dan kan jy dit raaksien in die registerEvents metode

 for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); });


Wat basies luister vir gebeure op die kroegry, en dit dan weer uitstuur via die gebeurtenis-uitsender.


So, kom ons som op wat ons hier gedoen het

  • Ons gebruik steeds ons gebeure in die stelsel soos ons gebruik het via EventEmitter2, maar as ons na ons gekoppelde websocket-kliënte wil publiseer, dan hoef ons net PublishableInterface te implementeer

  • Ons skep nie nuwe Redis-verbindings op elke gekoppelde websokkliënt nie

  • Ons kan ons stelsel opskaal na X instansies en dit sal steeds op dieselfde manier optree - elke gekoppelde kliënt sal 'n kopie van die gebeurtenis via websocket kry, maak nie saak aan watter instansie hulle gekoppel sal word nie


Werkkode en voorbeeld is hier beskikbaar: https://github.com/axotion/nestjs-events-websocket