Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void initialize(Configuration conf, Properties serDeProperties,
this.projectedSchema =
projectedSchema(conf, serDeProperties.getProperty(Catalogs.NAME), tableSchema, jobConf);

if (!IcebergTableUtil.isFanoutEnabled(Maps.fromProperties(serDeProperties))) {
if (!IcebergTableUtil.isFanoutEnabled(serDeProperties::getProperty)) {
// ClusteredWriter requires that records are ordered by partition keys.
// Here we ensure that SortedDynPartitionOptimizer will kick in and do the sorting.
HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_OPT_SORT_DYNAMIC_PARTITION_THRESHOLD, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1657,10 +1657,7 @@ public static Schema schema(Configuration config) {
@VisibleForTesting
static void overlayTableProperties(Configuration configuration, TableDesc tableDesc, Map<String, String> map) {
Properties props = tableDesc.getProperties();

Maps.fromProperties(props).entrySet().stream()
.filter(entry -> !map.containsKey(entry.getKey())) // map overrides tableDesc properties
.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
props.forEach((key, value) -> map.putIfAbsent((String) key, (String) value));

String location;
Schema schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.StructProjection;
import org.slf4j.Logger;
Expand Down Expand Up @@ -436,7 +435,12 @@ public static boolean isCopyOnWriteMode(Context.Operation operation, BinaryOpera
}

public static boolean isFanoutEnabled(Map<String, String> props) {
return PropertyUtil.propertyAsBoolean(props, InputFormatConfig.WRITE_FANOUT_ENABLED, true);
return isFanoutEnabled(props::getOrDefault);
}

public static boolean isFanoutEnabled(BinaryOperator<String> props) {
return Boolean.parseBoolean(
props.apply(InputFormatConfig.WRITE_FANOUT_ENABLED, "true"));
}

public static void performMetadataDelete(Table icebergTable, String branchName, SearchArgument sarg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private JobConf jobConf(Table table, int taskNum) {
tableDesc.getProperties().setProperty(InputFormatConfig.CATALOG_NAME, table.properties()
.get(InputFormatConfig.CATALOG_NAME));
HiveIcebergStorageHandler.overlayTableProperties(conf, tableDesc, propMap);
propMap.forEach((key, value) -> conf.set(key, value));
propMap.forEach(conf::set);
return conf;
}

Expand Down
37 changes: 22 additions & 15 deletions ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -484,8 +485,7 @@ private BucketMetaData(int bucketId, int copyNumber) {
* @return true, if the tblProperties contains {@link AcidUtils#COMPACTOR_TABLE_PROPERTY}
*/
public static boolean isCompactionTable(Properties tblProperties) {
return tblProperties != null &&
StringUtils.isNotBlank((String) tblProperties.get(COMPACTOR_TABLE_PROPERTY));
return tblProperties != null && isCompactionTable(tblProperties::getProperty);
}

/**
Expand All @@ -494,7 +494,11 @@ public static boolean isCompactionTable(Properties tblProperties) {
* @return true, if the parameters contains {@link AcidUtils#COMPACTOR_TABLE_PROPERTY}
*/
public static boolean isCompactionTable(Map<String, String> parameters) {
return StringUtils.isNotBlank(parameters.get(COMPACTOR_TABLE_PROPERTY));
return isCompactionTable(parameters::get);
}

private static boolean isCompactionTable(UnaryOperator<String> props) {
return StringUtils.isNotBlank(props.apply(COMPACTOR_TABLE_PROPERTY));
}

/**
Expand Down Expand Up @@ -762,7 +766,7 @@ public static AcidOperationalProperties parseString(String propertiesStr) {
AcidOperationalProperties obj = new AcidOperationalProperties();
String[] options = propertiesStr.split("\\|");
for (String option : options) {
if (option.trim().length() == 0) continue; // ignore empty strings
if (option.trim().isEmpty()) continue; // ignore empty strings
switch (option) {
case SPLIT_UPDATE_STRING:
obj.setSplitUpdate(true);
Expand Down Expand Up @@ -1954,17 +1958,17 @@ private static boolean isDirUsable(Path child, long visibilityTxnId, List<Path>
}

public static boolean isTablePropertyTransactional(Properties props) {
String resultStr = (String) props.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
if (resultStr == null) {
resultStr = (String) props.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase());
}
return Boolean.parseBoolean(resultStr);
return isTablePropertyTransactional(props::getProperty);
}

public static boolean isTablePropertyTransactional(Map<String, String> parameters) {
String resultStr = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
return isTablePropertyTransactional(parameters::get);
}

private static boolean isTablePropertyTransactional(UnaryOperator<String> prope) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: prope -> props

String resultStr = prope.apply(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
if (resultStr == null) {
resultStr = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase());
resultStr = prope.apply(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase());
}
return Boolean.parseBoolean(resultStr);
}
Expand Down Expand Up @@ -2206,16 +2210,19 @@ private static long getLastFlushLength(FileSystem fs, FileStatus deltaFile) thro
* @return true if table is an INSERT_ONLY table, false otherwise
*/
public static boolean isInsertOnlyTable(Map<String, String> params) {
String transactionalProp = params.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
return INSERTONLY_TRANSACTIONAL_PROPERTY.equalsIgnoreCase(transactionalProp);
return isInsertOnlyTable(params::get);
}

public static boolean isInsertOnlyTable(Table table) {
return isTransactionalTable(table) && getAcidOperationalProperties(table).isInsertOnly();
return isTransactionalTable(table) && isInsertOnlyTable(table::getProperty);
}

public static boolean isInsertOnlyTable(Properties params) {
String transactionalProp = (String) params.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
return isInsertOnlyTable(params::getProperty);
}

private static boolean isInsertOnlyTable(UnaryOperator<String> props) {
String transactionalProp = props.apply(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
return INSERTONLY_TRANSACTIONAL_PROPERTY.equalsIgnoreCase(transactionalProp);
}

Expand Down
Loading