Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support streams when sending and/or receiving #939

Open
ramonsmits opened this issue Jun 10, 2020 · 18 comments
Open

Support streams when sending and/or receiving #939

ramonsmits opened this issue Jun 10, 2020 · 18 comments
Labels
feature-request New feature or request

Comments

@ramonsmits
Copy link
Contributor

Describe the feature request

The MqttApplicationMessage only supports byte[] but it would be nice if it would support a Stream object. The MQTT spec allows for 256MB payloads and using streams makes sense from both when sending and receiving. For example, the ability to pass a FileStream and especially useful if the stream is still being written to.

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Example. I'm am trying to do [...] but [...]

Which project is your feature request related to?

  • Client

Describe the solution you'd like

Support streams when sending and/or receiving

Describe alternatives you've considered

The payload byte[] requires all data to be first loaded into memory which is not ideal when dealing with large data fragments and results in unneeded memory pressure.

@ramonsmits ramonsmits added the feature-request New feature or request label Jun 10, 2020
@JanEggers
Copy link
Contributor

with the current serializer / api design this is difficult to implement as mqtt requires the size of a message to be encoded in the header. thats why we serialize the message to a byte array and encode the length of that array to the header of the message. if you have ideas how to do that I would be glad to implement as some efficiency gains are always welcome.

@chkr1011
Copy link
Collaborator

In my opinion we are able to support streams. I already started with this some time ago but stopped because I did not want to change the API and I assume that usually a MQTT message is quite small.

The solution would be generating the entire message but without the payload and then sending the header and after that the payload in chunks (4 KB per default or configurable). We only have to add the payload length to the overall length property before sending this to the server/client. Another story is the server because we have to keep the payload in memory there (or we have to implement a temporary storage etc.).

@ramonsmits
Copy link
Contributor Author

@JanEggers Wouldn't you be able to do Stream.Length to get the length of the Stream? Alternatively, just pass a Stream and an expected length. Its not ideal that length must be known upfront.

The specification allows for 256MB payloads so the chunking approach @chkr1011 suggests would only apply if you would go beyond 256MB or in that regard any configured limit as server could potentially be configured to allow less

@mg90707
Copy link

mg90707 commented Jan 26, 2021

We'd appreciate an API like this, too. Or alternatively an API that allows for array pooling (e.g. passing a byte array and an integer that specifies the length of the byte array that is used for the payload). We use MQTT to communicate between our services and payloads (especially encoded as JSON) can easily get above 85k and therefore land on the large object heap which compromises performance.

I would think that making changes to the API that allow for pooling would be easier to implement, too. This would also not break the current API as it is just a special case where the length is taken directly from the buffer length and therfore could just be an overload. Of course the length information would have to get passed around internally until the bytes are actually written to the TCP stream.

Edit: It might also be worthwhile to look at the way System.Text.Json handles stuff like this (ReadOnlyMemory, ReadOnlySequence etc.)?

@ramonsmits
Copy link
Contributor Author

@mg90707
Copy link

mg90707 commented Jan 26, 2021

@ramonsmits I think it would have to be a ReadOnlyMemory as a Spans can only exists on the Stack, not the Heap.

@chkr1011
Copy link
Collaborator

@mg90707 Implementing a simple memory pool is a good idea and can be integrated without breaking the APIs.
I will have a look how to achieve this.

@ramonsmits
Copy link
Contributor Author

@chkr1011 You break the API when you apply a type change right? You see a way to prevent this?

@chkr1011
Copy link
Collaborator

When implementing an internal array pool the library will no longer allocate byte arrays for the payload. Instead it will reuse existing ones to a certain extent. The problem is that the user has to "release" the buffer somehow.

@mg90707
Copy link

mg90707 commented Jan 27, 2021

It also seems that the passed payload array is copied again internally by the packet writer to build the actual message anyway, so the message payload byte array passed in from the user is pretty much generated -> copied -> subject to garbage collection (at least in my applications I don't have any use for it afterwards). Thats why I thought that being able to pass something like ReadOnlyMemory would be nice, because then we would be able to use an array pool for these very short lived, but possibly large, payload arrays.

@chkr1011 I think the problem with using an array pool for messages that are then passed to the user is, that usually the buffers from the pool are not an exact fit for the message, and with just a byte array as a payload the user has no way of knowing how long the message actually is. Also as you mentioned the fact that the user would now have to release the buffers (System.Text.Json JsonDocument uses IDisposable). However, any of these changes would also break the public API.

Mabe using something like the Shared Array Pool would be a good fit for the packet writer to avoid allocating a new array for every message? Of course the buffer would have to get returned to the pool reliably after sending the data/failing to do so. But at least this would be an internal change only and therefore not break the public API, while avoiding allocating more than double of the required memory for each message sent (once by the user + copy by the library + headers + "wasted" memory due to the packet writer implementation (always doubling the buffer)).

@chkr1011
Copy link
Collaborator

Yes you are right. We have to at least change the API to ArraySegment etc. in order to deal with larger arrays from the pool.
But for me the biggest issue is that the user has to return the array from the pool. So we must make the message disposable etc. But this is maybe optional because a buffer which was not returned to the pool will be garbage collected anyway.

But using the pool only internally first is a good idea. Maybe we should start with that.

ReadOnlyMemory is no option in my opinion because then we have to wait before we can process the next message. The reason is that the buffer is reused for the next incoming message. That is the reason why I create a new byte array for the payload in all cases.

We can avoid the copy of the payload by changing the formatter interfaces to return an array of ArraySegments. Then we can construct the header etc. and the payload is simply written to the channel after the header is sent. What do you think?

@mg90707
Copy link

mg90707 commented Jan 28, 2021

Yes you are right. We have to at least change the API to ArraySegment etc. in order to deal with larger arrays from the pool.
But for me the biggest issue is that the user has to return the array from the pool. So we must make the message disposable etc. But this is maybe optional because a buffer which was not returned to the pool will be garbage collected anyway.

Yes this would have to be done and be a pretty fundamental change in my opinion.

Not returning the buffer to the pool is possible, but also kills all pooling benefits. This is especially true when using the shared array pool which is used by other libraries and the framework itself.

But using the pool only internally first is a good idea. Maybe we should start with that.

Agreed

ReadOnlyMemory is no option in my opinion because then we have to wait before we can process the next message. The reason is that the buffer is reused for the next incoming message. That is the reason why I create a new byte array for the payload in all cases.

Wouldn't the caller be responsible to only reuse a message only after the corresponding PublishAsync-task was awaited? At least for the non-managed client (which is what we use). As I understand it the managed client has an internal queue and the publish function returns before the message was actually sent? Then I guess the managed client would have to make a copy of the message before returning to the caller.

This ownership model is something that should be defined clearly. E.g. is a MqttApplicationMessage (and therefore it's payload buffer) free for reuse (for a new message or whatever the caller desires to do with the bufer) after a PublishAsync-call was awaited (managed and non-managed client) or should the caller transfer the ownership of the message to the library with the call? When transfering the ownership anyway something like ReadOnlyMemory for the payload is not necessary in my opinion, because the buffer is "lost" for the caller anyways.

We can avoid the copy of the payload by changing the formatter interfaces to return an array of ArraySegments. Then we can construct the header etc. and the payload is simply written to the channel after the header is sent. What do you think?

Seems like a good first step to reduce potentially big allocations + time spent copying. Should also be a completely transparent change for users of the library.

As Socket.SendAsync has an IList<ArraySegment> overload I would hope that this would also get sent efficiently (batching the segments to one call of the underlying socket) instead of calling Socket.SendAsync with a single ArraySegment at a time. So I'd propose that the formatter also returns IList<ArraySegment> and then use that for Socket.SendAsync.

@chkr1011
Copy link
Collaborator

As Socket.SendAsync has an IList

Please keep in mind that this library also supports .NET 4.5.2 and UWP so the question is if all platforms support that 😄
But even if not: Subsequent calls to Write are OK in my opinion because or first goal is to get rid of some useless copied arrays.

We can also start to declare the "Payload" property as obsolete and start some methods like "ReadPayloadAsync" or "ReadPayloadAsStringAsync" like in the HTTP Client from the framework. Then we can migrate over certain releases of the library. But we should also differentiate the application message into a "Inbound" one and an "Outbound" one because the mentioned methods make no sense when sending application messages.

@mg90707
Copy link

mg90707 commented Jan 29, 2021

But even if not: Subsequent calls to Write are OK in my opinion because or first goal is to get rid of some useless copied arrays.

I would be careful with that assumption, because in my opinion it would'nt help to replace one performance issue with another (possibly worse) one. E.g. the headers and payload for small messages then might get sent in separate TCP packages which would be pretty bad in networks with noticable latency + additional Interop? Otherwise the .NET people probably would've saved themselfs the effort of having extra code paths for this.

Please keep in mind that this library also supports .NET 4.5.2 and UWP so the question is if all platforms support that 😄

According to:
https://github.com/chkr1011/MQTTnet/blob/75845d6416bbc06c9d84d86ce242b7df1a33443e/Source/MQTTnet/Implementations/CrossPlatformSocket.cs#L210
BeginSend is used for .NET Framework 4.5.2 and the docs state that it has an IList<ArraySegment<byte>> overload.
For UWP a IList<ArraySegment<byte>> BufferList can get passed to the SendAsyncs SocketAsyncEventArgs argument.
So it should be possible to use the proper overload on all supported platforms?

@chkr1011
Copy link
Collaborator

@mg90707 Sorry I was referring to the API for WebSockets. They do not have the ability to send multiple array segments. But anyway we can go ahead and check that. We can also create some rules to that the payload is always added to the header if payload < x bytes. Something like one buffer if overall size < 4096 but multiple ones if larger.

can easily get above 85k and therefore land on the large object heap which compromises performance.

You can already play around with buffer settings at MqttPacketWriter.MaxBufferSize. If you have 85k you can try to set that to 100k. Then the internal buffer will stay at 100k (or less) and is reused all the time.

@mg90707
Copy link

mg90707 commented Feb 3, 2021

You can already play around with buffer settings at MqttPacketWriter.MaxBufferSize. If you have 85k you can try to set that to 100k. Then the internal buffer will stay at 100k (or less) and is reused all the time.

Thanks, didn't know that yet!

@ramonsmits
Copy link
Contributor Author

As a note, this issue originally was on Stream support, not on memory management. Would it be useful to continue this discussion on a new issue specific to memory management?

@mg90707
Copy link

mg90707 commented Feb 5, 2021

@chkr1011 Just a small suggestion: Wouldn't just allocating a new buffer here

https://github.com/chkr1011/MQTTnet/blob/b88924bce50671b4270282f4ca7ec10e429eaa49/Source/MQTTnet/Formatter/MqttPacketWriter.cs#L214

be more effective as Array.Resize also copies the old contents over to the new, shrinked buffer (at least as much as possible)? Doesn't seem necessary to me when the buffer will be overwritten with the contents of the next message anyways.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-request New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants