Skip to content

Commit

Permalink
Working on cancel forward
Browse files Browse the repository at this point in the history
  • Loading branch information
vpicaver committed Mar 18, 2020
1 parent cfe3f46 commit 3dc6491
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 32 deletions.
84 changes: 52 additions & 32 deletions asyncfuture.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,14 +334,14 @@ void runInMainThread(F func) {
* @param contextObject Determine the receiver callback
*/

template <typename T, typename Finished, typename Canceled>
template <typename T, typename Finished, typename Canceled, typename Progress, typename ProgressRange>
void watch(QFuture<T> future,
const QObject* owner,
const QObject* contextObject,
Finished finished,
Canceled canceled,
std::function<void (int value)> progress = nullptr,
std::function<void (int min, int max)> progressRange = nullptr) {
Progress progress,
ProgressRange progressRange) {

Q_ASSERT(owner);
QPointer<const QObject> ownerAlive = owner;
Expand Down Expand Up @@ -392,19 +392,17 @@ void watch(QFuture<T> future,
canceled();
});

if(progress) {
QObject::connect(watcher, &QFutureWatcher<T>::progressValueChanged,
contextObject, [=](int value) {
progress(value);
});
}

if(progressRange) {
QObject::connect(watcher, &QFutureWatcher<T>::progressRangeChanged,
contextObject, [=](int min, int max) {
progressRange(min, max);
});
}
QObject::connect(watcher, &QFutureWatcher<T>::progressValueChanged,
contextObject, [=](int value) {
progress(value);
});

QObject::connect(watcher, &QFutureWatcher<T>::progressRangeChanged,
contextObject, [=](int min, int max) {
progressRange(min, max);
});


} else {
QObject::connect(watcher, &QFutureWatcher<T>::finished,
Expand All @@ -430,19 +428,16 @@ void watch(QFuture<T> future,
});


if(progress) {
QObject::connect(watcher, &QFutureWatcher<T>::progressValueChanged,
[=](int value) {
progress(value);
});
}
QObject::connect(watcher, &QFutureWatcher<T>::progressValueChanged,
[=](int value) {
progress(value);
});

QObject::connect(watcher, &QFutureWatcher<T>::progressRangeChanged,
[=](int min, int max) {
progressRange(min, max);
});

if(progressRange) {
QObject::connect(watcher, &QFutureWatcher<T>::progressRangeChanged,
[=](int min, int max) {
progressRange(min, max);
});
}
}

if ((QThread::currentThread() != QCoreApplication::instance()->thread()) &&
Expand Down Expand Up @@ -581,7 +576,10 @@ class DeferredFuture : public QObject, public QFutureInterface<T>{
this,
nullptr,
onFinished,
onCanceled);
onCanceled,
[](int){},
[](int,int){}
);

track(future);
}
Expand All @@ -604,7 +602,9 @@ class DeferredFuture : public QObject, public QFutureInterface<T>{
this,
nullptr,
onFinished,
onCanceled);
onCanceled,
[](int){},
[](int,int){});
// It don't track for the first level of future
}

Expand Down Expand Up @@ -642,7 +642,10 @@ class DeferredFuture : public QObject, public QFutureInterface<T>{
this,
nullptr,
onFinished,
onCanceled);
onCanceled,
[](int){},
[](int,int){}
);
}

void incWeakRefCount() {
Expand Down Expand Up @@ -736,6 +739,11 @@ class DeferredFuture : public QObject, public QFutureInterface<T>{
mutex.unlock();
}

void setParent(QFuture<void> future) {
setParentProgressValue(future.progressValue());
setParentProgressRange(future.progressMinimum(), future.progressMaximum());
}

protected:
DeferredFuture(QObject* parent = nullptr): QObject(parent),
QFutureInterface<T>(QFutureInterface<T>::Running),
Expand Down Expand Up @@ -860,7 +868,8 @@ class CombinedFuture: public DeferredFuture<void> {
cancelFutureAt(index, trackProgress);
decWeakRefCount();
},
progressFunc
progressFunc,
[](int,int){}
);
}

Expand Down Expand Up @@ -1140,7 +1149,6 @@ static QFuture<DeferredType> execute(QFuture<T> future, const QObject* contextOb

auto defer = DeferredFuture<DeferredType>::create();

// qDebug() << "Future:" << future.progressMaximum();
defer->setParentProgressValue(future.progressValue());
defer->setParentProgressRange(future.progressMinimum(), future.progressMaximum());

Expand Down Expand Up @@ -1170,6 +1178,18 @@ static QFuture<DeferredType> execute(QFuture<T> future, const QObject* contextOb
defer->cancel(contextObject, &QObject::destroyed);
}


//Watch the defer future and propgate changes up to the parent future
auto futurePtr = QSharedPointer<QFuture<void>>::create(future);
watch(defer->future(),
contextObject,
contextObject,
[]() {}, //onComplete
[futurePtr]() { futurePtr->cancel(); },
[](int){},
[](int,int){}
);

return defer->future();
}

Expand Down
51 changes: 51 additions & 0 deletions tests/asyncfutureunittests/bugtests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,5 +317,56 @@ void BugTests::test_chained_obserable_progress()

QCOMPARE(nextExecuted2, true);
QCOMPARE(nextFuture2.progressValue(), ints.size() * 3);
}

void BugTests::test_forward_canceled() {
QVector<int> ints(100);
std::iota(ints.begin(), ints.end(), ints.size());
std::function<int (int)> func = [](int x)->int {
QThread::msleep(100);
return x * x;
};
QFuture<int> mappedFuture = QtConcurrent::mapped(ints, func);

bool completed1 = false;
bool canceled1 = false;
AsyncFuture::observe(mappedFuture).subscribe(
[&completed1]{ completed1 = true; },
[&canceled1]{ canceled1 = true; }
);

bool started = false;
bool completed2 = false;
bool canceled2 = false;
bool nextFutureCanceled = false;

auto nextFuture = AsyncFuture::observe(mappedFuture).subscribe([ints, func, &completed2, &canceled2, &started](){
started = true;
QFuture<int> mappedFuture2 = QtConcurrent::mapped(ints, func);

AsyncFuture::observe(mappedFuture2).subscribe(
[&completed2]{ completed2 = true; },
[&canceled2]{ canceled2 = true; }
);

return mappedFuture2;
},
[&nextFutureCanceled]() {
nextFutureCanceled = true;
}

).future();

observe(timeout(50)).subscribe([&nextFuture](){
nextFuture.cancel();
});

await(nextFuture);

QCOMPARE(completed1, false);
QCOMPARE(completed2, false);
QCOMPARE(started, false);
QCOMPARE(nextFutureCanceled, true);
QCOMPARE(canceled1, true);
QCOMPARE(canceled2, false); //This was never started, so it can't be cancelled
}
2 changes: 2 additions & 0 deletions tests/asyncfutureunittests/bugtests.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ private slots:
void test_combiner_handle_nested_progress();
void test_combiner_combiner_handle_nested_progress();
void test_chained_obserable_progress();

void test_forward_canceled();
};

#endif // BUGTESTS_H

0 comments on commit 3dc6491

Please sign in to comment.