Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Injecting data is too slow when using Kafka for WAL #224

Open
wenhaocs opened this issue Nov 21, 2022 · 1 comment
Open

Injecting data is too slow when using Kafka for WAL #224

wenhaocs opened this issue Nov 21, 2022 · 1 comment

Comments

@wenhaocs
Copy link

wenhaocs commented Nov 21, 2022

Hi, I am using rocksdb cloud with Kafka enabled for our POC. I launched a default single kafka instance in our local node. The librdkafka is using default config. Everything works fine except the data ingestion speed. Without Kafka for WAL, our data ingestion latency is around avg 15ms with 150K records/s. However, when I log to Kafka, the latency hiked to 1s, with 2K records/s. I also tried to export stats from libkafka, everything looks fine to me. I am posting the full stats below for reference. Can you help shed a light on the possible reason? Thanks.

"name": "rdkafka#producer-1",
  "client_id": "rdkafka",
  "type": "producer",
  "ts": 16559146928249,
  "time": 1669071318,
  "age": 192011241,
  "replyq": 0,
  "msg_cnt": 0,
  "msg_size": 0,
  "msg_max": 100000,
  "msg_size_max": 1073741824,
  "simple_cnt": 0,
  "metadata_cache_cnt": 1,
  "brokers": {
    "192.168.8.215:9092/1": {
      "name": "192.168.8.215:9092/1",
      "nodeid": 1,
      "txbytes": 23489995,
      "txerrs": 0,
      "txretries": 0,
      "txidle": 436738,
      "req_timeouts": 0,
      "rx": 178,
      "rxbytes": 14458,
      "rxerrs": 0,
      "rxcorriderrs": 0,
      "rxpartial": 0,
      "rxidle": 435937,
      "zbuf_grow": 0,
      "buf_grow": 0,
      "wakeups": 903,
      "connects": 1,
      "disconnects": 0,
      "int_latency": {
        "min": 5161,
        "max": 5396,
        "avg": 5269,
        "sum": 31615,
        "stddev": 70,
        "p50": 5279,
        "p75": 5343,
        "p90": 5343,
        "p95": 5407,
        "p99": 5407,
        "p99_99": 5407,
        "outofrange": 0,
        "hdrsize": 11376,
        "cnt": 6
      },
      "outbuf_latency": {
        "min": 55,
        "max": 188,
        "avg": 92,
        "sum": 553,
        "stddev": 45,
        "p50": 71,
        "p75": 92,
        "p90": 92,
        "p95": 188,
        "p99": 188,
        "p99_99": 188,
        "outofrange": 0,
        "hdrsize": 11376,
        "cnt": 6
      },
      "rtt": {
        "min": 801,
        "max": 1416,
        "avg": 1141,
        "sum": 6849,

      },
      "throttle": {
        "min": 0,
        "max": 0,
        "avg": 0,
        "sum": 0,
        "stddev": 0,
        "p50": 0,
        "p75": 0,
        "p90": 0,
        "p95": 0,
        "p99": 0,
        "p99_99": 0,
        "outofrange": 0,
        "hdrsize": 17520,
        "cnt": 0
      },
      "req": {
        "Produce": 176,
        "ListOffsets": 0,
        "Metadata": 1,
        "FindCoordinator": 0,
        "SaslHandshake": 0,
        "ApiVersion": 1,
        "InitProducerId": 0,
        "AddPartitionsToTxn": 0,
        "AddOffsetsToTxn": 0,
        "EndTxn": 0,
        "TxnOffsetCommit": 0,
        "SaslAuthenticate": 0,
        "OffsetDeleteRequest": 0,
        "DescribeClientQuotasRequest": 0,
        "AlterClientQuotasRequest": 0,
        "DescribeUserScramCredentialsRequest": 0
      },
      "toppars": {
        "rockset.nebula-test-wh-asia-0": {
          "topic": "rockset.nebula-test-wh-asia",
          "partition": 0
        }
      }
    }
  },
  "topics": {
    "rockset.nebula-test-wh-asia": {
      "topic": "rockset.nebula-test-wh-asia",
      "age": 192010,
      "metadata_age": 192007,
      "batchsize": {
        "min": 54271,
        "max": 236533,
        "avg": 145356,
        "sum": 872140,
        "stddev":
        "outofrange": 0,
        "hdrsize": 14448,
        "cnt": 6
      },
      "batchcnt": {
        "min": 1,
        "max": 1,
        "avg": 1,
        "sum": 6,
        "stddev": 0,
        "p50": 1,
        "p75": 1,
        "p90": 1,
        "p95": 1,
        "p99": 1,
        "p99_99": 1,
        "outofrange": 0,
        "hdrsize": 8304,
        "cnt": 6
      },
      "partitions": {
        "0": {
          "partition": 0,
          "broker": 1,
          "leader": 1,
          "desired": false,
          "unknown": false,
          "msgq_cnt": 0,
          "msgq_bytes": 0,
          "xmit_msgq_cnt": 0,
          "xmit_msgq_bytes": 0,
          "fetchq_cnt": 0,
          "fetchq_size": 0,
          "fetch_state": "none",
          "query_offset": -1001,
          "next_offset": 0,
          "app_offset": -1001,
          "stored_offset": -1001,
          "commited_offset": -1001,
          "committed_offset": -1001,
          "eof_offset": -1001,
          "lo_offset": -1001,
          "hi_offset": -1001,
          "ls_offset": -1001,
          "consumer_lag": -1,
          "consumer_lag_stored": -1,
          "txmsgs": 176,
          "txbytes": 23464230,
          "rxmsgs": 0,
          "rxbytes": 0,
          "msgs": 176,
          "rx_ver_drops": 0,
          "msgs_inflight": 0,
          "next_ack_seq": 0,
          "next_err_seq": 0,
          "acked_msgid": 0
        },
        "-1": {
          "partition": -1,
          "broker": -1,
          "leader": -1,
          "desired": false,
          "unknown": false,
          "msgq_cnt": 0,
          "msgq_bytes": 0,
          "xmit_msgq_cnt": 0,
          "xmit_msgq_bytes": 0,
          "fetchq_cnt": 0,
          "fetchq_size": 0,
          "fetch_state": "none",
          "query_offset": -1001,
          "next_offset": 0,
          "app_offset": -1001,
          "stored_offset": -1001,
          "commited_offset": -1001,
          "committed_offset": -1001,
          "eof_offset": -1001,
          "lo_offset": -1001,
          "hi_offset": -1001,
          "ls_offset": -1001,
          "consumer_lag": -1,
          "consumer_lag_stored": -1,
          "txmsgs": 0,
          "txbytes": 0,
          "rxmsgs": 0,
          "rxbytes": 0,
          "msgs": 0,
          "rx_ver_drops": 0,
          "msgs_inflight": 0,
          "next_ack_seq": 0,
          "next_err_seq": 0,
          "acked_msgid": 0
        }
      }
    }
  },
  "tx": 178,
  "tx_bytes": 23489995,
  "rx": 178,
  "rx_bytes": 14458,
  "txmsgs": 176,
  "txmsg_bytes": 23464230,
  "rxmsgs": 0,
  "rxmsg_bytes": 0
}
@wenhaocs
Copy link
Author

I figured out. It's because producer->poll(500) in the code. 500ms seems too much.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant