Skip to content

Commit

Permalink
fix(engine): signal long polling on unlock
Browse files Browse the repository at this point in the history
When a task is unlocked, it can potentially be
locked by already pending long-polling requests
in a similar way then when a task is initially
created.

This commit adds the missing functionality to
signal pending long polling requests when unlocking
a task.

related to #CAM-9195, #299
  • Loading branch information
meyerdan authored and koevskinikola committed Jul 16, 2018
1 parent b65f4bc commit f3b684c
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,60 +83,72 @@ public class ExternalTaskEntity implements ExternalTask, DbEntity, HasDbRevision

protected String businessKey;

@Override
public String getId() {
return id;
}
@Override
public void setId(String id) {
this.id = id;
}
@Override
public String getTopicName() {
return topicName;
}
public void setTopicName(String topic) {
this.topicName = topic;
}
@Override
public String getWorkerId() {
return workerId;
}
public void setWorkerId(String workerId) {
this.workerId = workerId;
}
@Override
public Date getLockExpirationTime() {
return lockExpirationTime;
}
public void setLockExpirationTime(Date lockExpirationTime) {
this.lockExpirationTime = lockExpirationTime;
}
@Override
public String getExecutionId() {
return executionId;
}
public void setExecutionId(String executionId) {
this.executionId = executionId;
}
@Override
public String getProcessDefinitionKey() {
return processDefinitionKey;
}
public void setProcessDefinitionKey(String processDefinitionKey) {
this.processDefinitionKey = processDefinitionKey;
}
@Override
public String getActivityId() {
return activityId;
}
public void setActivityId(String activityId) {
this.activityId = activityId;
}
@Override
public String getActivityInstanceId() {
return activityInstanceId;
}
public void setActivityInstanceId(String activityInstanceId) {
this.activityInstanceId = activityInstanceId;
}
@Override
public int getRevision() {
return revision;
}
@Override
public void setRevision(int revision) {
this.revision = revision;
}
@Override
public int getRevisionNext() {
return revision + 1;
}
Expand All @@ -146,33 +158,39 @@ public int getSuspensionState() {
public void setSuspensionState(int suspensionState) {
this.suspensionState = suspensionState;
}
@Override
public boolean isSuspended() {
return suspensionState == SuspensionState.SUSPENDED.getStateCode();
}
@Override
public String getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(String processInstanceId) {
this.processInstanceId = processInstanceId;
}
@Override
public String getProcessDefinitionId() {
return processDefinitionId;
}
public void setProcessDefinitionId(String processDefinitionId) {
this.processDefinitionId = processDefinitionId;
}
@Override
public String getTenantId() {
return tenantId;
}
public void setTenantId(String tenantId) {
this.tenantId = tenantId;
}
@Override
public Integer getRetries() {
return retries;
}
public void setRetries(Integer retries) {
this.retries = retries;
}
@Override
public String getErrorMessage() {
return errorMessage;
}
Expand All @@ -199,6 +217,7 @@ public void setBusinessKey(String businessKey) {
this.businessKey = businessKey;
}

@Override
public Object getPersistentState() {
Map<String, Object> persistentState = new HashMap<String, Object>();
persistentState.put("topic", topicName);
Expand Down Expand Up @@ -444,6 +463,10 @@ public String toString() {
public void unlock() {
workerId = null;
lockExpirationTime = null;

Context.getCommandContext()
.getExternalTaskManager()
.fireExternalTaskAvailableEvent();
}

public static ExternalTaskEntity createAndInsert(ExecutionEntity execution, String topic, long priority) {
Expand All @@ -458,7 +481,7 @@ public static ExternalTaskEntity createAndInsert(ExecutionEntity execution, Stri
externalTask.setTenantId(execution.getTenantId());
externalTask.setPriority(priority);

ProcessDefinitionEntity processDefinition = (ProcessDefinitionEntity) execution.getProcessDefinition();
ProcessDefinitionEntity processDefinition = execution.getProcessDefinition();
externalTask.setProcessDefinitionKey(processDefinition.getKey());

externalTask.insert();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public ExternalTaskEntity findExternalTaskById(String id) {

public void insert(ExternalTaskEntity externalTask) {
getDbEntityManager().insert(externalTask);
fireExternalTaskCreatedEvent();
fireExternalTaskAvailableEvent();
}

public void delete(ExternalTaskEntity externalTask) {
Expand Down Expand Up @@ -147,8 +147,7 @@ protected ListQueryParameterObject configureParameterizedQuery(Object parameter)
return getTenantManager().configureQuery(parameter);
}

public void fireExternalTaskCreatedEvent() {

public void fireExternalTaskAvailableEvent() {
Context.getCommandContext()
.getTransactionContext()
.addTransactionListener(TransactionState.COMMITTED, new TransactionListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
package org.camunda.bpm.engine.test.api.externaltask;

import static org.mockito.Mockito.times;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;

import org.camunda.bpm.engine.externaltask.LockedExternalTask;
import org.camunda.bpm.engine.impl.ProcessEngineImpl;
import org.camunda.bpm.engine.impl.util.SingleConsumerCondition;
import org.camunda.bpm.engine.test.ProcessEngineRule;
Expand Down Expand Up @@ -95,4 +97,26 @@ public void shouldSignalConditionOnTaskCreateMultipleTimes() {
verify(condition, times(2)).signal();
}

@Test
public void shouldSignalConditionOnUnlock() {

// given

rule.getRuntimeService()
.startProcessInstanceByKey("theProcess");

reset(condition); // clear signal for create

LockedExternalTask lockedTask = rule.getExternalTaskService().fetchAndLock(1, "theWorker")
.topic("theTopic", 10000)
.execute()
.get(0);

// when
rule.getExternalTaskService().unlock(lockedTask.getId());

// then
verify(condition, times(1)).signal();
}

}

0 comments on commit f3b684c

Please sign in to comment.