Skip to content

Commit

Permalink
fix(subjects): All subjects will now correctly be closed after `com…
Browse files Browse the repository at this point in the history
…plete()` and `error()`. Removed `ObjectUnsubscribedError`. Unsubscribed subjects will no longer throw.

BREAKING CHANGE: `ObjectUnsubscribedError` is no longer exported. The constructor was long deprecated. If you need a similar error, you can construct and throw your own. RxJS no longer throws this error.

BREAKING CHANGE: `Subject#closed` is no longer writeable. That was never an intended API. If you need to silently close a `Subject`, call `Subject#unsubscribe()`.

BREAKING CHANGE: Windows from `windowToggle` will no longer through if leaked and subscribed outside of the main subscription. There's no real use case for this behavior. If you find you needed this behavior, please file and issue.

fixes #7200
fixes #7209
  • Loading branch information
benlesh committed Apr 4, 2023
1 parent dbbae80 commit c71d2fc
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 119 deletions.
3 changes: 0 additions & 3 deletions spec-dtslint/errors-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
ArgumentOutOfRangeError,
EmptyError,
NotFoundError,
ObjectUnsubscribedError,
SequenceError,
TimeoutError,
UnsubscriptionError
Expand All @@ -15,7 +14,6 @@ it('should deprecate error construction', () => {
error = new ArgumentOutOfRangeError(); // $ExpectDeprecation
error = new EmptyError(); // $ExpectDeprecation
error = new NotFoundError('message'); // $ExpectDeprecation
error = new ObjectUnsubscribedError(); // $ExpectDeprecation
error = new SequenceError('message'); // $ExpectDeprecation
error = new TimeoutError(); // $ExpectDeprecation
error = new UnsubscriptionError([]); // $ExpectDeprecation
Expand All @@ -28,7 +26,6 @@ it('should not deprecate instanceof use', () => {
b = error instanceof ArgumentOutOfRangeError; // $ExpectNoDeprecation
b = error instanceof EmptyError; // $ExpectNoDeprecation
b = error instanceof NotFoundError; // $ExpectNoDeprecation
b = error instanceof ObjectUnsubscribedError; // $ExpectNoDeprecation
b = error instanceof SequenceError; // $ExpectNoDeprecation
b = error instanceof TimeoutError; // $ExpectNoDeprecation
b = error instanceof UnsubscriptionError; // $ExpectNoDeprecation
Expand Down
59 changes: 31 additions & 28 deletions spec/Subject-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { Subject, ObjectUnsubscribedError, Observable, AsyncSubject, Observer, of, config, throwError, concat, Subscription } from 'rxjs';
import { Subject, Observable, AsyncSubject, Observer, of, config, Subscription } from 'rxjs';
import { AnonymousSubject } from 'rxjs/internal/Subject';
import { catchError, delay, map, mergeMap } from 'rxjs/operators';
import { delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from './helpers/observableMatcher';

Expand Down Expand Up @@ -345,33 +345,35 @@ describe('Subject', () => {
subscription2.unsubscribe();
subject.unsubscribe();

expect(() => {
subject.subscribe(
{ next: function (x) {

const subscription = subject.subscribe(
{
next: function (x) {
results3.push(x);
}, error: function (err) {
},
error: function (err) {
expect(false).to.equal('should not throw error: ' + err.toString());
} }
);
}).to.throw(ObjectUnsubscribedError);
}
}
);

expect(subscription.closed).to.be.true;

expect(results1).to.deep.equal([1, 2, 3, 4, 5]);
expect(results2).to.deep.equal([3, 4, 5]);
expect(results3).to.deep.equal([]);
});

it('should not allow values to be nexted after it is unsubscribed', (done) => {
it('should not allow values to be nexted after it is unsubscribed', () => {
const subject = new Subject<string>();
const expected = ['foo'];

subject.subscribe(function (x) {
expect(x).to.equal(expected.shift());
});
const results: any[] = []
subject.subscribe(x => results.push(x));

subject.next('foo');
subject.unsubscribe();
expect(() => subject.next('bar')).to.throw(ObjectUnsubscribedError);
done();
subject.next('bar')

expect(results).to.deep.equal(['foo']);
});

it('should clean out unsubscribed subscribers', (done) => {
Expand Down Expand Up @@ -552,21 +554,22 @@ describe('Subject', () => {
source.subscribe(subject);
});

it('should throw ObjectUnsubscribedError when emit after unsubscribed', () => {
it('should be closed after unsubscribed', () => {
const subject = new Subject<string>();
subject.unsubscribe();
expect(subject.closed).to.be.true;
});

expect(() => {
subject.next('a');
}).to.throw(ObjectUnsubscribedError);

expect(() => {
subject.error('a');
}).to.throw(ObjectUnsubscribedError);
it('should be closed after error', () => {
const subject = new Subject<string>();
subject.error('bad');
expect(subject.closed).to.be.true;
});

expect(() => {
subject.complete();
}).to.throw(ObjectUnsubscribedError);
it('should be closed after complete', () => {
const subject = new Subject<string>();
subject.complete();
expect(subject.closed).to.be.true;
});

it('should not next after completed', () => {
Expand Down
1 change: 0 additions & 1 deletion spec/index-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ describe('index', () => {
it('should export error types', () => {
expect(index.ArgumentOutOfRangeError).to.exist;
expect(index.EmptyError).to.exist;
expect(index.ObjectUnsubscribedError).to.exist;
expect(index.UnsubscriptionError).to.exist;
expect(index.TimeoutError).to.exist;
});
Expand Down
9 changes: 4 additions & 5 deletions spec/operators/windowToggle-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { expect } from 'chai';
import { Observable, NEVER, of, ObjectUnsubscribedError, EMPTY } from 'rxjs';
import { Observable, NEVER, of, EMPTY } from 'rxjs';
import { windowToggle, tap, mergeMap } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';
Expand Down Expand Up @@ -243,7 +243,7 @@ describe('windowToggle', () => {
});
});

it('should dispose window Subjects if the outer is unsubscribed early', () => {
it('should close window Subjects if the outer is unsubscribed early', () => {
rxTestScheduler.run(({ hot, cold, expectObservable, expectSubscriptions, time }) => {
const open = cold(' o-------------------------|');
const e1 = hot(' --a--b--c--d--e--f--g--h--|');
Expand All @@ -265,9 +265,8 @@ describe('windowToggle', () => {
expectObservable(result, unsub).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
rxTestScheduler.schedule(() => {
expect(() => {
window.subscribe();
}).to.throw(ObjectUnsubscribedError);
const subscription = window.subscribe();
expect(subscription.closed).to.be.true;
}, late);
});
});
Expand Down
25 changes: 20 additions & 5 deletions spec/subjects/BehaviorSubject-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { expect } from 'chai';
import { BehaviorSubject, Subject, ObjectUnsubscribedError, of } from 'rxjs';
import { BehaviorSubject, Subject, of } from 'rxjs';
import { tap, mergeMapTo } from 'rxjs/operators';
import { asInteropSubject } from '../helpers/interop-helper';
import { TestScheduler } from 'rxjs/testing';
Expand All @@ -26,12 +26,27 @@ describe('BehaviorSubject', () => {
}).to.throw(Error, 'derp');
});

it('should throw an ObjectUnsubscribedError if getValue() is called and the BehaviorSubject has been unsubscribed', () => {
it('should be closed if unsubscribed', () => {
const subject = new BehaviorSubject('hi there');
subject.unsubscribe();
expect(() => {
subject.getValue();
}).to.throw(ObjectUnsubscribedError);
expect(subject.closed).to.be.true;
});

it('should not be closed if not unsubscribed', () => {
const subject = new BehaviorSubject('hi there');
expect(subject.closed).to.be.false;
});

it('should be closed if it has received an error', () => {
const subject = new BehaviorSubject('hi there');
subject.error(new Error('derp'));
expect(subject.closed).to.be.true;
});

it('should be closed if it has received a complete notification', () => {
const subject = new BehaviorSubject('hi there');
subject.complete();
expect(subject.closed).to.be.true;
});

it('should have a getValue() method to retrieve the current value', () => {
Expand Down
16 changes: 0 additions & 16 deletions spec/util/ObjectUnsubscribedError-spec.ts

This file was deleted.

1 change: 0 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ export { firstValueFrom } from './internal/firstValueFrom';
export { ArgumentOutOfRangeError } from './internal/util/ArgumentOutOfRangeError';
export { EmptyError } from './internal/util/EmptyError';
export { NotFoundError } from './internal/util/NotFoundError';
export { ObjectUnsubscribedError } from './internal/util/ObjectUnsubscribedError';
export { SequenceError } from './internal/util/SequenceError';
export { TimeoutError } from './internal/operators/timeout';
export { UnsubscriptionError } from './internal/util/UnsubscriptionError';
Expand Down
6 changes: 3 additions & 3 deletions src/internal/AsyncSubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ export class AsyncSubject<T> extends Subject<T> {

/** @internal */
protected _checkFinalizedStatuses(subscriber: Subscriber<T>) {
const { hasError, _hasValue, _value, thrownError, isStopped, _isComplete } = this;
const { hasError, _hasValue, _value, thrownError, _closed, _isComplete } = this;
if (hasError) {
subscriber.error(thrownError);
} else if (isStopped || _isComplete) {
} else if (_closed || _isComplete) {
_hasValue && subscriber.next(_value!);
subscriber.complete();
}
}

next(value: T): void {
if (!this.isStopped) {
if (!this._closed) {
this._value = value;
this._hasValue = true;
}
Expand Down
1 change: 0 additions & 1 deletion src/internal/BehaviorSubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ export class BehaviorSubject<T> extends Subject<T> {
if (hasError) {
throw thrownError;
}
this._throwIfClosed();
return _value;
}

Expand Down
5 changes: 2 additions & 3 deletions src/internal/ReplaySubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ export class ReplaySubject<T> extends Subject<T> {
}

next(value: T): void {
const { isStopped, _buffer, _infiniteTimeWindow, _timestampProvider, _windowTime } = this;
if (!isStopped) {
const { _closed, _buffer, _infiniteTimeWindow, _timestampProvider, _windowTime } = this;
if (!_closed) {
_buffer.push(value);
!_infiniteTimeWindow && _buffer.push(_timestampProvider.now() + _windowTime);
}
Expand All @@ -67,7 +67,6 @@ export class ReplaySubject<T> extends Subject<T> {

/** @internal */
protected _subscribe(subscriber: Subscriber<T>): Subscription {
this._throwIfClosed();
this._trimBuffer();

const subscription = this._innerSubscribe(subscriber);
Expand Down
43 changes: 19 additions & 24 deletions src/internal/Subject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { Observable } from './Observable';
import { Subscriber } from './Subscriber';
import { Subscription } from './Subscription';
import { Observer, SubscriptionLike, TeardownLogic } from './types';
import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError';

/**
* A Subject is a special type of Observable that allows values to be
Expand All @@ -12,7 +11,15 @@ import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError';
* Subject, and you can call next to feed values as well as error and complete.
*/
export class Subject<T> extends Observable<T> implements SubscriptionLike {
closed = false;
/** @internal */
_closed = false;

/**
* Will return true if this subject has been closed and is no longer accepting new values.
*/
get closed() {
return this._closed;
}

private currentObservers = new Map<Subscription, Observer<T>>();

Expand Down Expand Up @@ -52,21 +59,13 @@ export class Subject<T> extends Observable<T> implements SubscriptionLike {
super();
}

/** @internal */
protected _throwIfClosed() {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
}

protected _clearObservers() {
this.currentObservers.clear();
this.observerSnapshot = undefined;
}

next(value: T) {
this._throwIfClosed();
if (!this.isStopped) {
if (!this._closed) {
const { observers } = this;
const len = observers.length;
for (let i = 0; i < len; i++) {
Expand All @@ -76,9 +75,8 @@ export class Subject<T> extends Observable<T> implements SubscriptionLike {
}

error(err: any) {
this._throwIfClosed();
if (!this.isStopped) {
this.hasError = this.isStopped = true;
if (!this._closed) {
this.hasError = this._closed = true;
this.thrownError = err;
const { observers } = this;
const len = observers.length;
Expand All @@ -90,9 +88,8 @@ export class Subject<T> extends Observable<T> implements SubscriptionLike {
}

complete() {
this._throwIfClosed();
if (!this.isStopped) {
this.isStopped = true;
if (!this._closed) {
this._closed = true;
const { observers } = this;
const len = observers.length;
for (let i = 0; i < len; i++) {
Expand All @@ -103,7 +100,7 @@ export class Subject<T> extends Observable<T> implements SubscriptionLike {
}

unsubscribe() {
this.isStopped = this.closed = true;
this._closed = true;
this._clearObservers();
}

Expand All @@ -113,23 +110,21 @@ export class Subject<T> extends Observable<T> implements SubscriptionLike {

/** @internal */
protected _trySubscribe(subscriber: Subscriber<T>): TeardownLogic {
this._throwIfClosed();
return super._trySubscribe(subscriber);
}

/** @internal */
protected _subscribe(subscriber: Subscriber<T>): Subscription {
// this._throwIfClosed();
this._checkFinalizedStatuses(subscriber);
return this._innerSubscribe(subscriber);
}

/** @internal */
protected _innerSubscribe(subscriber: Subscriber<any>) {
const { hasError, isStopped, currentObservers } = this;
if (hasError || isStopped) {
if (this.hasError || this._closed) {
return Subscription.EMPTY;
}
const { currentObservers } = this;
const subscription = new Subscription(() => {
currentObservers.delete(subscription);
this.observerSnapshot = undefined;
Expand All @@ -141,10 +136,10 @@ export class Subject<T> extends Observable<T> implements SubscriptionLike {

/** @internal */
protected _checkFinalizedStatuses(subscriber: Subscriber<any>) {
const { hasError, thrownError, isStopped } = this;
const { hasError, thrownError, _closed } = this;
if (hasError) {
subscriber.error(thrownError);
} else if (isStopped) {
} else if (_closed) {
subscriber.complete();
}
}
Expand Down
Loading

0 comments on commit c71d2fc

Please sign in to comment.