博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hive指定多个字符作为列分隔符
阅读量:6949 次
发布时间:2019-06-27

本文共 5912 字,大约阅读时间需要 19 分钟。

hot3.png

hive创建表指定分隔符,不支持多个字符作为分隔符,如果想使用多个字符作为分割符的话就需要实现InputFormat.主要重写next方法,代码如下

package hiveStream;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapred.InputSplit;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.JobConfigurable;import org.apache.hadoop.mapred.RecordReader;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextInputFormat;public class MyHiveInputFormat extends TextInputFormat implements        JobConfigurable {    public RecordReader
getRecordReader( InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); return new MyRecordReader((FileSplit) genericSplit, job); }}

package hiveStream;import java.io.IOException;import java.io.InputStream;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapred.RecordReader;import org.apache.hadoop.util.LineReader;public class MyRecordReader implements RecordReader
{ private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private LineReader lineReader; int maxLineLength; // 构造方法 public MyRecordReader(FileSplit inputSplit, Configuration job) throws IOException { maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE); start = inputSplit.getStart(); end = start + inputSplit.getLength(); final Path file = inputSplit.getPath(); // 创建压缩器 compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); // 打开文件系统 FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(file); boolean skipFirstLine = false; if (codec != null) { lineReader = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; --start; fileIn.seek(start); } lineReader = new LineReader(fileIn, job); } if (skipFirstLine) { start += lineReader.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); } this.pos = start; } public MyRecordReader(InputStream in, long offset, long endOffset, int maxLineLength) { this.maxLineLength = maxLineLength; this.start = offset; this.lineReader = new LineReader(in); this.pos = offset; this.end = endOffset; } public MyRecordReader(InputStream in, long offset, long endOffset, Configuration job) throws IOException { this.maxLineLength = job.getInt( "mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE); this.lineReader = new LineReader(in, job); this.start = offset; this.end = endOffset; } @Override public void close() throws IOException { if (lineReader != null) lineReader.close(); } @Override public LongWritable createKey() { return new LongWritable(); } @Override public Text createValue() { return new Text(); } @Override public long getPos() throws IOException { return pos; } @Override public float getProgress() throws IOException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float) (end - start)); } } @Override public boolean next(LongWritable key, Text value) throws IOException { while (pos < end) { key.set(pos); int newSize = lineReader.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); // 把字符串中的"##"转变为"#" String strReplace = value.toString().replace("##", "#"); Text txtReplace = new Text(); txtReplace.set(strReplace); value.set(txtReplace.getBytes(), 0, txtReplace.getLength()); if (newSize == 0) return false; pos += newSize; if (newSize < maxLineLength) return true; } return false; }}

建表语句:自定义 outputformat/inputformat 后,在建表时需要指定 outputformat/inputformat

create external table testHiveInput( id int,name string,age int) row format delimited fields terminated by '|' stored as INPUTFORMAT 'hiveStream.MyHiveInputFormat'  OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' location '/user/hdfs/hiveInput';

测试数据:

1##Tom##22

2##Jerry##22
3##Jeny##22

测试代码( 通过jdbc来查询数据):

public static void testHive() throws Exception {        String sql = "select id,name,age from testHiveInput";        Class.forName("org.apache.hive.jdbc.HiveDriver");        String url = "jdbc:hive2://xxx.xxx.xxx.xxx:10000";        Connection conn = DriverManager.getConnection(url, "hive", "passwd");        Statement stmt = conn.createStatement();        stmt.execute("add jar /usr/lib/hive/lib/hiveInput.jar");        ResultSet rs = stmt.executeQuery(sql);        while (rs.next()) {            System.out.println(rs.getString("id")+" "+ rs.getString("name") + " " + rs.getString("age"));        }    }

转载于:https://my.oschina.net/u/1167806/blog/200808

你可能感兴趣的文章
控制台读写
查看>>
LVS+keepalived负载均衡实战
查看>>
使用 IntraWeb (17) - 基本控件之 TIWRadioButton、TIWRadioGroup、TIWCheckBox
查看>>
iptables 基础与案例配置
查看>>
命令行 简单的字符串处理函数集
查看>>
centos6.4下openstack-grizzly安装之网络节点
查看>>
KVM虚拟化搭建及其KVM中LVM扩容
查看>>
管理磁盘和文件系统
查看>>
CSS解决高度自适应问题
查看>>
硬件产品介绍之思科MDS9710
查看>>
win32 备忘2
查看>>
鼠标css样式:cursor
查看>>
hadoop-2.7.1+zookeeper-3.4.8+hbase-1.2.1+apache-hive-2.0.0完全分布式集群
查看>>
【军哥谈CI框架】之Ajax分页教程—《兄弟连微电影第一季一路向东之屌丝程序员之戏说PHP》演......
查看>>
测试文章
查看>>
进程与线程的一个简单解释
查看>>
pdf password recovery remove去除pdf文件密码
查看>>
使用.NET连接Sybase数据库的几种方法
查看>>
myeclipse6.5安装 svn插件方法
查看>>
Spring AOP 实现原理
查看>>