forked from typelevel/frameless
-
Notifications
You must be signed in to change notification settings - Fork 0
/
FramelessInternals.scala
39 lines (28 loc) · 1.41 KB
/
FramelessInternals.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.types.{ObjectType, UserDefinedType}
import scala.reflect.ClassTag
object FramelessInternals {
def objectTypeFor[A](implicit classTag: ClassTag[A]): ObjectType = ObjectType(classTag.runtimeClass)
def resolveExpr(ds: Dataset[_], colNames: Seq[String]): NamedExpression = {
ds.toDF.queryExecution.analyzed.resolve(colNames, ds.sparkSession.sessionState.analyzer.resolver).getOrElse {
throw new AnalysisException(
s"""Cannot resolve column name "$colNames" among (${ds.schema.fieldNames.mkString(", ")})""")
}
}
def expr(column: Column): Expression = column.expr
def column(column: Column): Expression = column.expr
def logicalPlan(ds: Dataset[_]): LogicalPlan = ds.logicalPlan
def executePlan(ds: Dataset[_], plan: LogicalPlan): QueryExecution = {
ds.sparkSession.sessionState.executePlan(plan)
}
def mkDataset[T](sqlContext: SQLContext, plan: LogicalPlan, encoder: Encoder[T]): Dataset[T] = {
new Dataset(sqlContext, plan, encoder)
}
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
Dataset.ofRows(sparkSession, logicalPlan)
}
type PublicUserDefinedType[A >: Null] = UserDefinedType[A]
}