Skip to content

Commit

Permalink
Update server-kafka.ts
Browse files Browse the repository at this point in the history
try fix from #10113 (comment)
  • Loading branch information
RaiMX committed Oct 7, 2022
1 parent 93d1401 commit 2082b1e
Showing 1 changed file with 17 additions and 24 deletions.
41 changes: 17 additions & 24 deletions packages/microservices/server/server-kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,31 +200,24 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
this.send(replayStream$, publish);
}

private combineStreamsAndThrowIfRetriable(
response$: Observable<any>,
replayStream$: ReplaySubject<unknown>,
) {
return new Promise<void>((resolve, reject) => {
let isPromiseResolved = false;
response$.subscribe({
next: val => {
replayStream$.next(val);
if (!isPromiseResolved) {
isPromiseResolved = true;
resolve();
}
},
error: err => {
if (err instanceof KafkaRetriableException && !isPromiseResolved) {
isPromiseResolved = true;
reject(err);
}
replayStream$.error(err);
},
complete: () => replayStream$.complete(),
});
private combineStreamsAndThrowIfRetriable(response$: Observable<any>, replayStream$: ReplaySubject<unknown>) {
return new Promise<void>((resolve, reject) => {
response$.subscribe({
next: (val) => {
replayStream$.next(val);
resolve();
},
error: (err) => {
if (err instanceof KafkaRetriableException) {
reject(err);
}
replayStream$.error(err);
resolve();
},
complete: () => replayStream$.complete(),
});
}
});
}

public async sendMessage(
message: OutgoingResponse,
Expand Down

0 comments on commit 2082b1e

Please sign in to comment.