-
Notifications
You must be signed in to change notification settings - Fork 2
/
scalyr.rb
1378 lines (1167 loc) · 60.3 KB
/
scalyr.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# encoding: utf-8
require "logstash/outputs/base"
require "logstash/namespace"
require "concurrent"
require "stud/buffer"
require "socket" # for Socket.gethostname
# rubocop:disable Lint/RedundantRequireStatement
require "thread" # for safe queueing
# rubocop:enable Lint/RedundantRequireStatement
require "uri" # for escaping user input
require 'json' # for converting event object to JSON for upload
require 'manticore'
require 'rbzip2'
require 'zlib'
require 'stringio'
require 'quantile'
require 'jrjackson'
require 'scalyr/common/client'
require "scalyr/common/util"
require "scalyr/constants"
#---------------------------------------------------------------------------------------------------------------------
# Implements the Scalyr output plugin
#---------------------------------------------------------------------------------------------------------------------
class LogStash::Outputs::Scalyr < LogStash::Outputs::Base
config_name "scalyr"
concurrency :shared
# The Scalyr API write token, these are available at https://www.scalyr.com/keys. This is the only compulsory configuration field required for proper upload
config :api_write_token, :validate => :string, :required => true
# If you have an EU-based Scalyr account, please use https://eu.scalyr.com/
config :scalyr_server, :validate => :string, :default => "https://agent.scalyr.com/"
# True to perform connectivity check with Scalyr on plugin start up / register phase. This
# ensures an exception is thrown if we can't communicate with Scalyr and we don't start
# consuming events until plugin is correctly configured.
config :perform_connectivity_check, :validate => :boolean, :default => true
# server_attributes is a dictionary of key value pairs that represents/identifies the logstash aggregator server
# (where this plugin is running). Keys are arbitrary except for the 'serverHost' key which holds special meaning to
# Scalyr and is given special treatment in the Scalyr UI. All of these attributes are optional (not required for logs
# to be correctly uploaded)
config :server_attributes, :validate => :hash, :default => nil
# Related to the server_attributes dictionary above, if you do not define the 'serverHost' key in server_attributes,
# the plugin will automatically set it, using the aggregator hostname as value, if this value is true.
config :use_hostname_for_serverhost, :validate => :boolean, :default => true
# Field that represents the origin of the log event.
# (Warning: events with an existing 'serverHost' field, it will be overwritten)
config :serverhost_field, :validate => :string, :default => 'serverHost'
# The 'logfile' fieldname has special meaning for the Scalyr UI. Traditionally, it represents the origin logfile
# which users can search for in a dedicated widget in the Scalyr UI. If your Events capture this in a different field
# you can specify that fieldname here and the Scalyr Output Plugin will rename it to 'logfile' before upload.
# (Warning: events with an existing 'logfile' field, it will be overwritten)
config :logfile_field, :validate => :string, :default => 'logfile'
# Record field which includes the value for the "severity" field. severity is a special field which tells
# Scalyr severity / log level for a particulat event. This field is a top level event field and not
# event attribute field. Actual field value must be an integer and is mapped to different severity /
# log level on DataSet server side as shown below:
#
# - 0 -> finest
# - 1 -> trace
# - 2 -> debug
# - 3 -> info
# - 4 -> warning
# - 5 -> error
# - 6 -> fatal / emergency / critical
#
# By default, if Event contains no severity field, default value of 3 (info) will be used.
config :severity_field, :validate => :string, :default => nil
# The Scalyr Output Plugin expects the main log message to be contained in the Event['message']. If your main log
# content is contained in a different field, specify it here. It will be renamed to 'message' before upload.
# (Warning: events with an existing 'message' field, it will be overwritten)
config :message_field, :validate => :string, :default => "message"
# A list of fieldnames that are constant for any logfile. Any fields listed here will be sent to Scalyr as part of
# the `logs` array instead of inside every event to save on transmitted bytes. What constitutes a single "logfile"
# for correctness is a combination of logfile_field value and serverhost_field value. Only events with a serverHost
# value with have fields moved.
config :log_constants, :validate => :array, :default => nil
# When this option is true and session level server host is defined (either via
# server_attributes config option or via node hostname) and some events in a batch contain
# "serverHost" attributes, other nodes in a batch which don't contain it will have serverHost
# set to the session level value.
# This is needed because session level attribute has priority over event level which means
# that in case we specify serverHost on some events, other events won't have any value set
# for serverHost.
# Since this option adds some overhead and requires additional processing time, it's
# disabled by default.
config :set_session_level_serverhost_on_events, validate: :boolean, default: false
# By default, logstash will add "host" attribute which includes logstash aggregator server
# host to each event. This is not really needed and desired anymore with the fixed and improved
# serverHost attribute handling (serverHost now contains logstash aggregator hostname by
# default).
config :remove_host_attribute_from_events, validate: :boolean, default: true
# If true, nested values will be flattened (which changes keys to delimiter-separated concatenation of all
# nested keys).
config :flatten_nested_values, :validate => :boolean, :default => false
config :flatten_nested_values_delimiter, :validate => :string, :default => "_"
config :flatten_nested_arrays, :validate => :boolean, :default => true
config :fix_deep_flattening_delimiters, :validate => :boolean, :default => false
config :flattening_max_key_count, :validate => :number, :default => -1
# If true, the 'tags' field will be flattened into key-values where each key is a tag and each value is set to
# :flat_tag_value
config :flatten_tags, :validate => :boolean, :default => false
config :flat_tag_prefix, :validate => :string, :default => 'tag_'
config :flat_tag_value, :default => 1
#####
## Retry settings for non deploy and non throttling related errors
####
# Initial interval in seconds between bulk retries. Doubled (by default, can be overriden using
# retry_backoff_factor config option) on each retry up to `retry_max_interval`
config :retry_initial_interval, :validate => :number, :default => 1
# How many times to retry sending an event before giving up on it
# This will result in a total of around 12 minutes of retrying / sleeping with a default value
# for retry_max_interval
config :max_retries, :validate => :number, :default => 15
# Set max interval in seconds between bulk retries.
config :retry_max_interval, :validate => :number, :default => 64
# Back off factor for retries. We default to 2 (exponential retry delay).
config :retry_backoff_factor, :validate => :number, :default => 2
#####
## Retry settings for deploy related errors
####
config :retry_initial_interval_deploy_errors, :validate => :number, :default => 30
config :max_retries_deploy_errors, :validate => :number, :default => 5
config :retry_max_interval_deploy_errors, :validate => :number, :default => 64
config :retry_backoff_factor_deploy_errors, :validate => :number, :default => 1.5
#####
## Retry settings for throttling related errors
####
config :retry_initial_interval_throttling_errors, :validate => :number, :default => 20
config :max_retries_throttling_errors, :validate => :number, :default => 6
config :retry_max_interval_throttling_errors, :validate => :number, :default => 64
config :retry_backoff_factor_throttling_errors, :validate => :number, :default => 1.5
#####
## Common retry related settings
#####
# Whether or not to send messages that failed to send a max_retries amount of times to the DLQ or just drop them
config :send_to_dlq, :validate => :boolean, :default => true
# Whether or not to verify the connection to Scalyr, only set to false for debugging.
config :ssl_verify_peer, :validate => :boolean, :default => true
# Path to SSL bundle file used to validate remote / server SSL certificate. By default, path to
# the CA bundled which is vendored / bundled with the RubyGem is used.
# If user has a specific reason to change this value (e.g. to a system ca bundle such as
# /etc/ssl/certs/ca-certificates.crt, they can update this option).
config :ssl_ca_bundle_path, :validate => :string, :default => CA_CERTS_PATH
# Unused since v0.2.7, left here for backward compatibility reasons
config :append_builtin_cert, :validate => :boolean, :default => false
config :max_request_buffer, :validate => :number, :default => 5500000 # echee TODO: eliminate?
config :force_message_encoding, :validate => :string, :default => nil
config :replace_invalid_utf8, :validate => :boolean, :default => false
# Valid options are bz2, deflate, or none.
config :compression_type, :validate => :string, :default => 'deflate'
# An int containing the compression level of compression to use, from 1-9. Defaults to 6. Only
# applicable when compression type is "deflate" or "bz2".
config :compression_level, :validate => :number, :default => 6
# How often to log and report status metrics to Scalyr. Defaults to every 5
# minutes.
config :status_report_interval, :validate => :number, :default => 300
# Status will be reported regardless if the plugin receives empty batch if the plugin is
# "active" (received and processed some events during the plugin life time) and more than this
# amount of time has passed since the last reporting.
config :idle_status_report_deadline, :validate => :number, :default => 300
# True to also call send_status when multi_receive() is called with no events.
# In some situations (e.g. when logstash is configured with multiple scalyr
# plugins conditionally where most are idle) you may want to set this to false
config :report_status_for_empty_batches, :validate => :boolean, :default => true
# Set to true to also log status messages with various metrics to stdout in addition to sending
# this data to Scalyr
config :log_status_messages_to_stdout, :validate => :boolean, :default => false
# Whether or not to count status event uploads in the statistics such as request latency etc.
config :record_stats_for_status, :validate => :boolean, :default => false
# Sample rate for event level metrics (flattening time, number of attributes per event, etc,).
# It's important to set this in case there are many events coming in per seconds, because
# instrumentation does add some overhead. By default, we sample 5% of the events. Keep in
# mind that we use simple random based sampling. Maximum possible value is 1 (aka no sampling
# - record metrics for every single event).
# We use sampling since Quantile.observe() operation is more expensive than simple counter
# based metric so we need to ensure recording a metric doesn't add too much overhead.
# Based on micro benchmark, random based sampler is about 5x faster than quantile.observe()
config :event_metrics_sample_rate, :validate => :number, :default => 0.05
# Parser to attach to status events
config :status_parser, :validate => :string, :default => "logstash_plugin_metrics"
# Whether or not to create fresh quantile estimators after a status send. Depending on what you want to gather from
# these stas this might be wanted or not.
config :flush_quantile_estimates_on_status_send, :validate => :boolean, :default => false
# Causes this plugin to act as if it successfully uploaded the logs, while actually returning as quickly as possible
# after no work being done.
config :noop_mode, :validate => :boolean, :default => false
# Set to true to disable estimiating the size of each serialized event to make sure we don't go over the max request
# size (5.5) and split batch into multiple Scalyr requests, if needed. Since this estimation is not "free", especially
# for large batches, it may make sense to disable this option when logstash batch size is configured in a way that
# Scalyr single request limit won't be reached.
config :estimate_each_event_size, :validate => :boolean, :default => true
# Library to use for JSON serialization. Valid values are "stdlib" and "jrjackson". The later may offer 2-4 performance
# improvements on serialization.
config :json_library, :validate => :string, :default => "stdlib"
# Manticore related options
config :http_connect_timeout, :validate => :number, :default => 10
config :http_socket_timeout, :validate => :number, :default => 10
config :http_request_timeout, :validate => :number, :default => 60
config :http_pool_max, :validate => :number, :default => 50
config :http_pool_max_per_route, :validate => :number, :default => 25
def initialize(*params)
super
# Request statistics are accumulated across multiple threads and must be accessed through a mutex
@stats_lock = Mutex.new
@send_stats = Mutex.new
end
def close
@running = false
@client_session.close if @client_session
end
def register
# This prng is used exclusively to determine when to sample statistics and no security related purpose, for this
# reason we do not ensure thread safety for it.
@prng = Random.new
if @event_metrics_sample_rate < 0 or @event_metrics_sample_rate > 1
raise LogStash::ConfigurationError, "Minimum possible value for 'event_metrics_sample_rate' is 0 (dont sample any events) and maximum is 1 (sample every event)"
end
@node_hostname = Socket.gethostname
if @log_constants and not @log_constants.all? { |x| x.is_a? String }
raise LogStash::ConfigurationError, "All elements of 'log_constants' must be strings."
end
if @max_request_buffer > 6000000
@logger.warn "Maximum request buffer > 6 MB. This may result in requests being rejected by Scalyr."
end
if not @estimate_each_event_size
@logger.warn("estimate_each_event_size config option is false, this means very large batches may be rejected or partially processed by the server")
end
if @json_library != "stdlib" and @json_library != "jrjackson"
raise LogStash::ConfigurationError, "json_library config option needs to be either stdlib or jrjackson"
end
if @json_library == "stdlib"
define_singleton_method "json_encode" do |data|
data.to_json
end
elsif @json_library == "jrjackson"
define_singleton_method "json_encode" do |data|
JrJackson::Json.dump(data)
end
end
if not @append_builtin_cert.nil?
@logger.warn "append_builtin_cert config option has been deprecated and is unused in versions 0.2.7 and above"
end
@dlq_writer = dlq_enabled? ? execution_context.dlq_writer : nil
@message_encoding = nil
if @force_message_encoding.to_s != ''
begin
@message_encoding = Encoding.find(@force_message_encoding)
@logger.debug "Forcing message encoding to '#{@force_message_encoding}'"
rescue ArgumentError
@logger.warn "Encoding '#{@force_message_encoding}' not found. Ignoring."
end
end
#evaluate any statements in string value of the server_attributes object
if @server_attributes
new_attributes = {}
@server_attributes.each do |key, value|
if value.is_a?( String )
m = /^\#{(.*)}$/.match( value )
if m
new_attributes[key] = eval( m[1] )
else
new_attributes[key] = value
end
end
end
@server_attributes = new_attributes
end
# See if we should use the hostname as the server_attributes.serverHost (aka if fixed serverHost is not
# defined as part of server_attributes config option)
if @server_attributes.nil?
@server_attributes = {}
end
if @use_hostname_for_serverhost
# only set serverHost if it doesn't currently exist in server_attributes
# Note: Use strings rather than symbols for the key, because keys coming
# from the config file will be strings
unless @server_attributes.key? 'serverHost'
@server_attributes['serverHost'] = @node_hostname
end
end
# Add monitor server attribute to identify this as coming from a plugin
@server_attributes['monitor'] = 'pluginLogstash'
# We create a fixed copy without host here so we can reference this later if needed to avoid
# some of the copy + manipulate overhead per batch
@server_attributes_without_serverhost = @server_attributes.clone
if @server_attributes_without_serverhost.key? "serverHost"
@server_attributes_without_serverhost.delete "serverHost"
end
@session_server_host = @server_attributes["serverHost"]
@scalyr_server << '/' unless @scalyr_server.end_with?('/')
# Validate the URL
uri = URI.parse(@scalyr_server)
if not uri.kind_of?(URI::HTTP) and not uri.kind_of?(URI::HTTPS)
raise LogStash::ConfigurationError, "scalyr_server configuration option value is not a valid URL. " \
"This value needs contain a full URL with the protocol. e.g. " \
"https://agent.scalyr.com for US and https://eu.scalyr.com for EU"
end
@add_events_uri = URI(@scalyr_server) + "addEvents"
@logger.info "Scalyr LogStash Plugin ID - #{self.id}"
@session_id = SecureRandom.uuid
@last_status_transmit_time_lock = Mutex.new
@last_status_transmit_time = nil
@last_status_ = false
# Plugin level (either per batch or event level metrics). Other request
# level metrics are handled by the HTTP Client class.
@multi_receive_statistics = {
:total_multi_receive_secs => 0,
:total_events_processed => 0,
:successful_events_processed => 0,
:failed_events_processed => 0,
:total_retry_count => 0,
:total_retry_duration_secs => 0,
:total_java_class_cast_errors => 0
}
@plugin_metrics = get_new_metrics
# create a client session for uploading to Scalyr
@running = true
@client_session = Scalyr::Common::Client::ClientSession.new(
@logger, @add_events_uri,
@compression_type, @compression_level, @ssl_verify_peer, @ssl_ca_bundle_path,
@record_stats_for_status, @flush_quantile_estimates_on_status_send,
@http_connect_timeout, @http_socket_timeout, @http_request_timeout, @http_pool_max, @http_pool_max_per_route
)
# We also "prime" the main HTTP client here, one which is used for sending subsequent requests.
# Here priming just means setting up the client parameters without opening any connections.
# Since client writes certs to a temporary file there could be a race in case we don't do that
# here since multi_receive() is multi threaded. An alternative would be to put a look around
# client init method (aka client_config())
@client_session.client
# Send a ping to verify that the configuration API key is correct and that we can establish
# connection with Scalyr API
connectivity_check
@logger.info(sprintf("Started Scalyr LogStash output plugin %s (compression_type=%s,compression_level=%s,json_library=%s)." %
[PLUGIN_VERSION, @compression_type, @compression_type, @json_library]), :class => self.class.name)
# Finally, send a status line to Scalyr
# We use a special separate short lived client session for sending the initial client status.
# This is done to avoid the overhead in case single logstash instance has many scalyr output
# plugins configured with conditionals and majority of them are inactive (aka receive no data).
# This way we don't need to keep idle long running connection open.
initial_send_status_client_session = Scalyr::Common::Client::ClientSession.new(
@logger, @add_events_uri,
@compression_type, @compression_level, @ssl_verify_peer, @ssl_ca_bundle_path,
@record_stats_for_status, @flush_quantile_estimates_on_status_send,
@http_connect_timeout, @http_socket_timeout, @http_request_timeout, @http_pool_max, @http_pool_max_per_route
)
# We propagate errors on intial request to better handle errors related to invalid hostname
# or similar
send_status(initial_send_status_client_session)
initial_send_status_client_session.close
end # def register
# Method which performs connectivity check with Scalyr API, verifies that wt can talk to the API
# and that the API token is valid.
def connectivity_check()
if not @perform_connectivity_check
return
end
@logger.debug("Performing connectivity check against the Scalyr API")
body = create_multi_event_request([], nil, nil, nil)[:body]
begin
@client_session.send_ping(body)
rescue Scalyr::Common::Client::ClientError, Manticore::ResolutionFailure => e
if not e.message.nil? and (e.message.include?("nodename nor servname provided") or
e.message.downcase.include?("name or service not know"))
raise LogStash::ConfigurationError,
format("Received error when trying to communicate with Scalyr API. This likely means that " \
"the configured value for 'scalyr_server' config option is invalid. Original error: %s",
e.message)
end
# For now, we consider rest of the errors non fatal and just log them and don't propagate
# them and fail register
@logger.warn("Received error when trying to send connectivity check request to Scalyr",
:error => e.message)
rescue Scalyr::Common::Client::ServerError => e
if e.code == 401
raise LogStash::ConfigurationError,
format("Received 401 from Scalyr API during connectivity check which indicates " \
"an invalid API key. Server Response: %s", e.body)
end
rescue => e
@logger.warn("Received non-fatal error during connectivity check", :error => e.message)
end
end
# Convenience method to create a fresh quantile estimator
def get_new_metrics
return {
:build_multi_duration_secs => Quantile::Estimator.new,
:multi_receive_duration_secs => Quantile::Estimator.new,
:multi_receive_event_count => Quantile::Estimator.new,
:event_attributes_count => Quantile::Estimator.new,
:flatten_values_duration_secs => Quantile::Estimator.new,
:batches_per_multi_receive => Quantile::Estimator.new
}
end
# Receive an array of events and immediately upload them (without buffering).
# The Logstash framework will call this plugin method whenever there is a list of events to upload to Scalyr.
# The plugin is expected to retry until success, or else to write failures to the Dead-letter Queue.
# No buffering/queuing is done -- ie a synchronous upload to Scalyr is attempted and retried upon failure.
#
# If there are any network errors, exponential backoff occurs.
#
# Also note that event uploads are broken up into batches such that each batch is less than max_request_buffer.
# Increasing max_request_buffer beyond 3MB will lead to failed requests.
#
def multi_receive(events)
# Just return and pretend we did something if running in noop mode
return events if @noop_mode
begin
records_count = events.to_a.length
# We also time the duration of the build_multi_event_request_array method. To avoid locking twice,
# we store the duration value here and record metric at the end.
start_time = Time.now.to_f
multi_event_request_array = build_multi_event_request_array(events)
build_multi_duration_secs = Time.now.to_f - start_time
# Loop over all array of multi-event requests, sending each multi-event to Scalyr
batch_num = 1
total_batches = multi_event_request_array.length unless multi_event_request_array.nil?
result = []
while !multi_event_request_array.to_a.empty?
multi_event_request = multi_event_request_array.pop
# Variables to hold information about exceptions we run into, and our handling of retries for this request. We
# track this to log it when the retries succeed so we can be sure logs are going through.
# General exception info we log in the error
exc_data = nil
# Whether the exception is commonly retried or not, for determining log level
exc_commonly_retried = false
# We use new and clean retry state object for each request
# Since @running is only available directly on the output plugin instance and we don't
# want to create a cyclic reference between output and state tracker instance we pass
# this lambda method to the state tracker
is_plugin_running = lambda { @running }
retry_state = RetryStateTracker.new(@config, is_plugin_running)
begin
# For some reason a retry on the multi_receive may result in the request array containing `nil` elements, we
# ignore these.
if !multi_event_request.nil?
@client_session.post_add_events(multi_event_request[:body], false, multi_event_request[:serialization_duration])
batch_num += 1
result.push(multi_event_request)
end
rescue Scalyr::Common::Client::PayloadTooLargeError => e
# if the payload is too large, we do not retry. we send to DLQ or drop it.
exc_data = {
:error_class => e.e_class,
:url => e.url.to_s,
:message => e.message,
:batch_num => batch_num,
:total_batches => total_batches,
:record_count => multi_event_request[:record_count],
:payload_size => multi_event_request[:body].bytesize,
}
exc_data[:code] = e.code if e.code
if defined?(e.body) and e.body
exc_data[:body] = Scalyr::Common::Util.truncate(e.body, 512)
end
exc_data[:payload] = "\tSample payload: #{multi_event_request[:body][0,1024]}..."
log_retry_failure(multi_event_request, exc_data, 0, 0)
next
rescue Scalyr::Common::Client::ServerError, Scalyr::Common::Client::ClientError => e
previous_state = retry_state.get_state_for_error(e)
updated_state = retry_state.sleep_for_error_and_update_state(e)
@stats_lock.synchronize do
@multi_receive_statistics[:total_retry_count] += 1
@multi_receive_statistics[:total_retry_duration_secs] += updated_state[:sleep_interval]
end
message = "Error uploading to Scalyr (will backoff-retry)"
exc_data = {
:error_class => e.e_class,
:url => e.url.to_s,
:message => e.message,
:batch_num => batch_num,
:total_batches => total_batches,
:record_count => multi_event_request[:record_count],
:payload_size => multi_event_request[:body].bytesize,
# retry related values
:max_retries => updated_state[:options][:max_retries],
:retry_backoff_factor => updated_state[:options][:retry_backoff_factor],
:retry_max_interval => updated_state[:options][:retry_max_interval],
:will_retry_in_seconds => updated_state[:sleep_interval],
# to get values which include this next retry, you need to add +1
# to :total_retries_so_far and +:sleep_interval to :total_sleep_time_so_far
:total_retries_so_far => previous_state[:retries],
:total_sleep_time_so_far => previous_state[:sleep],
}
exc_data[:code] = e.code if e.code
if @logger.debug? and defined?(e.body) and e.body
exc_data[:body] = e.body
elsif defined?(e.body) and e.body
exc_data[:body] = Scalyr::Common::Util.truncate(e.body, 512)
end
exc_data[:payload] = "\tSample payload: #{request[:body][0,1024]}..." if @logger.debug?
if e.is_commonly_retried?
# well-known retriable errors should be debug
@logger.debug(message, exc_data)
exc_commonly_retried = true
else
# all other failed uploads should be warning
@logger.warn(message, exc_data)
exc_commonly_retried = false
end
retry if @running and updated_state[:retries] < updated_state[:options][:max_retries]
log_retry_failure(multi_event_request, exc_data, updated_state[:retries], updated_state[:sleep])
next
rescue => e
# Any unexpected errors should be fully logged
@logger.error(
"Unexpected error occurred while uploading to Scalyr (will backoff-retry)",
:error_message => e.message,
:error_class => e.class.name,
:backtrace => e.backtrace
)
@logger.debug("Failed multi_event_request", :multi_event_request => multi_event_request)
updated_state = retry_state.sleep_for_error_and_update_state(e)
exc_data = {
:error_message => e.message,
:error_class => e.class.name,
:backtrace => e.backtrace,
:multi_event_request => multi_event_request
}
@stats_lock.synchronize do
@multi_receive_statistics[:total_retry_count] += 1
@multi_receive_statistics[:total_retry_duration_secs] += updated_state[:sleep_interval]
end
retry if @running and updated_state[:retries] < updated_state[:options][:max_retries]
log_retry_failure(multi_event_request, exc_data, updated_state[:retries], updated_state[:sleep])
next
end
@stats_lock.synchronize do
@multi_receive_statistics[:total_events_processed] += multi_event_request[:logstash_events].length
@multi_receive_statistics[:successful_events_processed] += multi_event_request[:logstash_events].length
end
if !exc_data.nil?
message = "Retry successful after error."
if exc_commonly_retried
@logger.debug(message, :error_data => exc_data, :retries => updated_state[:retries], :sleep_time => updated_state[:sleep_interval])
else
@logger.info(message, :error_data => exc_data, :retries => updated_state[:retries], :sleep_time => updated_state[:sleep_interval])
end
end
end
if records_count > 0
@stats_lock.synchronize do
@multi_receive_statistics[:total_multi_receive_secs] += (Time.now.to_f - start_time)
@plugin_metrics[:build_multi_duration_secs].observe(build_multi_duration_secs)
@plugin_metrics[:multi_receive_duration_secs].observe(Time.now.to_f - start_time)
@plugin_metrics[:multi_receive_event_count].observe(records_count)
@plugin_metrics[:batches_per_multi_receive].observe(total_batches)
end
end
if @report_status_for_empty_batches or records_count > 0 or should_call_send_status_for_empty_batch()
send_status
end
return result
rescue => e
# Any unexpected errors should be fully logged
@logger.error(
"Unexpected error occurred while executing multi_receive.",
:error_message => e.message,
:error_class => e.class.name,
:backtrace => e.backtrace
)
end
end # def multi_receive
# Return true if "send_status()" should be called even for an empty batch.
# NOTE: We always report status even if record count is 0 if more than X minutes have passed
# from the last status reporting for active plugins. Logstash sends empty batches for flush
# purposes quite often, but we only want to report status if we haven't reported it for a
# longer period (aka there hasn't been any data flowing in for a while). If we reported it
# for every flush event (which happens very often), this would skew metrics for regular
# events and output plugins which receive no data at all.
# We perform check against total events processed so we don't send status for output plugins
# which receive no data at all.
def should_call_send_status_for_empty_batch()
if @multi_receive_statistics[:total_events_processed] < 100
# Ignore idle plugins with no data
return false
end
if @last_status_transmit_time.nil?
# If this value is not set it means plugin is idle and hasn't received any data at all
return false
end
return (Time.now.to_i - @last_status_transmit_time.to_i > @idle_status_report_deadline)
end
def log_retry_failure(multi_event_request, exc_data, exc_retries, exc_sleep)
@stats_lock.synchronize do
@multi_receive_statistics[:total_events_processed] += multi_event_request[:logstash_events].length
@multi_receive_statistics[:failed_events_processed] += multi_event_request[:logstash_events].length
end
sample_events = Array.new
multi_event_request[:logstash_events][0,5].each {|l_event|
sample_events << Scalyr::Common::Util.truncate(l_event.to_hash.to_json, 256)
}
if exc_data[:code] == 413
message = "Failed to send #{multi_event_request[:logstash_events].length} events due to exceeding maximum request size. Not retrying non-retriable request."
# For PayloadTooLargeError we already include sample Scalyr payload in exc_data so there is no need
# to include redundant sample Logstash event objects
@logger.error(message, :error_data => exc_data)
else
message = "Failed to send #{multi_event_request[:logstash_events].length} events after #{exc_retries} tries."
@logger.error(message, :error_data => exc_data, :sample_events => sample_events, :retries => exc_retries, :sleep_time => exc_sleep)
end
if @dlq_writer
multi_event_request[:logstash_events].each {|l_event|
@dlq_writer.write(l_event, "#{exc_data[:message]}")
}
else
@logger.warn("Dead letter queue not configured, dropping #{multi_event_request[:logstash_events].length} events.", :sample_events => sample_events)
end
end
# Builds an array of multi-event requests from LogStash events
# Each array element is a request that groups multiple events (to be posted to Scalyr's addEvents endpoint)
#
# This function also performs data transformations to support special fields and, optionally, flatten JSON values.
#
# Special fields are those that have special semantics to Scalyr, i.e. 'message' contains the main log message,
# 'serverHost' and 'logfile' have a dedicated search boxes to facilitate filtering. All Logstash event key/values will
# be marshalled into a Scalyr addEvents `attr` key/value unless they are identified as alternate names for special
# fields. The special fields ('message', 'serverHost', 'logfile') may be remapped from other fields (configured by setting
# 'message_field', 'serverhost_field', 'logfile_field')
#
# Values that are nested JSON may be optionally flattened (See README.md for some examples).
#
# Certain fields are removed (e.g. @timestamp and @version)
#
# Tags are either propagated as a comma-separated string, or optionally transposed into key-values where the keys
# are tag names and the values are 1 (may be configured.)
def build_multi_event_request_array(logstash_events)
if logstash_events.nil? or logstash_events.empty?
return []
end
multi_event_request_array = Array.new
total_bytes = 0
# Set of unique scalyr threads for this chunk
current_threads = Hash.new
# Create a Scalyr event object for each record in the chunk
scalyr_events = Array.new
# Track the logstash events in each chunk to send them to the dlq in case of an error
l_events = Array.new
thread_ids = Hash.new
next_id = 1 #incrementing thread id for the session
# per-logfile attributes
logs = Hash.new
logs_ids = Hash.new
next_log_id = 1
batch_has_event_level_server_host = false
logstash_events.each {|l_event|
record = l_event.to_hash
# Create optional threads hash if serverHost is non-nil
# echee: TODO I don't think threads are necessary. Too much info?
# they seem to be a second level of granularity within a logfile
serverHost = record.fetch(@serverhost_field, nil)
if serverHost
# get thread id or add a new one if we haven't seen this serverHost before
if thread_ids.key? serverHost
thread_id = thread_ids[serverHost]
else
thread_id = next_id
thread_ids[serverHost] = thread_id
next_id += 1
end
# then update the map of threads for this chunk
current_threads[serverHost] = thread_id
end
rename = lambda do |renamed_field, standard_field|
if standard_field != renamed_field
if record.key? renamed_field
if record.key? standard_field
@logger.warn "Overwriting log record field '#{standard_field}'. You are seeing this warning because in " +
"your LogStash config file you have configured the '#{renamed_field}' field to be converted to the " +
"'#{standard_field}' field, but the event already contains a field called '#{standard_field}' and "
"this is now being overwritten."
end
record[standard_field] = record[renamed_field]
record.delete(renamed_field)
end
end
end
# Rename user-specified message field -> 'message'
rename.call(@message_field, 'message')
# Fix message encoding
if @message_encoding and !record['message'].to_s.empty?
if @replace_invalid_utf8 and @message_encoding == Encoding::UTF_8
record["message"] = record["message"].encode("UTF-8", :invalid => :replace,
:undef => :replace, :replace => "<?>").force_encoding('UTF-8')
else
record["message"].force_encoding(@message_encoding)
end
end
# Rename user-specified serverHost field -> 'serverHost'
rename.call(@serverhost_field, 'serverHost')
# Rename user-specified logfile field -> 'logfile'
rename.call(@logfile_field, 'logfile')
# Rename user-specified severity field -> 'severity' (if configured)
if not @severity_field.nil?
rename.call(@severity_field, 'severity')
end
# Remove "host" attribute
if @remove_host_attribute_from_events and record.key? "host"
record.delete("host")
end
# Set a default parser is none is present in the event
if record['parser'].to_s.empty?
record['parser'] = "logstashParser"
end
# Set logfile field if empty and serverHost is supplied
if record['logfile'].to_s.empty? and serverHost
record['logfile'] = "/logstash/#{serverHost}"
end
# Rename serverHost (if exists) to __origServerHost so sources filtering works correctly
# It's important that this happens at the very end of the event processing in this function.
record_has_server_host_attribute = record.key? 'serverHost'
batch_has_event_level_server_host |= record_has_server_host_attribute
if record_has_server_host_attribute
record[EVENT_LEVEL_SERVER_HOST_ATTRIBUTE_NAME] = record['serverHost']
record.delete('serverHost')
end
# To reduce duplication of common event-level attributes, we "fold" them into top-level "logs" attribute
# and reference log entry inside the event
log_identifier = nil
add_log = false
if serverHost
log_identifier = serverHost + record['logfile']
end
if log_identifier and not logs.key? log_identifier
add_log = true
logs[log_identifier] = {
'id' => next_log_id,
'attrs' => Hash.new
}
if not record['logfile'].to_s.empty?
logs[log_identifier]['attrs']['logfile'] = record['logfile']
record.delete('logfile')
end
if @log_constants
@log_constants.each {|log_constant|
if record.key? log_constant
logs[log_identifier]['attrs'][log_constant] = record[log_constant]
record.delete(log_constant)
end
}
end
logs_ids[log_identifier] = next_log_id
next_log_id += 1
end
# If we already contain "logs" entry for this record, we remove duplicated serverHost from
# the event attributes since it's already part of the log level attributes which are
# referenced by the event.
if log_identifier and logs.key? log_identifier
if not record[EVENT_LEVEL_SERVER_HOST_ATTRIBUTE_NAME].to_s.empty?
logs[log_identifier]['attrs'][EVENT_LEVEL_SERVER_HOST_ATTRIBUTE_NAME] = record[EVENT_LEVEL_SERVER_HOST_ATTRIBUTE_NAME]
record.delete(EVENT_LEVEL_SERVER_HOST_ATTRIBUTE_NAME)
end
end
# Delete unwanted fields from record
record.delete('@version')
record.delete('@timestamp')
# flatten tags
if @flatten_tags and record.key? 'tags'
record['tags'].each do |tag|
record["#{@flat_tag_prefix}#{tag}"] = @flat_tag_value
end
record.delete('tags')
end
# Record per-event level metrics (flatten duration, event attributes count). Doing this for every single
# event would be somewhat expensive so we use sampling.
should_sample_event_metrics = should_sample?
# flatten record
if @flatten_nested_values
start_time = Time.now.to_f
begin
record = Scalyr::Common::Util.flatten(record, @flatten_nested_values_delimiter, @flatten_nested_arrays, @fix_deep_flattening_delimiters, @flattening_max_key_count)
rescue Scalyr::Common::Util::MaxKeyCountError => e
@logger.warn("Error while flattening record", :error_message => e.message, :sample_keys => e.sample_keys)
end
end_time = Time.now.to_f
flatten_nested_values_duration = end_time - start_time
end
if should_sample_event_metrics
@stats_lock.synchronize do
@plugin_metrics[:event_attributes_count].observe(record.count)
if @flatten_nested_values
@plugin_metrics[:flatten_values_duration_secs].observe(flatten_nested_values_duration)
end
end
end
severity = record['severity']
severity_int = nil
# Server won't accept the payload in case severity value is not valid. To avoid events from
# being dropped, we only set Event.sev field in case this field contains a valid value.
if not @severity_field.nil? and severity and severity.is_integer?
severity_int = severity.to_i
if severity_int >= 0 and severity_int <= 6
record.delete('severity')
else
severity_int = nil
end
end
# Use LogStash event.timestamp as the 'ts' Scalyr timestamp. Note that this may be overwritten by input
# filters so may not necessarily reflect the actual originating timestamp.
scalyr_event = {
:ts => (l_event.timestamp.time.to_f * (10**9)).round,
:attrs => record
}
# optionally set thread and referenced log file
if serverHost
scalyr_event[:thread] = thread_id.to_s
scalyr_event[:log] = logs_ids[log_identifier]
end
# optionally set severity (if available and valid)
if @severity_field and not severity_int.nil?
scalyr_event[:sev] = severity_int
end
if @estimate_each_event_size
# get json string of event to keep track of how many bytes we are sending
begin
event_json = self.json_encode(scalyr_event)
log_json = nil
if add_log
log_json = self.json_encode(logs[log_identifier])
end
rescue JSON::GeneratorError, Encoding::UndefinedConversionError => e
@logger.warn "#{e.class}: #{e.message}"
# Send the faulty event to a label @ERROR block and allow to handle it there (output to exceptions file for ex)
# TODO
# atime = Fluent::EventTime.new( sec, nsec )
# router.emit_error_event(serverHost, time, record, e)
scalyr_event[:attrs].each do |key, value|
@logger.debug "\t#{key} (#{value.encoding.name}): '#{value}'"
scalyr_event[:attrs][key] = value.encode(
"UTF-8", :invalid => :replace, :undef => :replace, :replace => "<?>"
).force_encoding('UTF-8')
end
event_json = self.json_encode(scalyr_event)
rescue Java::JavaLang::ClassCastException
# Most likely we ran into the issue described here: https://github.com/flori/json/issues/336
# Because of the version of jruby logstash works with we don't have the option to just update this away,
# so if we run into it we convert bignums into strings so we can get the data in at least.
# This is fixed in JRuby 9.2.7, which includes json 2.2.0
@logger.warn("Error serializing events to JSON, likely due to the presence of Bignum values. Converting Bignum values to strings.")
@stats_lock.synchronize do
@multi_receive_statistics[:total_java_class_cast_errors] += 1
end
Scalyr::Common::Util.convert_bignums(scalyr_event)
event_json = self.json_encode(scalyr_event)
log_json = nil