While digging through Apache Spark SQL’s source code, what I realized is that looking deeper into its source code is a must for all those who are interested in implementation details of any SQL based data processing engine. Spark SQL has three major API modules: Datasource Api, Catalyst Api and Catalog Api. This series of posts concentrate mostly on the Catalyst correlating the query processing stages(Parsing, Binding, Validations, Optimization, Code generation, Query execution) to highlight all the entry points for further exploration.
PREREQUISITE: Overview of Spark Catalyst. You can start with this link.
NOTE: Code snippets in these posts refer to Spark SQL 2.0.1.
As we know since 2.0 SparkSession is the new entry point for the DataFrame through Dataset, is associated with a SessionState which wraps the SQLContext.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
org.apache.spark.sql.SparkSession /** * State isolated across sessions, including SQL configurations, temporary tables, registered * functions, and everything else that accepts a [[org.apache.spark.sql.internal.SQLConf]]. */ @transient private[sql] lazy val sessionState: SessionState = { SparkSession.reflect[SessionState, SparkSession]( SparkSession.sessionStateClassName(sparkContext.conf), self) } /** * A wrapped version of this session in the form of a [[SQLContext]], for backward compatibility. * * @since 2.0.0 */ @transient val sqlContext: SQLContext = new SQLContext(this) |
Every SessionState has instances of SQLConfig, SessionCalalog, Analyzer, SparkOptimizer, SparkSqlParser, SparkPlanner and many more.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
org.apache.spark.sql.internal.SessionState /** * Internal catalog for managing table and database states. */ lazy val catalog = new SessionCatalog( sparkSession.sharedState.externalCatalog, functionResourceLoader, functionRegistry, conf, newHadoopConf()) /** * Logical query plan analyzer for resolving unresolved attributes and relations. */ lazy val analyzer: Analyzer = { new Analyzer(catalog, conf) { override val extendedResolutionRules = PreprocessTableInsertion(conf) :: new FindDataSourceTable(sparkSession) :: DataSourceAnalysis(conf) :: (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil) override val extendedCheckRules = Seq(datasources.PreWriteCheck(conf, catalog)) } } /** * Logical query plan optimizer. */ lazy val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods) /** * Parser that extracts expressions, plans, table identifiers etc. from SQL texts. */ lazy val sqlParser: ParserInterface = new SparkSqlParser(conf) /** * Planner that converts optimized logical plans to physical plans. */ def planner: SparkPlanner = new SparkPlanner(sparkSession.sparkContext, conf, experimentalMethods.extraStrategies) |
Parsing:
Spark SQL uses ANTLR4 (see SqlBaseParser.scala have rules generated by SqlBase.g4) to parse the SQL statements into parse tree. SparkSqlParser mentioned in the above code snippet actually extends the AbstractSqlParser and uses an overridden version of AstBuilder i.e. SparkSqlAstBuilder to transform the tree into LogicalPlan, Expression, TableIdentifier and DataType.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
org.apache.spark.sql.catalyst.parser.ParserDriver abstract class AbstractSqlParser extends ParserInterface with Logging { /** Creates/Resolves DataType for a given SQL string. */ def parseDataType(sqlText: String): DataType = parse(sqlText) { parser => // TODO add this to the parser interface. astBuilder.visitSingleDataType(parser.singleDataType()) } /** Creates Expression for a given SQL string. */ override def parseExpression(sqlText: String): Expression = parse(sqlText) { parser => astBuilder.visitSingleExpression(parser.singleExpression()) } /** Creates TableIdentifier for a given SQL string. */ override def parseTableIdentifier(sqlText: String): TableIdentifier = parse(sqlText) { parser => astBuilder.visitSingleTableIdentifier(parser.singleTableIdentifier()) } /** Creates LogicalPlan for a given SQL string. */ override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser => astBuilder.visitSingleStatement(parser.singleStatement()) match { case plan: LogicalPlan => plan case _ => val position = Origin(None, None) throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position) } } |
Let take this command that you may have executed on Spark.
sparkSession.sql(“select * from sometable”)
SparkSession calls the parsePlan (shown above) to get the LogicalPlan while creating the DataFrame
1 2 3 4 5 6 7 8 9 10 |
org.apache.spark.sql.Dataset /** * Executes a SQL query using Spark, returning the result as a [[DataFrame]]. * The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'. * * @since 2.0.0 */ def sql(sqlText: String): DataFrame = { Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText)) } |
Also for something like this “val filteredDs = ds.filter(“age > 15″)”, parser calls the parseExpression to get the equivalent Catalyst Expression to construct the DataFrame i.e. Dataset[Row]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
org.apache.spark.sql.Dataset /** * Filters rows using the given SQL expression. * {{{ * peopleDs.filter("age > 15") * }}} * * @group typedrel * @since 1.6.0 */ def filter(conditionExpr: String): Dataset[T] = { filter(Column(sparkSession.sessionState.sqlParser.parseExpression(conditionExpr))) } |
parsetableIdentifier resolves the tablename with the help of catalog api to create the DataFrame.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
org.apache.spark.sql.SparkSession /** * Returns the specified table as a [[DataFrame]]. * * @since 2.0.0 */ def table(tableName: String): DataFrame = { table(sessionState.sqlParser.parseTableIdentifier(tableName)) } private[sql] def table(tableIdent: TableIdentifier): DataFrame = { Dataset.ofRows(self, sessionState.catalog<strong>.</strong>lookupRelation(tableIdent)) } |
So, every DataFrame associated with a LogicalPlan which is the initial QueryPlan that ultimately get transformed into actual RDD code.
Pingback: Spark 2.0 SQL source code tour part 2 : Catalyst query plan transformation – experience@imaginea