Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: pubsub over amqp #3850

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open

Conversation

waheedejaz
Copy link
Contributor

Up for the initial review.

Basic PubSub publish over AMQP is added, works with ay AMQP 1.0 broker

NOTE:
The build system is adapted to link the qpid proton lib. At the moment proton c doesn't successfully build using all the warnings enabled in the open62541 build. This will be fixed later.

#3849

@ccvca
Copy link
Contributor

ccvca commented Aug 19, 2020

Maybe resolve the compile issues by using "libqpid-proton11-dev" and find_package()? Then there is no need to include all the sources in the open62541-project. mbedTLS is included the same way at the moment.

@jpfr
Copy link
Member

jpfr commented Aug 19, 2020

Hey there!

Wow, this looks good!
You say that proton does not support all the compiler warnings that we enable by default.
I say that we don't care. We cannot maintain 28,000 lines of code of external dependency that we don't even know.

Most systems can install proton as a dependency.
So let's handle it like we handle mbedtls/openssl.

https://packages.debian.org/stable/libs/libqpid-proton-cpp12

We can have a closer look when the line-count when only the changes to open62541 itself are in the PR.

@waheedejaz
Copy link
Contributor Author

Proton lib is linked using findPackage(). The previous commit is removed. PR rebased

@jpfr
Copy link
Member

jpfr commented Aug 27, 2020

Thanks. We will review when the [WIP] is removed from the title.
Keep up the good work!

yield immedietly after amqp link open
disconnect and delete amqp properly

Signed-off-by: Waheed Ejaz <[email protected]>
Signed-off-by: Waheed Ejaz <[email protected]>

if(!ua_amqpChannel || !amqpCtx){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub AMQP Connection creation failed. Out of memory.");
UA_Destroy_AmqpChannel(ua_amqpChannel, false);
Copy link
Contributor

@basyskom-meerkoetter basyskom-meerkoetter Sep 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this leak amqpCtx on the error path (potentially)?


if(!amqpCtx->ua_connection || !amqpCtx->driver) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub AMQP: Context creation failed. Out of memory.");
UA_Destroy_AmqpChannel(ua_amqpChannel, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here. amqpCtx and either ua_connection or driver would leak?

} else {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub AMQP Connection creation failed. Invalid Address.");
return NULL;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leaks resources

@jackybek
Copy link
Contributor

jackybek commented Apr 29, 2021

I tested your code but realised that the prefix opc.amqp:https://xxx.xxx.xxx.xxx:5672 was not changed to opc.tcp:https://xxx.xxx.xxx.xxx:5672 during runtime.
As compared to the implementation by kalycito on pubsub mqtt, when I pass in opc.mqtt:https://xxx.xxx.xxx.xxx:1883 the code changes the prefix to opc.tcp before sending to the mqtt broker.
This caused failure when open62541 tries to extract the host name from the URL. The host name returned was ‘empty string’.

To force the test to continue, i manually set the prefix to opc.tcp:https://....
a. ua_amqp_adaptor.c: UA_AmqpConnect() :
b. changes made:
UA_String myAddressUrl = UA_STRING("opc.tcp:https://192.168.1.22:5672");
UA_Connection connection = UA_ClientConnectionTCP_init( conf, myAddressUrl, 1000, NULL);
//UA_Connection connection = UA_ClientConnectionTCP_init( conf, address.url, 1000, NULL);
c. During testing, i got the following error:

[2021-04-30 07:25:59.616 (UTC+0800)] error/server ua_amqp_adaptor.c :: yieldAmqp: _write failed. ret_code=BadConnectionClosed
[2021-04-30 07:26:04.116 (UTC+0800)] info/userland received ctrl-c
[2021-04-30 07:26:04.116 (UTC+0800)] info/network Shutting down the TCP network lay

on closer look, the error is due to ctx->ua_connection.state returning a value of 1

In yieldAmqp function: inspecting ctx->ua_connection
ctx->ua_connection.state : 1


0 = UA_CONNECTIONSTATE_CLOSED : The socket has been closed and the connection will be deleted
1 = UA_CONNECTIONSTATE_OPENING : The socket is open, but the HEL/ACK handshake is not done
2 = UA_CONNECTIONSTATE_ESTABLISHED : The socket is open and the connection configured


ctx->ua_connection.channel : 0
ctx->ua_connection.sockfd : 5
ctx->ua_connection.openingDate : 0
ctx->ua_connection.handle :
......................................................
ctx->openLink : 1
ctx->sender_link_ready : 0
ctx->message : (null)
ctx->send_buffer (pn_rwbytes_t) : 0
ctx->message_buffer (pn_rwbytes_t) : 0
ctx->sequence_no : 0
ctx->acknowledged_no : 0

NB:
Other changes made to conduct the above tests:
open62541.c : UA_parseEndpointUrl() : add the check for "opc.amqp:https://"
Note: the code that follows is supposed to change the prefix to opc.tcp. It worked for opc.mqtt but it failed for opc.amqp

open62541.c : UA_ClientConnectionTCP_init() : add the following statement at the end of this function: connection.sockfd = socket(AF_INET, SOCK_STREAM, 0);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants