Skip to content

Commit

Permalink
Merge pull request ReactiveX#3477 from akarnokd/FromArray1x
Browse files Browse the repository at this point in the history
1.x: add a source OnSubscribe which works from an array directly
  • Loading branch information
akarnokd committed Dec 15, 2015
2 parents 5e3540f + 0e45a7e commit 8d3a0c5
Show file tree
Hide file tree
Showing 4 changed files with 343 additions and 18 deletions.
43 changes: 25 additions & 18 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,14 @@ public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
* @see <a href="https://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
*/
public final static <T> Observable<T> from(T[] array) {
return from(Arrays.asList(array));
int n = array.length;
if (n == 0) {
return empty();
} else
if (n == 1) {
return just(array[0]);
}
return create(new OnSubscribeFromArray<T>(array));
}

/**
Expand Down Expand Up @@ -1448,7 +1455,7 @@ public final static <T> Observable<T> just(final T value) {
// suppress unchecked because we are using varargs inside the method
@SuppressWarnings("unchecked")
public final static <T> Observable<T> just(T t1, T t2) {
return from(Arrays.asList(t1, t2));
return from((T[])new Object[] { t1, t2 });
}

/**
Expand All @@ -1474,7 +1481,7 @@ public final static <T> Observable<T> just(T t1, T t2) {
// suppress unchecked because we are using varargs inside the method
@SuppressWarnings("unchecked")
public final static <T> Observable<T> just(T t1, T t2, T t3) {
return from(Arrays.asList(t1, t2, t3));
return from((T[])new Object[] { t1, t2, t3 });
}

/**
Expand Down Expand Up @@ -1502,7 +1509,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3) {
// suppress unchecked because we are using varargs inside the method
@SuppressWarnings("unchecked")
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4) {
return from(Arrays.asList(t1, t2, t3, t4));
return from((T[])new Object[] { t1, t2, t3, t4 });
}

/**
Expand Down Expand Up @@ -1532,7 +1539,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4) {
// suppress unchecked because we are using varargs inside the method
@SuppressWarnings("unchecked")
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5) {
return from(Arrays.asList(t1, t2, t3, t4, t5));
return from((T[])new Object[] { t1, t2, t3, t4, t5 });
}

/**
Expand Down Expand Up @@ -1564,7 +1571,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5) {
// suppress unchecked because we are using varargs inside the method
@SuppressWarnings("unchecked")
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6) {
return from(Arrays.asList(t1, t2, t3, t4, t5, t6));
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6 });
}

/**
Expand Down Expand Up @@ -1598,7 +1605,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6) {
// suppress unchecked because we are using varargs inside the method
@SuppressWarnings("unchecked")
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7) {
return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7));
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7 });
}

/**
Expand Down Expand Up @@ -1634,7 +1641,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T
// suppress unchecked because we are using varargs inside the method
@SuppressWarnings("unchecked")
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8) {
return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8));
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7, t8 });
}

/**
Expand Down Expand Up @@ -1672,7 +1679,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T
// suppress unchecked because we are using varargs inside the method
@SuppressWarnings("unchecked")
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9) {
return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9));
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7, t8, t9 });
}

/**
Expand Down Expand Up @@ -1712,7 +1719,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T
// suppress unchecked because we are using varargs inside the method
@SuppressWarnings("unchecked")
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9, T t10) {
return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9, t10));
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7, t8, t9, t10 });
}

/**
Expand Down Expand Up @@ -1845,7 +1852,7 @@ public final static <T> Observable<T> merge(Observable<? extends Observable<? ex
*/
@SuppressWarnings("unchecked")
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2) {
return merge(from(Arrays.asList(t1, t2)));
return merge(new Observable[] { t1, t2 });
}

/**
Expand All @@ -1871,7 +1878,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
*/
@SuppressWarnings("unchecked")
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3) {
return merge(from(Arrays.asList(t1, t2, t3)));
return merge(new Observable[] { t1, t2, t3 });
}

/**
Expand Down Expand Up @@ -1899,7 +1906,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
*/
@SuppressWarnings("unchecked")
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4) {
return merge(from(Arrays.asList(t1, t2, t3, t4)));
return merge(new Observable[] { t1, t2, t3, t4 });
}

/**
Expand Down Expand Up @@ -1929,7 +1936,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
*/
@SuppressWarnings("unchecked")
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5) {
return merge(from(Arrays.asList(t1, t2, t3, t4, t5)));
return merge(new Observable[] { t1, t2, t3, t4, t5 });
}

/**
Expand Down Expand Up @@ -1961,7 +1968,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
*/
@SuppressWarnings("unchecked")
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6) {
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6)));
return merge(new Observable[] { t1, t2, t3, t4, t5, t6 });
}

/**
Expand Down Expand Up @@ -1995,7 +2002,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
*/
@SuppressWarnings("unchecked")
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7) {
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7)));
return merge(new Observable[] { t1, t2, t3, t4, t5, t6, t7 });
}

/**
Expand Down Expand Up @@ -2031,7 +2038,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
*/
@SuppressWarnings("unchecked")
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7, Observable<? extends T> t8) {
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8)));
return merge(new Observable[] { t1, t2, t3, t4, t5, t6, t7, t8 });
}

/**
Expand Down Expand Up @@ -2069,7 +2076,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
*/
@SuppressWarnings("unchecked")
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7, Observable<? extends T> t8, Observable<? extends T> t9) {
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9)));
return merge(new Observable[] { t1, t2, t3, t4, t5, t6, t7, t8, t9 });
}

/**
Expand Down
128 changes: 128 additions & 0 deletions src/main/java/rx/internal/operators/OnSubscribeFromArray.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.internal.operators;

import java.util.concurrent.atomic.AtomicLong;

import rx.*;
import rx.Observable.OnSubscribe;

public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
final T[] array;
public OnSubscribeFromArray(T[] array) {
this.array = array;
}

@Override
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}

static final class FromArrayProducer<T>
extends AtomicLong
implements Producer {
/** */
private static final long serialVersionUID = 3534218984725836979L;

final Subscriber<? super T> child;
final T[] array;

int index;

public FromArrayProducer(Subscriber<? super T> child, T[] array) {
this.child = child;
this.array = array;
}

@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
if (n == Long.MAX_VALUE) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
fastPath();
}
} else
if (n != 0) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
slowPath(n);
}
}
}

void fastPath() {
final Subscriber<? super T> child = this.child;

for (T t : array) {
if (child.isUnsubscribed()) {
return;
}

child.onNext(t);
}

if (child.isUnsubscribed()) {
return;
}
child.onCompleted();
}

void slowPath(long r) {
final Subscriber<? super T> child = this.child;
final T[] array = this.array;
final int n = array.length;

long e = 0L;
int i = index;

for (;;) {

while (r != 0L && i != n) {
if (child.isUnsubscribed()) {
return;
}

child.onNext(array[i]);

i++;

if (i == n) {
if (!child.isUnsubscribed()) {
child.onCompleted();
}
return;
}

r--;
e--;
}

r = get() + e;

if (r == 0L) {
index = i;
r = addAndGet(e);
if (r == 0L) {
return;
}
e = 0L;
}
}
}
}
}
Loading

0 comments on commit 8d3a0c5

Please sign in to comment.