Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] grpc SubStreamHandler exception #4804

Closed
3 tasks done
9997766 opened this issue Mar 27, 2024 · 11 comments · Fixed by #4807
Closed
3 tasks done

[Bug] grpc SubStreamHandler exception #4804

9997766 opened this issue Mar 27, 2024 · 11 comments · Fixed by #4807
Labels
bug Something isn't working

Comments

@9997766
Copy link

9997766 commented Mar 27, 2024

Search before asking

  • I had searched in the issues and found no similar issues.

Environment

Linux

EventMesh version

1.10.0

What happened

There is a problem with the class of SubStreamHandler. grpc is two-way communication. I have integrated grpc pub and sub functions in the project.
This code throws an exception, and every 30 seconds,

private void senderOnNext(final CloudEvent subscription) {
try {
synchronized (sender) {
sender.onNext(subscription);
}
} catch (Exception e) {
log.error("StreamObserver Error onNext", e);
}
}

I analyzed the reason, because when the exception is thrown, you should call onCompleted to close the sender, but that is not done here, resulting in
grpc thinks the connection is still there and keeps throwing exceptions in a loop.

I tried the method below,it solved the problem。
Will you change code quickly? I am using the project,but this problem stop me。

@slf4j
public class SubStreamHandler extends Thread implements Serializable {

...
public SubStreamHandler(final ConsumerServiceStub consumerAsyncClient, final EventMeshGrpcClientConfig clientConfig,
final ReceiveMsgHook listener) {
this.consumerAsyncClient = consumerAsyncClient;
this.clientConfig = clientConfig;
this.listener = listener;
}

public void sendSubscription(final CloudEvent subscription) {
    synchronized (this) {
        if (this.sender == null) {
            this.sender = consumerAsyncClient.subscribeStream(createReceiver());
        }
    }
    senderOnNext(subscription);
}

private StreamObserver<CloudEvent> createReceiver() {
    return new StreamObserver<CloudEvent>() {

        @Override
        public void onNext(final CloudEvent message) {
            T msg = EventMeshCloudEventBuilder.buildMessageFromEventMeshCloudEvent(message, listener.getProtocolType());
            if (msg instanceof Set) {
                log.info("Received message from Server:{}", message);
            } else {
                log.info("Received message from Server.|seq={}|uniqueId={}|",
                        EventMeshCloudEventUtils.getSeqNum(message), EventMeshCloudEventUtils.getUniqueId(message));
                CloudEvent streamReply = null;
                try {
                    Optional<T> reply = listener.handle(msg);
                    if (reply.isPresent()) {
                        streamReply = buildReplyMessage(message, reply.get());
                    }
                } catch (Exception e) {
                    log.error("Error in handling reply message.|seq={}|uniqueId={}|",
                            EventMeshCloudEventUtils.getSeqNum(message), EventMeshCloudEventUtils.getUniqueId(message), e);
                }
                if (streamReply != null) {
                    log.info("Sending reply message to Server.|seq={}|uniqueId={}|",
                            EventMeshCloudEventUtils.getSeqNum(streamReply), EventMeshCloudEventUtils.getUniqueId(streamReply));
                    senderOnNext(streamReply);
                }
            }
        }

        @Override
        public void onError(final Throwable t) {
            log.error("Received Server side error", t);
            **close();**
        }

        @Override
        public void onCompleted() {
            log.info("Finished receiving messages from server.");
            close();
        }
    };
}

private CloudEvent buildReplyMessage(final CloudEvent reqMessage, final T replyMessage) {
    final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventMeshCloudEvent(replyMessage,
            clientConfig, listener.getProtocolType());

    return CloudEvent.newBuilder(cloudEvent).putAllAttributes(reqMessage.getAttributesMap()).putAllAttributes(cloudEvent.getAttributesMap())
            .putAttributes(ProtocolKey.DATA_CONTENT_TYPE,
                    CloudEventAttributeValue.newBuilder().setCeString(EventMeshDataContentType.JSON.getCode()).build())
            // Indicate that it is a subscription response
            .putAttributes(ProtocolKey.SUB_MESSAGE_TYPE, CloudEventAttributeValue.newBuilder().setCeString(ProtocolKey.SUB_REPLY_MESSAGE).build())
            .build();
}

@Override
public void run() {
    try {
        latch.await();
    } catch (InterruptedException e) {
        log.error("SubStreamHandler Thread interrupted", e);
        Thread.currentThread().interrupt();
    }
}

public void close() {
    if (this.sender != null) {
        senderOnComplete();
    }

    latch.countDown();

    log.info("SubStreamHandler closed.");
}

private void senderOnNext(final CloudEvent subscription) {
    try {
        synchronized (sender) {
            sender.onNext(subscription);
        }
    } catch (Exception e) {
        log.error("StreamObserver Error onNext", e);
        **close();**
    }
}

private void senderOnComplete() {
    try {
        synchronized (sender) {
            sender.onCompleted();
            **sender=null;**
        }
    } catch (Exception e) {
        log.error("StreamObserver Error onComplete", e);
    }
}

}

How to reproduce

when exception happen ,this method will cause endless loop。
for example,grpc time out。
private void senderOnNext(final CloudEvent subscription) {
try {
synchronized (sender) {
sender.onNext(subscription);
}
} catch (Exception e) {
log.error("StreamObserver Error onNext", e);
}
}

Debug logs

no

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@9997766 9997766 added the bug Something isn't working label Mar 27, 2024
@9997766
Copy link
Author

9997766 commented Apr 1, 2024

when will this problem be solved?

@Pil0tXia
Copy link
Member

Pil0tXia commented Apr 1, 2024

Since you checked the Yes I am willing to submit a PR! box, we assumed you would submit a PR to fix the problem. I will try to reproduce this issue.

@Pil0tXia
Copy link
Member

Pil0tXia commented Apr 2, 2024

After calling the close() method, the SubStreamHandler instance will never run again unless the EventMeshGrpcConsumer re-subscribes.

@9997766 Do you have an idea for this issue?

@9997766
Copy link
Author

9997766 commented Apr 3, 2024 via email

@9997766
Copy link
Author

9997766 commented Apr 3, 2024 via email

@Pil0tXia
Copy link
Member

Pil0tXia commented Apr 3, 2024

@9997766 Thanks. You may check PR #4807.

@9997766
Copy link
Author

9997766 commented Apr 10, 2024

Linux

I readed the code,you solved the problem。 But the main branche, do not have the 4807 code yet。 when will 4807 be merged to the main branch?

@Pil0tXia
Copy link
Member

@9997766 We need another reviewer to merge this PR.

@pandaapo May you please help?

@9997766
Copy link
Author

9997766 commented Apr 10, 2024

no problem. but i am not so familar with developping with github。you submit a merge request to me ?

@9997766
Copy link
Author

9997766 commented Apr 10, 2024

and i find another problem
i use nginx to loadbalance the grpc ports on 2 eventmeshs, the jdk in the application always will report the follwing error.
it seems eventmesh have reset the grpc connection, and the client receive this error;
I'm not sure about the specific impact at the moment, but I keep encountering this error, which is causing some doubts about the stability of the EventMesh application system.

2024-04-10 17:33:49.374 ERROR 18488 [] [grpc-default-executor-0] org.apache.eventmesh.client.grpc.consumer.SubStreamHandler Received Server side error

io.grpc.StatusRuntimeException: INTERNAL: RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR
at io.grpc.Status.asRuntimeException(Status.java:535)
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:479)
at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463)
at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427)
at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)

@Pil0tXia
Copy link
Member

@9997766

i am not so familar with developping with github

You may refer to this guide: https://eventmesh.apache.org/community/contribute/contribute/

We need another reviewer to merge this PR.

I mean that at least two committers are required to merge a PR, and #4807 is waiting for one more committer's approval to be merged into master.

and i find another problem

It is better to submit this problem to a new issue, since merging #4807 will close this issue and lose tracking.

It seems that this log is reported by SDK, and you have to check Runtime for detailed reason.

@Pil0tXia Pil0tXia changed the title [Bug]grpc SubStreamHandler exception [Bug] grpc SubStreamHandler exception Apr 12, 2024
mxsm pushed a commit that referenced this issue Apr 13, 2024
)

* Handle exception loop by closeOnError

* Lombok optimization

* some format optimization

* Avoid closing multiple times

* Remove redundant set null

* Revert "Avoid closing multiple times"

This reverts commit 774397f.

* Use synchronized latch to keep senderOnComplete called once

* Use boolean to prevent latch called by somebody else

* Remove the unique callee/caller close() of onCompleted()
xwm1992 added a commit that referenced this issue Apr 22, 2024
* [ISSUE #4812] Set up Admin Endpoints v2 (#4813)

* Remove redundant overloaded methods

* Simplify write() result param

* Add writeJson(); Add PUT; Add JavaDoc

* Rename EventHttpHandler to EventMeshHttpHandler

* Correct server thread name

* Clean up messy & non-hierarchical overloading

* No need to set headers manually any more

* Set up v1&v2 endpoints

* Set up v1&v2 response dto

* Introduce fastjson2

* Fix fastjson2 "level too large : 2048" error caused by IPAddress

* Correct @ConfigField naming

* Return properties format json key

* Add format option to query string

* Introduce Result

* Reduce duplicate builder code

* Fix all checkstyle warnings in eventmesh-runtime

* Add known dependency

* [ISSUE #4814] Migrate from fastjson 1.2.83 to fastjson2 (#4819)

* [Enhancement] Migrate from fastjson 1.2.83 to fastjson2 #4814

* fix_dependencies_problem

* fix_check

* [ISSUE #4551] modify the logic of time-consumption statistics (#4822)

* init connector runtime v2

* [ISSUE #4804] Fix SubStreamHandler exception loop by closeOnError (#4807)

* Handle exception loop by closeOnError

* Lombok optimization

* some format optimization

* Avoid closing multiple times

* Remove redundant set null

* Revert "Avoid closing multiple times"

This reverts commit 774397f.

* Use synchronized latch to keep senderOnComplete called once

* Use boolean to prevent latch called by somebody else

* Remove the unique callee/caller close() of onCompleted()

* [ISSUE #4838] Deprecate unused `eventMesh.connector.plugin.type` etc. properties (#4839)

* Remove all references of `eventMesh.connector.plugin.type`

* Deprecate `eventMesh.connector.plugin.type` and sort properties

* Remove misconfigured & not-used `registerIntervalInMills`, `fetchRegistryAddrIntervalInMills`

* Remove 'defibus' related un-used usages

* Supplement #4809 for `null != object`

* [ISSUE #4832] Downgrade stale bot to v8 to resolve state cache reserving error (#4833)

* Revert stale bot to v8 to resolve state cache reserving error

* Reduce operations-per-run to default value to ease pressure

* Unify yaml to yml

* [ISSUE #4820] Bug fix EventHandler not return json (#4821)

* bug fix

* bug fix

* bug fix

* update runtime v2

* update connector runtime

* update connector runtime

* update connector runtime

* update connector runtime

* update connector runtime

---------

Co-authored-by: Pil0tXia <[email protected]>
Co-authored-by: Zaki <[email protected]>
Co-authored-by: Karson <[email protected]>
xwm1992 added a commit that referenced this issue May 31, 2024
* EventMesh function admin (#4851)

* own

* dependency

* finish registry

* EventMesh function admin (#4853)

* own

* dependency

* finish registry

* init

* Eventmesh function admin (#4854)

* own

* dependency

* finish registry

* init

* 0419

* 0419

* more discovery and move gRPC

* fix dependency

* EventMesh function connector runtime (#4858)

* [ISSUE #4812] Set up Admin Endpoints v2 (#4813)

* Remove redundant overloaded methods

* Simplify write() result param

* Add writeJson(); Add PUT; Add JavaDoc

* Rename EventHttpHandler to EventMeshHttpHandler

* Correct server thread name

* Clean up messy & non-hierarchical overloading

* No need to set headers manually any more

* Set up v1&v2 endpoints

* Set up v1&v2 response dto

* Introduce fastjson2

* Fix fastjson2 "level too large : 2048" error caused by IPAddress

* Correct @ConfigField naming

* Return properties format json key

* Add format option to query string

* Introduce Result

* Reduce duplicate builder code

* Fix all checkstyle warnings in eventmesh-runtime

* Add known dependency

* [ISSUE #4814] Migrate from fastjson 1.2.83 to fastjson2 (#4819)

* [Enhancement] Migrate from fastjson 1.2.83 to fastjson2 #4814

* fix_dependencies_problem

* fix_check

* [ISSUE #4551] modify the logic of time-consumption statistics (#4822)

* init connector runtime v2

* [ISSUE #4804] Fix SubStreamHandler exception loop by closeOnError (#4807)

* Handle exception loop by closeOnError

* Lombok optimization

* some format optimization

* Avoid closing multiple times

* Remove redundant set null

* Revert "Avoid closing multiple times"

This reverts commit 774397f.

* Use synchronized latch to keep senderOnComplete called once

* Use boolean to prevent latch called by somebody else

* Remove the unique callee/caller close() of onCompleted()

* [ISSUE #4838] Deprecate unused `eventMesh.connector.plugin.type` etc. properties (#4839)

* Remove all references of `eventMesh.connector.plugin.type`

* Deprecate `eventMesh.connector.plugin.type` and sort properties

* Remove misconfigured & not-used `registerIntervalInMills`, `fetchRegistryAddrIntervalInMills`

* Remove 'defibus' related un-used usages

* Supplement #4809 for `null != object`

* [ISSUE #4832] Downgrade stale bot to v8 to resolve state cache reserving error (#4833)

* Revert stale bot to v8 to resolve state cache reserving error

* Reduce operations-per-run to default value to ease pressure

* Unify yaml to yml

* [ISSUE #4820] Bug fix EventHandler not return json (#4821)

* bug fix

* bug fix

* bug fix

* update runtime v2

* update connector runtime

* update connector runtime

* update connector runtime

* update connector runtime

* update connector runtime

---------

Co-authored-by: Pil0tXia <[email protected]>
Co-authored-by: Zaki <[email protected]>
Co-authored-by: Karson <[email protected]>

* [ISSUE #4931]Add Registry Module for Discovery AdminServer

* [ISSUES #4933]Add Admin Module

* [ISSUE #4935] Add and Move the Pojo Used By Both Runtime and Admin to Common

* [ISSUE #4937]fix gradle dependecy and add runtime v2

* [ISSUES #4939]add canal connector

* fix missing apache header

* fix missing apache header

* fix missing apache header

* update gradle dependencies

* fix admin server ci check error

* fix admin server ci check error

* fix ci checkStyle error

* fix ci check error

---------

Co-authored-by: sodaRyCN <[email protected]>
Co-authored-by: Pil0tXia <[email protected]>
Co-authored-by: Zaki <[email protected]>
Co-authored-by: Karson <[email protected]>
xwm1992 added a commit that referenced this issue Jul 2, 2024
* EventMesh function admin (#4851)

* own

* dependency

* finish registry

* EventMesh function admin (#4853)

* own

* dependency

* finish registry

* init

* Eventmesh function admin (#4854)

* own

* dependency

* finish registry

* init

* 0419

* 0419

* more discovery and move gRPC

* fix dependency

* EventMesh function connector runtime (#4858)

* [ISSUE #4812] Set up Admin Endpoints v2 (#4813)

* Remove redundant overloaded methods

* Simplify write() result param

* Add writeJson(); Add PUT; Add JavaDoc

* Rename EventHttpHandler to EventMeshHttpHandler

* Correct server thread name

* Clean up messy & non-hierarchical overloading

* No need to set headers manually any more

* Set up v1&v2 endpoints

* Set up v1&v2 response dto

* Introduce fastjson2

* Fix fastjson2 "level too large : 2048" error caused by IPAddress

* Correct @ConfigField naming

* Return properties format json key

* Add format option to query string

* Introduce Result

* Reduce duplicate builder code

* Fix all checkstyle warnings in eventmesh-runtime

* Add known dependency

* [ISSUE #4814] Migrate from fastjson 1.2.83 to fastjson2 (#4819)

* [Enhancement] Migrate from fastjson 1.2.83 to fastjson2 #4814

* fix_dependencies_problem

* fix_check

* [ISSUE #4551] modify the logic of time-consumption statistics (#4822)

* init connector runtime v2

* [ISSUE #4804] Fix SubStreamHandler exception loop by closeOnError (#4807)

* Handle exception loop by closeOnError

* Lombok optimization

* some format optimization

* Avoid closing multiple times

* Remove redundant set null

* Revert "Avoid closing multiple times"

This reverts commit 774397f.

* Use synchronized latch to keep senderOnComplete called once

* Use boolean to prevent latch called by somebody else

* Remove the unique callee/caller close() of onCompleted()

* [ISSUE #4838] Deprecate unused `eventMesh.connector.plugin.type` etc. properties (#4839)

* Remove all references of `eventMesh.connector.plugin.type`

* Deprecate `eventMesh.connector.plugin.type` and sort properties

* Remove misconfigured & not-used `registerIntervalInMills`, `fetchRegistryAddrIntervalInMills`

* Remove 'defibus' related un-used usages

* Supplement #4809 for `null != object`

* [ISSUE #4832] Downgrade stale bot to v8 to resolve state cache reserving error (#4833)

* Revert stale bot to v8 to resolve state cache reserving error

* Reduce operations-per-run to default value to ease pressure

* Unify yaml to yml

* [ISSUE #4820] Bug fix EventHandler not return json (#4821)

* bug fix

* bug fix

* bug fix

* update runtime v2

* update connector runtime

* update connector runtime

* update connector runtime

* update connector runtime

* update connector runtime

---------

Co-authored-by: Pil0tXia <[email protected]>
Co-authored-by: Zaki <[email protected]>
Co-authored-by: Karson <[email protected]>

* [ISSUE #4931]Add Registry Module for Discovery AdminServer

* [ISSUES #4933]Add Admin Module

* [ISSUE #4935] Add and Move the Pojo Used By Both Runtime and Admin to Common

* [ISSUE #4937]fix gradle dependecy and add runtime v2

* [ISSUES #4939]add canal connector

* fix missing apache header

* fix missing apache header

* fix missing apache header

* update gradle dependencies

* fix admin server ci check error

* fix admin server ci check error

* fix ci checkStyle error

* fix ci check error

* [ISSUE #4979]Canal Connector supports bidirectional data synchronization

* add bash files for admin & runtime-v2

* fix ack offset read & persist

* fix checkStyle error

* [ISSUE #4979] Canal Connector supports bidirectional data synchronization (#5011)

* [ISSUE #4979]Canal Connector supports bidirectional data synchronization

* add bash files for admin & runtime-v2

* fix ack offset read & persist

* fix checkStyle error

* fix http source connector test error

---------

Co-authored-by: sodaRyCN <[email protected]>
Co-authored-by: Pil0tXia <[email protected]>
Co-authored-by: Zaki <[email protected]>
Co-authored-by: Karson <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants