博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【原创】大叔问题定位分享(21)spark执行insert overwrite非常慢,比hive还要慢
阅读量:4957 次
发布时间:2019-06-12

本文共 21248 字,大约阅读时间需要 70 分钟。

最近把一些sql执行从hive改到spark,发现执行更慢,sql主要是一些insert overwrite操作,从执行计划看到,用到InsertIntoHiveTable

spark-sql> explain insert overwrite table test2 select * from test1;

== Physical Plan ==
InsertIntoHiveTable MetastoreRelation temp, test2, true, false
+- HiveTableScan [id#20], MetastoreRelation temp, test1
Time taken: 0.404 seconds, Fetched 1 row(s)

跟进代码

org.apache.spark.sql.hive.execution.InsertIntoHiveTable

protected override def doExecute(): RDD[InternalRow] = {    sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1)  }  /**   * Inserts all the rows in the table into Hive.  Row objects are properly serialized with the   * `org.apache.hadoop.hive.serde2.SerDe` and the   * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition.   *   * Note: this is run once and then kept to avoid double insertions.   */  protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {    // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer    // instances within the closure, since Serializer is not serializable while TableDesc is.    val tableDesc = table.tableDesc    val tableLocation = table.hiveQlTable.getDataLocation    val tmpLocation = getExternalTmpPath(tableLocation)    val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)    val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean    if (isCompressed) {      // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",      // and "mapred.output.compression.type" have no impact on ORC because it uses table properties      // to store compression information.      hadoopConf.set("mapred.output.compress", "true")      fileSinkConf.setCompressed(true)      fileSinkConf.setCompressCodec(hadoopConf.get("mapred.output.compression.codec"))      fileSinkConf.setCompressType(hadoopConf.get("mapred.output.compression.type"))    }    val numDynamicPartitions = partition.values.count(_.isEmpty)    val numStaticPartitions = partition.values.count(_.nonEmpty)    val partitionSpec = partition.map {      case (key, Some(value)) => key -> value      case (key, None) => key -> ""    }    // All partition column names in the format of "
/
/..." val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty) // By this time, the partition map must match the table's partition columns if (partitionColumnNames.toSet != partition.keySet) { throw new SparkException( s"""Requested partitioning does not match the ${table.tableName} table: |Requested partitions: ${partition.keys.mkString(",")} |Table partitions: ${table.partitionKeys.map(_.name).mkString(",")}""".stripMargin) } // Validate partition spec if there exist any dynamic partitions if (numDynamicPartitions > 0) { // Report error if dynamic partitioning is not enabled if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) { throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg) } // Report error if dynamic partition strict mode is on but no static partition is found if (numStaticPartitions == 0 && hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) { throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg) } // Report error if any static partition appears after a dynamic partition val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty) if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) { throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) } } val jobConf = new JobConf(hadoopConf) val jobConfSer = new SerializableJobConf(jobConf) // When speculation is on and output committer class name contains "Direct", we should warn // users that they may loss data if they are using a direct output committer. val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false) val outputCommitterClass = jobConf.get("mapred.output.committer.class", "") if (speculationEnabled && outputCommitterClass.contains("Direct")) { val warningMessage = s"$outputCommitterClass may be an output committer that writes data directly to " + "the final location. Because speculation is enabled, this output committer may " + "cause data loss (see the case in SPARK-10063). If possible, please use an output " + "committer that does not have this behavior (e.g. FileOutputCommitter)." logWarning(warningMessage) } val writerContainer = if (numDynamicPartitions > 0) { val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions) new SparkHiveDynamicPartitionWriterContainer( jobConf, fileSinkConf, dynamicPartColNames, child.output) } else { new SparkHiveWriterContainer( jobConf, fileSinkConf, child.output) } @transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) val outputPath = FileOutputFormat.getOutputPath(jobConf) // TODO: Correctly set holdDDLTime. // In most of the time, we should have holdDDLTime = false. // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint. val holdDDLTime = false if (partition.nonEmpty) { if (numDynamicPartitions > 0) { externalCatalog.loadDynamicPartitions( db = table.catalogTable.database, table = table.catalogTable.identifier.table, outputPath.toString, partitionSpec, overwrite, numDynamicPartitions, holdDDLTime = holdDDLTime) } else { // scalastyle:off // ifNotExists is only valid with static partition, refer to // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries // scalastyle:on val oldPart = externalCatalog.getPartitionOption( table.catalogTable.database, table.catalogTable.identifier.table, partitionSpec) var doHiveOverwrite = overwrite if (oldPart.isEmpty || !ifNotExists) { // SPARK-18107: Insert overwrite runs much slower than hive-client. // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive // version and we may not want to catch up new Hive version every time. We delete the // Hive partition first and then load data file into the Hive partition. if (oldPart.nonEmpty && overwrite) { oldPart.get.storage.locationUri.foreach { uri => val partitionPath = new Path(uri) val fs = partitionPath.getFileSystem(hadoopConf) if (fs.exists(partitionPath)) { if (!fs.delete(partitionPath, true)) { throw new RuntimeException( "Cannot remove partition directory '" + partitionPath.toString) } // Don't let Hive do overwrite operation since it is slower. doHiveOverwrite = false } } } // inheritTableSpecs is set to true. It should be set to false for an IMPORT query // which is currently considered as a Hive native command. val inheritTableSpecs = true externalCatalog.loadPartition( table.catalogTable.database, table.catalogTable.identifier.table, outputPath.toString, partitionSpec, isOverwrite = doHiveOverwrite, holdDDLTime = holdDDLTime, inheritTableSpecs = inheritTableSpecs) } } } else { externalCatalog.loadTable( table.catalogTable.database, table.catalogTable.identifier.table, outputPath.toString, // TODO: URI overwrite, holdDDLTime) } // Attempt to delete the staging directory and the inclusive files. If failed, the files are // expected to be dropped at the normal termination of VM since deleteOnExit is used. try { createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) } } catch { case NonFatal(e) => logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) } // un-cache this table. sqlContext.sparkSession.catalog.uncacheTable(table.catalogTable.identifier.quotedString) sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier) // It would be nice to just return the childRdd unchanged so insert operations could be chained, // however for now we return an empty list to simplify compatibility checks with hive, which // does not return anything for insert operations. // TODO: implement hive compatibility as rules. Seq.empty[InternalRow] }

insert overwrite 执行分为三步,一个是select,一个是write,一个是load,前边两步没什么问题,主要是最后一步load,以loadPartition为例看下执行过程:

org.apache.spark.sql.hive.HiveExternalCatalog

override def loadPartition(      db: String,      table: String,      loadPath: String,      partition: TablePartitionSpec,      isOverwrite: Boolean,      holdDDLTime: Boolean,      inheritTableSpecs: Boolean): Unit = withClient {    requireTableExists(db, table)    val orderedPartitionSpec = new util.LinkedHashMap[String, String]()    getTable(db, table).partitionColumnNames.foreach { colName =>      // Hive metastore is not case preserving and keeps partition columns with lower cased names,      // and Hive will validate the column names in partition spec to make sure they are partition      // columns. Here we Lowercase the column names before passing the partition spec to Hive      // client, to satisfy Hive.      orderedPartitionSpec.put(colName.toLowerCase, partition(colName))    }    client.loadPartition(      loadPath,      db,      table,      orderedPartitionSpec,      isOverwrite,      holdDDLTime,      inheritTableSpecs)  }

这里会调用HiveClientImpl.loadPartition

org.apache.spark.sql.hive.client.HiveClientImpl

def loadPartition(      loadPath: String,      dbName: String,      tableName: String,      partSpec: java.util.LinkedHashMap[String, String],      replace: Boolean,      holdDDLTime: Boolean,      inheritTableSpecs: Boolean): Unit = withHiveState {    val hiveTable = client.getTable(dbName, tableName, true /* throw exception */)    shim.loadPartition(      client,      new Path(loadPath), // TODO: Use URI      s"$dbName.$tableName",      partSpec,      replace,      holdDDLTime,      inheritTableSpecs,      isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories)  }

这里会调用Shim_v0_12.loadPartition

org.apache.spark.sql.hive.client.Shim_v0_12

override def loadPartition(      hive: Hive,      loadPath: Path,      tableName: String,      partSpec: JMap[String, String],      replace: Boolean,      holdDDLTime: Boolean,      inheritTableSpecs: Boolean,      isSkewedStoreAsSubdir: Boolean): Unit = {    loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,      holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean)  }  private lazy val loadPartitionMethod =    findMethod(      classOf[Hive],      "loadPartition",      classOf[Path],      classOf[String],      classOf[JMap[String, String]],      JBoolean.TYPE,      JBoolean.TYPE,      JBoolean.TYPE,      JBoolean.TYPE)

这里会反射调用hive的类Hive.loadPartition

org.apache.hadoop.hive.ql.metadata.Hive (1.2版本)

public void loadPartition(Path loadPath, String tableName, Map
partSpec, boolean replace, boolean holdDDLTime, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid) throws HiveException { Table tbl = this.getTable(tableName); this.loadPartition(loadPath, tbl, partSpec, replace, holdDDLTime, inheritTableSpecs, isSkewedStoreAsSubdir, isSrcLocal, isAcid); } public Partition loadPartition(Path loadPath, Table tbl, Map
partSpec, boolean replace, boolean holdDDLTime, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); Partition newTPart = null; try { Partition oldPart = this.getPartition(tbl, partSpec, false); Path oldPartPath = null; if (oldPart != null) { oldPartPath = oldPart.getDataLocation(); } Path newPartPath = null; FileSystem oldPartPathFS; if (inheritTableSpecs) { Path partPath = new Path(tbl.getDataLocation(), Warehouse.makePartPath(partSpec)); newPartPath = new Path(tblDataLocationPath.toUri().getScheme(), tblDataLocationPath.toUri().getAuthority(), partPath.toUri().getPath()); if (oldPart != null) { oldPartPathFS = oldPartPath.getFileSystem(this.getConf()); FileSystem loadPathFS = loadPath.getFileSystem(this.getConf()); if (FileUtils.equalsFileSystem(oldPartPathFS, loadPathFS)) { newPartPath = oldPartPath; } } } else { newPartPath = oldPartPath; } List
newFiles = null; if (replace) { replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, this.getConf(), isSrcLocal); } else { newFiles = new ArrayList(); oldPartPathFS = tbl.getDataLocation().getFileSystem(this.conf); copyFiles(this.conf, loadPath, newPartPath, oldPartPathFS, isSrcLocal, isAcid, newFiles); } boolean forceCreate = !holdDDLTime; newTPart = this.getPartition(tbl, partSpec, forceCreate, newPartPath.toString(), inheritTableSpecs, newFiles); if (!holdDDLTime && isSkewedStoreAsSubdir) { org.apache.hadoop.hive.metastore.api.Partition newCreatedTpart = newTPart.getTPartition(); SkewedInfo skewedInfo = newCreatedTpart.getSd().getSkewedInfo(); Map
, String> skewedColValueLocationMaps = this.constructListBucketingLocationMap(newPartPath, skewedInfo); skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps); newCreatedTpart.getSd().setSkewedInfo(skewedInfo); this.alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newCreatedTpart)); this.getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs, newFiles); return new Partition(tbl, newCreatedTpart); } else { return newTPart; } } catch (IOException var20) { LOG.error(StringUtils.stringifyException(var20)); throw new HiveException(var20); } catch (MetaException var21) { LOG.error(StringUtils.stringifyException(var21)); throw new HiveException(var21); } catch (InvalidOperationException var22) { LOG.error(StringUtils.stringifyException(var22)); throw new HiveException(var22); } } protected static void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf, boolean isSrcLocal) throws HiveException { try { FileSystem destFs = destf.getFileSystem(conf); boolean inheritPerms = HiveConf.getBoolVar(conf, ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); FileSystem srcFs; FileStatus[] srcs; try { srcFs = srcf.getFileSystem(conf); srcs = srcFs.globStatus(srcf); } catch (IOException var20) { throw new HiveException("Getting globStatus " + srcf.toString(), var20); } if (srcs == null) { LOG.info("No sources specified to move: " + srcf); } else { List
> result = checkPaths(conf, destFs, srcs, srcFs, destf, true); if (oldPath != null) { try { FileSystem fs2 = oldPath.getFileSystem(conf); if (fs2.exists(oldPath)) { if (FileUtils.isSubDir(oldPath, destf, fs2)) { FileUtils.trashFilesUnderDir(fs2, oldPath, conf); } if (inheritPerms) { inheritFromTable(tablePath, destf, conf, destFs); } } } catch (Exception var19) { LOG.warn("Directory " + oldPath.toString() + " cannot be removed: " + var19, var19); } } if (srcs.length == 1 && srcs[0].isDir()) { Path destfp = destf.getParent(); if (!destFs.exists(destfp)) { boolean success = destFs.mkdirs(destfp); if (!success) { LOG.warn("Error creating directory " + destf.toString()); } if (inheritPerms && success) { inheritFromTable(tablePath, destfp, conf, destFs); } } Iterator i$ = result.iterator(); while(i$.hasNext()) { List
sdpairs = (List)i$.next(); Iterator i$ = sdpairs.iterator(); while(i$.hasNext()) { Path[] sdpair = (Path[])i$.next(); Path destParent = sdpair[1].getParent(); FileSystem destParentFs = destParent.getFileSystem(conf); if (!destParentFs.isDirectory(destParent)) { boolean success = destFs.mkdirs(destParent); if (!success) { LOG.warn("Error creating directory " + destParent); } if (inheritPerms && success) { inheritFromTable(tablePath, destParent, conf, destFs); } } if (!moveFile(conf, sdpair[0], sdpair[1], destFs, true, isSrcLocal)) { throw new IOException("Unable to move file/directory from " + sdpair[0] + " to " + sdpair[1]); } } } } else { if (!destFs.exists(destf)) { boolean success = destFs.mkdirs(destf); if (!success) { LOG.warn("Error creating directory " + destf.toString()); } if (inheritPerms && success) { inheritFromTable(tablePath, destf, conf, destFs); } } Iterator i$ = result.iterator(); while(i$.hasNext()) { List
sdpairs = (List)i$.next(); Iterator i$ = sdpairs.iterator(); while(i$.hasNext()) { Path[] sdpair = (Path[])i$.next(); if (!moveFile(conf, sdpair[0], sdpair[1], destFs, true, isSrcLocal)) { throw new IOException("Error moving: " + sdpair[0] + " into: " + sdpair[1]); } } } } } } catch (IOException var21) { throw new HiveException(var21.getMessage(), var21); } } public static boolean trashFilesUnderDir(FileSystem fs, Path f, Configuration conf) throws FileNotFoundException, IOException { FileStatus[] statuses = fs.listStatus(f, HIDDEN_FILES_PATH_FILTER); boolean result = true; FileStatus[] arr$ = statuses; int len$ = statuses.length; for(int i$ = 0; i$ < len$; ++i$) { FileStatus status = arr$[i$]; result &= moveToTrash(fs, status.getPath(), conf); } return result; }

hive在执行loadPartition的时候,如果分区目录已经存在,会调用replaceFiles,replaceFiles会调用trashFilesUnderDir,trashFilesUnderDir里会逐个将文件放到回收站;

spark执行loadPartition的时候,直接反射调用hive的逻辑,为什么还会比hive执行慢很多呢?

这时注意到hive用的版本是2.1,spark2.1.1里依赖的hive版本是1.2,对比hive1.2和hive2.1之间的代码发现,确实有差别,以下是hive2.1的代码:

org.apache.hadoop.hive.ql.metadata.Hive(2.1版本)

/**   * Trashes or deletes all files under a directory. Leaves the directory as is.   * @param fs FileSystem to use   * @param f path of directory   * @param conf hive configuration   * @param forceDelete whether to force delete files if trashing does not succeed   * @return true if deletion successful   * @throws IOException   */  private boolean trashFilesUnderDir(final FileSystem fs, Path f, final Configuration conf)      throws IOException {    FileStatus[] statuses = fs.listStatus(f, FileUtils.HIDDEN_FILES_PATH_FILTER);    boolean result = true;    final List
> futures = new LinkedList<>(); final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build()) : null; final SessionState parentSession = SessionState.get(); for (final FileStatus status : statuses) { if (null == pool) { result &= FileUtils.moveToTrash(fs, status.getPath(), conf); } else { futures.add(pool.submit(new Callable
() { @Override public Boolean call() throws Exception { SessionState.setCurrentSessionState(parentSession); return FileUtils.moveToTrash(fs, status.getPath(), conf); } })); } } if (null != pool) { pool.shutdown(); for (Future
future : futures) { try { result &= future.get(); } catch (InterruptedException | ExecutionException e) { LOG.error("Failed to delete: ",e); pool.shutdownNow(); throw new IOException(e); } } } return result; }

可以看到在hive2.1里删除文件用到了线程池,而在hive1.2里是在for循环里串行删除,所以当文件很多时,hive2.1比hive1.2(即spark2.1.1)就会快非常多;

spark依赖hive的方式是直接反射调用,由于hive1.2和hive2.1很多类的方法接口都有调整,很难升级,所以遇到这个问题只能通过修改spark里Hive.trashFilesUnderDir代码,同样改为线程池的方式来删除文件,问题解决;

 

转载于:https://www.cnblogs.com/barneywill/p/10154922.html

你可能感兴趣的文章
Eqs - poj 1840(hash)
查看>>
JavaScript把项目本地的图片或者图片的绝对路径转为base64字符串、blob对象在上传...
查看>>
关于Tag的详细介绍
查看>>
查看更多
查看>>
HDU 3488 Tour【多个环的并】
查看>>
Muduo阅读笔记---net(三)
查看>>
CF940F Machine Learning
查看>>
文件查找工具、find详解
查看>>
两个经典问题
查看>>
lua 打印一个table的实现
查看>>
canvas动画:气泡上升效果
查看>>
作业 4:词频统计——基本功能
查看>>
并发编程
查看>>
SQL RIGHT JOIN 关键字
查看>>
How to SetUp The Receiving Transaction Manager
查看>>
css实现的鼠标悬浮提示
查看>>
Ubuntu网络配置与SSH远程连接
查看>>
Vector、 ArrayList 、List、 Set、 Map
查看>>
浅谈输入法编程(转)
查看>>
电子书下载:Building Websites with DotNetNuke 5
查看>>