Skip to content

Commit

Permalink
[FLINK-6138] [table] Create the ListStateDescriptor with the aggregat…
Browse files Browse the repository at this point in the history
…ionStateType instead of a serializer.

this closes apache#3581
  • Loading branch information
sunjincheng121 authored and wuchong committed Mar 21, 2017
1 parent 17dd915 commit e141355
Showing 1 changed file with 1 addition and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ class UnboundedNonPartitionedProcessingOverProcessFunction(
}

override def initializeState(context: FunctionInitializationContext): Unit = {
val stateSerializer =
aggregationStateType.createSerializer(getRuntimeContext.getExecutionConfig)
val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", stateSerializer)
val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", aggregationStateType)
state = context.getOperatorStateStore.getOperatorState(accumulatorsDescriptor)
}
}

0 comments on commit e141355

Please sign in to comment.