forked from apache/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
/
beam_runner_api.proto
1306 lines (1087 loc) · 44.3 KB
/
beam_runner_api.proto
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Protocol Buffers describing the Runner API, which is the runner-independent,
* SDK-independent definition of the Beam model.
*/
syntax = "proto3";
package org.apache.beam.model.pipeline.v1;
option go_package = "pipeline_v1";
option java_package = "org.apache.beam.model.pipeline.v1";
option java_outer_classname = "RunnerApi";
import "endpoints.proto";
import "google/protobuf/any.proto";
import "google/protobuf/descriptor.proto";
message BeamConstants {
enum Constants {
// All timestamps in milliseconds since Jan 1, 1970.
MIN_TIMESTAMP_MILLIS = 0 [(beam_constant) = "-9223372036854775"];
MAX_TIMESTAMP_MILLIS = 1 [(beam_constant) = "9223372036854775"];
// The maximum timestamp for the global window.
// Triggers use maxTimestamp to set timers' timestamp. Timers fires when
// the watermark passes their timestamps. So, the timestamp needs to be
// smaller than the MAX_TIMESTAMP_MILLIS.
// One standard day is subtracted from MAX_TIMESTAMP_MILLIS to make sure
// the maxTimestamp is smaller than MAX_TIMESTAMP_MILLIS even after rounding up
// to seconds or minutes. See also GlobalWindow in the Java SDK.
GLOBAL_WINDOW_MAX_TIMESTAMP_MILLIS = 2 [(beam_constant) = "9223371950454775"];
}
}
// A set of mappings from id to message. This is included as an optional field
// on any proto message that may contain references needing resolution.
message Components {
// (Required) A map from pipeline-scoped id to PTransform.
map<string, PTransform> transforms = 1;
// (Required) A map from pipeline-scoped id to PCollection.
map<string, PCollection> pcollections = 2;
// (Required) A map from pipeline-scoped id to WindowingStrategy.
map<string, WindowingStrategy> windowing_strategies = 3;
// (Required) A map from pipeline-scoped id to Coder.
map<string, Coder> coders = 4;
// (Required) A map from pipeline-scoped id to Environment.
map<string, Environment> environments = 5;
}
// A Pipeline is a hierarchical graph of PTransforms, linked
// by PCollections.
//
// This is represented by a number of by-reference maps to nodes,
// PCollections, SDK environments, UDF, etc., for
// supporting compact reuse and arbitrary graph structure.
//
// All of the keys in the maps here are arbitrary strings that are only
// required to be internally consistent within this proto message.
message Pipeline {
// (Required) The coders, UDFs, graph nodes, etc, that make up
// this pipeline.
Components components = 1;
// (Required) The ids of all PTransforms that are not contained within another PTransform.
// These must be in shallow topological order, so that traversing them recursively
// in this order yields a recursively topological traversal.
repeated string root_transform_ids = 2;
// (Optional) Static display data for the pipeline. If there is none,
// it may be omitted.
DisplayData display_data = 3;
}
// An applied PTransform! This does not contain the graph data, but only the
// fields specific to a graph node that is a Runner API transform
// between PCollections.
message PTransform {
// (Required) A unique name for the application node.
//
// Ideally, this should be stable over multiple evolutions of a pipeline
// for the purposes of logging and associating pipeline state with a node,
// etc.
//
// If it is not stable, then the runner decides what will happen. But, most
// importantly, it must always be here and be unique, even if it is
// autogenerated.
string unique_name = 5;
// (Optional) A URN and payload that, together, fully defined the semantics
// of this transform.
//
// If absent, this must be an "anonymous" composite transform.
//
// For primitive transform in the Runner API, this is required, and the
// payloads are well-defined messages. When the URN indicates ParDo it
// is a ParDoPayload, and so on.
//
// TODO: document the standardized URNs and payloads
// TODO: separate standardized payloads into a separate proto file
//
// For some special composite transforms, the payload is also officially
// defined:
//
// - when the URN is "beam:transforms:combine" it is a CombinePayload
//
FunctionSpec spec = 1;
// (Optional) if this node is a composite, a list of the ids of
// transforms that it contains.
repeated string subtransforms = 2;
// (Required) A map from local names of inputs (unique only with this map, and
// likely embedded in the transform payload and serialized user code) to
// PCollection ids.
//
// The payload for this transform may clarify the relationship of these
// inputs. For example:
//
// - for a Flatten transform they are merged
// - for a ParDo transform, some may be side inputs
//
// All inputs are recorded here so that the topological ordering of
// the graph is consistent whether or not the payload is understood.
//
map<string, string> inputs = 3;
// (Required) A map from local names of outputs (unique only within this map,
// and likely embedded in the transform payload and serialized user code)
// to PCollection ids.
//
// The URN or payload for this transform node may clarify the type and
// relationship of these outputs. For example:
//
// - for a ParDo transform, these are tags on PCollections, which will be
// embedded in the DoFn.
//
map<string, string> outputs = 4;
// (Optional) Static display data for this PTransform application. If
// there is none, or it is not relevant (such as use by the Fn API)
// then it may be omitted.
DisplayData display_data = 6;
}
message StandardPTransforms {
enum Primitives {
// Represents Beam's parallel do operation.
// Payload: ParDoPayload.
// TODO(BEAM-3595): Change this to beam:transform:pardo:v1.
PAR_DO = 0 [(beam_urn) = "beam:transform:pardo:v1"];
// Represents Beam's flatten operation.
// Payload: None.
FLATTEN = 1 [(beam_urn) = "beam:transform:flatten:v1"];
// Represents Beam's group-by-key operation.
// Payload: None
GROUP_BY_KEY = 2 [(beam_urn) = "beam:transform:group_by_key:v1"];
// Represents the operation generating a single empty element.
IMPULSE = 3 [(beam_urn) = "beam:transform:impulse:v1"];
// Represents the Window.into() operation.
// Payload: WindowIntoPayload.
ASSIGN_WINDOWS = 4 [(beam_urn) = "beam:transform:window_into:v1"];
// Represents the TestStream.
// Payload: TestStreamPayload
TEST_STREAM = 5 [(beam_urn) = "beam:transform:teststream:v1"];
// Represents mapping of main input window onto side input window.
//
// Side input window mapping function:
// Input: KV<nonce, MainInputWindow>
// Output: KV<nonce, SideInputWindow>
//
// For each main input window, the side input window is returned. The
// nonce is used by a runner to associate each input with its output.
// The nonce is represented as an opaque set of bytes.
//
// Payload: WindowMappingFn from SideInputSpec.
MAP_WINDOWS = 6 [(beam_urn) = "beam:transform:map_windows:v1"];
// Used to merge windows during a GroupByKey.
//
// Window merging function:
// Input: KV<nonce, iterable<OriginalWindow>>
// Output: KV<nonce, KV<iterable<UnmergedOriginalWindow>, iterable<KV<MergedWindow, iterable<ConsumedOriginalWindow>>>>
//
// For each set of original windows, a list of all unmerged windows is
// output alongside a map of merged window to set of consumed windows.
// All original windows must be contained in either the unmerged original
// window set or one of the consumed original window sets. Each original
// window can only be part of one output set. The nonce is used by a runner
// to associate each input with its output. The nonce is represented as an
// opaque set of bytes.
//
// Payload: WindowFn from WindowingStrategy.
MERGE_WINDOWS = 7 [(beam_urn) = "beam:transform:merge_windows:v1"];
}
enum DeprecatedPrimitives {
// Represents the operation to read a Bounded or Unbounded source.
// Payload: ReadPayload.
READ = 0 [(beam_urn) = "beam:transform:read:v1"];
// Runners should move away from translating `CreatePCollectionView` and treat this as
// part of the translation for a `ParDo` side input.
CREATE_VIEW = 1 [(beam_urn) = "beam:transform:create_view:v1"];
}
enum Composites {
// Represents the Combine.perKey() operation.
// If this is produced by an SDK, it is assumed that the SDK understands
// each of CombineComponents.
// Payload: CombinePayload
COMBINE_PER_KEY = 0 [(beam_urn) = "beam:transform:combine_per_key:v1"];
// Represents the Combine.globally() operation.
// If this is produced by an SDK, it is assumed that the SDK understands
// each of CombineComponents.
// Payload: CombinePayload
COMBINE_GLOBALLY = 1 [(beam_urn) = "beam:transform:combine_globally:v1"];
// Represents the Reshuffle operation.
RESHUFFLE = 2 [(beam_urn) = "beam:transform:reshuffle:v1"];
// Less well-known. Payload: WriteFilesPayload.
WRITE_FILES = 3 [(beam_urn) = "beam:transform:write_files:v1"];
}
// Payload for all of these: CombinePayload
enum CombineComponents {
// Represents the Pre-Combine part of a lifted Combine Per Key, as described
// in the following document:
// https://s.apache.org/beam-runner-api-combine-model#heading=h.ta0g6ase8z07
// Payload: CombinePayload
COMBINE_PER_KEY_PRECOMBINE = 0 [(beam_urn) = "beam:transform:combine_per_key_precombine:v1"];
// Represents the Merge Accumulators part of a lifted Combine Per Key, as
// described in the following document:
// https://s.apache.org/beam-runner-api-combine-model#heading=h.jco9rvatld5m
// Payload: CombinePayload
COMBINE_PER_KEY_MERGE_ACCUMULATORS = 1 [(beam_urn) = "beam:transform:combine_per_key_merge_accumulators:v1"];
// Represents the Extract Outputs part of a lifted Combine Per Key, as
// described in the following document:
// https://s.apache.org/beam-runner-api-combine-model#heading=h.i9i6p8gtl6ku
// Payload: CombinePayload
COMBINE_PER_KEY_EXTRACT_OUTPUTS = 2 [(beam_urn) = "beam:transform:combine_per_key_extract_outputs:v1"];
// Represents the Combine Grouped Values transform, as described in the
// following document:
// https://s.apache.org/beam-runner-api-combine-model#heading=h.aj86ew4v1wk
// Payload: CombinePayload
COMBINE_GROUPED_VALUES = 3 [(beam_urn) = "beam:transform:combine_grouped_values:v1"];
}
// Payload for all of these: ParDoPayload containing the user's SDF
enum SplittableParDoComponents {
// Pairs the input element with its initial restriction.
// Input: element; output: KV(element, restriction).
PAIR_WITH_RESTRICTION = 0 [(beam_urn) = "beam:transform:sdf_pair_with_restriction:v1"];
// Splits the restriction inside an element/restriction pair.
// Input: KV(element, restriction); output: KV(element, restriction).
SPLIT_RESTRICTION = 1 [(beam_urn) = "beam:transform:sdf_split_restriction:v1"];
// Applies the DoFn to every element/restriction pair in a uniquely keyed
// collection, in a splittable fashion.
// Input: KV(bytes, KV(element, restriction)); output: DoFn's output.
// The first "bytes" is an opaque unique key using the standard bytes coder.
// Typically a runner would rewrite this into a runner-specific grouping
// operation supporting state and timers, followed by PROCESS_ELEMENTS,
// with some runner-specific glue code in between.
PROCESS_KEYED_ELEMENTS = 2 [(beam_urn) = "beam:transform:sdf_process_keyed_elements:v1"];
// Like PROCESS_KEYED_ELEMENTS, but without the unique key - just elements
// and restrictions.
// Input: KV(element, restriction); output: DoFn's output.
PROCESS_ELEMENTS = 3 [(beam_urn) = "beam:transform:sdf_process_elements:v1"];
// Splits the restriction of each element/restriction pair and returns the
// resulting splits, with a corresponding floating point size estimations
// for each.
// A reasonable value for size is the number of bytes expected to be
// produced by this (element, restriction) pair.
// Input: KV(element, restriction)
// Output: KV(KV(element, restriction), size))
SPLIT_AND_SIZE_RESTRICTIONS = 4 [(beam_urn) = "beam:transform:sdf_split_and_size_restrictions:v1"];
// Like PROCESS_ELEMENTS, but accepts the sized output produced by
// SPLIT_RESTRICTION_WITH_SIZING.
// Input: KV(KV(element, restriction), size); output: DoFn's output.
PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS = 5 [(beam_urn) = "beam:transform:sdf_process_sized_element_and_restrictions:v1"];
}
}
message StandardSideInputTypes {
enum Enum {
// Represents a view over a PCollection<V>.
//
// StateGetRequests performed on this side input must use
// StateKey.IterableSideInput.
ITERABLE = 0 [(beam_urn) = "beam:side_input:iterable:v1"];
// Represents a view over a PCollection<KV<K, V>>.
//
// StateGetRequests performed on this side input must use
// StateKey.IterableSideInput or StateKey.MultimapSideInput.
MULTIMAP = 1 [(beam_urn) = "beam:side_input:multimap:v1"];
}
}
// A PCollection!
message PCollection {
// (Required) A unique name for the PCollection.
//
// Ideally, this should be stable over multiple evolutions of a pipeline
// for the purposes of logging and associating pipeline state with a node,
// etc.
//
// If it is not stable, then the runner decides what will happen. But, most
// importantly, it must always be here, even if it is autogenerated.
string unique_name = 1;
// (Required) The id of the Coder for this PCollection.
string coder_id = 2;
// (Required) Whether this PCollection is bounded or unbounded
IsBounded.Enum is_bounded = 3;
// (Required) The id of the windowing strategy for this PCollection.
string windowing_strategy_id = 4;
// (Optional) Static display data for this PTransform application. If
// there is none, or it is not relevant (such as use by the Fn API)
// then it may be omitted.
DisplayData display_data = 5;
}
// The payload for the primitive ParDo transform.
message ParDoPayload {
// (Required) The SdkFunctionSpec of the DoFn.
SdkFunctionSpec do_fn = 1;
// (Required) Additional pieces of context the DoFn may require that
// are not otherwise represented in the payload.
// (may force runners to execute the ParDo differently)
repeated Parameter parameters = 2;
// (Optional) A mapping of local input names to side inputs, describing
// the expected access pattern.
map<string, SideInput> side_inputs = 3;
// (Optional) A mapping of local state names to state specifications.
map<string, StateSpec> state_specs = 4;
// (Optional) A mapping of local timer names to timer specifications.
map<string, TimerSpec> timer_specs = 5;
// Whether the DoFn is splittable
bool splittable = 6;
// (Required if splittable == true) Id of the restriction coder.
string restriction_coder_id = 7;
// (Optional) Only set when this ParDo can request bundle finalization.
bool requests_finalization = 8;
}
// Parameters that a UDF might require.
//
// The details of how a runner sends these parameters to the SDK harness
// are the subject of the Fn API.
//
// The details of how an SDK harness delivers them to the UDF is entirely
// up to the SDK. (for some SDKs there may be parameters that are not
// represented here if the runner doesn't need to do anything)
//
// Here, the parameters are simply indicators to the runner that they
// need to run the function a particular way.
//
// TODO: the evolution of the Fn API will influence what needs explicit
// representation here
message Parameter {
Type.Enum type = 1;
message Type {
enum Enum {
UNSPECIFIED = 0;
WINDOW = 1;
PIPELINE_OPTIONS = 2;
RESTRICTION_TRACKER = 3;
}
}
}
message StateSpec {
oneof spec {
ReadModifyWriteStateSpec read_modify_write_spec = 1;
BagStateSpec bag_spec = 2;
CombiningStateSpec combining_spec = 3;
MapStateSpec map_spec = 4;
SetStateSpec set_spec = 5;
}
}
message ReadModifyWriteStateSpec {
string coder_id = 1;
}
message BagStateSpec {
string element_coder_id = 1;
}
message CombiningStateSpec {
string accumulator_coder_id = 1;
SdkFunctionSpec combine_fn = 2;
}
message MapStateSpec {
string key_coder_id = 1;
string value_coder_id = 2;
}
message SetStateSpec {
string element_coder_id = 1;
}
message TimerSpec {
TimeDomain.Enum time_domain = 1;
string timer_coder_id = 2;
}
message IsBounded {
enum Enum {
UNSPECIFIED = 0;
UNBOUNDED = 1;
BOUNDED = 2;
}
}
// The payload for the primitive Read transform.
message ReadPayload {
// (Required) The SdkFunctionSpec of the source for this Read.
SdkFunctionSpec source = 1;
// (Required) Whether the source is bounded or unbounded
IsBounded.Enum is_bounded = 2;
// TODO: full audit of fields required by runners as opposed to SDK harness
}
// The payload for the WindowInto transform.
message WindowIntoPayload {
// (Required) The SdkFunctionSpec of the WindowFn.
SdkFunctionSpec window_fn = 1;
}
// The payload for the special-but-not-primitive Combine transform.
message CombinePayload {
// (Required) The SdkFunctionSpec of the CombineFn.
SdkFunctionSpec combine_fn = 1;
// (Required) A reference to the Coder to use for accumulators of the CombineFn
string accumulator_coder_id = 2;
}
// The payload for the test-only primitive TestStream
message TestStreamPayload {
// (Required) the coder for elements in the TestStream events
string coder_id = 1;
repeated Event events = 2;
message Event {
oneof event {
AdvanceWatermark watermark_event = 1;
AdvanceProcessingTime processing_time_event = 2;
AddElements element_event = 3;
}
message AdvanceWatermark {
int64 new_watermark = 1;
}
message AdvanceProcessingTime {
int64 advance_duration = 1;
}
message AddElements {
repeated TimestampedElement elements = 1;
}
}
message TimestampedElement {
bytes encoded_element = 1;
int64 timestamp = 2;
}
}
// The payload for the special-but-not-primitive WriteFiles transform.
message WriteFilesPayload {
// (Required) The SdkFunctionSpec of the FileBasedSink.
SdkFunctionSpec sink = 1;
// (Required) The format function.
SdkFunctionSpec format_function = 2;
bool windowed_writes = 3;
bool runner_determined_sharding = 4;
map<string, SideInput> side_inputs = 5;
}
// A coder, the binary format for serialization and deserialization of data in
// a pipeline.
message Coder {
// (Required) A specification for the coder, as a URN plus parameters. This
// may be a cross-language agreed-upon format, or it may be a "custom coder"
// that can only be used by a particular SDK. It does not include component
// coders, as it is beneficial for these to be comprehensible to a runner
// regardless of whether the binary format is agreed-upon.
FunctionSpec spec = 1;
// (Optional) If this coder is parametric, such as ListCoder(VarIntCoder),
// this is a list of the components. In order for encodings to be identical,
// the SdkFunctionSpec and all components must be identical, recursively.
repeated string component_coder_ids = 2;
}
message StandardCoders {
enum Enum {
// Components: None
BYTES = 0 [(beam_urn) = "beam:coder:bytes:v1"];
// Components: None
STRING_UTF8 = 10 [(beam_urn) = "beam:coder:string_utf8:v1"];
// Components: The key and value coder, in that order.
KV = 1 [(beam_urn) = "beam:coder:kv:v1"];
// Components: None
BOOL = 12 [(beam_urn) = "beam:coder:bool:v1"];
// Variable length Encodes a 64-bit integer.
// Components: None
VARINT = 2 [(beam_urn) = "beam:coder:varint:v1"];
// Encodes the floating point value as a big-endian 64-bit integer
// according to the IEEE 754 double format bit layout.
// Components: None
DOUBLE = 11 [(beam_urn) = "beam:coder:double:v1"];
// Encodes an iterable of elements.
//
// The encoding for an iterable [e1...eN] of known length N is
//
// fixed32(N)
// encode(e1) encode(e2) encode(e3) ... encode(eN)
//
// If the length is unknown, it is batched up into groups of size b1..bM
// and encoded as
//
// fixed32(-1)
// varInt64(b1) encode(e1) encode(e2) ... encode(e_b1)
// varInt64(b2) encode(e_(b1+1)) encode(e_(b1+2)) ... encode(e_(b1+b2))
// ...
// varInt64(bM) encode(e_(N-bM+1)) encode(e_(N-bM+2)) ... encode(eN)
// varInt64(0)
//
// Components: Coder for a single element.
ITERABLE = 3 [(beam_urn) = "beam:coder:iterable:v1"];
// Encodes a timer containing a timestamp and a user specified payload.
// The encoding is represented as: timestamp payload
// timestamp - a big endian 8 byte integer representing millis-since-epoch.
// The encoded representation is shifted so that the byte representation of
// negative values are lexicographically ordered before the byte representation
// of positive values. This is typically done by subtracting -9223372036854775808
// from the value and encoding it as a signed big endian integer. Example values:
//
// -9223372036854775808: 00 00 00 00 00 00 00 00
// -255: 7F FF FF FF FF FF FF 01
// -1: 7F FF FF FF FF FF FF FF
// 0: 80 00 00 00 00 00 00 00
// 1: 80 00 00 00 00 00 00 01
// 256: 80 00 00 00 00 00 01 00
// 9223372036854775807: FF FF FF FF FF FF FF FF
// payload - user defined data, uses the component coder
// Components: Coder for the payload.
TIMER = 4 [(beam_urn) = "beam:coder:timer:v1"];
/*
* The following coders are typically not specified manually by the user,
* but are used at runtime and must be supported by every SDK.
*/
// Components: None
INTERVAL_WINDOW = 5 [(beam_urn) = "beam:coder:interval_window:v1"];
// Components: The coder to attach a length prefix to
LENGTH_PREFIX = 6 [(beam_urn) = "beam:coder:length_prefix:v1"];
// Components: None
GLOBAL_WINDOW = 7 [(beam_urn) = "beam:coder:global_window:v1"];
// Encodes an element, the window the value is in, the timestamp of the element, and the pane
// of the element
// Components: The element coder and the window coder, in that order
WINDOWED_VALUE = 8 [(beam_urn) = "beam:coder:windowed_value:v1"];
// Encodes an iterable of elements, some of which may be stored elsewhere.
//
// The encoding for a state-backed iterable is the same as that for
// an iterable, but the final varInt64(0) terminating the set of batches
// may instead be replaced by
//
// varInt64(-1)
// varInt64(len(token))
// token
//
// where token is an opaque byte string that can be used to fetch the
// remainder of the iterable (e.g. over the state API).
//
// Components: Coder for a single element.
// Experimental.
STATE_BACKED_ITERABLE = 9 [(beam_urn) = "beam:coder:state_backed_iterable:v1"];
// Additional Standard Coders
// --------------------------
// The following coders are not required to be implemented for an SDK or
// runner to support the Beam model, but enable users to take advantage of
// schema-aware transforms.
// Encodes a "row", an element with a known schema, defined by an
// instance of Schema from schema.proto.
//
// A row is encoded as the concatenation of:
// - The number of attributes in the schema, encoded with
// beam:coder:varint:v1. This makes it possible to detect certain
// allowed schema changes (appending or removing columns) in
// long-running streaming pipelines.
// - A byte array representing a packed bitset indicating null fields (a
// 1 indicating a null) encoded with beam:coder:bytes:v1. The unused
// bits in the last byte must be set to 0. If there are no nulls an
// empty byte array is encoded.
// The two-byte bitset (not including the lenghth-prefix) for the row
// [NULL, 0, 0, 0, NULL, 0, 0, NULL, 0, NULL] would be
// [0b10010001, 0b00000010]
// - An encoding for each non-null field, concatenated together.
//
// Schema types are mapped to coders as follows:
// AtomicType:
// BYTE: not yet a standard coder (BEAM-7996)
// INT16: not yet a standard coder (BEAM-7996)
// INT32: beam:coder:varint:v1
// INT64: beam:coder:varint:v1
// FLOAT: not yet a standard coder (BEAM-7996)
// DOUBLE: beam:coder:double:v1
// STRING: beam:coder:string_utf8:v1
// BOOLEAN: beam:coder:bool:v1
// BYTES: beam:coder:bytes:v1
// ArrayType: beam:coder:iterable:v1 (always has a known length)
// MapType: not yet a standard coder (BEAM-7996)
// RowType: beam:coder:row:v1
// LogicalType: Uses the coder for its representation.
//
// The payload for RowCoder is an instance of Schema.
// Components: None
// Experimental.
ROW = 13 [(beam_urn) = "beam:coder:row:v1"];
}
}
// A windowing strategy describes the window function, triggering, allowed
// lateness, and accumulation mode for a PCollection.
//
// TODO: consider inlining field on PCollection
message WindowingStrategy {
// (Required) The SdkFunctionSpec of the UDF that assigns windows,
// merges windows, and shifts timestamps before they are
// combined according to the OutputTime.
SdkFunctionSpec window_fn = 1;
// (Required) Whether or not the window fn is merging.
//
// This knowledge is required for many optimizations.
MergeStatus.Enum merge_status = 2;
// (Required) The coder for the windows of this PCollection.
string window_coder_id = 3;
// (Required) The trigger to use when grouping this PCollection.
Trigger trigger = 4;
// (Required) The accumulation mode indicates whether new panes are a full
// replacement for prior panes or whether they are deltas to be combined
// with other panes (the combine should correspond to whatever the upstream
// grouping transform is).
AccumulationMode.Enum accumulation_mode = 5;
// (Required) The OutputTime specifies, for a grouping transform, how to
// compute the aggregate timestamp. The window_fn will first possibly shift
// it later, then the OutputTime takes the max, min, or ignores it and takes
// the end of window.
//
// This is actually only for input to grouping transforms, but since they
// may be introduced in runner-specific ways, it is carried along with the
// windowing strategy.
OutputTime.Enum output_time = 6;
// (Required) Indicate when output should be omitted upon window expiration.
ClosingBehavior.Enum closing_behavior = 7;
// (Required) The duration, in milliseconds, beyond the end of a window at
// which the window becomes droppable.
int64 allowed_lateness = 8;
// (Required) Indicate whether empty on-time panes should be omitted.
OnTimeBehavior.Enum OnTimeBehavior = 9;
// (Required) Whether or not the window fn assigns inputs to exactly one window
//
// This knowledge is required for some optimizations
bool assigns_to_one_window = 10;
}
// Whether or not a PCollection's WindowFn is non-merging, merging, or
// merging-but-already-merged, in which case a subsequent GroupByKey is almost
// always going to do something the user does not want
message MergeStatus {
enum Enum {
UNSPECIFIED = 0;
// The WindowFn does not require merging.
// Examples: global window, FixedWindows, SlidingWindows
NON_MERGING = 1;
// The WindowFn is merging and the PCollection has not had merging
// performed.
// Example: Sessions prior to a GroupByKey
NEEDS_MERGE = 2;
// The WindowFn is merging and the PCollection has had merging occur
// already.
// Example: Sessions after a GroupByKey
ALREADY_MERGED = 3;
}
}
// Whether or not subsequent outputs of aggregations should be entire
// replacement values or just the aggregation of inputs received since
// the prior output.
message AccumulationMode {
enum Enum {
UNSPECIFIED = 0;
// The aggregation is discarded when it is output
DISCARDING = 1;
// The aggregation is accumulated across outputs
ACCUMULATING = 2;
// The aggregation emits retractions when it is output
RETRACTING = 3;
}
}
// Controls whether or not an aggregating transform should output data
// when a window expires.
message ClosingBehavior {
enum Enum {
UNSPECIFIED = 0;
// Emit output when a window expires, whether or not there has been
// any new data since the last output.
EMIT_ALWAYS = 1;
// Only emit output when new data has arrives since the last output
EMIT_IF_NONEMPTY = 2;
}
}
// Controls whether or not an aggregating transform should output data
// when an on-time pane is empty.
message OnTimeBehavior {
enum Enum {
UNSPECIFIED = 0;
// Always fire the on-time pane. Even if there is no new data since
// the previous firing, an element will be produced.
FIRE_ALWAYS = 1;
// Only fire the on-time pane if there is new data since the previous firing.
FIRE_IF_NONEMPTY = 2;
}
}
// When a number of windowed, timestamped inputs are aggregated, the timestamp
// for the resulting output.
message OutputTime {
enum Enum {
UNSPECIFIED = 0;
// The output has the timestamp of the end of the window.
END_OF_WINDOW = 1;
// The output has the latest timestamp of the input elements since
// the last output.
LATEST_IN_PANE = 2;
// The output has the earliest timestamp of the input elements since
// the last output.
EARLIEST_IN_PANE = 3;
}
}
// The different time domains in the Beam model.
message TimeDomain {
enum Enum {
UNSPECIFIED = 0;
// Event time is time from the perspective of the data
EVENT_TIME = 1;
// Processing time is time from the perspective of the
// execution of your pipeline
PROCESSING_TIME = 2;
// Synchronized processing time is the minimum of the
// processing time of all pending elements.
//
// The "processing time" of an element refers to
// the local processing time at which it was emitted
SYNCHRONIZED_PROCESSING_TIME = 3;
}
}
// A small DSL for expressing when to emit new aggregations
// from a GroupByKey or CombinePerKey
//
// A trigger is described in terms of when it is _ready_ to permit output.
message Trigger {
// Ready when all subtriggers are ready.
message AfterAll {
repeated Trigger subtriggers = 1;
}
// Ready when any subtrigger is ready.
message AfterAny {
repeated Trigger subtriggers = 1;
}
// Starting with the first subtrigger, ready when the _current_ subtrigger
// is ready. After output, advances the current trigger by one.
message AfterEach {
repeated Trigger subtriggers = 1;
}
// Ready after the input watermark is past the end of the window.
//
// May have implicitly-repeated subtriggers for early and late firings.
// When the end of the window is reached, the trigger transitions between
// the subtriggers.
message AfterEndOfWindow {
// (Optional) A trigger governing output prior to the end of the window.
Trigger early_firings = 1;
// (Optional) A trigger governing output after the end of the window.
Trigger late_firings = 2;
}
// After input arrives, ready when the specified delay has passed.
message AfterProcessingTime {
// (Required) The transforms to apply to an arriving element's timestamp,
// in order
repeated TimestampTransform timestamp_transforms = 1;
}
// Ready whenever upstream processing time has all caught up with
// the arrival time of an input element
message AfterSynchronizedProcessingTime {
}
// The default trigger. Equivalent to Repeat { AfterEndOfWindow } but
// specially denoted to indicate the user did not alter the triggering.
message Default {
}
// Ready whenever the requisite number of input elements have arrived
message ElementCount {
int32 element_count = 1;
}
// Never ready. There will only be an ON_TIME output and a final
// output at window expiration.
message Never {
}
// Always ready. This can also be expressed as ElementCount(1) but
// is more explicit.
message Always {
}
// Ready whenever either of its subtriggers are ready, but finishes output
// when the finally subtrigger fires.
message OrFinally {
// (Required) Trigger governing main output; may fire repeatedly.
Trigger main = 1;
// (Required) Trigger governing termination of output.
Trigger finally = 2;
}
// Ready whenever the subtrigger is ready; resets state when the subtrigger
// completes.
message Repeat {
// (Require) Trigger that is run repeatedly.
Trigger subtrigger = 1;
}
// The full disjoint union of possible triggers.
oneof trigger {
AfterAll after_all = 1;
AfterAny after_any = 2;
AfterEach after_each = 3;
AfterEndOfWindow after_end_of_window = 4;
AfterProcessingTime after_processing_time = 5;
AfterSynchronizedProcessingTime after_synchronized_processing_time = 6;
Always always = 12;
Default default = 7;
ElementCount element_count = 8;
Never never = 9;
OrFinally or_finally = 10;
Repeat repeat = 11;
}
}
// A specification for a transformation on a timestamp.
//
// Primarily used by AfterProcessingTime triggers to transform
// the arrival time of input to a target time for firing.
message TimestampTransform {
oneof timestamp_transform {
Delay delay = 1;
AlignTo align_to = 2;
}
message Delay {
// (Required) The delay, in milliseconds.
int64 delay_millis = 1;
}
message AlignTo {
// (Required) A duration to which delays should be quantized
// in milliseconds.
int64 period = 3;
// (Required) An offset from 0 for the quantization specified by
// alignment_size, in milliseconds
int64 offset = 4;
}
}
// A specification for how to "side input" a PCollection.
message SideInput {