-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support streams when sending and/or receiving #939
Comments
with the current serializer / api design this is difficult to implement as mqtt requires the size of a message to be encoded in the header. thats why we serialize the message to a byte array and encode the length of that array to the header of the message. if you have ideas how to do that I would be glad to implement as some efficiency gains are always welcome. |
In my opinion we are able to support streams. I already started with this some time ago but stopped because I did not want to change the API and I assume that usually a MQTT message is quite small. The solution would be generating the entire message but without the payload and then sending the header and after that the payload in chunks (4 KB per default or configurable). We only have to add the payload length to the overall length property before sending this to the server/client. Another story is the server because we have to keep the payload in memory there (or we have to implement a temporary storage etc.). |
@JanEggers Wouldn't you be able to do The specification allows for 256MB payloads so the chunking approach @chkr1011 suggests would only apply if you would go beyond 256MB or in that regard any configured limit as server could potentially be configured to allow less |
We'd appreciate an API like this, too. Or alternatively an API that allows for array pooling (e.g. passing a byte array and an integer that specifies the length of the byte array that is used for the payload). We use MQTT to communicate between our services and payloads (especially encoded as JSON) can easily get above 85k and therefore land on the large object heap which compromises performance. I would think that making changes to the API that allow for pooling would be easier to implement, too. This would also not break the current API as it is just a special case where the length is taken directly from the buffer length and therfore could just be an overload. Of course the length information would have to get passed around internally until the bytes are actually written to the TCP stream. Edit: It might also be worthwhile to look at the way System.Text.Json handles stuff like this (ReadOnlyMemory, ReadOnlySequence etc.)? |
@mg90707 a |
@ramonsmits I think it would have to be a ReadOnlyMemory as a Spans can only exists on the Stack, not the Heap. |
@mg90707 Implementing a simple memory pool is a good idea and can be integrated without breaking the APIs. |
@chkr1011 You break the API when you apply a type change right? You see a way to prevent this? |
When implementing an internal array pool the library will no longer allocate byte arrays for the payload. Instead it will reuse existing ones to a certain extent. The problem is that the user has to "release" the buffer somehow. |
It also seems that the passed payload array is copied again internally by the packet writer to build the actual message anyway, so the message payload byte array passed in from the user is pretty much generated -> copied -> subject to garbage collection (at least in my applications I don't have any use for it afterwards). Thats why I thought that being able to pass something like ReadOnlyMemory would be nice, because then we would be able to use an array pool for these very short lived, but possibly large, payload arrays. @chkr1011 I think the problem with using an array pool for messages that are then passed to the user is, that usually the buffers from the pool are not an exact fit for the message, and with just a byte array as a payload the user has no way of knowing how long the message actually is. Also as you mentioned the fact that the user would now have to release the buffers (System.Text.Json JsonDocument uses IDisposable). However, any of these changes would also break the public API. Mabe using something like the Shared Array Pool would be a good fit for the packet writer to avoid allocating a new array for every message? Of course the buffer would have to get returned to the pool reliably after sending the data/failing to do so. But at least this would be an internal change only and therefore not break the public API, while avoiding allocating more than double of the required memory for each message sent (once by the user + copy by the library + headers + "wasted" memory due to the packet writer implementation (always doubling the buffer)). |
Yes you are right. We have to at least change the API to ArraySegment etc. in order to deal with larger arrays from the pool. But using the pool only internally first is a good idea. Maybe we should start with that. ReadOnlyMemory is no option in my opinion because then we have to wait before we can process the next message. The reason is that the buffer is reused for the next incoming message. That is the reason why I create a new byte array for the payload in all cases. We can avoid the copy of the payload by changing the formatter interfaces to return an array of ArraySegments. Then we can construct the header etc. and the payload is simply written to the channel after the header is sent. What do you think? |
Yes this would have to be done and be a pretty fundamental change in my opinion. Not returning the buffer to the pool is possible, but also kills all pooling benefits. This is especially true when using the shared array pool which is used by other libraries and the framework itself.
Agreed
Wouldn't the caller be responsible to only reuse a message only after the corresponding This ownership model is something that should be defined clearly. E.g. is a
Seems like a good first step to reduce potentially big allocations + time spent copying. Should also be a completely transparent change for users of the library. As |
Please keep in mind that this library also supports .NET 4.5.2 and UWP so the question is if all platforms support that 😄 We can also start to declare the "Payload" property as obsolete and start some methods like "ReadPayloadAsync" or "ReadPayloadAsStringAsync" like in the HTTP Client from the framework. Then we can migrate over certain releases of the library. But we should also differentiate the application message into a "Inbound" one and an "Outbound" one because the mentioned methods make no sense when sending application messages. |
I would be careful with that assumption, because in my opinion it would'nt help to replace one performance issue with another (possibly worse) one. E.g. the headers and payload for small messages then might get sent in separate TCP packages which would be pretty bad in networks with noticable latency + additional Interop? Otherwise the .NET people probably would've saved themselfs the effort of having extra code paths for this.
According to: |
@mg90707 Sorry I was referring to the API for WebSockets. They do not have the ability to send multiple array segments. But anyway we can go ahead and check that. We can also create some rules to that the payload is always added to the header if payload < x bytes. Something like one buffer if overall size < 4096 but multiple ones if larger.
You can already play around with buffer settings at MqttPacketWriter.MaxBufferSize. If you have 85k you can try to set that to 100k. Then the internal buffer will stay at 100k (or less) and is reused all the time. |
Thanks, didn't know that yet! |
As a note, this issue originally was on Stream support, not on memory management. Would it be useful to continue this discussion on a new issue specific to memory management? |
@chkr1011 Just a small suggestion: Wouldn't just allocating a new buffer here be more effective as Array.Resize also copies the old contents over to the new, shrinked buffer (at least as much as possible)? Doesn't seem necessary to me when the buffer will be overwritten with the contents of the next message anyways. |
Describe the feature request
The
MqttApplicationMessage
only supportsbyte[]
but it would be nice if it would support a Stream object. The MQTT spec allows for 256MB payloads and using streams makes sense from both when sending and receiving. For example, the ability to pass a FileStream and especially useful if the stream is still being written to.Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Example. I'm am trying to do [...] but [...]
Which project is your feature request related to?
Describe the solution you'd like
Support streams when sending and/or receiving
Describe alternatives you've considered
The payload byte[] requires all data to be first loaded into memory which is not ideal when dealing with large data fragments and results in unneeded memory pressure.
The text was updated successfully, but these errors were encountered: