Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continuous writes to var node via UA_Server_writeValue() API while it is subscribed to data change notification fails after some time #2363

Open
bvinodkumar2008 opened this issue Jan 11, 2019 · 12 comments

Comments

@bvinodkumar2008
Copy link

bvinodkumar2008 commented Jan 11, 2019

Description

On continuously calling UA_Server_writeValue() API for updating the opcua variable node in server.c and subscribing to this data change notification in client.c, i see this error in server open62541_test: ../open62541.c:48832: UA_NodeMap_releaseNode: Assertion `entry->refCount > 0' failed.

Background Information / Reproduction Steps

  1. Run server
gcc -std=c99 open62541.c server.c -pthread -o server
./server
  1. Run client
gcc -std=c99 open62541.c client.c -pthread -o client
./client

server.c

#include <signal.h>
#include "open62541.h"
#include <pthread.h>

static UA_Server *server = NULL;
static UA_Boolean serverRunning = true;
UA_ServerConfig *serverConfig = NULL;
static char dataToPublish[100];
static char errorMsg[100];

void signalHandler(int sig) {
    serverRunning = false;
}

/* This function provides data to the subscriber */
static UA_StatusCode
readPublishedData(UA_Server *server,
                const UA_NodeId *sessionId,
                void *sessionContext,
                const UA_NodeId *nodeId, void *nodeContext,
                UA_Boolean sourceTimeStamp, const UA_NumericRange *range,
                UA_DataValue *data) {
    UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                     "In readPublishedData() function");
    data->hasValue = true;
    UA_String str = UA_STRING(dataToPublish);
    UA_Variant_setScalarCopy(&data->value, &str, &UA_TYPES[UA_TYPES_STRING]);
	return UA_STATUSCODE_GOOD;
}

static UA_StatusCode
writePublishedData(UA_Server *server,
                 const UA_NodeId *sessionId, void *sessionContext,
                 const UA_NodeId *nodeId, void *nodeContext,
                 const UA_NumericRange *range, const UA_DataValue *data) {

    return UA_STATUSCODE_GOOD;
}

static UA_UInt16
addTopicDataSourceVariable(UA_Server *server, char *namespace, char *topic) {

    UA_UInt16 namespaceIndex = UA_Server_addNamespace(server, namespace);
    if (namespaceIndex == 0) {
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "UA_Server_addNamespace has failed");
        return 100;
    }
    UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%u:namespaceIndex created for namespace: %s\n", namespaceIndex, namespace);

    UA_VariableAttributes attr = UA_VariableAttributes_default;
    attr.description = UA_LOCALIZEDTEXT("en-US", topic);
    attr.displayName = UA_LOCALIZEDTEXT("en-US", topic);
    attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;

    /* Add the variable node to the information model */
    UA_NodeId currentNodeId = UA_NODEID_STRING(namespaceIndex, topic);
    UA_QualifiedName currentName = UA_QUALIFIEDNAME(namespaceIndex, topic);
    UA_NodeId parentNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER);
    UA_NodeId parentReferenceNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES);
    UA_NodeId variableTypeNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE);

    UA_DataSource topicDataSource;
    topicDataSource.read = readPublishedData;
    topicDataSource.write = writePublishedData;
    UA_Server_addDataSourceVariableNode(server, currentNodeId, parentNodeId,
                                        parentReferenceNodeId, currentName,
                                        variableTypeNodeId, attr,
                                        topicDataSource, NULL, NULL);
    return namespaceIndex;
}

static void*
startServer(void *ptr) {
    /* run server */
    UA_StatusCode retval = UA_Server_run(server, &serverRunning);
    if (retval != UA_STATUSCODE_GOOD) {
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "\nServer failed to start, error: %s", UA_StatusCode_name(retval));
    }
    return NULL;
}

char*
serverPublish(int nsIndex, char *topic, char* data) {

    /* check if server is started or not */
    if (server == NULL) {
        char str[] = "UA_Server instance is not instantiated";
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s",
            str);
        return NULL;
    }

    /* writing the data to the opcua variable */
    UA_Variant *val = UA_Variant_new();
    strcpy(dataToPublish, data);
    UA_Variant_setScalarCopy(val, dataToPublish, &UA_TYPES[UA_TYPES_STRING]);
    UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "nsIndex: %d, topic:%s\n", nsIndex, topic);

    UA_StatusCode retval = UA_Server_writeValue(server, UA_NODEID_STRING(nsIndex, topic), *val);
    if (retval == UA_STATUSCODE_GOOD) {
        UA_Variant_delete(val);
        return "0";
    }
    sprintf(errorMsg, "%s", UA_StatusCode_name(retval));
    UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s", errorMsg);
    UA_Variant_delete(val);
    return errorMsg;
}

/* cleanupServer deletes the memory allocated for server configuration */
static void
cleanupServer() {
    UA_Server_delete(server);
    UA_ServerConfig_delete(serverConfig);
}

int main(int argc, char** argv)
{
    signal(SIGINT, signalHandler); /* catch ctrl-c */

    /* Create a server listening on port 4840 */
    serverConfig = UA_ServerConfig_new_default();

    UA_DurationRange range = {5.0, 10.0};
    serverConfig->publishingIntervalLimits = range;
    serverConfig->samplingIntervalLimits = range;

    /* Initiate server instance */
    server = UA_Server_new(serverConfig);

    char *namespace = "namespace";
    char *varName = "test";

    /* add datasource variable */
    UA_Int16 nsIndex = addTopicDataSourceVariable(server, namespace, varName);

    pthread_t serverThread;
    if (pthread_create(&serverThread, NULL, startServer, NULL)) {
        char str[] = "server pthread creation to start server failed";
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s",
            str);
        return -1;
    }

    char result[100];

    int i = 0;
    while (1) {
        sprintf(result, "Hello %d", i);
        char *errorMsg = serverPublish(nsIndex, varName, result);
        if(strcmp(errorMsg, "0")) {
            printf("serverPublish() API failed, error: %s", errorMsg);
            return -1;
        }
        printf("Publishing [%s]\n", result);
        i++;
    }
    cleanupServer();
}

client.c

/* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
 * See http:https://creativecommons.org/publicdomain/zero/1.0/ for more information. */

/**
 * Client disconnect handling
 * --------------------------
 * This example shows you how to handle a client disconnect, e.g., if the server
 * is shut down while the client is connected. You just need to call connect
 * again and the client will automatically reconnect.
 *
 * This example is very similar to the tutorial_client_firststeps.c. */

#include "open62541.h"

#include <signal.h>

UA_Boolean running = true;

static void stopHandler(int sign) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Received Ctrl-C");
    running = 0;
}

static void
handlerCallback(UA_Client *client, UA_UInt32 subId, void *subContext,
                           UA_UInt32 monId, void *monContext, UA_DataValue *data) {
    UA_Variant *value = &data->value;
    if(UA_Variant_isScalar(value)) {
        if (value->type == &UA_TYPES[UA_TYPES_STRING]) {
            UA_String str = *(UA_String*)value->data;
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Data: %.*s", (int) str.length, str.data);
        }
    }
}

static void
deleteSubscriptionCallback(UA_Client *client, UA_UInt32 subscriptionId, void *subscriptionContext) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                "Subscription Id %u was deleted", subscriptionId);
}

static void
subscriptionInactivityCallback (UA_Client *client, UA_UInt32 subId, void *subContext) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Inactivity for subscription %u", subId);
}

static void
stateCallback (UA_Client *client, UA_ClientState clientState) {
    switch(clientState) {
        case UA_CLIENTSTATE_DISCONNECTED:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "The client is disconnected");
        break;
        case UA_CLIENTSTATE_WAITING_FOR_ACK:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Waiting for ack");
        break;
        case UA_CLIENTSTATE_CONNECTED:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                        "A TCP connection to the server is open");
        break;
        case UA_CLIENTSTATE_SECURECHANNEL:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                        "A SecureChannel to the server is open");
        break;
        case UA_CLIENTSTATE_SESSION:{
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "A session with the server is open");
            /* A new session was created. We need to create the subscription. */
            /* Create a subscription */
            UA_CreateSubscriptionRequest request = UA_CreateSubscriptionRequest_default();
            UA_CreateSubscriptionResponse response = UA_Client_Subscriptions_create(client, request,
                                                                                    NULL, NULL, deleteSubscriptionCallback);

            if(response.responseHeader.serviceResult == UA_STATUSCODE_GOOD)
                UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                            "Create subscription succeeded, id %u", response.subscriptionId);
            else
                return;

            UA_NodeId nodeId = UA_NODEID_STRING(2, "test");
            /* Add a MonitoredItem */
            UA_MonitoredItemCreateRequest monRequest =
                UA_MonitoredItemCreateRequest_default(nodeId);

            UA_MonitoredItemCreateResult monResponse =
                UA_Client_MonitoredItems_createDataChange(client, response.subscriptionId,
                                                          UA_TIMESTAMPSTORETURN_BOTH,
                                                          monRequest, NULL, handlerCallback, NULL);
            if(monResponse.statusCode == UA_STATUSCODE_GOOD)
                UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                            "Monitoring UA_NS0ID_SERVER_SERVERSTATUS_CURRENTTIME', id %u",
                            monResponse.monitoredItemId);
        }
        break;
        case UA_CLIENTSTATE_SESSION_RENEWED:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                        "A session with the server is open (renewed)");
            /* The session was renewed. We don't need to recreate the subscription. */
        break;
        case UA_CLIENTSTATE_SESSION_DISCONNECTED:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Session disconnected");
        break;
    }
    return;
}

int
main(void) {
    signal(SIGINT, stopHandler); /* catches ctrl-c */

    UA_ClientConfig config = UA_ClientConfig_default;
    /* Set stateCallback */
    config.stateCallback = stateCallback;
    config.subscriptionInactivityCallback = subscriptionInactivityCallback;

    UA_Client *client = UA_Client_new(config);

    /* Endless loop runAsync */
    while(running) {
        /* if already connected, this will return GOOD and do nothing */
        /* if the connection is closed/errored, the connection will be reset and then reconnected */
        /* Alternatively you can also use UA_Client_getState to get the current state */
        UA_StatusCode retval = UA_Client_connect(client, "opc.tcp:https://localhost:4840");
        if(retval != UA_STATUSCODE_GOOD) {
            UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                         "Not connected. Retrying to connect in 1 second");
            /* The connect may timeout after 1 second (see above) or it may fail immediately on network errors */
            /* E.g. name resolution errors or unreachable network. Thus there should be a small sleep here */
            UA_sleep_ms(1000);
            continue;
        }

        UA_Client_run_iterate(client, 1000);
    };

    /* Clean up */
    UA_Client_delete(client); /* Disconnects the client internally */
    return UA_STATUSCODE_GOOD;
}

Checklist

Please provide the following information:

  • [bbafc24] open62541 Version (release number or git tag):
  • [None] Other OPC UA SDKs used (client or server):
  • [Ubuntu 16.04] Operating system:
  • [300] Logs (with UA_LOGLEVEL set as low as necessary) attached
  • [No] Wireshark network dump attached
  • [Yes] Self-contained code example attached
  • [Yes] Critical issue - continuous writes to variable node
@Pro
Copy link
Member

Pro commented Jan 11, 2019

That's probably because you are using the server in multiple threads, i.e. in the startServer and serverPublish method.

Multithreading is currently not supported. Therefore you need to make sure that you only access the server sequentially. E.g. with mutex or similar.

Please try that and check if the error is stil there.

@bvinodkumar2008
Copy link
Author

Thanks for quick response. I did try acquiring mutex lock before running UA_Server_writeValue() API but even with that I was still get this issue. By the way, I had not enabled multithreading feature on the compiled open62451, should I enable and try?

@Pro
Copy link
Member

Pro commented Jan 11, 2019

You need to use a mutext not only for the UA_Server_writeValue, but everywhere, where you are using the server variable. Especially you need to change

static void*
startServer(void *ptr) {
    /* run server */
    UA_StatusCode retval = UA_Server_run(server, &serverRunning);
    if (retval != UA_STATUSCODE_GOOD) {
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "\nServer failed to start, error: %s", UA_StatusCode_name(retval));
    }
    return NULL;
}

to smth like:

static void*
startServer(void *ptr) {
    /* run server */
   UA_StatusCode retval = UA_Server_run_startup(server);
  // check for GOOD
  while (serverRunning) {
   // lock mutex
   UA_Server_run_iterate(server, 100);
   // unlock mutex
}
UA_Server_run_shutdown(server);
    return NULL;
}

Otherwise you still do not have all the server instances mutexed.

The multithreading feature should not be enabled, this is in very alpha state.

@bvinodkumar2008
Copy link
Author

Thank you for the inputs. Will try that and get back to you.

@bvinodkumar2008
Copy link
Author

I used your logic in startServer but saw that the call was blocked there with just 1 publish, so i've commented out for now. I'm sharing the files with you to see if i'm doing anything wrong in there that could be causing this.

server.c:

#include <signal.h>
#include "open62541.h"
#include <pthread.h>

static UA_Server *server = NULL;
static UA_Boolean serverRunning = true;
UA_ServerConfig *serverConfig = NULL;
static char dataToPublish[100];
static char errorMsg[100];
pthread_mutex_t serverLock;

void signalHandler(int sig) {
    serverRunning = false;
}

/* This function provides data to the subscriber */
static UA_StatusCode
readPublishedData(UA_Server *server,
                const UA_NodeId *sessionId,
                void *sessionContext,
                const UA_NodeId *nodeId, void *nodeContext,
                UA_Boolean sourceTimeStamp, const UA_NumericRange *range,
                UA_DataValue *data) {
    UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                     "In readPublishedData() function");
    data->hasValue = true;
    UA_String str = UA_STRING(dataToPublish);
    UA_Variant_setScalarCopy(&data->value, &str, &UA_TYPES[UA_TYPES_STRING]);
	return UA_STATUSCODE_GOOD;
}

static UA_StatusCode
writePublishedData(UA_Server *server,
                 const UA_NodeId *sessionId, void *sessionContext,
                 const UA_NodeId *nodeId, void *nodeContext,
                 const UA_NumericRange *range, const UA_DataValue *data) {

    return UA_STATUSCODE_GOOD;
}

static UA_UInt16
addTopicDataSourceVariable(UA_Server *server, char *namespace, char *topic) {

    UA_UInt16 namespaceIndex = UA_Server_addNamespace(server, namespace);
    if (namespaceIndex == 0) {
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "UA_Server_addNamespace has failed");
        return 100;
    }
    UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%u:namespaceIndex created for namespace: %s\n", namespaceIndex, namespace);

    UA_VariableAttributes attr = UA_VariableAttributes_default;
    attr.description = UA_LOCALIZEDTEXT("en-US", topic);
    attr.displayName = UA_LOCALIZEDTEXT("en-US", topic);
    attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;

    /* Add the variable node to the information model */
    UA_NodeId currentNodeId = UA_NODEID_STRING(namespaceIndex, topic);
    UA_QualifiedName currentName = UA_QUALIFIEDNAME(namespaceIndex, topic);
    UA_NodeId parentNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER);
    UA_NodeId parentReferenceNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES);
    UA_NodeId variableTypeNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE);

    UA_DataSource topicDataSource;
    topicDataSource.read = readPublishedData;
    topicDataSource.write = writePublishedData;
    UA_Server_addDataSourceVariableNode(server, currentNodeId, parentNodeId,
                                        parentReferenceNodeId, currentName,
                                        variableTypeNodeId, attr,
                                        topicDataSource, NULL, NULL);
    return namespaceIndex;
}

static void*
startServer(void *ptr) {
    /* run server */
    UA_StatusCode retval = UA_Server_run_startup(server);
    if (retval != UA_STATUSCODE_GOOD) {
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "\nServer failed to start, error: %s", UA_StatusCode_name(retval));
        return NULL;
    }
    while (serverRunning) {
        // pthread_mutex_lock(&serverLock);
        UA_Server_run_iterate(server, 100);
        // pthread_mutex_unlock(&serverLock); 
    }
    UA_Server_run_shutdown(server); 
    return NULL;
}

char*
serverPublish(int nsIndex, char *topic, char* data) {

    /* check if server is started or not */
    if (server == NULL) {
        char str[] = "UA_Server instance is not instantiated";
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s",
            str);
        return NULL;
    }

    /* writing the data to the opcua variable */
    UA_Variant *val = UA_Variant_new();
    strcpy(dataToPublish, data);
    UA_Variant_setScalarCopy(val, dataToPublish, &UA_TYPES[UA_TYPES_STRING]);
    UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "nsIndex: %d, topic:%s\n", nsIndex, topic);

    // pthread_mutex_lock(&serverLock); 
    UA_StatusCode retval = UA_Server_writeValue(server, UA_NODEID_STRING(nsIndex, topic), *val);
    if (retval == UA_STATUSCODE_GOOD) {
        UA_Variant_delete(val);
        return "0";
    }
    // pthread_mutex_unlock(&serverLock); 
    sprintf(errorMsg, "%s", UA_StatusCode_name(retval));
    UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s", errorMsg);
    UA_Variant_delete(val);
    return errorMsg;
}

/* cleanupServer deletes the memory allocated for server configuration */
static void
cleanupServer() {
    UA_Server_delete(server);
    UA_ServerConfig_delete(serverConfig);
}

int main(int argc, char** argv)
{
    signal(SIGINT, signalHandler); /* catch ctrl-c */

    /* Create a server listening on port 4840 */
    serverConfig = UA_ServerConfig_new_default();

    UA_DurationRange range = {5.0, 10.0};
    serverConfig->publishingIntervalLimits = range;
    serverConfig->samplingIntervalLimits = range;

    /* Initiate server instance */
    server = UA_Server_new(serverConfig);

    char *namespace = "namespace";
    char *varName = "test";

    /* add datasource variable */
    UA_Int16 nsIndex = addTopicDataSourceVariable(server, namespace, varName);

    if (pthread_mutex_init(&serverLock, NULL) != 0) { 
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "mutex init has failed!"); 
        return -1; 
    } 

    pthread_t serverThread;
    if (pthread_create(&serverThread, NULL, startServer, NULL)) {
        char str[] = "server pthread creation to start server failed";
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s",
            str);
        return -1;
    }

    char result[100];

    int i = 0;
    while (1) {
        sprintf(result, "Hello %d", i);
        char *errorMsg = serverPublish(nsIndex, varName, result);
        if(strcmp(errorMsg, "0")) {
            printf("serverPublish() API failed, error: %s", errorMsg);
            return -1;
        }
        printf("Publishing [%s]\n", result);
        i++;
    }
    cleanupServer();
    pthread_mutex_destroy(&serverLock); 
}

client.c:

/* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
 * See http:https://creativecommons.org/publicdomain/zero/1.0/ for more information. */

/**
 * Client disconnect handling
 * --------------------------
 * This example shows you how to handle a client disconnect, e.g., if the server
 * is shut down while the client is connected. You just need to call connect
 * again and the client will automatically reconnect.
 *
 * This example is very similar to the tutorial_client_firststeps.c. */

#include "open62541.h"

#include <signal.h>

UA_Boolean running = true;

static void stopHandler(int sign) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Received Ctrl-C");
    running = 0;
}

static void
handlerCallback(UA_Client *client, UA_UInt32 subId, void *subContext,
                           UA_UInt32 monId, void *monContext, UA_DataValue *data) {
    UA_Variant *value = &data->value;
    if(UA_Variant_isScalar(value)) {
        if (value->type == &UA_TYPES[UA_TYPES_STRING]) {
            UA_String str = *(UA_String*)value->data;
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Data: %.*s", (int) str.length, str.data);
        }
    }
}

static void
deleteSubscriptionCallback(UA_Client *client, UA_UInt32 subscriptionId, void *subscriptionContext) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                "Subscription Id %u was deleted", subscriptionId);
}

static void
subscriptionInactivityCallback (UA_Client *client, UA_UInt32 subId, void *subContext) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Inactivity for subscription %u", subId);
}

static void
stateCallback (UA_Client *client, UA_ClientState clientState) {
    switch(clientState) {
        case UA_CLIENTSTATE_DISCONNECTED:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "The client is disconnected");
        break;
        case UA_CLIENTSTATE_WAITING_FOR_ACK:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Waiting for ack");
        break;
        case UA_CLIENTSTATE_CONNECTED:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                        "A TCP connection to the server is open");
        break;
        case UA_CLIENTSTATE_SECURECHANNEL:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                        "A SecureChannel to the server is open");
        break;
        case UA_CLIENTSTATE_SESSION:{
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "A session with the server is open");
            /* A new session was created. We need to create the subscription. */
            /* Create a subscription */
            UA_CreateSubscriptionRequest request = UA_CreateSubscriptionRequest_default();
            UA_CreateSubscriptionResponse response = UA_Client_Subscriptions_create(client, request,
                                                                                    NULL, NULL, deleteSubscriptionCallback);

            if(response.responseHeader.serviceResult == UA_STATUSCODE_GOOD)
                UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                            "Create subscription succeeded, id %u", response.subscriptionId);
            else
                return;

            UA_NodeId nodeId = UA_NODEID_STRING(2, "test");
            /* Add a MonitoredItem */
            UA_MonitoredItemCreateRequest monRequest =
                UA_MonitoredItemCreateRequest_default(nodeId);

            UA_MonitoredItemCreateResult monResponse =
                UA_Client_MonitoredItems_createDataChange(client, response.subscriptionId,
                                                          UA_TIMESTAMPSTORETURN_BOTH,
                                                          monRequest, NULL, handlerCallback, NULL);
            if(monResponse.statusCode == UA_STATUSCODE_GOOD)
                UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                            "Monitoring UA_NS0ID_SERVER_SERVERSTATUS_CURRENTTIME', id %u",
                            monResponse.monitoredItemId);
        }
        break;
        case UA_CLIENTSTATE_SESSION_RENEWED:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                        "A session with the server is open (renewed)");
            /* The session was renewed. We don't need to recreate the subscription. */
        break;
        case UA_CLIENTSTATE_SESSION_DISCONNECTED:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Session disconnected");
        break;
    }
    return;
}

int
main(void) {
    signal(SIGINT, stopHandler); /* catches ctrl-c */

    UA_ClientConfig config = UA_ClientConfig_default;
    /* Set stateCallback */
    config.stateCallback = stateCallback;
    config.subscriptionInactivityCallback = subscriptionInactivityCallback;

    UA_Client *client = UA_Client_new(config);

    /* Endless loop runAsync */
    while(running) {
        /* if already connected, this will return GOOD and do nothing */
        /* if the connection is closed/errored, the connection will be reset and then reconnected */
        /* Alternatively you can also use UA_Client_getState to get the current state */
        UA_StatusCode retval = UA_Client_connect(client, "opc.tcp:https://localhost:4840");
        if(retval != UA_STATUSCODE_GOOD) {
            UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                         "Not connected. Retrying to connect in 1 second");
            /* The connect may timeout after 1 second (see above) or it may fail immediately on network errors */
            /* E.g. name resolution errors or unreachable network. Thus there should be a small sleep here */
            UA_sleep_ms(1000);
            continue;
        }

        UA_Client_run_iterate(client, 1000);
    };

    /* Clean up */
    UA_Client_delete(client); /* Disconnects the client internally */
    return UA_STATUSCODE_GOOD;
}

@bvinodkumar2008
Copy link
Author

Hi,
Were you able to repro the issue with the samples I shared? Let me know if I I'm doing anything wrong in the samples which could potentially be causing this.

@bvinodkumar2008
Copy link
Author

Hi @Pro ,
The above issue of call blocking was occuring as i wasn't releasing the mutex on success of writeValue API in serverPublish() method. Once i fixed it, the server snippet you shared is working fine but even with this change i'm running into the refCount > 0 assertion issue. The issue could be easily reproduced with the shared code below.

Let me know if you want me to try anything else.

`server.c:

#include <signal.h>
#include "open62541.h"
#include <pthread.h>

static UA_Server *server = NULL;
static UA_Boolean serverRunning = true;
UA_ServerConfig *serverConfig = NULL;
static char dataToPublish[100];
static char errorMsg[100];
pthread_mutex_t serverLock;

void signalHandler(int sig) {
    serverRunning = false;
}

/* This function provides data to the subscriber */
static UA_StatusCode
readPublishedData(UA_Server *server,
                const UA_NodeId *sessionId,
                void *sessionContext,
                const UA_NodeId *nodeId, void *nodeContext,
                UA_Boolean sourceTimeStamp, const UA_NumericRange *range,
                UA_DataValue *data) {
    UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                     "In readPublishedData() function");
    data->hasValue = true;
    UA_String str = UA_STRING(dataToPublish);
    UA_Variant_setScalarCopy(&data->value, &str, &UA_TYPES[UA_TYPES_STRING]);
	return UA_STATUSCODE_GOOD;
}

static UA_StatusCode
writePublishedData(UA_Server *server,
                 const UA_NodeId *sessionId, void *sessionContext,
                 const UA_NodeId *nodeId, void *nodeContext,
                 const UA_NumericRange *range, const UA_DataValue *data) {

    return UA_STATUSCODE_GOOD;
}

static UA_UInt16
addTopicDataSourceVariable(UA_Server *server, char *namespace, char *topic) {

    UA_UInt16 namespaceIndex = UA_Server_addNamespace(server, namespace);
    if (namespaceIndex == 0) {
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "UA_Server_addNamespace has failed");
        return 100;
    }
    UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%u:namespaceIndex created for namespace: %s\n", namespaceIndex, namespace);

    UA_VariableAttributes attr = UA_VariableAttributes_default;
    attr.description = UA_LOCALIZEDTEXT("en-US", topic);
    attr.displayName = UA_LOCALIZEDTEXT("en-US", topic);
    attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;

    /* Add the variable node to the information model */
    UA_NodeId currentNodeId = UA_NODEID_STRING(namespaceIndex, topic);
    UA_QualifiedName currentName = UA_QUALIFIEDNAME(namespaceIndex, topic);
    UA_NodeId parentNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER);
    UA_NodeId parentReferenceNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES);
    UA_NodeId variableTypeNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE);

    UA_DataSource topicDataSource;
    topicDataSource.read = readPublishedData;
    topicDataSource.write = writePublishedData;
    UA_Server_addDataSourceVariableNode(server, currentNodeId, parentNodeId,
                                        parentReferenceNodeId, currentName,
                                        variableTypeNodeId, attr,
                                        topicDataSource, NULL, NULL);
    return namespaceIndex;
}

static void*
startServer(void *ptr) {
    /* run server */
    // pthread_mutex_lock(&serverLock);
    UA_StatusCode retval = UA_Server_run_startup(server);
    // pthread_mutex_unlock(&serverLock);
    if (retval != UA_STATUSCODE_GOOD) {
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "\nServer failed to start, error: %s", UA_StatusCode_name(retval));
        return NULL;
    }
    while (serverRunning) {
        pthread_mutex_lock(&serverLock);
        UA_Server_run_iterate(server, 100);
        pthread_mutex_unlock(&serverLock); 
    }
    UA_Server_run_shutdown(server); 
    return NULL;
}

char*
serverPublish(int nsIndex, char *topic, char* data) {

    /* check if server is started or not */
    if (server == NULL) {
        char str[] = "UA_Server instance is not instantiated";
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s",
            str);
        return NULL;
    }

    /* writing the data to the opcua variable */
    UA_Variant *val = UA_Variant_new();
    strcpy(dataToPublish, data);
    UA_Variant_setScalarCopy(val, dataToPublish, &UA_TYPES[UA_TYPES_STRING]);
    UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "nsIndex: %d, topic:%s\n", nsIndex, topic);

    pthread_mutex_lock(&serverLock); 
    UA_StatusCode retval = UA_Server_writeValue(server, UA_NODEID_STRING(nsIndex, topic), *val);
    pthread_mutex_unlock(&serverLock); 
    if (retval == UA_STATUSCODE_GOOD) {
        UA_Variant_delete(val);
        return "0";
    }
    sprintf(errorMsg, "%s", UA_StatusCode_name(retval));
    UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s", errorMsg);
    UA_Variant_delete(val);
    return errorMsg;
}

/* cleanupServer deletes the memory allocated for server configuration */
static void
cleanupServer() {
    UA_Server_delete(server);
    UA_ServerConfig_delete(serverConfig);
}

int main(int argc, char** argv)
{
    signal(SIGINT, signalHandler); /* catch ctrl-c */

    /* Create a server listening on port 4840 */
    serverConfig = UA_ServerConfig_new_default();

    UA_DurationRange range = {5.0, 10.0};
    serverConfig->publishingIntervalLimits = range;
    serverConfig->samplingIntervalLimits = range;

    /* Initiate server instance */
    server = UA_Server_new(serverConfig);

    char *namespace = "namespace";
    char *varName = "test";

    /* add datasource variable */
    UA_Int16 nsIndex = addTopicDataSourceVariable(server, namespace, varName);

    if (pthread_mutex_init(&serverLock, NULL) != 0) { 
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "mutex init has failed!"); 
        return -1; 
    } 

    pthread_t serverThread;
    if (pthread_create(&serverThread, NULL, startServer, NULL)) {
        char str[] = "server pthread creation to start server failed";
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s",
            str);
        return -1;
    }

    char result1[1000] = "Hello";
    char result[1000];

    int i = 0;
    for(int j = 0; j < 50; j++)
        strcat(result1, " Hello");

    while (1) {
        sprintf(result, "%s %d", result1, i);
        char *errorMsg = serverPublish(nsIndex, varName, result);
        if(strcmp(errorMsg, "0")) {
            printf("serverPublish() API failed, error: %s", errorMsg);
            return -1;
        }
        printf("Publishing [%s]\n\n\n\n", result);
        i++;
    }
    cleanupServer();
    pthread_mutex_destroy(&serverLock); 
}

client.c:

/* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
 * See http:https://creativecommons.org/publicdomain/zero/1.0/ for more information. */

/**
 * Client disconnect handling
 * --------------------------
 * This example shows you how to handle a client disconnect, e.g., if the server
 * is shut down while the client is connected. You just need to call connect
 * again and the client will automatically reconnect.
 *
 * This example is very similar to the tutorial_client_firststeps.c. */

#include "open62541.h"

#include <signal.h>

UA_Boolean running = true;

static void stopHandler(int sign) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Received Ctrl-C");
    running = 0;
}

static void
handlerCallback(UA_Client *client, UA_UInt32 subId, void *subContext,
                           UA_UInt32 monId, void *monContext, UA_DataValue *data) {
    UA_Variant *value = &data->value;
    if(UA_Variant_isScalar(value)) {
        if (value->type == &UA_TYPES[UA_TYPES_STRING]) {
            UA_String str = *(UA_String*)value->data;
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Data: %.*s", (int) str.length, str.data);
        }
    }
}

static void
deleteSubscriptionCallback(UA_Client *client, UA_UInt32 subscriptionId, void *subscriptionContext) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                "Subscription Id %u was deleted", subscriptionId);
}

static void
subscriptionInactivityCallback (UA_Client *client, UA_UInt32 subId, void *subContext) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Inactivity for subscription %u", subId);
}

static void
stateCallback (UA_Client *client, UA_ClientState clientState) {
    switch(clientState) {
        case UA_CLIENTSTATE_DISCONNECTED:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "The client is disconnected");
        break;
        case UA_CLIENTSTATE_WAITING_FOR_ACK:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Waiting for ack");
        break;
        case UA_CLIENTSTATE_CONNECTED:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                        "A TCP connection to the server is open");
        break;
        case UA_CLIENTSTATE_SECURECHANNEL:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                        "A SecureChannel to the server is open");
        break;
        case UA_CLIENTSTATE_SESSION:{
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "A session with the server is open");
            /* A new session was created. We need to create the subscription. */
            /* Create a subscription */
            UA_CreateSubscriptionRequest request = UA_CreateSubscriptionRequest_default();
            UA_CreateSubscriptionResponse response = UA_Client_Subscriptions_create(client, request,
                                                                                    NULL, NULL, deleteSubscriptionCallback);

            if(response.responseHeader.serviceResult == UA_STATUSCODE_GOOD)
                UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                            "Create subscription succeeded, id %u", response.subscriptionId);
            else
                return;

            UA_NodeId nodeId = UA_NODEID_STRING(2, "test");
            /* Add a MonitoredItem */
            UA_MonitoredItemCreateRequest monRequest =
                UA_MonitoredItemCreateRequest_default(nodeId);

            UA_MonitoredItemCreateResult monResponse =
                UA_Client_MonitoredItems_createDataChange(client, response.subscriptionId,
                                                          UA_TIMESTAMPSTORETURN_BOTH,
                                                          monRequest, NULL, handlerCallback, NULL);
            if(monResponse.statusCode == UA_STATUSCODE_GOOD)
                UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                            "Monitoring NODEID(%d, %s), id %u",
                            2, "test", monResponse.monitoredItemId);
        }
        break;
        case UA_CLIENTSTATE_SESSION_RENEWED:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                        "A session with the server is open (renewed)");
            /* The session was renewed. We don't need to recreate the subscription. */
        break;
        case UA_CLIENTSTATE_SESSION_DISCONNECTED:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Session disconnected");
        break;
    }
    return;
}

int
main(void) {
    signal(SIGINT, stopHandler); /* catches ctrl-c */

    UA_ClientConfig config = UA_ClientConfig_default;
    /* Set stateCallback */
    config.stateCallback = stateCallback;
    config.subscriptionInactivityCallback = subscriptionInactivityCallback;

    UA_Client *client = UA_Client_new(config);

    /* Endless loop runAsync */
    while(running) {
        /* if already connected, this will return GOOD and do nothing */
        /* if the connection is closed/errored, the connection will be reset and then reconnected */
        /* Alternatively you can also use UA_Client_getState to get the current state */
        UA_StatusCode retval = UA_Client_connect(client, "opc.tcp:https://localhost:4840");
        if(retval != UA_STATUSCODE_GOOD) {
            UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                         "Not connected. Retrying to connect in 1 second");
            /* The connect may timeout after 1 second (see above) or it may fail immediately on network errors */
            /* E.g. name resolution errors or unreachable network. Thus there should be a small sleep here */
            UA_sleep_ms(1000);
            continue;
        }

        UA_Client_run_iterate(client, 1000);
    };

    /* Clean up */
    UA_Client_delete(client); /* Disconnects the client internally */
    return UA_STATUSCODE_GOOD;
}

@Pro
Copy link
Member

Pro commented Jan 16, 2019

I could reproduce the issue locally. The server asserts on this line, but only if the client is reading:

UA_assert(entry->refCount > 0);

I can not wrap my head around the cause, how this may happen. As far as I can see, your calls are correctly mutexed.

Maybe @jpfr can have a look at it

@bvinodkumar2008
Copy link
Author

Hi @Pro, @jpfr ,

For now, i just added an if check instead of doing the assert at

UA_assert(entry->refCount > 0);
like below to get the server and client working. Do you think there would be any side affects of doing so?

Workaround:

 if (entry->refCount > 0) {
   --entry->refCount;
   cleanupEntry(entry);
 }

@jpfr
Copy link
Member

jpfr commented Jan 21, 2019

FYI, we have continued this discussion in private communication.
This issue will be updated/resolved when ready.

@bvinodkumar2008
Copy link
Author

Hi @jpfr,
The below workaround that you shared to my intel email id was working without any refCount issues.

The locking with mutexes looks correct. So the above stack trace should be impossible.
Maybe there was a memory corruption.

I made two changes to the code:
1.	For setting the new value, the char-array is wrapped in a UA_String structure.
(This could have led to a memory-corruption as UA_String was expected on the other end…)
2.	The mutex is now heap-allocated. To make memory corruptions on the mutex less likely.
This has been running for 45minutes without a problem now.
And the server comes out squaky-clean even with Valgrind instrumentation.

The server code, with major changes highlighted, is pasted here:
http:https://paste.debian.net/1061757/

We have some ideas to make open62541 thread-safe internally. So that this wrapping with mutexes is no longer required.
But this will require some effort and extensive testing before going into production.

But when i tested this with open62541 version (https://github.com/open62541/open62541/tree/cca3ead344186f9c0aac0e177263c48c52e49c9c) and with one additional change in the API i.e., using UA_Client_MonitoredItems_createDataChanges()
instead of UA_Client_MonitoredItems_createDataChange(), I'm again running into the refCount issue.
I'm attaching below the sample codes for server and client below:

server.c

#include <signal.h>
#include "open62541.h"
#include <pthread.h>

static UA_Server *server = NULL;
static UA_Boolean serverRunning = true;
static UA_ServerConfig *serverConfig = NULL;
static char dataToPublish[1000];
static pthread_mutex_t *serverLock = NULL;
static char errorMsg[100];

void signalHandler(int sig) {
    serverRunning = false;
}

char* printTime() {

    int millisec;
    struct tm* tm_info;
    struct timeval tv;

    gettimeofday(&tv, NULL);

    millisec = tv.tv_usec / 1000.0;
    if (millisec >= 1000) {
        millisec -= 1000;
        tv.tv_sec++;
    }

    char buffer[26];
    static char timeStr[100];
    tm_info = localtime(&tv.tv_sec);
    strftime(buffer, 26, "%Y:%m:%d %H:%M:%S", tm_info);
    sprintf(timeStr, "%s.%03d", buffer, millisec);
    return timeStr;
}

/* This function provides data to the subscriber */
static UA_StatusCode
readPublishedData(UA_Server *server,
                const UA_NodeId *sessionId,
                void *sessionContext,
                const UA_NodeId *nodeId, void *nodeContext,
                UA_Boolean sourceTimeStamp, const UA_NumericRange *range,
                UA_DataValue *data) {
    UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                     "In readPublishedData() function");
    data->hasValue = true;
    UA_String str = UA_STRING(dataToPublish);
    UA_Variant_setScalarCopy(&data->value, &str, &UA_TYPES[UA_TYPES_STRING]);
	return UA_STATUSCODE_GOOD;
}

static UA_StatusCode
writePublishedData(UA_Server *server,
                 const UA_NodeId *sessionId, void *sessionContext,
                 const UA_NodeId *nodeId, void *nodeContext,
                 const UA_NumericRange *range, const UA_DataValue *data) {

    return UA_STATUSCODE_GOOD;
}

static void
addTopicDataSourceVariable(UA_Server *server, char *namespace, char *topic, size_t *namespaceIndex) {

    UA_StatusCode ret = UA_Server_getNamespaceByName(server, UA_STRING(namespace), namespaceIndex);
    if (ret == UA_STATUSCODE_GOOD) {
        UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Namespace: %s exist.",
                     namespace);
    } else {
        *namespaceIndex = UA_Server_addNamespace(server, namespace);
        if (*namespaceIndex == 0) {
            static char str[] = "UA_Server_addNamespace() has failed";
            UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s for namespace: %s", str, namespace);
            return;
        }
    }
    UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%lu:namespaceIndex created for namespace: %s\n", *namespaceIndex, namespace);

    UA_VariableAttributes attr = UA_VariableAttributes_default;
    attr.description = UA_LOCALIZEDTEXT("en-US", topic);
    attr.displayName = UA_LOCALIZEDTEXT("en-US", topic);
    attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;

    /* Add the variable node to the information model */
    UA_NodeId currentNodeId = UA_NODEID_STRING(*namespaceIndex, topic);
    char *topicContext = (char*) malloc(strlen(topic) + 1);
    ret = UA_Server_getNodeContext(server, currentNodeId, (void**) &topicContext);
    if (topicContext != NULL) {
        UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Topic context: %s", topicContext);
        if (!strcmp(topicContext, topic)) {
            static char str[] = "Topic already exist for namespace";
            UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s", str);
            return;
        }
    }
    UA_QualifiedName currentName = UA_QUALIFIEDNAME(*namespaceIndex, topic);
    UA_NodeId parentNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER);
    UA_NodeId parentReferenceNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES);
    UA_NodeId variableTypeNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE);

    UA_DataSource topicDataSource;
    topicDataSource.read = readPublishedData;
    topicDataSource.write = writePublishedData;
    ret = UA_Server_addDataSourceVariableNode(server, currentNodeId, parentNodeId,
                                              parentReferenceNodeId, currentName,
                                              variableTypeNodeId, attr,
                                              topicDataSource, topic, NULL);
    if (ret != UA_STATUSCODE_GOOD) {
        static char str[] = "UA_Server_addDataSourceVariableNode() has failed";
        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s \
                    for namespace: %s and topic: %s. Error code: %s", str, namespace, topic, UA_StatusCode_name(ret));
        return;
    }
}

static void*
startServer(void *ptr) {

    /* run server */
    UA_StatusCode retval = UA_Server_run_startup(server);
    if (retval != UA_STATUSCODE_GOOD) {
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "\nServer failed to start, error: %s", UA_StatusCode_name(retval));
        return NULL;
    }
    while (serverRunning) {
        pthread_mutex_lock(serverLock);
        UA_Server_run_iterate(server, false);
        pthread_mutex_unlock(serverLock);
    }
    UA_Server_run_shutdown(server);
    return NULL;
}

char*
serverPublish(int nsIndex, char *topic, char* data) {

    /* check if server is started or not */
    if (server == NULL) {
        char str[] = "UA_Server instance is not instantiated";
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s",
            str);
        return NULL;
    }

    if (serverLock == NULL) {
        char str[] = "pthread serverLock instance is not instantiated";
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s",
            str);
        return NULL;
    }

    /* writing the data to the opcua variable */
    UA_Variant *val = UA_Variant_new();


    strcpy(dataToPublish, data);
    UA_String myString = UA_STRING(dataToPublish);
    UA_Variant_setScalarCopy(val, &myString, &UA_TYPES[UA_TYPES_STRING]);

    UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "nsIndex: %d, topic:%s\n", nsIndex, topic);

    /*sleep for mininum publishing interval in ms*/
    UA_sleep_ms(5);

    pthread_mutex_lock(serverLock);
    UA_StatusCode retval = UA_Server_writeValue(server, UA_NODEID_STRING(nsIndex, topic), *val);
    pthread_mutex_unlock(serverLock);

    if (retval == UA_STATUSCODE_GOOD) {
        UA_Variant_delete(val);
        return "0";
    }
    sprintf(errorMsg, "%s", UA_StatusCode_name(retval));
    UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s", errorMsg);
    UA_Variant_delete(val);
    return errorMsg;
}

/* cleanupServer deletes the memory allocated for server configuration */
static void
cleanupServer() {
    UA_Server_delete(server);
    UA_ServerConfig_delete(serverConfig);
}


/* loadFile parses the certificate file.
 *
 * @param  path               specifies the file name given in argv[]
 * @return Returns the file content after parsing */
static UA_INLINE UA_ByteString
loadFile(const char *const path) {
    UA_ByteString fileContents = UA_STRING_NULL;

    /* Open the file */
    FILE *fp = fopen(path, "rb");
    if(!fp) {
        return fileContents;
    }

    /* Get the file length, allocate the data and read */
    fseek(fp, 0, SEEK_END);
    fileContents.length = (size_t)ftell(fp);
    fileContents.data = (UA_Byte *)UA_malloc(fileContents.length * sizeof(UA_Byte));
    if(fileContents.data) {
        fseek(fp, 0, SEEK_SET);
        size_t read = fread(fileContents.data, sizeof(UA_Byte), fileContents.length, fp);
        if(read != fileContents.length)
            UA_ByteString_deleteMembers(&fileContents);
    } else {
        fileContents.length = 0;
    }
    fclose(fp);

    return fileContents;
}

int main(int argc, char** argv) {
    signal(SIGINT, signalHandler); /* catch ctrl-c */
    signal(SIGTERM, signalHandler);

    if(argc < 3) {
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                     "Missing arguments. Arguments are "
                     "<server-certificate.der> <private-key.der> "
                     "[<trustlist1.crl>, ...]");
        return 1;
    }

     /* Load certificate and private key */
    UA_ByteString certificate = loadFile(argv[1]);
    UA_ByteString privateKey = loadFile(argv[2]);

    /* Load the trustlist */
    size_t trustListSize = 0;
    if(argc > 3)
        trustListSize = (size_t)argc-3;
    UA_STACKARRAY(UA_ByteString, trustList, trustListSize);
    for(size_t i = 0; i < trustListSize; i++) {
        trustList[i] = loadFile(argv[i+3]);
          if(trustList[i].length == 0) {
            static char str[] = "Unable to load trusted cert file";
            UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                "%s", str);
            return -1;
          }
    }

    /* Loading of a revocation list currently unsupported */
    UA_ByteString *revocationList = NULL;
    size_t revocationListSize = 0;

    printf("TrustListSize: %d", trustListSize);

   /* Initiate server config */
    serverConfig =
        UA_ServerConfig_new_basic256sha256(4840, &certificate, &privateKey,
                                          trustList, trustListSize,
                                          revocationList, revocationListSize);
    if(!serverConfig) {
        static char str[] = "Could not create the server config";
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s",
            str);
        return -1;
    }

    for(int i = 0; i < serverConfig->endpointsSize; i++) {
        if(serverConfig->endpoints[i].securityMode != UA_MESSAGESECURITYMODE_SIGNANDENCRYPT) {
            serverConfig->endpoints[i].userIdentityTokens = NULL;
            serverConfig->endpoints[i].userIdentityTokensSize = 0;
        }
    }

    UA_ServerConfig_set_customHostname(serverConfig, UA_STRING("localhost"));

    UA_DurationRange range = {5.0, 5.0};
    serverConfig->publishingIntervalLimits = range;
    serverConfig->samplingIntervalLimits = range;

    /* Initiate server instance */
    server = UA_Server_new(serverConfig);
    if(server == NULL) {
        static char str[] = "UA_Server_new() API failed";
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s",
            str);
        return -1;
    }

    char *namespace = "namespace";
    char *topics[] = {"topic1", "topic2"};

    serverLock = malloc(sizeof(pthread_mutex_t));

    if (!serverLock || pthread_mutex_init(serverLock, NULL) != 0) {
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "mutex init has failed!");
        return -1;
    }

    pthread_t serverThread;
    if (pthread_create(&serverThread, NULL, startServer, NULL)) {
        char str[] = "server pthread creation to start server failed";
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s",
            str);
        return -1;
    }

    UA_Int16 nsIndex;
    char result[1000];

    long count = 0;

    while (1) {
        for (long i = 0; i < sizeof(topics)/sizeof(char*); i++) {
            sprintf(result, "%s %ld", topics[i], count);

            size_t nsIndex;
            /* add datasource variable */
            addTopicDataSourceVariable(server, namespace, topics[i], &nsIndex);
            char *errorMsg = serverPublish(nsIndex, topics[i], result);
            if(strcmp(errorMsg, "0")) {
                printf("serverPublish() API failed, error: %s", errorMsg);
                return -1;
            }
            printf("%s %s Publishing...[%s]\n\n", __DATE__,  printTime(), result);
        }
        count++;
    }

    pthread_join(serverThread, NULL);

    cleanupServer();
    pthread_mutex_destroy(serverLock);

    free(serverLock);
}

client.c

/* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
 * See http:https://creativecommons.org/publicdomain/zero/1.0/ for more information. */

/**
 * Client disconnect handling
 * --------------------------
 * This example shows you how to handle a client disconnect, e.g., if the server
 * is shut down while the client is connected. You just need to call connect
 * again and the client will automatically reconnect.
 *
 * This example is very similar to the tutorial_client_firststeps.c. */

#include "open62541.h"

#include <signal.h>
#include <pthread.h>
#define CONNECTION_STRING  "opc.tcp:https://localhost:4840"

UA_Boolean running = true;
static UA_Client *client = NULL;
static UA_ClientConfig* clientConfig = NULL;
static bool clientExited = false;

static UA_Int16
getNamespaceIndex(char *ns,
                  char *topic) {
    UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Browsing nodes in objects folder:\n");
    char nodeIdentifier[100];
    UA_BrowseRequest bReq;
    UA_BrowseRequest_init(&bReq);
    bReq.requestedMaxReferencesPerNode = 0;
    bReq.nodesToBrowse = UA_BrowseDescription_new();
    bReq.nodesToBrowseSize = 1;
    bReq.nodesToBrowse[0].nodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER); /* browse objects folder */
    bReq.nodesToBrowse[0].resultMask = UA_BROWSERESULTMASK_ALL; /* return everything */
    UA_BrowseResponse bResp = UA_Client_Service_browse(client, bReq);
    UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%-9s %-16s %-16s %-16s\n", "NAMESPACE", "NODEID", "BROWSE NAME", "DISPLAY NAME");
    for(size_t i = 0; i < bResp.resultsSize; ++i) {
        for(size_t j = 0; j < bResp.results[i].referencesSize; ++j) {
            UA_ReferenceDescription *ref = &(bResp.results[i].references[j]);
            if(ref->nodeId.nodeId.identifierType == UA_NODEIDTYPE_STRING) {
                int len = (int)ref->nodeId.nodeId.identifier.string.length;
                UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%-9d %-16.*s %-16.*s %-16.*s\n", ref->nodeId.nodeId.namespaceIndex,
                       (int)ref->nodeId.nodeId.identifier.string.length,
                       ref->nodeId.nodeId.identifier.string.data,
                       (int)ref->browseName.name.length, ref->browseName.name.data,
                       (int)ref->displayName.text.length, ref->displayName.text.data);
                strncpy(nodeIdentifier, (char*)ref->nodeId.nodeId.identifier.string.data, len);
                UA_Int16 nodeNs = ref->nodeId.nodeId.namespaceIndex;
                if (!strcmp(topic, nodeIdentifier)) {
                    UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Node Id exists !!!\n");
                    UA_BrowseRequest_deleteMembers(&bReq);
                    UA_BrowseResponse_deleteMembers(&bResp);
                    return nodeNs;
                }
            }
        }
    }
    UA_BrowseRequest_deleteMembers(&bReq);
    UA_BrowseResponse_deleteMembers(&bResp);
    return -1;
}

static void signalHandler(int sign) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Received Ctrl-C");
    clientExited = true;
}

static void
handlerCallback(UA_Client *client, UA_UInt32 subId, void *subContext,
                           UA_UInt32 monId, void *monContext, UA_DataValue *data) {
    UA_Variant *value = &data->value;
    if(UA_Variant_isScalar(value)) {
        if (value->type == &UA_TYPES[UA_TYPES_STRING]) {
            UA_String str = *(UA_String*)value->data;
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Data: %.*s", (int) str.length, str.data);
        }
    }
}

static void
deleteSubscriptionCallback(UA_Client *client, UA_UInt32 subscriptionId, void *subscriptionContext) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                "Subscription Id %u was deleted", subscriptionId);
}

static void
subscriptionInactivityCallback (UA_Client *client, UA_UInt32 subId, void *subContext) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Inactivity for subscription %u", subId);
}

static UA_Int16
createSubscription() {

    UA_CreateSubscriptionRequest request = UA_CreateSubscriptionRequest_default();
    request.requestedPublishingInterval = 0;

    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "In %s...!", __FUNCTION__);
    UA_CreateSubscriptionResponse response = UA_Client_Subscriptions_create(client, request,
                                                                            NULL, NULL, deleteSubscriptionCallback);

    if(response.responseHeader.serviceResult == UA_STATUSCODE_GOOD) {
        UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Create subscription succeeded, id %u", response.subscriptionId);
    } else {
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Create subscription failed");
        return 1;
    }
    int subId = response.subscriptionId;

    UA_MonitoredItemCreateRequest *items= (UA_MonitoredItemCreateRequest*) malloc(2 * sizeof(UA_MonitoredItemCreateRequest));
    UA_Client_DataChangeNotificationCallback *subCallbacks = (UA_Client_DataChangeNotificationCallback*) malloc(2 * sizeof(UA_Client_DataChangeNotificationCallback));
    UA_Client_DeleteMonitoredItemCallback *deleteCallbacks = (UA_Client_DeleteMonitoredItemCallback*) malloc(2 * sizeof(UA_Client_DeleteMonitoredItemCallback));
    void **contexts = (void*) malloc(2 * sizeof(void*));

    char *topic;
    char *namespace = "namespace";
    size_t namespaceIndex;
    UA_NodeId nodeId;
    char *topics[] = {"topic1", "topic2"};
    int topicItems = sizeof(topic)/sizeof(char*);
    for(int i = 0; i < topicItems; i++) {
        topic = topics[i];
        namespaceIndex = getNamespaceIndex(namespace, topic);
        nodeId = UA_NODEID_STRING(namespaceIndex, topic);

        items[i] = UA_MonitoredItemCreateRequest_default(nodeId);
        subCallbacks[i] = handlerCallback;

        contexts[i] = (void*) topic;
        deleteCallbacks[i] = NULL;
    }

    UA_CreateMonitoredItemsRequest createRequest;
    UA_CreateMonitoredItemsRequest_init(&createRequest);
    createRequest.subscriptionId = subId;
    createRequest.timestampsToReturn = UA_TIMESTAMPSTORETURN_BOTH;
    createRequest.itemsToCreate = items;
    createRequest.itemsToCreateSize = topicItems;
    UA_CreateMonitoredItemsResponse createResponse =
       UA_Client_MonitoredItems_createDataChanges(client, createRequest, contexts,
                                                   subCallbacks, deleteCallbacks);

    for(int i = 0; i < 2; i++) {
        if (createResponse.results[0].statusCode == UA_STATUSCODE_GOOD) {
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,"MonitorItemId: %d created successfully for topic: %s\n",
                        createResponse.results[0].monitoredItemId, topics[i]);
        }
    }

    return 0;
}

static void stateCallback(UA_Client *client,
                          UA_ClientState clientState) {
    switch(clientState) {
        case UA_CLIENTSTATE_WAITING_FOR_ACK:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "The client is waiting for ACK");
            break;
        case UA_CLIENTSTATE_DISCONNECTED:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "The client is disconnected");
            break;
        case UA_CLIENTSTATE_CONNECTED:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "A TCP connection to the server is open");
            break;
        case UA_CLIENTSTATE_SECURECHANNEL:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "A SecureChannel to the server is open");
            break;
        case UA_CLIENTSTATE_SESSION:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "A session with the server is open");
            break;
        case UA_CLIENTSTATE_SESSION_RENEWED:
            /* The session was renewed. We don't need to recreate the subscription. */
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "A session with the server is open (renewed)");
            break;
        case UA_CLIENTSTATE_SESSION_DISCONNECTED:
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Session disconnected");
            break;
    }
    return;
}


/* loadFile parses the certificate file.
 *
 * @param  path               specifies the file name given in argv[]
 * @return Returns the file content after parsing */
static UA_INLINE UA_ByteString
loadFile(const char *const path) {
    UA_ByteString fileContents = UA_STRING_NULL;

    /* Open the file */
    FILE *fp = fopen(path, "rb");
    if(!fp) {
        return fileContents;
    }

    /* Get the file length, allocate the data and read */
    fseek(fp, 0, SEEK_END);
    fileContents.length = (size_t)ftell(fp);
    fileContents.data = (UA_Byte *)UA_malloc(fileContents.length * sizeof(UA_Byte));
    if(fileContents.data) {
        fseek(fp, 0, SEEK_SET);
        size_t read = fread(fileContents.data, sizeof(UA_Byte), fileContents.length, fp);
        if(read != fileContents.length)
            UA_ByteString_deleteMembers(&fileContents);
    } else {
        fileContents.length = 0;
    }
    fclose(fp);

    return fileContents;
}

char*
clientContextCreate(char *hostname, int port, char *certificateFile, char *privateKeyFile, char **trustedCerts, size_t trustedListSize) {

    UA_ByteString *revocationList = NULL;
    size_t revocationListSize = 0;

    UA_ByteString*          remoteCertificate  = NULL;
    UA_StatusCode           retval             = UA_STATUSCODE_GOOD;
    /* endpointArray is used to hold the available endpoints in the server
     * endpointArraySize is used to hold the number of endpoints available */
    UA_EndpointDescription* endpointArray      = NULL;
    size_t                  endpointArraySize  = 0;

    /* Load certificate and private key */
    UA_ByteString           certificate        = loadFile(certificateFile);
    UA_ByteString           privateKey         = loadFile(privateKeyFile);


    UA_STACKARRAY(UA_ByteString, trustList, trustedListSize);
    for(size_t trustListCount = 0; trustListCount < trustedListSize; trustListCount++) {
        trustList[trustListCount] = loadFile(trustedCerts[trustListCount]);
          if(trustList[trustListCount].length == 0) {
            static char str[] = "Unable to load trusted cert file";
            UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                "%s", str);
            return "-1";
        }
    }
    client = UA_Client_new();
    clientConfig = UA_Client_getConfig(client);
    /* Set stateCallback */
    clientConfig->timeout = 1000;
    clientConfig->stateCallback = stateCallback;
    clientConfig->subscriptionInactivityCallback = subscriptionInactivityCallback;

    clientConfig->securityMode = UA_MESSAGESECURITYMODE_SIGNANDENCRYPT;
    UA_ClientConfig_setDefaultEncryption(clientConfig, certificate, privateKey,
                                         trustList, trustedListSize,
                                         revocationList, revocationListSize);

    UA_ByteString_clear(&certificate);
    UA_ByteString_clear(&privateKey);
    for(size_t deleteCount = 0; deleteCount < trustedListSize; deleteCount++) {
        UA_ByteString_clear(&trustList[deleteCount]);
    }

    /* Secure client connect */
    clientConfig->securityMode = UA_MESSAGESECURITYMODE_SIGNANDENCRYPT; /* require encryption */
    retval = UA_Client_connect(client, CONNECTION_STRING);
    if(retval != UA_STATUSCODE_GOOD) {
        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Error: %s", UA_StatusCode_name(retval));
        UA_Client_delete(client);
        return "-1";
    }

    return "0";
}


/* Runs iteratively the client to auto-reconnect and re-subscribe to the last subscribed topic of the client */
static void*
runClient(void *ptr) {

    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "In runClient...\n");
    while(!clientExited) {
        /* if already connected, this will return GOOD and do nothing */
        /* if the connection is closed/errored, the connection will be reset and then reconnected */
        /* Alternatively you can also use UA_Client_getState to get the current state */
        UA_ClientState clientState = UA_Client_getState(client);
        if (clientState == UA_CLIENTSTATE_DISCONNECTED) {
            UA_StatusCode retval = UA_Client_connect(client, CONNECTION_STRING);
            if(retval != UA_STATUSCODE_GOOD) {
                UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Error: %s", UA_StatusCode_name(retval));
                UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Not connected. Retrying to connect in 1 second");
                /* The connect may timeout after 1 second (see above) or it may fail immediately on network errors */
                /* E.g. name resolution errors or unreachable network. Thus there should be a small sleep here */
                UA_sleep_ms(1000);
                continue;
            }
        }

        UA_Client_run_iterate(client, 1000);
    }
    return NULL;

}

int main(int argc, char* argv[]) {
    signal(SIGINT, signalHandler); /* catch ctrl-c */
    signal(SIGTERM, signalHandler);

    if(argc < 3) {
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                     "Missing arguments. Arguments are "
                     "<client-certificate.der> <private-key.der> "
                     "[<trustlist1.crl>, ...]");
        return 1;
    }

    size_t trustListSize = 0;
    char **trustList = NULL;

    if (argc > 3) {
        trustListSize = (size_t)argc-3;
    }

    trustList = (char **) malloc(trustListSize * sizeof(char *));
    // UA_STACKARRAY(UA_ByteString, trustList, trustListSize);

    for(size_t i = 0; i < trustListSize; i++)
        trustList[i] = (char*)argv[i+3];

    char* errorMsg = clientContextCreate("localhost", 4840, argv[1], argv[2], trustList, trustListSize);
    if(strcmp(errorMsg, "0")) {
        printf("clientContextCreate() API failed, error: %s\n", errorMsg);
        return -1;
    }
    printf("clientContextCreate() API successfully executed!\n");
    pthread_t clientThread;

    if (createSubscription() == 1) {
        static char str[] = "Subscription failed!";
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s",
            str);
        return -1;
    }

    if (pthread_create(&clientThread, NULL, runClient, NULL)) {
        static char str[] = "pthread creation to run the client thread iteratively failed";
        UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s",
            str);
        return -1;
    }
    while(1) {
        sleep(120);
    }
}

Steps to run server and client:

Server

$ gcc -std=c99 server.c open62541.c -lmbedtls -lmbedx509 -lmbedcrypto -pthread -o server
$ ./server /etc/ssl/opcua/opcua_server_certificate.der /etc/ssl/opcua/opcua_server_key.der /etc/ssl/ca/ca_certificate.der

Client

$ gcc -std=c99 client.c open62541.c -lmbedtls -lmbedx509 -lmbedcrypto -pthread -o client
$ ./client /etc/ssl/opcua/opcua_client_certificate.der /etc/ssl/opcua/opcua_client_key.der /etc/ssl/ca/ca_certificate.der

You can use certs generated by the cert tool (https://github.com/open62541/open62541/tree/cca3ead344186f9c0aac0e177263c48c52e49c9c/tools/certs) for supplying to server and client programs. What i've noticed is, after some 34k+ publishes (in around few mins) of server and client running, server is failing with below refCount issue

server: open62541.c:47869: NodeMap_releaseNode: Assertion `entry->refCount > 0' failed.
Aborted (core dumped)

Let me know if you need any additional details.

Regards,
Vinod

@cochicde
Copy link
Contributor

Hi,

in your last example, addTopicDataSourceVariable is inside the loop. Is that intended? Also, this function access the server, but there's no mutex around it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants