Skip to content

Commit

Permalink
Merge pull request versatica#806 from nazar-pc/rust-release
Browse files Browse the repository at this point in the history
New Rust release 0.9.3
  • Loading branch information
nazar-pc committed Apr 2, 2022
2 parents 9306f8a + 61c53e7 commit b375939
Show file tree
Hide file tree
Showing 18 changed files with 165 additions and 122 deletions.
16 changes: 16 additions & 0 deletions rust/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
# Changelog

# 0.9.3

* Fix a segfaults in tests and under multithreaded executor
* Fix another racy deadlock situation
* Expose hierarchical dependencies of ownership of Rust data structures, now it is possible to call `consumer.transport().router().worker().worker_manager()`
* General mediasoup changes:
* ICE renomination support (PR #756).
* Update `libuv` to 1.43.0.
* TCC client optimizations for faster and more stable BWE (PR #712 by @ggarber).
* Added support for RTP abs-capture-time header (PR #761 by @oto313).
* Fix VP9 kSVC forwarding logic to not forward lower unneded layers (PR #778 by @ggarber).
* Fix update bandwidth estimation configuration and available bitrate when updating max outgoing bitrate (PR #779 by @ggarber).
* Optimize RTP header extension handling (PR #786).
* `RateCalculator`: Reset optimization (PR #785).
* Fix frozen video due to double call to `Consumer::UserOnTransportDisconnected()` (PR #788, thanks to @ggarber for exposing this issue in PR #787).

# 0.9.2

* Update `lru` dependency to fix security vulnerability
Expand Down
4 changes: 2 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mediasoup"
version = "0.9.2"
version = "0.9.3"
description = "Cutting Edge WebRTC Video Conferencing in Rust"
categories = ["api-bindings", "multimedia", "network-programming"]
authors = ["Nazar Mokrynskyi <[email protected]>"]
Expand Down Expand Up @@ -45,7 +45,7 @@ version = "0.7.2"

[dependencies.mediasoup-sys]
path = "../worker"
version = "0.3.1"
version = "0.3.3"

[dependencies.parking_lot]
version = "0.11.2"
Expand Down
18 changes: 11 additions & 7 deletions rust/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ struct Inner {
mapped_pipe_transports:
Arc<Mutex<HashedMap<RouterId, Arc<AsyncMutex<Option<WeakPipeTransportPair>>>>>>,
// Make sure worker is not dropped until this router is not dropped
worker: Option<Worker>,
worker: Worker,
closed: AtomicBool,
_on_worker_close_handler: Mutex<HandlerId>,
}
Expand Down Expand Up @@ -481,7 +481,7 @@ impl Router {
data_producers,
mapped_pipe_transports,
app_data,
worker: Some(worker),
worker,
closed: AtomicBool::new(false),
_on_worker_close_handler: Mutex::new(on_worker_close_handler),
});
Expand All @@ -497,6 +497,11 @@ impl Router {
self.inner.id
}

/// Worker to which router belongs.
pub fn worker(&self) -> &Worker {
&self.inner.worker
}

/// Custom application data.
#[must_use]
pub fn app_data(&self) -> &AppData {
Expand Down Expand Up @@ -1356,17 +1361,16 @@ impl Router {
// `PipeTransport`. To prevent that, we have `HashedMap` with mapping behind `Mutex` and
// another `Mutex` on the pair of `PipeTransport`.

let mut mapped_pipe_transports = self.inner.mapped_pipe_transports.lock();

let pipe_transport_pair_mutex = mapped_pipe_transports
let pipe_transport_pair_mutex = self
.inner
.mapped_pipe_transports
.lock()
.entry(pipe_to_router_options.router.id())
.or_default()
.clone();

let mut pipe_transport_pair_guard = pipe_transport_pair_mutex.lock().await;

drop(mapped_pipe_transports);

let pipe_transport_pair = match pipe_transport_pair_guard
.as_ref()
.and_then(WeakPipeTransportPair::upgrade)
Expand Down
4 changes: 4 additions & 0 deletions rust/src/router/active_speaker_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ impl RtpObserver for ActiveSpeakerObserver {
self.inner.id
}

fn router(&self) -> &Router {
&self.inner.router
}

fn paused(&self) -> bool {
self.inner.paused.load(Ordering::SeqCst)
}
Expand Down
4 changes: 4 additions & 0 deletions rust/src/router/audio_level_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ impl RtpObserver for AudioLevelObserver {
self.inner.id
}

fn router(&self) -> &Router {
&self.inner.router
}

fn paused(&self) -> bool {
self.inner.paused.load(Ordering::SeqCst)
}
Expand Down
13 changes: 9 additions & 4 deletions rust/src/router/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ struct Inner {
current_layers: Arc<Mutex<Option<ConsumerLayers>>>,
handlers: Arc<Handlers>,
app_data: AppData,
transport: Box<dyn Transport>,
transport: Arc<dyn Transport>,
weak_producer: WeakProducer,
closed: Arc<AtomicBool>,
// Drop subscription to consumer-specific notifications when consumer itself is dropped
Expand Down Expand Up @@ -403,7 +403,7 @@ impl Inner {
let channel = self.channel.clone();
let request = ConsumerCloseRequest {
internal: ConsumerInternal {
router_id: self.transport.router_id(),
router_id: self.transport.router().id(),
transport_id: self.transport.id(),
consumer_id: self.id,
producer_id: self.producer_id,
Expand Down Expand Up @@ -480,7 +480,7 @@ impl Consumer {
score: ConsumerScore,
preferred_layers: Option<ConsumerLayers>,
app_data: AppData,
transport: Box<dyn Transport>,
transport: Arc<dyn Transport>,
) -> Self {
debug!("new()");

Expand Down Expand Up @@ -632,6 +632,11 @@ impl Consumer {
self.inner.producer_id
}

/// Transport to which consumer belongs.
pub fn transport(&self) -> &Arc<dyn Transport> {
&self.inner.transport
}

/// Media kind.
#[must_use]
pub fn kind(&self) -> MediaKind {
Expand Down Expand Up @@ -973,7 +978,7 @@ impl Consumer {

fn get_internal(&self) -> ConsumerInternal {
ConsumerInternal {
router_id: self.inner.transport.router_id(),
router_id: self.inner.transport.router().id(),
transport_id: self.inner.transport.id(),
consumer_id: self.inner.id,
producer_id: self.inner.producer_id,
Expand Down
15 changes: 10 additions & 5 deletions rust/src/router/data_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ struct Inner {
payload_channel: PayloadChannel,
handlers: Arc<Handlers>,
app_data: AppData,
transport: Box<dyn Transport>,
transport: Arc<dyn Transport>,
weak_data_producer: WeakDataProducer,
closed: Arc<AtomicBool>,
// Drop subscription to consumer-specific notifications when consumer itself is dropped
Expand Down Expand Up @@ -233,7 +233,7 @@ impl Inner {
let channel = self.channel.clone();
let request = DataConsumerCloseRequest {
internal: DataConsumerInternal {
router_id: self.transport.router_id(),
router_id: self.transport.router().id(),
transport_id: self.transport.id(),
data_consumer_id: self.id,
data_producer_id: self.data_producer_id,
Expand Down Expand Up @@ -364,7 +364,7 @@ impl DataConsumer {
channel: Channel,
payload_channel: PayloadChannel,
app_data: AppData,
transport: Box<dyn Transport>,
transport: Arc<dyn Transport>,
direct: bool,
) -> Self {
debug!("new()");
Expand Down Expand Up @@ -489,6 +489,11 @@ impl DataConsumer {
self.inner().data_producer_id
}

/// Transport to which data consumer belongs.
pub fn transport(&self) -> &Arc<dyn Transport> {
&self.inner().transport
}

/// The type of the data consumer.
#[must_use]
pub fn r#type(&self) -> DataConsumerType {
Expand Down Expand Up @@ -678,7 +683,7 @@ impl DataConsumer {

fn get_internal(&self) -> DataConsumerInternal {
DataConsumerInternal {
router_id: self.inner().transport.router_id(),
router_id: self.inner().transport.router().id(),
transport_id: self.inner().transport.id(),
data_consumer_id: self.inner().id,
data_producer_id: self.inner().data_producer_id,
Expand All @@ -696,7 +701,7 @@ impl DirectDataConsumer {
.request(
DataConsumerSendRequest {
internal: DataConsumerInternal {
router_id: self.inner.transport.router_id(),
router_id: self.inner.transport.router().id(),
transport_id: self.inner.transport.id(),
data_consumer_id: self.inner.id,
data_producer_id: self.inner.data_producer_id,
Expand Down
15 changes: 10 additions & 5 deletions rust/src/router/data_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ struct Inner {
payload_channel: PayloadChannel,
handlers: Arc<Handlers>,
app_data: AppData,
transport: Box<dyn Transport>,
transport: Arc<dyn Transport>,
closed: AtomicBool,
_on_transport_close_handler: Mutex<HandlerId>,
}
Expand All @@ -163,7 +163,7 @@ impl Inner {
let channel = self.channel.clone();
let request = DataProducerCloseRequest {
internal: DataProducerInternal {
router_id: self.transport.router_id(),
router_id: self.transport.router().id(),
transport_id: self.transport.id(),
data_producer_id: self.id,
},
Expand Down Expand Up @@ -273,7 +273,7 @@ impl DataProducer {
channel: Channel,
payload_channel: PayloadChannel,
app_data: AppData,
transport: Box<dyn Transport>,
transport: Arc<dyn Transport>,
direct: bool,
) -> Self {
debug!("new()");
Expand Down Expand Up @@ -324,6 +324,11 @@ impl DataProducer {
self.inner().id
}

/// Transport to which data producer belongs.
pub fn transport(&self) -> &Arc<dyn Transport> {
&self.inner().transport
}

/// The type of the data producer.
#[must_use]
pub fn r#type(&self) -> DataProducerType {
Expand Down Expand Up @@ -430,7 +435,7 @@ impl DataProducer {

fn get_internal(&self) -> DataProducerInternal {
DataProducerInternal {
router_id: self.inner().transport.router_id(),
router_id: self.inner().transport.router().id(),
transport_id: self.inner().transport.id(),
data_producer_id: self.inner().id,
}
Expand All @@ -445,7 +450,7 @@ impl DirectDataProducer {
self.inner.payload_channel.notify(
DataProducerSendNotification {
internal: DataProducerInternal {
router_id: self.inner.transport.router_id(),
router_id: self.inner.transport.router().id(),
transport_id: self.inner.transport.id(),
data_producer_id: self.inner.id,
},
Expand Down
24 changes: 3 additions & 21 deletions rust/src/router/direct_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::data_structures::{AppData, SctpState};
use crate::messages::{TransportCloseRequest, TransportInternal, TransportSendRtcpNotification};
use crate::producer::{Producer, ProducerId, ProducerOptions};
use crate::router::transport::{TransportImpl, TransportType};
use crate::router::{Router, RouterId};
use crate::router::Router;
use crate::sctp_parameters::SctpParameters;
use crate::transport::{
ConsumeDataError, ConsumeError, ProduceDataError, ProduceError, RecvRtpHeaderExtensions,
Expand Down Expand Up @@ -238,16 +238,14 @@ impl fmt::Debug for DirectTransport {

#[async_trait]
impl Transport for DirectTransport {
/// Transport id.
fn id(&self) -> TransportId {
self.inner.id
}

fn router_id(&self) -> RouterId {
self.inner.router.id()
fn router(&self) -> &Router {
&self.inner.router
}

/// Custom application data.
fn app_data(&self) -> &AppData {
&self.inner.app_data
}
Expand All @@ -256,9 +254,6 @@ impl Transport for DirectTransport {
self.inner.closed.load(Ordering::SeqCst)
}

/// Create a Producer.
///
/// Transport will be kept alive as long as at least one producer instance is alive.
async fn produce(&self, producer_options: ProducerOptions) -> Result<Producer, ProduceError> {
debug!("produce()");

Expand All @@ -271,9 +266,6 @@ impl Transport for DirectTransport {
Ok(producer)
}

/// Create a Consumer.
///
/// Transport will be kept alive as long as at least one consumer instance is alive.
async fn consume(&self, consumer_options: ConsumerOptions) -> Result<Consumer, ConsumeError> {
debug!("consume()");

Expand All @@ -286,9 +278,6 @@ impl Transport for DirectTransport {
Ok(consumer)
}

/// Create a DataProducer.
///
/// Transport will be kept alive as long as at least one data producer instance is alive.
async fn produce_data(
&self,
data_producer_options: DataProducerOptions,
Expand All @@ -311,9 +300,6 @@ impl Transport for DirectTransport {
Ok(data_producer)
}

/// Create a DataConsumer.
///
/// Transport will be kept alive as long as at least one data consumer instance is alive.
async fn consume_data(
&self,
data_consumer_options: DataConsumerOptions,
Expand Down Expand Up @@ -426,10 +412,6 @@ impl TransportGeneric for DirectTransport {
}

impl TransportImpl for DirectTransport {
fn router(&self) -> &Router {
&self.inner.router
}

fn channel(&self) -> &Channel {
&self.inner.channel
}
Expand Down
10 changes: 3 additions & 7 deletions rust/src/router/pipe_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::messages::{
};
use crate::producer::{Producer, ProducerId, ProducerOptions};
use crate::router::transport::{TransportImpl, TransportType};
use crate::router::{Router, RouterId};
use crate::router::Router;
use crate::sctp_parameters::{NumSctpStreams, SctpParameters};
use crate::srtp_parameters::SrtpParameters;
use crate::transport::{
Expand Down Expand Up @@ -286,8 +286,8 @@ impl Transport for PipeTransport {
self.inner.id
}

fn router_id(&self) -> RouterId {
self.inner.router.id()
fn router(&self) -> &Router {
&self.inner.router
}

fn app_data(&self) -> &AppData {
Expand Down Expand Up @@ -451,10 +451,6 @@ impl TransportGeneric for PipeTransport {
}

impl TransportImpl for PipeTransport {
fn router(&self) -> &Router {
&self.inner.router
}

fn channel(&self) -> &Channel {
&self.inner.channel
}
Expand Down
Loading

0 comments on commit b375939

Please sign in to comment.