diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java index b8491173b10a..83d00942edd5 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java @@ -143,7 +143,7 @@ public void initialize(Configuration conf, Properties serDeProperties, this.projectedSchema = projectedSchema(conf, serDeProperties.getProperty(Catalogs.NAME), tableSchema, jobConf); - if (!IcebergTableUtil.isFanoutEnabled(Maps.fromProperties(serDeProperties))) { + if (!IcebergTableUtil.isFanoutEnabled(serDeProperties::getProperty)) { // ClusteredWriter requires that records are ordered by partition keys. // Here we ensure that SortedDynPartitionOptimizer will kick in and do the sorting. HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_OPT_SORT_DYNAMIC_PARTITION_THRESHOLD, 1); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 65852f1a8553..3a4b1f1f8d7d 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -1657,10 +1657,7 @@ public static Schema schema(Configuration config) { @VisibleForTesting static void overlayTableProperties(Configuration configuration, TableDesc tableDesc, Map map) { Properties props = tableDesc.getProperties(); - - Maps.fromProperties(props).entrySet().stream() - .filter(entry -> !map.containsKey(entry.getKey())) // map overrides tableDesc properties - .forEach(entry -> map.put(entry.getKey(), entry.getValue())); + props.forEach((key, value) -> map.putIfAbsent((String) key, (String) value)); String location; Schema schema; diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index 386473f46fee..fe1a06d32acb 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -115,7 +115,6 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.Pair; -import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.StructProjection; import org.slf4j.Logger; @@ -436,7 +435,12 @@ public static boolean isCopyOnWriteMode(Context.Operation operation, BinaryOpera } public static boolean isFanoutEnabled(Map props) { - return PropertyUtil.propertyAsBoolean(props, InputFormatConfig.WRITE_FANOUT_ENABLED, true); + return isFanoutEnabled(props::getOrDefault); + } + + public static boolean isFanoutEnabled(BinaryOperator props) { + return Boolean.parseBoolean( + props.apply(InputFormatConfig.WRITE_FANOUT_ENABLED, "true")); } public static void performMetadataDelete(Table icebergTable, String branchName, SearchArgument sarg) { diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java index b5f5eb31587b..193137058c1f 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java @@ -240,7 +240,7 @@ private JobConf jobConf(Table table, int taskNum) { tableDesc.getProperties().setProperty(InputFormatConfig.CATALOG_NAME, table.properties() .get(InputFormatConfig.CATALOG_NAME)); HiveIcebergStorageHandler.overlayTableProperties(conf, tableDesc, propMap); - propMap.forEach((key, value) -> conf.set(key, value)); + propMap.forEach(conf::set); return conf; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 909b22cf8a47..c1944805417f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -49,6 +49,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import java.util.function.UnaryOperator; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -484,8 +485,7 @@ private BucketMetaData(int bucketId, int copyNumber) { * @return true, if the tblProperties contains {@link AcidUtils#COMPACTOR_TABLE_PROPERTY} */ public static boolean isCompactionTable(Properties tblProperties) { - return tblProperties != null && - StringUtils.isNotBlank((String) tblProperties.get(COMPACTOR_TABLE_PROPERTY)); + return tblProperties != null && isCompactionTable(tblProperties::getProperty); } /** @@ -494,7 +494,11 @@ public static boolean isCompactionTable(Properties tblProperties) { * @return true, if the parameters contains {@link AcidUtils#COMPACTOR_TABLE_PROPERTY} */ public static boolean isCompactionTable(Map parameters) { - return StringUtils.isNotBlank(parameters.get(COMPACTOR_TABLE_PROPERTY)); + return isCompactionTable(parameters::get); + } + + private static boolean isCompactionTable(UnaryOperator props) { + return StringUtils.isNotBlank(props.apply(COMPACTOR_TABLE_PROPERTY)); } /** @@ -762,7 +766,7 @@ public static AcidOperationalProperties parseString(String propertiesStr) { AcidOperationalProperties obj = new AcidOperationalProperties(); String[] options = propertiesStr.split("\\|"); for (String option : options) { - if (option.trim().length() == 0) continue; // ignore empty strings + if (option.trim().isEmpty()) continue; // ignore empty strings switch (option) { case SPLIT_UPDATE_STRING: obj.setSplitUpdate(true); @@ -1954,17 +1958,17 @@ private static boolean isDirUsable(Path child, long visibilityTxnId, List } public static boolean isTablePropertyTransactional(Properties props) { - String resultStr = (String) props.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); - if (resultStr == null) { - resultStr = (String) props.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); - } - return Boolean.parseBoolean(resultStr); + return isTablePropertyTransactional(props::getProperty); } public static boolean isTablePropertyTransactional(Map parameters) { - String resultStr = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + return isTablePropertyTransactional(parameters::get); + } + + private static boolean isTablePropertyTransactional(UnaryOperator prope) { + String resultStr = prope.apply(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); if (resultStr == null) { - resultStr = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); + resultStr = prope.apply(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); } return Boolean.parseBoolean(resultStr); } @@ -2206,16 +2210,19 @@ private static long getLastFlushLength(FileSystem fs, FileStatus deltaFile) thro * @return true if table is an INSERT_ONLY table, false otherwise */ public static boolean isInsertOnlyTable(Map params) { - String transactionalProp = params.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); - return INSERTONLY_TRANSACTIONAL_PROPERTY.equalsIgnoreCase(transactionalProp); + return isInsertOnlyTable(params::get); } public static boolean isInsertOnlyTable(Table table) { - return isTransactionalTable(table) && getAcidOperationalProperties(table).isInsertOnly(); + return isTransactionalTable(table) && isInsertOnlyTable(table::getProperty); } public static boolean isInsertOnlyTable(Properties params) { - String transactionalProp = (String) params.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + return isInsertOnlyTable(params::getProperty); + } + + private static boolean isInsertOnlyTable(UnaryOperator props) { + String transactionalProp = props.apply(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); return INSERTONLY_TRANSACTIONAL_PROPERTY.equalsIgnoreCase(transactionalProp); }