Skip to content

Configuration references

Matias Quaranta edited this page Aug 16, 2021 · 16 revisions

Below is the description of the available configurations of the CosmsoDB Spark Connector. Depends on the scenario, different configurations should be used to optimize the performance and throughput.

Note that the configuration key is case-insensitive and for now, the configuration value is always a string.

Reading CosmosDB collection

Many of the below configurations are passed on to the Java SDK when fetching the data from the CosmosDB collection.

  • query_maxdegreeofparallelism: sets the number of concurrent operations run client side during parallel query execution in the Azure Cosmos DB database service. A positive property value limits the number of concurrent operations to the set value. If it is set to less than 0, the system automatically decides the number of concurrent operations to run. As the Connector maps each collection partition with an executor, this value won't have any effect on the reading operation.
  • query_maxbuffereditemcount: sets the maximum number of items that can be buffered client side during parallel query execution in the Azure Cosmos DB database service. A positive property value limits the number of buffered items to the set value. If it is set to less than 0, the system automatically decides the number of items to buffer.
  • query_enablescan: sets the option to enable scans on the queries which couldn't be served as indexing was opted out on the requested paths in the Azure Cosmos DB database service.
  • query_disableruperminuteusage: disables Request Units(RUs)/minute capacity to serve the query if regular provisioned RUs/second is exhausted.
  • query_emitverbosetraces: Sets the option to allow queries to emit out verbose traces for investigation.
  • query_pagesize: Sets the size of the query result page for each query request. To optimized for throughput, use a large page size to reduce the number of round trips to fetch queries results.
  • query_custom: Sets the CosmosDB query to override the default query when fetching data from CosmosDB. Note that when this is provided, it will be used in place of a query with pushed down predicates as well.

Reading CosmosDB collection change feed

  • readchangefeed: indicates that the collection content is fetched from CosmosDB Change Feed. The default value is false.
  • changefeedqueryname: a custom string to identify the query. The connector keeps track of the collection continuation tokens for different change feed queries separately. If readchangefeed is true, this is a required configuration which cannot take empty value.
  • rollingchangefeed: a boolean value indicating whether the change feed should continue making progress after executing the query. This value indicates whether any existing bookmarks should be moved forward or not when processing a change feed in bulk mode. If bookmarks exist in the check point location they will only be updated if rollingchangefeed==true. The default value is false.
  • changefeedusenexttoken: a boolean value to support processing failure scenarios. It is used to indicate that the current change feed batch has been handled gracefully and the RDD should use the next continuation tokens to get the subsequent batch of changes.
  • changefeedcheckpointlocation: a path to local file storage to persist continuation tokens in case of node failures.
  • changefeedstartfromthebeginning: sets whether change feed should start from the beginning (true) or from the current point (false). By default, it starts from the current (false).
  • changefeedstartfromdatetime: specifies the date/time from which the changefeed should be processed. This setting is only applicable if no bookmarks have been persisted yet (first run). By default this setting isn't specified and the changefeed will either be processed from the current date/time (if changefeedstartfromthebeginning==false) or from the beginning. This setting can only be used for single-master accounts. For multi-master accounts it is not supported. The time you specify can only have a granularity of seconds (milliseconds are ignored) and needs to be in UTC time zone. For example "2020-08-31T17:00:00.00000Z"
  • changefeedmaxpagesperbatch: limits how many documents can be processed from the change feed in one batch. This is useful especially when processing the changefeed from the beginning - in this case the initial batch contains basically all change feed records. This can cause excessive resource consumption (especially memory) in the Spark executors. By specifying the changefeedmaxpagesperbatch (Default Int.MaxValue) parameter you can limit it. One page form the change feed by default can contain up to 1000 documents. So setting changefeedmaxpagesperbatch to 1000 would mean each batch would process at most 1 million documents. Throughput overall will increase the higher the value for changefeedmaxpagesperbatch - so constraining it only is recommended when seeing excessive resource consumption during stream initialization in the first batch. Avoid setting a batch size of 1

Writing to CosmosDB

  • WritingBatchSize: an integer string indicating the batch size to use when writing to CosmosDB collection. The connector sends createDocument/upsertDocument requests asynchronously in batch. The larger the batch size the more throughput we can achieve, as long as the cluster resources are available. On the other hand, specify a smaller number batch size to limit the rate and RU consumption. By default, writing batch size is 500.
  • Upsert: a boolean value string indicating whether upsertDocument should be used instead of CreateDocument when writing to CosmosDB collection.
  • WriteThroughputBudget: an integer value defining the RU budget the ingestion operations in a certain Spark job should not exceed. When not provided the total RU of the container can be consumed for ingestion. If this value is specified (for example 10000) the ingestion operations will be limited and artificially delayed to not exceed this budget (the enforcement happens client-side - so no hard guarantee - the actual RUs taken for ingestion can exceed the budget by a couple percent in certain conditions). This configuration option can be useful when data ingestion is happening while other workloads (for example queries/read operation via a web-app) should be prioritized. By limiting the number of RUs the ingestion can take it helps avoiding throttling for other workloads.
  • MaxIngestionTaskParallelism: An integer value defining the max number of parallel ingestion tasks. Usually it is not necessary to specify this parameter explicitly - by default the parallelism will be driven by the number of partitions in the input Spark DataFrame - only in cases when the number of partitions in the input DF is excessively higher than the number of Cores available in the Spark cluster it is recommended to set this value to the number of available cores. By reducing parallelism to the number of Cores available the latency for the ingestion can be reduced in these cases.

Streaming Writes - retry policy settings [Only available in version >= 2.1.0 of the Spark connector]

  • writestreamretrypolicy.kind: Defines the retry policy kind that should be used for writes in structured streaming mode. The default value "NoRetries" means that only the retries in the SDK for 429-Throttling are applied - no additional retries are initiated by the Spark connector. "NoRetries" is the default behavior to keep the behavior consistent with previous versions of the Spark connector. "Default" means that additional retries will be initiated by the spark connector and the writestreamretrypolicy.* settings below will be used to configure the applicable retry policy.
  • writestreamretrypolicy.maxtransientretrycount: The maximum number of retry-attempts for transient errors - if either the retry count or duration is exceeded retries will be stopped and the message is treated as poison message.
  • writestreamretrypolicy.maxtransientretrydurationinms: The maximum duration of retry-attempts for transient errors in milliseconds - if either the retry count or duration is exceeded retries will be stopped and the message is treated as poison message.
  • writestreamretrypolicy.maxtransientretrydelayinms: The maximum delay between retry-attempts for transient errors in milliseconds - The actual delay will be a random value between 1 and writestreamretrypolicy.maxtransientretrydelayinms.
  • writestreamretrypolicy.poisonmessagelocation: Any documents resulting in non-transient errors or exceeding the retry limits above will be treated as poison messages and persisted in this location to be able to make progress without loosing the data that hasn't been ingested successfully.
  • writestreamretrypolicy.treatunknownexceptionsastransient: A flag [true|false] indicating whether exceptions not well known by the spark connector are treated as transient errors (resulting in retries according to the limits above being applied) or not.

Connection settings

  • connectionmode: sets the connection mode that the internal DocumentClient should use to communicate with CosmosDB. The values are DirectHttps (default value) and Gateway. The DirectHttps connection mode routes the requests directly to the CosmosDB partitions and provides some latency advantage.
  • connectionmaxpoolsize: sets the value of the connection pool size the internal DocumentClient should use. The default value is 100.
  • connectionidletimeout: sets the value of the timeout for idle connections in seconds. The default value is 60.
  • query_maxretryattemptsonthrottledrequests: sets the maximum number of retries in the case where the request fails because the Azure CosmosDB database service has applied rate limiting on the client. If not specified, the default value is 1000 retry attempts.
  • query_maxretrywaittimeinseconds: sets the maximum retry time in seconds. By default, it is 1000 seconds.