Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OPCUA CLIENT]: the routine can't enter into the event callback at the same time with data point subscription #2556

Open
7 tasks
MaoniinoaM opened this issue Mar 8, 2019 · 0 comments

Comments

@MaoniinoaM
Copy link

Description

I use open62541 as opcua client and subscribe data point and event at the same time. When I trigger a event in server side, the routine can not step into the event subscription callback function. But when I just subscribe the event, the routine can step into the event subscription callback successfully after I trigger a event.
Can you give me some hints?
Here is my code of OPCUA part.

`MS_BOOL OPCUA_create_Client(MS_OPCUA_CLIENT_HANDLE_PTR_TYPE pHandle)
{
UA_ByteString* remoteCertificate = NULL;

UA_EndpointDescription* endpointArray      = NULL;
size_t                  endpointArraySize  = 0;
UA_StatusCode           code               = UA_STATUSCODE_GOOD;
MS_BOOL                 retVal             = MS_TRUE;

UA_ByteString           certificate        = {0};
UA_ByteString           privateKey         = {0};

UA_ClientConfig         clientConfig       = UA_ClientConfig_default;


pHandle->pClient = UA_Client_new(clientConfig);

remoteCertificate = UA_ByteString_new();
code = UA_Client_getEndpoints(pHandle->pClient, pHandle->pClientAttr->pServerUrl, &endpointArraySize, &endpointArray);
if (code != UA_STATUSCODE_GOOD) {
    MS_LOG(MS_DEBUG_LEVEL, "[OPCUA]: Getting endpoint... FAIL: %s.\r\n", UA_StatusCode_name(code));
    retVal = MS_FALSE;
}

if (retVal == MS_TRUE) {
    if (pHandle->pClientAttr->logInfo.isAnonymous == MS_TRUE) {
        code = UA_Client_connect(pHandle->pClient, pHandle->pClientAttr->pServerUrl);
    } else {
        code = UA_Client_connect_username(pHandle->pClient, pHandle->pClientAttr->pServerUrl
            , pHandle->pClientAttr->logInfo.pUsrName
            , pHandle->pClientAttr->logInfo.pPasswd);
    }

    if (code != UA_STATUSCODE_GOOD) {
        MS_LOG(MS_DEBUG_LEVEL, "[OPCUA]: Connecting server... FAIL: %s.\r\n", UA_StatusCode_name(code));
        retVal = MS_FALSE;
    }
}


// encrytion start
if (retVal == MS_TRUE && pHandle->pClientAttr->securityPolicyType != MS_OPCUA_SECURE_POLICY_NONE) {
    UA_String securityPolicyUri = UA_STRING(pHandle->pClientAttr->pSecurityPolicyUri);
    MS_LOG(MS_DEBUG_LEVEL, "[OPCUA]: %i endpoints found.\r\n", (MS_INT32)endpointArraySize);

    for (MS_INT32 endPointCount = 0; endPointCount < endpointArraySize; endPointCount++) {
        MS_LOG(MS_DEBUG_LEVEL, "[OPCUA]: URL of endpoint %i is %.*s / %.*s.\r\n", (MS_INT32)endPointCount
            , (MS_INT32)endpointArray[endPointCount].endpointUrl.length, endpointArray[endPointCount].endpointUrl.data
            , (MS_INT32)endpointArray[endPointCount].securityPolicyUri.length,
            endpointArray[endPointCount].securityPolicyUri.data);

        if (endpointArray[endPointCount].securityMode != UA_MESSAGESECURITYMODE_SIGNANDENCRYPT) {
            continue;
        }

        if (UA_String_equal(&endpointArray[endPointCount].securityPolicyUri, &securityPolicyUri)) {
            UA_ByteString_copy(&endpointArray[endPointCount].serverCertificate, remoteCertificate);
            break;
        }
    }

    if (UA_ByteString_equal(remoteCertificate, &UA_BYTESTRING_NULL)) {
        MS_LOG(MS_DEBUG_LEVEL, "[OPCUA]: Server does not support Security Basic128Rsa15 Mode of"
            " UA_MESSAGESECURITYMODE_SIGNANDENCRYPT.\r\n");
        retVal = MS_FALSE;
    }

    if (retVal == MS_TRUE) {
        // Disconnects the client internally
        // Prepare secure connection
        UA_Client_delete(pHandle->pClient);
        pHandle->pClient = NULL;
    }

/*
UA_STACKARRAY(UA_ByteString, trustList, trustListSize);
for (size_t trustListCount = 0; trustListCount < trustListSize; trustListCount++) {
trustList[trustListCount] = load_OPCUA_Cert_File(argv[trustListCount+3]);
}
*/
if (retVal == MS_TRUE) {
clientConfig.secureChannelLifeTime = 0xFFFFFFFF; // keep alive max time

        certificate = OPCUA_load_Cert_File(OPCUA_CERT_FILE_PATH);
        privateKey  = OPCUA_load_Cert_File(OPCUA_PRIVATE_KEY_FILE_PATH);

        if (certificate.data == NULL || privateKey.data == NULL) {
            MS_LOG(MS_DEBUG_LEVEL, "[OPCUA]: Certificate or private key is NULL.\r\n");
            retVal = MS_FALSE;
        }
    }

    if (retVal == MS_TRUE) {
        if (pHandle->pClientAttr->securityPolicyType == MS_OPCUA_SECURE_POLICY_BASIC128RSA15) {
            pHandle->pClient = UA_Client_secure_new(clientConfig, certificate, privateKey, remoteCertificate
                , NULL, 0, NULL, 0, UA_SecurityPolicy_Basic128Rsa15);
        } else {
            pHandle->pClient = UA_Client_secure_new(clientConfig, certificate, privateKey, remoteCertificate
                , NULL, 0, NULL, 0, UA_SecurityPolicy_Basic256Sha256);
        }

        if (pHandle->pClient == NULL) {
            retVal = MS_FALSE;
        }
    }

/*
for(size_t deleteCount = 0; deleteCount < trustListSize; deleteCount++) {
UA_ByteString_clear(&trustList[deleteCount]);
}
*/
if (retVal == MS_TRUE) {
if (pHandle->pClientAttr->logInfo.isAnonymous == MS_TRUE) {
code = UA_Client_connect(pHandle->pClient, pHandle->pClientAttr->pServerUrl);
} else {
code = UA_Client_connect_username(pHandle->pClient, pHandle->pClientAttr->pServerUrl
, pHandle->pClientAttr->logInfo.pUsrName
, pHandle->pClientAttr->logInfo.pPasswd);
}

        if (code != UA_STATUSCODE_GOOD) {
            MS_LOG(MS_DEBUG_LEVEL, "[OPCUA]: Connecting \"%s\"... FAIL: %s.\r\n"
                , pHandle->pClientAttr->pServerUrl, UA_StatusCode_name(code));

            retVal = MS_FALSE;
        }
    }
}
// encrytion end


if (endpointArray != NULL) {
    UA_Array_delete(endpointArray, endpointArraySize, &UA_TYPES[UA_TYPES_ENDPOINTDESCRIPTION]);
}

if (certificate.data != NULL) {
    UA_ByteString_clear(&certificate);
}
if (privateKey.data != NULL) {
    UA_ByteString_clear(&privateKey);
}

if (remoteCertificate != NULL) {
    UA_ByteString_delete(remoteCertificate); /* Dereference the memory */
}

if (retVal != MS_TRUE) {
    if (pHandle->pClient != NULL) {
        UA_Client_delete(pHandle->pClient);
        pHandle->pClient = NULL;
    }
} else {
    // set up OPCUA subscription.
    OPCUA_build_Subscription(pHandle);
    OPCUA_build_Subscription_Event(pHandle);

    MS_LOG(MS_DEBUG_LEVEL, "[OPCUA]: Connecting \"%s\"... SUCCESS.\r\n"
        , pHandle->pClientAttr->pServerUrl);
}

return retVal;

}

MS_VOID OPCUA_build_Subscription(MS_OPCUA_CLIENT_HANDLE_PTR_TYPE pHandle)
{
LIST_ENTRY_PTR_TYPE pHead = &pHandle->pClientAttr->dataPointHead;
LIST_PTR_TYPE pos = NULL;

UA_NodeId           nodeId = {0};

MS_OPCUA_DP_NODE_PTR_TYPE pNode  = NULL;

UA_CreateSubscriptionRequest  request;
UA_CreateSubscriptionResponse response;
UA_MonitoredItemCreateRequest monRequest;
UA_MonitoredItemCreateResult  monResponse;

request = UA_CreateSubscriptionRequest_default();
    // request.requestedPublishingInterval = 1000;
    // request.requestedMaxKeepAliveCount  = 100;

response = UA_Client_Subscriptions_create(pHandle->pClient, request, NULL, NULL, NULL);

if(response.responseHeader.serviceResult == UA_STATUSCODE_GOOD) {
    MS_LOG(MS_DEBUG_LEVEL, "[OPCUA]: Creating subscription... SUCCESS: %u.\r\n"
        , response.subscriptionId);
} else {
    MS_LOG(MS_DEBUG_LEVEL, "[OPCUA]: Creating subscription... FAIL.\r\n");

    return;
}

LIST_FOR_EACH(pos, pHead) {
    pNode = NULL;
    pNode = CONTAINER_OF(pos, MS_OPCUA_DP_NODE_TYPE, list);
    if (pNode == NULL) {
        continue;
    }

    if (pNode->readType != MS_OPCUA_READ_TYPE_SUBSCRIPTION) {
        continue;
    }


    if (pNode->nodeIDType == MS_OPCUA_NODEID_TYPE_STRING) {
        nodeId = UA_NODEID_STRING(pNode->nameSpace, pNode->pNodeIDStr);
    } else {
        nodeId = UA_NODEID_NUMERIC(pNode->nameSpace, pNode->nodeIDNum);
    }
    monRequest = UA_MonitoredItemCreateRequest_default(nodeId);

    monResponse = UA_Client_MonitoredItems_createDataChange(pHandle->pClient
        , response.subscriptionId, UA_TIMESTAMPSTORETURN_BOTH
        , monRequest, (MS_VOID_PTR)pNode, OPCUA_handle_Subscription, NULL);

    if(monResponse.statusCode == UA_STATUSCODE_GOOD) {
        MS_LOG(MS_DEBUG_LEVEL, "[OPCUA]: Monitoring \"%s\"... SUCCESS: %u.\r\n", pNode->pValName
            , monResponse.monitoredItemId);
    } else {
        MS_LOG(MS_DEBUG_LEVEL, "[OPCUA]: Monitoring \"%s\"... FAIL.\r\n", pNode->pValName);
    }
}

}

MS_VOID OPCUA_handle_Subscription(UA_Client *client, UA_UInt32 subId, void *subContext,
UA_UInt32 monId, void *monContext, UA_DataValue *value)
{
MS_OPCUA_DP_NODE_PTR_TYPE pNode = (MS_OPCUA_DP_NODE_PTR_TYPE)monContext;
MS_MCL_DP_PTR_TYPE pDataPoint = NULL;
UA_Variant * pVal = NULL;
UA_DateTimeStruct dts = {0};

MS_INT32               strLen     = 0;

// MS_LOG(MS_DEBUG_LEVEL, "handle thread : %d.\r\n", GetCurrentThreadId());

if (value->status == UA_STATUSCODE_GOOD) {
    pVal = &value->value;

    pDataPoint = (MS_MCL_DP_PTR_TYPE)malloc(sizeof (MS_MCL_DP_TYPE));
    if (pDataPoint == NULL) {
        return;
    }

    memset(pDataPoint, 0, sizeof (MS_MCL_DP_TYPE));

    OPCUA_store_Raw_Data_Point(pDataPoint, pNode, pVal);

    if (pDataPoint->valType != MS_MCL_DP_TYPE_NULL) {
        strLen = strlen(pNode->pUUID);

        pDataPoint->pUUID = malloc(strLen+1);
        memset(pDataPoint->pUUID, 0, strLen+1);
        memcpy(pDataPoint->pUUID, pNode->pUUID, strLen);

        pDataPoint->pTimeStamp = malloc(32);
        memset(pDataPoint->pTimeStamp, 0, 32);

        // Here we assume the time source from OPCUA server is UTC time.
        // If not, please do some modification of hour.
        dts = UA_DateTime_toStruct(value->sourceTimestamp);

        sprintf(pDataPoint->pTimeStamp, "%04d-%02d-%02dT%02d:%02d:%02d.%03dZ", dts.year, dts.month
                , dts.day, dts.hour, dts.min, dts.sec, dts.milliSec);

        // send the DataPoint to MindSphere.
        MCL_insert_DataPoint_Queue(pDataPoint);
    } else {
        free(pDataPoint);
    }
} else {
    MS_LOG(MS_DEBUG_LEVEL, "[OPCUA]: Subscribe value error.\r\n");
}

}

MS_VOID OPCUA_build_Subscription_Event(MS_OPCUA_CLIENT_HANDLE_PTR_TYPE pHandle)
{
LIST_ENTRY_PTR_TYPE pHead = &pHandle->pClientAttr->eventHead;
LIST_PTR_TYPE pos = NULL;

UA_NodeId                     nodeId      = {0};

MS_OPCUA_EVENT_NODE_PTR_TYPE  pNode       = NULL;

UA_CreateSubscriptionRequest  request;
UA_CreateSubscriptionResponse response;
UA_MonitoredItemCreateRequest monRequest;
UA_MonitoredItemCreateResult  monResponse;

UA_EventFilter                filter;

request = UA_CreateSubscriptionRequest_default();
// request.requestedPublishingInterval = 1000;
// request.requestedMaxKeepAliveCount  = 100;

response = UA_Client_Subscriptions_create(pHandle->pClient, request, NULL, NULL, NULL);

if(response.responseHeader.serviceResult == UA_STATUSCODE_GOOD) {
    MS_LOG(MS_DEBUG_LEVEL, "[OPCUA]: Creating subscription... SUCCESS: %u.\r\n"
        , response.subscriptionId);
} else {
    MS_LOG(MS_DEBUG_LEVEL, "[OPCUA]: Creating subscription... FAIL.\r\n");
}

LIST_FOR_EACH(pos, pHead) {
    pNode = NULL;
    pNode = CONTAINER_OF(pos, MS_OPCUA_EVENT_NODE_TYPE, list);
    if (pNode == NULL) {
        continue;
    }

    if (pNode->nodeIDType == MS_OPCUA_NODEID_TYPE_STRING) {
        nodeId = UA_NODEID_STRING(pNode->nameSpace, pNode->pNodeIDStr);
    } else {
        nodeId = UA_NODEID_NUMERIC(pNode->nameSpace, pNode->nodeIDNum);
    }

    UA_MonitoredItemCreateRequest_init(&monRequest);
    monRequest.itemToMonitor.nodeId = nodeId;
    monRequest.itemToMonitor.attributeId = UA_ATTRIBUTEID_EVENTNOTIFIER;
    monRequest.monitoringMode = UA_MONITORINGMODE_REPORTING;


    UA_EventFilter_init(&filter);
    filter.selectClauses = OPCUA_setup_SelectClauses(&pNode->filterHead);
    filter.selectClausesSize = pNode->filterHead.elemNum;

    monRequest.requestedParameters.filter.encoding = UA_EXTENSIONOBJECT_DECODED;
    monRequest.requestedParameters.filter.content.decoded.data = &filter;
    monRequest.requestedParameters.filter.content.decoded.type = &UA_TYPES[UA_TYPES_EVENTFILTER];


    monResponse = UA_Client_MonitoredItems_createEvent(pHandle->pClient
        , response.subscriptionId, UA_TIMESTAMPSTORETURN_BOTH
        , monRequest, pNode, OPCUA_handle_Subscription_Event, NULL);

    if(monResponse.statusCode == UA_STATUSCODE_GOOD) {
        MS_LOG(MS_DEBUG_LEVEL, "[OPCUA]: Monitoring \"%s\"... SUCCESS: %u.\r\n", pNode->pEventName
            , monResponse.monitoredItemId);
    } else {
        MS_LOG(MS_DEBUG_LEVEL, "[OPCUA]: Monitoring \"%s\"... FAIL: %s.\r\n", pNode->pEventName
            , UA_StatusCode_name(monResponse.statusCode));

       // UA_Client_delete(pHandle->pClient);
    }
}

}

MS_VOID OPCUA_handle_Subscription_Event(UA_Client *client, UA_UInt32 subId, void *subContext,
UA_UInt32 monId, void *monContext, size_t nEventFields, UA_Variant *eventFields)
{
MS_OPCUA_EVENT_NODE_PTR_TYPE pEventNode = (MS_OPCUA_EVENT_NODE_PTR_TYPE)monContext;
MS_OPCUA_EVENT_FILTER_NODE_PTR_TYPE pFilterNode = NULL;
MS_MCL_EVENT_PTR_TYPE pEvent = NULL;
UA_Variant * pVal = NULL;

LIST_ENTRY_PTR_TYPE          pFilterHead = &pEventNode->filterHead;
LIST_PTR_TYPE                pos        = 0;
MS_INT32                     i          = 0;

MS_BOOL                      retVal     = MS_TRUE;


pEvent = (MS_MCL_EVENT_PTR_TYPE)malloc(sizeof (MS_MCL_EVENT_TYPE));
if (pEvent == NULL) {
    return;
}

memset(pEvent, 0, sizeof (MS_MCL_EVENT_TYPE));

LIST_FOR_EACH(pos, pFilterHead) {
    pFilterNode = NULL;
    pFilterNode = CONTAINER_OF(pos, MS_OPCUA_EVENT_FILTER_NODE_TYPE, list);
    if (pFilterNode == NULL) {
        continue;
    }

    pVal = &eventFields[i];

    retVal &= OPCUA_store_Raw_Event(pEvent, pFilterNode, pVal);

    // MCL_add_Time_Stamp(&pEvent->pTimeStamp);
    i++;
}

if (retVal == MS_TRUE) {
    // send the Event to MindSphere.
    MCL_insert_Event_Queue(pEvent);
} else {
    MS_LOG(MS_DEBUG_LEVEL, "[OPCUA]: Storing event... FAIL.\r\n");

    MCL_free_Event(pEvent);
}

}

UA_SimpleAttributeOperand * OPCUA_setup_SelectClauses(LIST_ENTRY_PTR_TYPE pFilterHead)
{
LIST_PTR_TYPE pos = NULL;
size_t i = 0;

MS_OPCUA_EVENT_FILTER_NODE_PTR_TYPE pNode = NULL;

UA_SimpleAttributeOperand * selectClauses = (UA_SimpleAttributeOperand *)
    UA_Array_new(pFilterHead->elemNum, &UA_TYPES[UA_TYPES_SIMPLEATTRIBUTEOPERAND]);
if (!selectClauses) {
    return NULL;
}


for(i = 0; i < pFilterHead->elemNum; ++i) {
    UA_SimpleAttributeOperand_init(&selectClauses[i]);
}

i = 0;
LIST_FOR_EACH(pos, pFilterHead) {
    pNode = NULL;
    pNode = CONTAINER_OF(pos, MS_OPCUA_EVENT_FILTER_NODE_TYPE, list);
    if (pNode == NULL) {
        continue;
    }

    selectClauses[i].typeDefinitionId = UA_NODEID_NUMERIC(0, UA_NS0ID_BASEEVENTTYPE);
    selectClauses[i].browsePathSize = 1;
    selectClauses[i].browsePath = (UA_QualifiedName*)
        UA_Array_new(selectClauses[i].browsePathSize, &UA_TYPES[UA_TYPES_QUALIFIEDNAME]);
    if (!selectClauses[i].browsePath) {
        UA_SimpleAttributeOperand_delete(selectClauses);

        return NULL;
    }
    selectClauses[i].attributeId = UA_ATTRIBUTEID_VALUE;
    selectClauses[i].browsePath[0] = UA_QUALIFIEDNAME_ALLOC(0, pNode->pItemName);

    i++;
}

return selectClauses;

}`

Background Information / Reproduction Steps

Checklist

Please provide the following information:

  • open62541 Version (release number or git tag): main branch
  • Other OPC UA SDKs used (client or server): RF650R
  • Operating system: WINDOWS & LINUX
  • Logs (with UA_LOGLEVEL set as low as necessary) attached
  • Wireshark network dump attached
  • Self-contained code example attached
  • Critical issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant