這篇文章給大家分享的是有關(guān)Spark存儲(chǔ)Parquet數(shù)據(jù)到Hive時(shí)如何對(duì)map、array、struct字段類型進(jìn)行處理的內(nèi)容。小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過來看看吧。
從網(wǎng)站建設(shè)到定制行業(yè)解決方案,為提供成都網(wǎng)站建設(shè)、成都網(wǎng)站制作服務(wù)體系,各種行業(yè)企業(yè)客戶提供網(wǎng)站建設(shè)解決方案,助力業(yè)務(wù)快速發(fā)展。成都創(chuàng)新互聯(lián)將不斷加快創(chuàng)新步伐,提供優(yōu)質(zhì)的建站服務(wù)。
為了更好的說明導(dǎo)致問題的原因、現(xiàn)象以及解決方案,首先看下述示例:
-- 創(chuàng)建存儲(chǔ)格式為parquet的Hive非分區(qū)表
CREATE EXTERNAL TABLE `t1`(
`id` STRING,
`map_col` MAP<STRING, STRING>,
`arr_col` ARRAY<STRING>,
`struct_col` STRUCT<A:STRING,B:STRING>)
STORED AS PARQUET
LOCATION '/home/spark/test/tmp/t1';
-- 創(chuàng)建存儲(chǔ)格式為parquet的Hive分區(qū)表
CREATE EXTERNAL TABLE `t2`(
`id` STRING,
`map_col` MAP<STRING, STRING>,
`arr_col` ARRAY<STRING>,
`struct_col` STRUCT<A:STRING,B:STRING>)
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/home/spark/test/tmp/t2';
insert into table t1 values(1,map(),array('1,1,1'),named_struct('A','1','B','1'));
insert into table t2 partition(dt='20200101')
t1表正常執(zhí)行,但對(duì)t2執(zhí)行上述insert語句時(shí),報(bào)如下異常:
Caused by: parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead
at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:244)
at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeMap(DataWritableWriter.java:241)
at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeValue(DataWritableWriter.java:116)
at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeGroupFields(DataWritableWriter.java:89)
at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:60)
... 23 more
t1和t2從建表看唯一的區(qū)別就是t1不是分區(qū)表而t2是分區(qū)表,僅僅從報(bào)錯(cuò)信息是無法看出表分區(qū)產(chǎn)生這種問題的原因,看看源碼是做了哪些不同的處理(這里為了方便,筆者這里直接給出分析這個(gè)問題的源碼思路圖):
從拋出的異常信息empty fields are illegal,關(guān)鍵看empty fields在哪里拋出,做了哪些處理,這要看MessageColumnIO中startField和endField是做了哪些處理:
public void startField(String field, int index) {
try {
if (MessageColumnIO.DEBUG) {
this.log("startField(" + field + ", " + index + ")");
}
this.currentColumnIO = ((GroupColumnIO)this.currentColumnIO).getChild(index);
//MessageColumnIO中,startField方法中首先會(huì)將emptyField設(shè)置為true
this.emptyField = true;
if (MessageColumnIO.DEBUG) {
this.printState();
}
} catch (RuntimeException var4) {
throw new ParquetEncodingException("error starting field " + field + " at " + index, var4);
}
}
//endField方法中會(huì)針對(duì)emptyField是否為true來決定是否拋出異常
public void endField(String field, int index) {
if (MessageColumnIO.DEBUG) {
this.log("endField(" + field + ", " + index + ")");
}
this.currentColumnIO = this.currentColumnIO.getParent();
//如果到這里仍為true,則拋異常
if (this.emptyField) {
throw new ParquetEncodingException("empty fields are illegal, the field should be ommited completely instead");
} else {
this.fieldsWritten[this.currentLevel].markWritten(index);
this.r[this.currentLevel] = this.currentLevel == 0 ? 0 : this.r[this.currentLevel - 1];
if (MessageColumnIO.DEBUG) {
this.printState();
}
}
}
針對(duì)map做處理的一些源碼:
private void writeMap(final Object value, final MapObjectInspector inspector, final GroupType type) {
// Get the internal map structure (MAP_KEY_VALUE)
GroupType repeatedType = type.getType(0).asGroupType();
recordConsumer.startGroup();
recordConsumer.startField(repeatedType.getName(), 0);
Map<?, ?> mapValues = inspector.getMap(value);
Type keyType = repeatedType.getType(0);
String keyName = keyType.getName();
ObjectInspector keyInspector = inspector.getMapKeyObjectInspector();
Type valuetype = repeatedType.getType(1);
String valueName = valuetype.getName();
ObjectInspector valueInspector = inspector.getMapValueObjectInspector();
for (Map.Entry<?, ?> keyValue : mapValues.entrySet()) {
recordConsumer.startGroup();
if (keyValue != null) {
// write key element
Object keyElement = keyValue.getKey();
//recordConsumer此處對(duì)應(yīng)的是MessageColumnIO中的MessageColumnIORecordConsumer
//查看其中的startField和endField的處理
recordConsumer.startField(keyName, 0);
//查看writeValue中對(duì)原始數(shù)據(jù)類型的處理,如int、boolean、varchar
writeValue(keyElement, keyInspector, keyType);
recordConsumer.endField(keyName, 0);
// write value element
Object valueElement = keyValue.getValue();
if (valueElement != null) {
//同上
recordConsumer.startField(valueName, 1);
writeValue(valueElement, valueInspector, valuetype);
recordConsumer.endField(valueName, 1);
}
}
recordConsumer.endGroup();
}
recordConsumer.endField(repeatedType.getName(), 0);
recordConsumer.endGroup();
}
private void writePrimitive(final Object value, final PrimitiveObjectInspector inspector) {
//value為null,則return
if (value == null) {
return;
}
switch (inspector.getPrimitiveCategory()) {
//PrimitiveCategory為VOID,則return
case VOID:
return;
case DOUBLE:
recordConsumer.addDouble(((DoubleObjectInspector) inspector).get(value));
break;
//下面是對(duì)double、boolean、float、byte、int等數(shù)據(jù)類型做的處理,這里不在貼出
....
這里只是以map為例,對(duì)于array、struct都有類似問題,看源碼HiveFileFormat -> DataWritableWriter對(duì)這三者處理方式類似。類似的問題,在Hive的issue中https://issues.apache.org/jira/browse/HIVE-11625也有討論。
1. 如果無法改變建表schema,或者存儲(chǔ)時(shí)底層用的就是HiveFileFormat
-- 這種方式本質(zhì)上還是用ParquetFileFormat,并且是內(nèi)部表,生產(chǎn)中不建議直接使用這種方式
CREATE TABLE `test`(
`id` STRING,
`map_col` MAP<STRING, STRING>,
`arr_col` ARRAY<STRING>,
`struct_col` STRUCT<A:STRING,B:STRING>)
USING parquet
OPTIONS(`serialization.format` '1');
3. 存儲(chǔ)時(shí)指定ParquetFileFormat
感謝各位的閱讀!關(guān)于“Spark存儲(chǔ)Parquet數(shù)據(jù)到Hive時(shí)如何對(duì)map、array、struct字段類型進(jìn)行處理”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!
當(dāng)前標(biāo)題:Spark存儲(chǔ)Parquet數(shù)據(jù)到Hive時(shí)如何對(duì)map、array、struct字段類型進(jìn)行處理
本文網(wǎng)址:http://m.rwnh.cn/article42/jdgdhc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站策劃、網(wǎng)站維護(hù)、、手機(jī)網(wǎng)站建設(shè)、網(wǎng)站導(dǎo)航、Google
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)