-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Continuous writes to var node via UA_Server_writeValue() API while it is subscribed to data change notification fails after some time #2363
Comments
That's probably because you are using the Multithreading is currently not supported. Therefore you need to make sure that you only access the server sequentially. E.g. with mutex or similar. Please try that and check if the error is stil there. |
Thanks for quick response. I did try acquiring mutex lock before running UA_Server_writeValue() API but even with that I was still get this issue. By the way, I had not enabled multithreading feature on the compiled open62451, should I enable and try? |
You need to use a mutext not only for the UA_Server_writeValue, but everywhere, where you are using the
to smth like:
Otherwise you still do not have all the server instances mutexed. The multithreading feature should not be enabled, this is in very alpha state. |
Thank you for the inputs. Will try that and get back to you. |
I used your logic in server.c:
client.c:
|
Hi, |
Hi @Pro , Let me know if you want me to try anything else. `server.c: #include <signal.h>
#include "open62541.h"
#include <pthread.h>
static UA_Server *server = NULL;
static UA_Boolean serverRunning = true;
UA_ServerConfig *serverConfig = NULL;
static char dataToPublish[100];
static char errorMsg[100];
pthread_mutex_t serverLock;
void signalHandler(int sig) {
serverRunning = false;
}
/* This function provides data to the subscriber */
static UA_StatusCode
readPublishedData(UA_Server *server,
const UA_NodeId *sessionId,
void *sessionContext,
const UA_NodeId *nodeId, void *nodeContext,
UA_Boolean sourceTimeStamp, const UA_NumericRange *range,
UA_DataValue *data) {
UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"In readPublishedData() function");
data->hasValue = true;
UA_String str = UA_STRING(dataToPublish);
UA_Variant_setScalarCopy(&data->value, &str, &UA_TYPES[UA_TYPES_STRING]);
return UA_STATUSCODE_GOOD;
}
static UA_StatusCode
writePublishedData(UA_Server *server,
const UA_NodeId *sessionId, void *sessionContext,
const UA_NodeId *nodeId, void *nodeContext,
const UA_NumericRange *range, const UA_DataValue *data) {
return UA_STATUSCODE_GOOD;
}
static UA_UInt16
addTopicDataSourceVariable(UA_Server *server, char *namespace, char *topic) {
UA_UInt16 namespaceIndex = UA_Server_addNamespace(server, namespace);
if (namespaceIndex == 0) {
UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "UA_Server_addNamespace has failed");
return 100;
}
UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%u:namespaceIndex created for namespace: %s\n", namespaceIndex, namespace);
UA_VariableAttributes attr = UA_VariableAttributes_default;
attr.description = UA_LOCALIZEDTEXT("en-US", topic);
attr.displayName = UA_LOCALIZEDTEXT("en-US", topic);
attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
/* Add the variable node to the information model */
UA_NodeId currentNodeId = UA_NODEID_STRING(namespaceIndex, topic);
UA_QualifiedName currentName = UA_QUALIFIEDNAME(namespaceIndex, topic);
UA_NodeId parentNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER);
UA_NodeId parentReferenceNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES);
UA_NodeId variableTypeNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE);
UA_DataSource topicDataSource;
topicDataSource.read = readPublishedData;
topicDataSource.write = writePublishedData;
UA_Server_addDataSourceVariableNode(server, currentNodeId, parentNodeId,
parentReferenceNodeId, currentName,
variableTypeNodeId, attr,
topicDataSource, NULL, NULL);
return namespaceIndex;
}
static void*
startServer(void *ptr) {
/* run server */
// pthread_mutex_lock(&serverLock);
UA_StatusCode retval = UA_Server_run_startup(server);
// pthread_mutex_unlock(&serverLock);
if (retval != UA_STATUSCODE_GOOD) {
UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "\nServer failed to start, error: %s", UA_StatusCode_name(retval));
return NULL;
}
while (serverRunning) {
pthread_mutex_lock(&serverLock);
UA_Server_run_iterate(server, 100);
pthread_mutex_unlock(&serverLock);
}
UA_Server_run_shutdown(server);
return NULL;
}
char*
serverPublish(int nsIndex, char *topic, char* data) {
/* check if server is started or not */
if (server == NULL) {
char str[] = "UA_Server instance is not instantiated";
UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s",
str);
return NULL;
}
/* writing the data to the opcua variable */
UA_Variant *val = UA_Variant_new();
strcpy(dataToPublish, data);
UA_Variant_setScalarCopy(val, dataToPublish, &UA_TYPES[UA_TYPES_STRING]);
UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "nsIndex: %d, topic:%s\n", nsIndex, topic);
pthread_mutex_lock(&serverLock);
UA_StatusCode retval = UA_Server_writeValue(server, UA_NODEID_STRING(nsIndex, topic), *val);
pthread_mutex_unlock(&serverLock);
if (retval == UA_STATUSCODE_GOOD) {
UA_Variant_delete(val);
return "0";
}
sprintf(errorMsg, "%s", UA_StatusCode_name(retval));
UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s", errorMsg);
UA_Variant_delete(val);
return errorMsg;
}
/* cleanupServer deletes the memory allocated for server configuration */
static void
cleanupServer() {
UA_Server_delete(server);
UA_ServerConfig_delete(serverConfig);
}
int main(int argc, char** argv)
{
signal(SIGINT, signalHandler); /* catch ctrl-c */
/* Create a server listening on port 4840 */
serverConfig = UA_ServerConfig_new_default();
UA_DurationRange range = {5.0, 10.0};
serverConfig->publishingIntervalLimits = range;
serverConfig->samplingIntervalLimits = range;
/* Initiate server instance */
server = UA_Server_new(serverConfig);
char *namespace = "namespace";
char *varName = "test";
/* add datasource variable */
UA_Int16 nsIndex = addTopicDataSourceVariable(server, namespace, varName);
if (pthread_mutex_init(&serverLock, NULL) != 0) {
UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "mutex init has failed!");
return -1;
}
pthread_t serverThread;
if (pthread_create(&serverThread, NULL, startServer, NULL)) {
char str[] = "server pthread creation to start server failed";
UA_LOG_FATAL(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s",
str);
return -1;
}
char result1[1000] = "Hello";
char result[1000];
int i = 0;
for(int j = 0; j < 50; j++)
strcat(result1, " Hello");
while (1) {
sprintf(result, "%s %d", result1, i);
char *errorMsg = serverPublish(nsIndex, varName, result);
if(strcmp(errorMsg, "0")) {
printf("serverPublish() API failed, error: %s", errorMsg);
return -1;
}
printf("Publishing [%s]\n\n\n\n", result);
i++;
}
cleanupServer();
pthread_mutex_destroy(&serverLock);
} client.c: /* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
* See http:https://creativecommons.org/publicdomain/zero/1.0/ for more information. */
/**
* Client disconnect handling
* --------------------------
* This example shows you how to handle a client disconnect, e.g., if the server
* is shut down while the client is connected. You just need to call connect
* again and the client will automatically reconnect.
*
* This example is very similar to the tutorial_client_firststeps.c. */
#include "open62541.h"
#include <signal.h>
UA_Boolean running = true;
static void stopHandler(int sign) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Received Ctrl-C");
running = 0;
}
static void
handlerCallback(UA_Client *client, UA_UInt32 subId, void *subContext,
UA_UInt32 monId, void *monContext, UA_DataValue *data) {
UA_Variant *value = &data->value;
if(UA_Variant_isScalar(value)) {
if (value->type == &UA_TYPES[UA_TYPES_STRING]) {
UA_String str = *(UA_String*)value->data;
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Data: %.*s", (int) str.length, str.data);
}
}
}
static void
deleteSubscriptionCallback(UA_Client *client, UA_UInt32 subscriptionId, void *subscriptionContext) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"Subscription Id %u was deleted", subscriptionId);
}
static void
subscriptionInactivityCallback (UA_Client *client, UA_UInt32 subId, void *subContext) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Inactivity for subscription %u", subId);
}
static void
stateCallback (UA_Client *client, UA_ClientState clientState) {
switch(clientState) {
case UA_CLIENTSTATE_DISCONNECTED:
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "The client is disconnected");
break;
case UA_CLIENTSTATE_WAITING_FOR_ACK:
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Waiting for ack");
break;
case UA_CLIENTSTATE_CONNECTED:
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"A TCP connection to the server is open");
break;
case UA_CLIENTSTATE_SECURECHANNEL:
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"A SecureChannel to the server is open");
break;
case UA_CLIENTSTATE_SESSION:{
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "A session with the server is open");
/* A new session was created. We need to create the subscription. */
/* Create a subscription */
UA_CreateSubscriptionRequest request = UA_CreateSubscriptionRequest_default();
UA_CreateSubscriptionResponse response = UA_Client_Subscriptions_create(client, request,
NULL, NULL, deleteSubscriptionCallback);
if(response.responseHeader.serviceResult == UA_STATUSCODE_GOOD)
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"Create subscription succeeded, id %u", response.subscriptionId);
else
return;
UA_NodeId nodeId = UA_NODEID_STRING(2, "test");
/* Add a MonitoredItem */
UA_MonitoredItemCreateRequest monRequest =
UA_MonitoredItemCreateRequest_default(nodeId);
UA_MonitoredItemCreateResult monResponse =
UA_Client_MonitoredItems_createDataChange(client, response.subscriptionId,
UA_TIMESTAMPSTORETURN_BOTH,
monRequest, NULL, handlerCallback, NULL);
if(monResponse.statusCode == UA_STATUSCODE_GOOD)
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"Monitoring NODEID(%d, %s), id %u",
2, "test", monResponse.monitoredItemId);
}
break;
case UA_CLIENTSTATE_SESSION_RENEWED:
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"A session with the server is open (renewed)");
/* The session was renewed. We don't need to recreate the subscription. */
break;
case UA_CLIENTSTATE_SESSION_DISCONNECTED:
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Session disconnected");
break;
}
return;
}
int
main(void) {
signal(SIGINT, stopHandler); /* catches ctrl-c */
UA_ClientConfig config = UA_ClientConfig_default;
/* Set stateCallback */
config.stateCallback = stateCallback;
config.subscriptionInactivityCallback = subscriptionInactivityCallback;
UA_Client *client = UA_Client_new(config);
/* Endless loop runAsync */
while(running) {
/* if already connected, this will return GOOD and do nothing */
/* if the connection is closed/errored, the connection will be reset and then reconnected */
/* Alternatively you can also use UA_Client_getState to get the current state */
UA_StatusCode retval = UA_Client_connect(client, "opc.tcp:https://localhost:4840");
if(retval != UA_STATUSCODE_GOOD) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"Not connected. Retrying to connect in 1 second");
/* The connect may timeout after 1 second (see above) or it may fail immediately on network errors */
/* E.g. name resolution errors or unreachable network. Thus there should be a small sleep here */
UA_sleep_ms(1000);
continue;
}
UA_Client_run_iterate(client, 1000);
};
/* Clean up */
UA_Client_delete(client); /* Disconnects the client internally */
return UA_STATUSCODE_GOOD;
} |
I could reproduce the issue locally. The server asserts on this line, but only if the client is reading: open62541/plugins/ua_nodestore_default.c Line 218 in fe01551
I can not wrap my head around the cause, how this may happen. As far as I can see, your calls are correctly mutexed. Maybe @jpfr can have a look at it |
For now, i just added an if check instead of doing the assert at open62541/plugins/ua_nodestore_default.c Line 218 in fe01551
Workaround: if (entry->refCount > 0) {
--entry->refCount;
cleanupEntry(entry);
} |
FYI, we have continued this discussion in private communication. |
Hi @jpfr,
But when i tested this with open62541 version (https://github.com/open62541/open62541/tree/cca3ead344186f9c0aac0e177263c48c52e49c9c) and with one additional change in the API i.e., using server.c
client.c
Steps to run server and client:Server
Client
You can use certs generated by the cert tool (https://github.com/open62541/open62541/tree/cca3ead344186f9c0aac0e177263c48c52e49c9c/tools/certs) for supplying to server and client programs. What i've noticed is, after some 34k+ publishes (in around few mins) of server and client running, server is failing with below refCount issue
Let me know if you need any additional details. Regards, |
Hi, in your last example, addTopicDataSourceVariable is inside the loop. Is that intended? Also, this function access the server, but there's no mutex around it |
Description
On continuously calling UA_Server_writeValue() API for updating the opcua variable node in server.c and subscribing to this data change notification in client.c, i see this error in server open62541_test: ../open62541.c:48832: UA_NodeMap_releaseNode: Assertion `entry->refCount > 0' failed.
Background Information / Reproduction Steps
server.c
client.c
Checklist
Please provide the following information:
UA_LOGLEVEL
set as low as necessary) attachedThe text was updated successfully, but these errors were encountered: