Heka output plugin for persisting messages from the data pipeline to BigQuery.
It consumes data from a Kafka Topic into a buffer variable + local file(for backup) and uploads periodically to BigQuery when the buffer is of a certain file size.
Contains a ticker that checks for midnight and creates new tables daily in BigQuery. The intervals are in the code as constants.
This plugin is currently used in Wego.com
Uses toml (heka plugin default) for configuration. See heka_config.toml.sample
for reference.
Bigquery schema file is specified by a json file. See realtime_log.schema.sample
for reference.
Private key for BigQuery is specified by a pkcs12 format PEM file that was converted (password removed) from the p12 file originally obtained from developer's console: https://console.developers.google.com/project/{project_id}/apiui/credential
. More information here: https://www.openssl.org/docs/apps/pkcs12.html.
Table names in BigQuery will {dataset_id}/{table_id}{date_stamp}. date_stamp formats as such: 20151230
If no Encoder is specified in TOML, then the message Payload is extracted and sent.
[realtime-log-input-kafka]
type = "KafkaInput"
topic = "realtime-log"
addrs = ["kafka-a-1.bezurk.org:9092", "kafka-b-1.bezurk.org:9092"]
[realtime-log-output-bq]
type = "BqOutput"
message_matcher = "Logger == 'realtime-log-input-kafka'"
project_id = "org-project"
dataset_id = "go_realtime_log"
table_id = "log"
schema_file_path = "/var/apps/shared/config/realtime_log.schema"
service_email = "[email protected]"
pem_file_path = "/var/apps/shared/config/big_query.pem"
buffer_path = "/var/buffer/bq"
buffer_file = "realtime_log"
ticker_interval = 5
Refer to: https://hekad.readthedocs.org/en/v0.9.2/installing.html#building-hekad-with-external-plugins
Simply add this line in {heka root}/cmake/plugin_loader.cmake:
add_external_plugin(git https://github.com/aranair/heka-bigquery master)
Then run build.sh as per the documentation: `source build.sh`
Developed in collaboration, with Alex when he built https://github.com/uohzxela/heka-s3 the heka plugin to upload to S3 for Wego.
- Fork it
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Added some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request
MIT