最近把一些sql执行从hive改到spark,发现执行更慢,sql主要是一些insert overwrite操作,从执行计划看到,用到InsertIntoHiveTable
spark-sql> explain insert overwrite table test2 select * from test1;
== Physical Plan ==InsertIntoHiveTable MetastoreRelation temp, test2, true, false+- HiveTableScan [id#20], MetastoreRelation temp, test1Time taken: 0.404 seconds, Fetched 1 row(s)
跟进代码
org.apache.spark.sql.hive.execution.InsertIntoHiveTableprotected override def doExecute(): RDD[InternalRow] = { sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1) } /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the * `org.apache.hadoop.hive.serde2.SerDe` and the * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition. * * Note: this is run once and then kept to avoid double insertions. */ protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc val tableLocation = table.hiveQlTable.getDataLocation val tmpLocation = getExternalTmpPath(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean if (isCompressed) { // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", // and "mapred.output.compression.type" have no impact on ORC because it uses table properties // to store compression information. hadoopConf.set("mapred.output.compress", "true") fileSinkConf.setCompressed(true) fileSinkConf.setCompressCodec(hadoopConf.get("mapred.output.compression.codec")) fileSinkConf.setCompressType(hadoopConf.get("mapred.output.compression.type")) } val numDynamicPartitions = partition.values.count(_.isEmpty) val numStaticPartitions = partition.values.count(_.nonEmpty) val partitionSpec = partition.map { case (key, Some(value)) => key -> value case (key, None) => key -> "" } // All partition column names in the format of "/ /..." val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty) // By this time, the partition map must match the table's partition columns if (partitionColumnNames.toSet != partition.keySet) { throw new SparkException( s"""Requested partitioning does not match the ${table.tableName} table: |Requested partitions: ${partition.keys.mkString(",")} |Table partitions: ${table.partitionKeys.map(_.name).mkString(",")}""".stripMargin) } // Validate partition spec if there exist any dynamic partitions if (numDynamicPartitions > 0) { // Report error if dynamic partitioning is not enabled if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) { throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg) } // Report error if dynamic partition strict mode is on but no static partition is found if (numStaticPartitions == 0 && hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) { throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg) } // Report error if any static partition appears after a dynamic partition val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty) if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) { throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) } } val jobConf = new JobConf(hadoopConf) val jobConfSer = new SerializableJobConf(jobConf) // When speculation is on and output committer class name contains "Direct", we should warn // users that they may loss data if they are using a direct output committer. val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false) val outputCommitterClass = jobConf.get("mapred.output.committer.class", "") if (speculationEnabled && outputCommitterClass.contains("Direct")) { val warningMessage = s"$outputCommitterClass may be an output committer that writes data directly to " + "the final location. Because speculation is enabled, this output committer may " + "cause data loss (see the case in SPARK-10063). If possible, please use an output " + "committer that does not have this behavior (e.g. FileOutputCommitter)." logWarning(warningMessage) } val writerContainer = if (numDynamicPartitions > 0) { val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions) new SparkHiveDynamicPartitionWriterContainer( jobConf, fileSinkConf, dynamicPartColNames, child.output) } else { new SparkHiveWriterContainer( jobConf, fileSinkConf, child.output) } @transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) val outputPath = FileOutputFormat.getOutputPath(jobConf) // TODO: Correctly set holdDDLTime. // In most of the time, we should have holdDDLTime = false. // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint. val holdDDLTime = false if (partition.nonEmpty) { if (numDynamicPartitions > 0) { externalCatalog.loadDynamicPartitions( db = table.catalogTable.database, table = table.catalogTable.identifier.table, outputPath.toString, partitionSpec, overwrite, numDynamicPartitions, holdDDLTime = holdDDLTime) } else { // scalastyle:off // ifNotExists is only valid with static partition, refer to // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries // scalastyle:on val oldPart = externalCatalog.getPartitionOption( table.catalogTable.database, table.catalogTable.identifier.table, partitionSpec) var doHiveOverwrite = overwrite if (oldPart.isEmpty || !ifNotExists) { // SPARK-18107: Insert overwrite runs much slower than hive-client. // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive // version and we may not want to catch up new Hive version every time. We delete the // Hive partition first and then load data file into the Hive partition. if (oldPart.nonEmpty && overwrite) { oldPart.get.storage.locationUri.foreach { uri => val partitionPath = new Path(uri) val fs = partitionPath.getFileSystem(hadoopConf) if (fs.exists(partitionPath)) { if (!fs.delete(partitionPath, true)) { throw new RuntimeException( "Cannot remove partition directory '" + partitionPath.toString) } // Don't let Hive do overwrite operation since it is slower. doHiveOverwrite = false } } } // inheritTableSpecs is set to true. It should be set to false for an IMPORT query // which is currently considered as a Hive native command. val inheritTableSpecs = true externalCatalog.loadPartition( table.catalogTable.database, table.catalogTable.identifier.table, outputPath.toString, partitionSpec, isOverwrite = doHiveOverwrite, holdDDLTime = holdDDLTime, inheritTableSpecs = inheritTableSpecs) } } } else { externalCatalog.loadTable( table.catalogTable.database, table.catalogTable.identifier.table, outputPath.toString, // TODO: URI overwrite, holdDDLTime) } // Attempt to delete the staging directory and the inclusive files. If failed, the files are // expected to be dropped at the normal termination of VM since deleteOnExit is used. try { createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) } } catch { case NonFatal(e) => logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) } // un-cache this table. sqlContext.sparkSession.catalog.uncacheTable(table.catalogTable.identifier.quotedString) sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier) // It would be nice to just return the childRdd unchanged so insert operations could be chained, // however for now we return an empty list to simplify compatibility checks with hive, which // does not return anything for insert operations. // TODO: implement hive compatibility as rules. Seq.empty[InternalRow] }
insert overwrite 执行分为三步,一个是select,一个是write,一个是load,前边两步没什么问题,主要是最后一步load,以loadPartition为例看下执行过程:
org.apache.spark.sql.hive.HiveExternalCatalog
override def loadPartition( db: String, table: String, loadPath: String, partition: TablePartitionSpec, isOverwrite: Boolean, holdDDLTime: Boolean, inheritTableSpecs: Boolean): Unit = withClient { requireTableExists(db, table) val orderedPartitionSpec = new util.LinkedHashMap[String, String]() getTable(db, table).partitionColumnNames.foreach { colName => // Hive metastore is not case preserving and keeps partition columns with lower cased names, // and Hive will validate the column names in partition spec to make sure they are partition // columns. Here we Lowercase the column names before passing the partition spec to Hive // client, to satisfy Hive. orderedPartitionSpec.put(colName.toLowerCase, partition(colName)) } client.loadPartition( loadPath, db, table, orderedPartitionSpec, isOverwrite, holdDDLTime, inheritTableSpecs) }
这里会调用HiveClientImpl.loadPartition
org.apache.spark.sql.hive.client.HiveClientImpl
def loadPartition( loadPath: String, dbName: String, tableName: String, partSpec: java.util.LinkedHashMap[String, String], replace: Boolean, holdDDLTime: Boolean, inheritTableSpecs: Boolean): Unit = withHiveState { val hiveTable = client.getTable(dbName, tableName, true /* throw exception */) shim.loadPartition( client, new Path(loadPath), // TODO: Use URI s"$dbName.$tableName", partSpec, replace, holdDDLTime, inheritTableSpecs, isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories) }
这里会调用Shim_v0_12.loadPartition
org.apache.spark.sql.hive.client.Shim_v0_12
override def loadPartition( hive: Hive, loadPath: Path, tableName: String, partSpec: JMap[String, String], replace: Boolean, holdDDLTime: Boolean, inheritTableSpecs: Boolean, isSkewedStoreAsSubdir: Boolean): Unit = { loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean) } private lazy val loadPartitionMethod = findMethod( classOf[Hive], "loadPartition", classOf[Path], classOf[String], classOf[JMap[String, String]], JBoolean.TYPE, JBoolean.TYPE, JBoolean.TYPE, JBoolean.TYPE)
这里会反射调用hive的类Hive.loadPartition
org.apache.hadoop.hive.ql.metadata.Hive (1.2版本)
public void loadPartition(Path loadPath, String tableName, MappartSpec, boolean replace, boolean holdDDLTime, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid) throws HiveException { Table tbl = this.getTable(tableName); this.loadPartition(loadPath, tbl, partSpec, replace, holdDDLTime, inheritTableSpecs, isSkewedStoreAsSubdir, isSrcLocal, isAcid); } public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, boolean replace, boolean holdDDLTime, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); Partition newTPart = null; try { Partition oldPart = this.getPartition(tbl, partSpec, false); Path oldPartPath = null; if (oldPart != null) { oldPartPath = oldPart.getDataLocation(); } Path newPartPath = null; FileSystem oldPartPathFS; if (inheritTableSpecs) { Path partPath = new Path(tbl.getDataLocation(), Warehouse.makePartPath(partSpec)); newPartPath = new Path(tblDataLocationPath.toUri().getScheme(), tblDataLocationPath.toUri().getAuthority(), partPath.toUri().getPath()); if (oldPart != null) { oldPartPathFS = oldPartPath.getFileSystem(this.getConf()); FileSystem loadPathFS = loadPath.getFileSystem(this.getConf()); if (FileUtils.equalsFileSystem(oldPartPathFS, loadPathFS)) { newPartPath = oldPartPath; } } } else { newPartPath = oldPartPath; } List newFiles = null; if (replace) { replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, this.getConf(), isSrcLocal); } else { newFiles = new ArrayList(); oldPartPathFS = tbl.getDataLocation().getFileSystem(this.conf); copyFiles(this.conf, loadPath, newPartPath, oldPartPathFS, isSrcLocal, isAcid, newFiles); } boolean forceCreate = !holdDDLTime; newTPart = this.getPartition(tbl, partSpec, forceCreate, newPartPath.toString(), inheritTableSpecs, newFiles); if (!holdDDLTime && isSkewedStoreAsSubdir) { org.apache.hadoop.hive.metastore.api.Partition newCreatedTpart = newTPart.getTPartition(); SkewedInfo skewedInfo = newCreatedTpart.getSd().getSkewedInfo(); Map
, String> skewedColValueLocationMaps = this.constructListBucketingLocationMap(newPartPath, skewedInfo); skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps); newCreatedTpart.getSd().setSkewedInfo(skewedInfo); this.alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newCreatedTpart)); this.getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs, newFiles); return new Partition(tbl, newCreatedTpart); } else { return newTPart; } } catch (IOException var20) { LOG.error(StringUtils.stringifyException(var20)); throw new HiveException(var20); } catch (MetaException var21) { LOG.error(StringUtils.stringifyException(var21)); throw new HiveException(var21); } catch (InvalidOperationException var22) { LOG.error(StringUtils.stringifyException(var22)); throw new HiveException(var22); } } protected static void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf, boolean isSrcLocal) throws HiveException { try { FileSystem destFs = destf.getFileSystem(conf); boolean inheritPerms = HiveConf.getBoolVar(conf, ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); FileSystem srcFs; FileStatus[] srcs; try { srcFs = srcf.getFileSystem(conf); srcs = srcFs.globStatus(srcf); } catch (IOException var20) { throw new HiveException("Getting globStatus " + srcf.toString(), var20); } if (srcs == null) { LOG.info("No sources specified to move: " + srcf); } else { List
> result = checkPaths(conf, destFs, srcs, srcFs, destf, true); if (oldPath != null) { try { FileSystem fs2 = oldPath.getFileSystem(conf); if (fs2.exists(oldPath)) { if (FileUtils.isSubDir(oldPath, destf, fs2)) { FileUtils.trashFilesUnderDir(fs2, oldPath, conf); } if (inheritPerms) { inheritFromTable(tablePath, destf, conf, destFs); } } } catch (Exception var19) { LOG.warn("Directory " + oldPath.toString() + " cannot be removed: " + var19, var19); } } if (srcs.length == 1 && srcs[0].isDir()) { Path destfp = destf.getParent(); if (!destFs.exists(destfp)) { boolean success = destFs.mkdirs(destfp); if (!success) { LOG.warn("Error creating directory " + destf.toString()); } if (inheritPerms && success) { inheritFromTable(tablePath, destfp, conf, destFs); } } Iterator i$ = result.iterator(); while(i$.hasNext()) { List sdpairs = (List)i$.next(); Iterator i$ = sdpairs.iterator(); while(i$.hasNext()) { Path[] sdpair = (Path[])i$.next(); Path destParent = sdpair[1].getParent(); FileSystem destParentFs = destParent.getFileSystem(conf); if (!destParentFs.isDirectory(destParent)) { boolean success = destFs.mkdirs(destParent); if (!success) { LOG.warn("Error creating directory " + destParent); } if (inheritPerms && success) { inheritFromTable(tablePath, destParent, conf, destFs); } } if (!moveFile(conf, sdpair[0], sdpair[1], destFs, true, isSrcLocal)) { throw new IOException("Unable to move file/directory from " + sdpair[0] + " to " + sdpair[1]); } } } } else { if (!destFs.exists(destf)) { boolean success = destFs.mkdirs(destf); if (!success) { LOG.warn("Error creating directory " + destf.toString()); } if (inheritPerms && success) { inheritFromTable(tablePath, destf, conf, destFs); } } Iterator i$ = result.iterator(); while(i$.hasNext()) { List sdpairs = (List)i$.next(); Iterator i$ = sdpairs.iterator(); while(i$.hasNext()) { Path[] sdpair = (Path[])i$.next(); if (!moveFile(conf, sdpair[0], sdpair[1], destFs, true, isSrcLocal)) { throw new IOException("Error moving: " + sdpair[0] + " into: " + sdpair[1]); } } } } } } catch (IOException var21) { throw new HiveException(var21.getMessage(), var21); } } public static boolean trashFilesUnderDir(FileSystem fs, Path f, Configuration conf) throws FileNotFoundException, IOException { FileStatus[] statuses = fs.listStatus(f, HIDDEN_FILES_PATH_FILTER); boolean result = true; FileStatus[] arr$ = statuses; int len$ = statuses.length; for(int i$ = 0; i$ < len$; ++i$) { FileStatus status = arr$[i$]; result &= moveToTrash(fs, status.getPath(), conf); } return result; }
hive在执行loadPartition的时候,如果分区目录已经存在,会调用replaceFiles,replaceFiles会调用trashFilesUnderDir,trashFilesUnderDir里会逐个将文件放到回收站;
spark执行loadPartition的时候,直接反射调用hive的逻辑,为什么还会比hive执行慢很多呢?
这时注意到hive用的版本是2.1,spark2.1.1里依赖的hive版本是1.2,对比hive1.2和hive2.1之间的代码发现,确实有差别,以下是hive2.1的代码:
org.apache.hadoop.hive.ql.metadata.Hive(2.1版本)
/** * Trashes or deletes all files under a directory. Leaves the directory as is. * @param fs FileSystem to use * @param f path of directory * @param conf hive configuration * @param forceDelete whether to force delete files if trashing does not succeed * @return true if deletion successful * @throws IOException */ private boolean trashFilesUnderDir(final FileSystem fs, Path f, final Configuration conf) throws IOException { FileStatus[] statuses = fs.listStatus(f, FileUtils.HIDDEN_FILES_PATH_FILTER); boolean result = true; final List> futures = new LinkedList<>(); final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build()) : null; final SessionState parentSession = SessionState.get(); for (final FileStatus status : statuses) { if (null == pool) { result &= FileUtils.moveToTrash(fs, status.getPath(), conf); } else { futures.add(pool.submit(new Callable () { @Override public Boolean call() throws Exception { SessionState.setCurrentSessionState(parentSession); return FileUtils.moveToTrash(fs, status.getPath(), conf); } })); } } if (null != pool) { pool.shutdown(); for (Future future : futures) { try { result &= future.get(); } catch (InterruptedException | ExecutionException e) { LOG.error("Failed to delete: ",e); pool.shutdownNow(); throw new IOException(e); } } } return result; }
可以看到在hive2.1里删除文件用到了线程池,而在hive1.2里是在for循环里串行删除,所以当文件很多时,hive2.1比hive1.2(即spark2.1.1)就会快非常多;
spark依赖hive的方式是直接反射调用,由于hive1.2和hive2.1很多类的方法接口都有调整,很难升级,所以遇到这个问题只能通过修改spark里Hive.trashFilesUnderDir代码,同样改为线程池的方式来删除文件,问题解决;